当前位置:   article > 正文

FlinkCDC2.0利用FlinkSQL采集MySQL_sql-client checkpoints

sql-client checkpoints

1.依赖管理

将如下依赖包放到FLINK_HOME/lib下。

flink-sql-connector-mysql-cdc-2.2.0.jar

flink-connector-jdbc_2.11-1.14.3.jar

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

2.Flink全局配置

修改flink-conf.yaml文件:

  1. execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  2. execution.checkpointing.max-concurrent-checkpoints: 1
  3. execution.checkpointing.mode: EXACTLY_ONCE
  4. execution.checkpointing.timeout: 10min
  5. state.backend: filesystem
  6. state.checkpoints.dir: hdfs://mycluster/flinkcdc-checkpoints

3.sql-client提交作业模式

1.Standalone模式

启动sql-client:bin/sql-client.sh embedded

注意,如果使用standalone模式运行,需要先启动一个Flink standalone集群,方法如下:

bin/start-cluster.sh

2.yarn-session模式(本案例使用方式)

先启动Flink yarn-session集群:bin/yarn-session.sh -s 1 -jm 1024 -tm 1024

然后再启动sql-client:bin/sql-client.sh embedded -s yarn-session

4.checkpoint配置

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#checkpointing

#sql-client设置checkpoint参数

  1. SET 'execution.checkpointing.interval' = '10s';
  2. SET 'parallelism.default' = '3';

5.创建source table

  1. CREATE TABLE `cars`(
  2.  `id` BIGINT,
  3.  `owerId` BIGINT,
  4.  `carCode` STRING,
  5.  `carColor` STRING,
  6.  `type` BIGINT,
  7.  `remark` STRING,
  8.  PRIMARY KEY(id) NOT ENFORCED
  9. ) WITH (
  10.  'connector' = 'mysql-cdc',
  11.  'hostname' = 'hadoop1',
  12.  'port' = '3306',
  13.  'username' = 'hive',
  14.  'password' = 'hive',
  15.  'database-name' = 'sca',
  16.  'table-name' = 'cars',
  17.  'connect.timeout' = '60s'
  18. );

6.创建sink table

  1. CREATE TABLE `cars_copy`(
  2.  `id` BIGINT,
  3.  `owerId` BIGINT,
  4.  `carCode` STRING,
  5.  `carColor` STRING,
  6.  `type` BIGINT,
  7.  `remark` STRING,
  8.  PRIMARY KEY(id) NOT ENFORCED
  9. ) WITH (
  10.  'connector' = 'jdbc',
  11.  'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8',
  12.  'username' = 'hive',
  13.  'password' = 'hive',
  14.  'table-name' = 'cars_copy',
  15.  'sink.parallelism' = '2'
  16. );

7.source to sink table

将采集过来的数据写入MySQL

insert into cars_copy SELECT * FROM cars;

查询结果表数据记录数

select count(*) from cars_copy

新增测试数据集(再次查看结果表)

  1. insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096006','10244815','港T·7RONE','红色','1',NULL);
  2. insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096007','10244816','港T·7RONE','黄色','1',NULL);

备注:如果通过手动cacel job,下次重新启动job仍然会重头采集表中的数据。

8.cacel job时保存Save point

bin/flink stop --savepointPath hdfs://mycluster/flinkcdc-savepoints -Dyarn.application.id=application_1658045078748_0001 79ce915e39fc1d18a194b6a464d7c3fd

备注:结尾一个参数为yarn中的job id,第二个参数为flink的job id。

9.cacel job之后重新恢复job

#设置job从上一次savepoint位置开始处理

  1. SET 'execution.checkpointing.interval' = '10s';
  2. SET 'parallelism.default' = '3';
  3. SET 'execution.savepoint.path' = 'hdfs://mycluster/flinkcdc-savepoints/savepoint-79ce91-92206bcaaad2';

备注:该参数的值为savepoint路径。

#执行flink sql job

insert into cars_copy SELECT * FROM cars;

#新增测试数据集

  1. insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096008','10244815','港T·7RONE','红色','1',NULL);
  2. insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096009','10244816','港T·7RONE','黄色','1',NULL);

#再次查询结果表数据记录数

select count(*) from cars_copy

正常情况,这个时候采集的就是新增数据,历史数据不会再采集。

备注:Flink  SQL方式采集MySQL数据,使用方便,但只支持单表。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/876399
推荐阅读
相关标签
  

闽ICP备14008679号