当前位置:   article > 正文

Flink SQL -- CheckPoint_flinksql checkpoint

flinksql checkpoint
1、开启CheckPoint

checkpoint可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失

  1. # 修改flink配置文件
  2. vim flink-conf.yaml
  3. # checkppint 间隔时间
  4. execution.checkpointing.interval: 1min
  5. # 任务手动取消时保存checkpoint
  6. execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  7. # 同时允许1个checkpoint执行
  8. execution.checkpointing.max-concurrent-checkpoints: 1
  9. execution.checkpointing.min-pause: 0
  10. # 数据处理的语义
  11. execution.checkpointing.mode: EXACTLY_ONCE
  12. # checkpoint超时时间
  13. execution.checkpointing.timeout: 10min
  14. execution.checkpointing.tolerable-failed-checkpoints: 0
  15. execution.checkpointing.unaligned: false
  16. # 状态后端(保存状态的位置,hashmap:内存)
  17. state.backend: hashmap
  18. # checkpoint路径
  19. state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2、编写一个Flnik SQL 脚本:
vim word_count.sql
  1. -- 实时从kafka中读取单词,统计单词的数量,将结果保存到mysql中
  2. -- 1、创建source表
  3. CREATE TABLE words (
  4. word STRING
  5. ) WITH (
  6. 'connector' = 'kafka',
  7. 'topic' = 'words', -- 数据的topic
  8. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  9. 'properties.group.id' = 'testGroup', -- 消费者组
  10. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  11. 'format' = 'csv' -- 读取数据的格式
  12. );
  13. -- 2、创建sink表
  14. CREATE TABLE word_count (
  15. word STRING,
  16. num BIGINT,
  17. PRIMARY KEY (word) NOT ENFORCED -- 主键
  18. ) WITH (
  19. 'connector' = 'jdbc',
  20. 'url' = 'jdbc:mysql://master:3306/student',
  21. 'table-name' = 'word_count', -- 需要手动到mysql中创建表
  22. 'username' ='root',
  23. 'password' ='123456'
  24. );
  25. -- 3、编写sql处理数据将结果保存到sink表中
  26. insert into word_count
  27. select
  28. word,
  29. count(1) as num
  30. from
  31. words
  32. group by word;
3、使用sq-client.sh -f 启动脚本
sql-client.sh -f word_count.sql
 4、当任务失败的时候再重新启动任务:
  1. -- 1、获取checkpoint的路径
  2. /file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23
  3. -- 2、再sql脚本中增加参数,增加到sql脚本的insert into 的前面
  4. -- 指定任务会的checkpoint的地址
  5. SET'execution.savepoint.path'='hdfs://master:9000/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23';
  6. -- 3、启动sql任务
  7. sql-client.sh -f word_count.sql

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/876337?site
推荐阅读
相关标签
  

闽ICP备14008679号