赞
踩
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.2.1</version>
- </dependency>
情形:
Snapshot step 3 的时候会进行锁表,
Snapshot step 6 的时候会锁库
Snapshot step 7 的时候读全量表
- According to the connector configuration both schema and data will be snapshotted
- Snapshot step 1 - Preparing
- Snapshot step 2 - Determining captured tables
- Snapshot step 3 - Locking captured tables [hello.order]
- Snapshot step 4 - Determining snapshot offset
- Read binlog position of MySQL primary server
- Snapshot step 5 - Reading structure of captured tablesAll eligible tables schema should be captured, capturing: [tcec.hello....
- Snapshot step 6 - Persisting schema history
- Releasing global read lock to enable MySQL writes
- Writes to MySQL tables prevented for a total of 00:00:13.912
- Snapshot step 7 - Snapshotting data
- Snapshotting contents of 1 tables while still in transaction
- For table 'hello.order' using select statement: 'SELECT * FROM `hello`.`order`'
- Finished exporting 195 records for table
MySqlSnapshotChangeEventSource.class:147
- Properties properties = new Properties();
- properties.setProperty("debezium.snapshot.locking.mode", "none");
- // 使用这个配置关闭全部锁表
- properties.setProperty("test.disable.global.locking", "true");
-
- DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
- .hostname("127.0.0.1")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("hello")
- .tableList("order") //多个表逗号分隔
- .debeziumProperties(properties)
- .deserializer(new Config())
- .startupOptions(StartupOptions.specificOffset("mysql-bin.000002",123123));
- .build();

- // 指定偏移量增量模式, 初始化不会读取数据
- builder.startupOptions(StartupOptions.specificOffset("mysql-bin.000002",123123));
-
-
- // 默认全量
- public static StartupOptions initial() {
- return new StartupOptions(StartupMode.INITIAL, (String)null, (Integer)null, (Long)null);
- }
-
- // 从最开始的binlog 开始
- public static StartupOptions earliest() {
- return new StartupOptions(StartupMode.EARLIEST_OFFSET, (String)null, (Integer)null, (Long)null);
- }
-
- // 从最后的binlog开始
- public static StartupOptions latest() {
- return new StartupOptions(StartupMode.LATEST_OFFSET, (String)null, (Integer)null, (Long)null);
- }
-
- // 从指定的binlog开始
- public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) {
- return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, (Long)null);
- }
-
- // 从指定binlog时间开始
- public static StartupOptions timestamp(long startupTimestampMillis) {
- return new StartupOptions(StartupMode.TIMESTAMP, (String)null, (Integer)null, startupTimestampMillis);
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。