当前位置:   article > 正文

Flink-CDC 关闭初始化读表锁, DDL 锁 Schema 锁, 使用增量或者全量或者指定偏移量_flinkcdc锁表问题

flinkcdc锁表问题
  1. <dependency>
  2. <groupId>com.ververica</groupId>
  3. <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  4. <version>2.2.1</version>
  5. </dependency>

1, 关闭全局锁

情形: 

Snapshot step 3  的时候会进行锁表, 

Snapshot step 6  的时候会锁库

Snapshot step 7  的时候读全量表

  1. According to the connector configuration both schema and data will be snapshotted
  2. Snapshot step 1 - Preparing
  3. Snapshot step 2 - Determining captured tables
  4. Snapshot step 3 - Locking captured tables [hello.order]
  5. Snapshot step 4 - Determining snapshot offset
  6. Read binlog position of MySQL primary server
  7. Snapshot step 5 - Reading structure of captured tablesAll eligible tables schema should be captured, capturing: [tcec.hello....
  8. Snapshot step 6 - Persisting schema history
  9. Releasing global read lock to enable MySQL writes
  10. Writes to MySQL tables prevented for a total of 00:00:13.912
  11. Snapshot step 7 - Snapshotting data
  12. Snapshotting contents of 1 tables while still in transaction
  13. For table 'hello.order' using select statement: 'SELECT * FROM `hello`.`order`'
  14. Finished exporting 195 records for table

MySqlSnapshotChangeEventSource.class:147

  1. Properties properties = new Properties();
  2. properties.setProperty("debezium.snapshot.locking.mode", "none");
  3. // 使用这个配置关闭全部锁表
  4. properties.setProperty("test.disable.global.locking", "true");
  5. DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  6. .hostname("127.0.0.1")
  7. .port(3306)
  8. .username("root")
  9. .password("123456")
  10. .databaseList("hello")
  11. .tableList("order") //多个表逗号分隔
  12. .debeziumProperties(properties)
  13. .deserializer(new Config())
  14. .startupOptions(StartupOptions.specificOffset("mysql-bin.000002",123123));
  15. .build();

2,  初始化不读取全量数据, 使用增量模式

  1. // 指定偏移量增量模式, 初始化不会读取数据
  2. builder.startupOptions(StartupOptions.specificOffset("mysql-bin.000002",123123));
  3. // 默认全量
  4. public static StartupOptions initial() {
  5. return new StartupOptions(StartupMode.INITIAL, (String)null, (Integer)null, (Long)null);
  6. }
  7. // 从最开始的binlog 开始
  8. public static StartupOptions earliest() {
  9. return new StartupOptions(StartupMode.EARLIEST_OFFSET, (String)null, (Integer)null, (Long)null);
  10. }
  11. // 从最后的binlog开始
  12. public static StartupOptions latest() {
  13. return new StartupOptions(StartupMode.LATEST_OFFSET, (String)null, (Integer)null, (Long)null);
  14. }
  15. // 从指定的binlog开始
  16. public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) {
  17. return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, (Long)null);
  18. }
  19. // 从指定binlog时间开始
  20. public static StartupOptions timestamp(long startupTimestampMillis) {
  21. return new StartupOptions(StartupMode.TIMESTAMP, (String)null, (Integer)null, startupTimestampMillis);
  22. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/244766
推荐阅读
相关标签
  

闽ICP备14008679号