赞
踩
将如下依赖包放到FLINK_HOME/lib下。
flink-sql-connector-mysql-cdc-2.2.0.jar
flink-connector-jdbc_2.11-1.14.3.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
修改flink-conf.yaml文件:
- execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
-
- execution.checkpointing.max-concurrent-checkpoints: 1
-
- execution.checkpointing.mode: EXACTLY_ONCE
-
- execution.checkpointing.timeout: 10min
-
- state.backend: filesystem
-
- state.checkpoints.dir: hdfs://mycluster/flinkcdc-checkpoints
1.Standalone模式
启动sql-client:bin/sql-client.sh embedded
注意,如果使用standalone模式运行,需要先启动一个Flink standalone集群,方法如下:
bin/start-cluster.sh
2.yarn-session模式(本案例使用方式)
先启动Flink yarn-session集群:bin/yarn-session.sh -s 1 -jm 1024 -tm 1024
然后再启动sql-client:bin/sql-client.sh embedded -s yarn-session
官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#checkpointing
#sql-client设置checkpoint参数
- SET 'execution.checkpointing.interval' = '10s';
-
- SET 'parallelism.default' = '3';
- CREATE TABLE `cars`(
-
- `id` BIGINT,
-
- `owerId` BIGINT,
-
- `carCode` STRING,
-
- `carColor` STRING,
-
- `type` BIGINT,
-
- `remark` STRING,
-
- PRIMARY KEY(id) NOT ENFORCED
-
- ) WITH (
-
- 'connector' = 'mysql-cdc',
-
- 'hostname' = 'hadoop1',
-
- 'port' = '3306',
-
- 'username' = 'hive',
-
- 'password' = 'hive',
-
- 'database-name' = 'sca',
-
- 'table-name' = 'cars',
-
- 'connect.timeout' = '60s'
-
- );

- CREATE TABLE `cars_copy`(
-
- `id` BIGINT,
-
- `owerId` BIGINT,
-
- `carCode` STRING,
-
- `carColor` STRING,
-
- `type` BIGINT,
-
- `remark` STRING,
-
- PRIMARY KEY(id) NOT ENFORCED
-
- ) WITH (
-
- 'connector' = 'jdbc',
-
- 'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8',
-
- 'username' = 'hive',
-
- 'password' = 'hive',
-
- 'table-name' = 'cars_copy',
-
- 'sink.parallelism' = '2'
-
- );

将采集过来的数据写入MySQL
insert into cars_copy SELECT * FROM cars;
查询结果表数据记录数
select count(*) from cars_copy
新增测试数据集(再次查看结果表)
- insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096006','10244815','港T·7RONE','红色','1',NULL);
-
- insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096007','10244816','港T·7RONE','黄色','1',NULL);
备注:如果通过手动cacel job,下次重新启动job仍然会重头采集表中的数据。
bin/flink stop --savepointPath hdfs://mycluster/flinkcdc-savepoints -Dyarn.application.id=application_1658045078748_0001 79ce915e39fc1d18a194b6a464d7c3fd
备注:结尾一个参数为yarn中的job id,第二个参数为flink的job id。
#设置job从上一次savepoint位置开始处理
- SET 'execution.checkpointing.interval' = '10s';
-
- SET 'parallelism.default' = '3';
-
- SET 'execution.savepoint.path' = 'hdfs://mycluster/flinkcdc-savepoints/savepoint-79ce91-92206bcaaad2';
备注:该参数的值为savepoint路径。
#执行flink sql job
insert into cars_copy SELECT * FROM cars;
#新增测试数据集
- insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096008','10244815','港T·7RONE','红色','1',NULL);
-
- insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096009','10244816','港T·7RONE','黄色','1',NULL);
#再次查询结果表数据记录数
select count(*) from cars_copy
正常情况,这个时候采集的就是新增数据,历史数据不会再采集。
备注:Flink SQL方式采集MySQL数据,使用方便,但只支持单表。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。