赞
踩
checkpoint可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失
- # 修改flink配置文件
- vim flink-conf.yaml
-
- # checkppint 间隔时间
- execution.checkpointing.interval: 1min
- # 任务手动取消时保存checkpoint
- execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
- # 同时允许1个checkpoint执行
- execution.checkpointing.max-concurrent-checkpoints: 1
- execution.checkpointing.min-pause: 0
- # 数据处理的语义
- execution.checkpointing.mode: EXACTLY_ONCE
- # checkpoint超时时间
- execution.checkpointing.timeout: 10min
- execution.checkpointing.tolerable-failed-checkpoints: 0
- execution.checkpointing.unaligned: false
- # 状态后端(保存状态的位置,hashmap:内存)
- state.backend: hashmap
- # checkpoint路径
- state.checkpoints.dir: hdfs://master:9000/flink/checkpoint

vim word_count.sql
- -- 实时从kafka中读取单词,统计单词的数量,将结果保存到mysql中
-
- -- 1、创建source表
- CREATE TABLE words (
- word STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'words', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
-
- -- 2、创建sink表
- CREATE TABLE word_count (
- word STRING,
- num BIGINT,
- PRIMARY KEY (word) NOT ENFORCED -- 主键
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/student',
- 'table-name' = 'word_count', -- 需要手动到mysql中创建表
- 'username' ='root',
- 'password' ='123456'
- );
-
- -- 3、编写sql处理数据将结果保存到sink表中
- insert into word_count
- select
- word,
- count(1) as num
- from
- words
- group by word;

sql-client.sh -f word_count.sql
- -- 1、获取checkpoint的路径
- /file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23
-
- -- 2、再sql脚本中增加参数,增加到sql脚本的insert into 的前面
- -- 指定任务会的checkpoint的地址
- SET'execution.savepoint.path'='hdfs://master:9000/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23';
-
- -- 3、启动sql任务
- sql-client.sh -f word_count.sql
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。