赞
踩
canal-adapter是阿里开源的一款基于canal server订阅Mysql binglog日志增量同步数据的一款工具。它整体包含启动器和适配器两个模块,启动器采用springboot项目框架,基于spring的SPI
机制,启动器动态加载不同的适配器(plugins), 目前支持rdbAdapter,esAdapter和hbaseAdatper等,canal-adapter通过这种动态加载外部适配器(plugins)的方式,使项目的扩张性非常强,用户能够根据具体需求自主适配符合自己的外部适配器。同时,通过FileAlterationMonitor
实现了动态监听和加载配置文件的变更,用户可以不停机动态更新配置。
1.系统启动的主入口为:com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
@SpringBootApplication
public class CanalAdapterApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(CanalAdapterApplication.class);
application.setBannerMode(Banner.Mode.OFF);
application.run(args);
}
}
2.通过@PostConstruct
注解,启动时加载com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
中的init()方法,初始化CanalAdapterLoader
。
@PostConstruct
public synchronized void init() {
if (running) {
return;
}
try {
logger.info("## start the canal client adapters.");
adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
adapterLoader.init();
running = true;
logger.info("## the canal client adapters are running now ......");
} catch (Exception e) {
logger.error("## something goes wrong when starting up the canal client adapters:", e);
}
}
3.com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
。CanalAdapterLoader类中的init()方法,根据在canal-adapter的application.yml
中配置的canal.conf.mode
来确定adapter-worker模式,因为本次采用tcp
模式,所以通过worker.start()
方法来启动监听的。
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 3
timeout:
accessKey:
secretKey:
srcDataSources:
public void init() { loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class); String canalServerHost = this.canalClientConfig.getCanalServerHost(); SocketAddress sa = null; if (canalServerHost != null) { String[] ipPort = canalServerHost.split(":"); sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1])); } String zkHosts = this.canalClientConfig.getZookeeperHosts(); if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) { // 初始化canal-client的适配器 //.....省略部分代码 worker.start(); logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance()); } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) { // 初始化canal-client-kafka的适配器 //.....省略部分代码 canalKafkaWorker.start(); logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-" + group.getGroupId()); } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) { // 初始化canal-client-rocketMQ的适配器 //.....省略部分代码 rocketMQWorker.start(); logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-" + group.getGroupId()); } }
4.com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
。worker.start()
是该抽象类中的方法,它会单独启动一个线程来处理消息。process()
方法是一个抽象方法,它有多种实现,根据client模式,选择不同的消息处理方法,它具体实现包含CanalAdapterWorker,CanalAdapterKafkaWorker和CanalAdapterRocketMQWorker等。
public void start() {
if (!running) {
thread = new Thread(this::process);
thread.setUncaughtExceptionHandler(handler);
thread.start();
running = true;
}
}
protected abstract void process();
5.com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
类是处理tcp模式消息的一个实现。CanalAdapterWorker.process()
方法是AbstractCanalAdapterWorker.process()
的一个具体实现。这个方法中:
(1)首先会加载canal-adapter客户端的一些基础配置信息,包括消息发送失败的最大重试次数retries
(如果retries =-1则表示无限重试),以及从adapter从canal server instance批量拉取消息的一个大小batchSize
。
(2)包含两个while循环。第一个while循环主要是:(2.1)开启canal adapter instance同步开关;(2.2)初始化canal server connetcor;(2.3)与canal adapter中配置的outerAdapters建立连接。第二个while循环主要是从connetcor中获取指定数量的增量数据Message message = connector.getWithoutAck(batchSize)
,同时将数据writeOut(message)
到指定的outerAdapters中。这块需要注意一点,如果在同步数据过程中,出现异常,它会根据最大重试次数,重新同步,但一旦超过最大重试次数,adapter会向connetor返回一个ack信号connector.ack(batchId)
,把本次batchId返回给connector,告诉connector,小于等于本次batchId的消息都已成功同步,因此我理解这块可能会有丢数据情况(个人理解,可以指正)
@Override protected void process() { while (!running) { // waiting until running == true while (!running) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries(); if (retry == -1) { // 重试次数-1代表异常时一直阻塞重试 retry = Integer.MAX_VALUE; } // long timeout = canalClientConfig.getTimeout() == null ? 300000 : // canalClientConfig.getTimeout(); // 默认超时5分钟 Integer batchSize = canalClientConfig.getBatchSize(); if (batchSize == null) { batchSize = BATCH_SIZE; } while (running) { try { syncSwitch.get(canalDestination); logger.info("=============> Start to connect destination: {} <=============", this.canalDestination); connector.connect(); logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination); connector.subscribe(); logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination); while (running) { try { syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES); } catch (TimeoutException e) { break; } if (!running) { break; } for (int i = 0; i < retry; i++) { if (!running) { break; } Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); try { int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(500); } else { if (logger.isDebugEnabled()) { logger.debug("destination: {} batchId: {} batchSize: {} ", canalDestination, batchId, size); } long begin = System.currentTimeMillis(); writeOut(message); if (logger.isDebugEnabled()) { logger.debug("destination: {} batchId: {} elapsed time: {} ms", canalDestination, batchId, System.currentTimeMillis() - begin); } } connector.ack(batchId); // 提交确认 break; } catch (Exception e) { if (i != retry - 1) { connector.rollback(batchId); // 处理失败, 回滚数据 logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1)); } else { connector.ack(batchId); logger.error(e.getMessage() + " Error sync but ACK!"); } Thread.sleep(500); } } } } catch (Throwable e) { logger.error("process error!", e); } finally { connector.disconnect(); logger.info("=============> Disconnect destination: {} <=============", this.canalDestination); } if (running) { // is reconnect try { Thread.sleep(1000); } catch (InterruptedException e) { // ignore } } } }
6. writeOut(message)
也是AbstractCanalAdapterWorker
抽象类中的一个方法。通过该类初始化的一个与canalOuterAdapters.size()
相等的固长线程池,来异步批量提交消息batchSync(dmls, adapter)
。
protected void writeOut(final Message message) { List<Future<Boolean>> futures = new ArrayList<>(); // 组间适配器并行运行 canalOuterAdapters.forEach(outerAdapters -> { final List<OuterAdapter> adapters = outerAdapters; futures.add(groupInnerExecutorService.submit(() -> { try { // 组内适配器穿行运行,尽量不要配置组内适配器 adapters.forEach(adapter -> { long begin = System.currentTimeMillis(); List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, groupId, message); if (dmls != null) { batchSync(dmls, adapter); if (logger.isDebugEnabled()) { logger.debug("{} elapsed time: {}", adapter.getClass().getName(), (System.currentTimeMillis() - begin)); } } }); return true; } catch (Exception e) { logger.error(e.getMessage(), e); return false; } })); //省略部分代码 }
7.batchSync()方法也是AbstractCanalAdapterWorker
抽象类中的一个方法。当dmls条数小于syncBatchSize
时,会直接同步数据,如果大于则分批同步,但始终调用的还是 adapter.sync()
这个方法(这块判断是分批还是一次同步,个人感觉有点问题,因为dml.getData()其实也是一个List集合,里面也包含了多条数据,一次性同步时并不能直接使用dmls.size() <= canalClientConfig.getSyncBatchSize()
来判断是否小于syncBatchSize
,而应该用List中dml.getData().size()的和来判断)。
private void batchSync(List<Dml> dmls, OuterAdapter adapter) { // 分批同步 if (dmls.size() <= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmls); } else { int len = 0; List<Dml> dmlsBatch = new ArrayList<>(); for (Dml dml : dmls) { dmlsBatch.add(dml); if (dml.getData() == null || dml.getData().isEmpty()) { len += 1; } else { len += dml.getData().size(); } if (len >= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmlsBatch); dmlsBatch.clear(); len = 0; } } if (!dmlsBatch.isEmpty()) { adapter.sync(dmlsBatch); } } }
8.adapter.sync()是com/alibaba/otter/canal/client/adapter/OuterAdapter.java
的一个接口。这个接口也有多种实现,包括:ESAdapter,HbaseAdapter,RdbAdapter等。这些实现就是具体的适配器(plugins)实现,本次主要研究RdbAdapter
适配器。
9.com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
是OuterAdapter
的一个具体实现,RdbAdapter.sync()
这个方法,主要通过调用 rdbSyncService.sync(mappingConfigCache, dmls, envProperties)
来实现Rdb核心同步逻辑处理。
@Override
public void sync(List<Dml> dmls) {
if (dmls == null || dmls.isEmpty()) {
return;
}
try {
rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
rdbMirrorDbSyncService.sync(dmls);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
10.com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
。RdbSyncService.sync()
方法中,会判断同步语句是否为DDL语句,如果是DDL语句,则直接返回false,换句话说,就是没有schame变更的记录操作。针对DML语句,它会把List<Dml>
中所有的Dml
拿出来,然后通过SingleDml.dml2SingleDmls()
方法,将dml.getData()
中所有的数据再全部遍历出来,然后生成一个List<SingleDml>
集合,然后根据配置文件中配置的concurrent=false/concurrent=true
来判断是否需要并发同步。如果设置了并发处理,则会根据该方法中的pkHash()
方法,根据主键值计算hash,返回一个具体的hashCode,最后根据hashCode将单条数据SyncItem
存放在不同List<SyncItem>[] dmlsPartition
index上,这块其实是一个性能瓶颈。
public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) { sync(dmls, dml -> { if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) { // DDL columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable()); return false; } else { // DML String destination = StringUtils.trimToEmpty(dml.getDestination()); String groupId = StringUtils.trimToEmpty(dml.getGroupId()); String database = dml.getDatabase(); String table = dml.getTable(); Map<String, MappingConfig> configMap; if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) { configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table); } else { configMap = mappingConfig.get(destination + "_" + database + "-" + table); } if (configMap == null) { return false; } if (configMap.values().isEmpty()) { return false; } for (MappingConfig config : configMap.values()) { if (config.getConcurrent()) { List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml); singleDmls.forEach(singleDml -> { int hash = pkHash(config.getDbMapping(), singleDml.getData()); SyncItem syncItem = new SyncItem(config, singleDml); dmlsPartition[hash].add(syncItem); }); } else { int hash = 0; List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml); singleDmls.forEach(singleDml -> { SyncItem syncItem = new SyncItem(config, singleDml); dmlsPartition[hash].add(syncItem); }); } } return true; } } ); }
public int pkHash(DbMapping dbMapping, Map<String, Object> d) { return pkHash(dbMapping, d, null); } public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) { int hash = 0; // 取主键 for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) { String targetColumnName = entry.getKey(); String srcColumnName = entry.getValue(); if (srcColumnName == null) { srcColumnName = Util.cleanColumn(targetColumnName); } Object value = null; if (o != null && o.containsKey(srcColumnName)) { value = o.get(srcColumnName); } else if (d != null) { value = d.get(srcColumnName); } if (value != null) { hash += value.hashCode(); } } hash = Math.abs(hash) % threads; return Math.abs(hash); }
11.同样在com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
类中,另一个
RdbSyncService.sync()
重载方法主要是来异步处理dmlsPartition
中的数据。这个方法中,默认会开启3个futcher线程(代码中写死的),然后开始遍历dmlsPartition
,通过sync(batchExecutors[j]
来处理数据。
private int threads = 3; public void sync(List<Dml> dmls, Function<Dml, Boolean> function) { try { boolean toExecute = false; for (Dml dml : dmls) { if (!toExecute) { toExecute = function.apply(dml); } else { function.apply(dml); } } if (toExecute) { List<Future<Boolean>> futures = new ArrayList<>(); for (int i = 0; i < threads; i++) { int j = i; if (dmlsPartition[j].isEmpty()) { // bypass continue; } futures.add(executorThreads[i].submit(() -> { try { dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml)); dmlsPartition[j].clear(); batchExecutors[j].commit(); return true; } catch (Throwable e) { batchExecutors[j].rollback(); throw new RuntimeException(e); } })); } futures.forEach(future -> { try { future.get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } }); } } finally { for (BatchExecutor batchExecutor : batchExecutors) { if (batchExecutor != null) { batchExecutor.close(); } } } }
12.同样在com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
类中,另一个
RdbSyncService.sync()
重载方法,数据进来之后,会有一个实时Etl清洗操作streamEtlHitProcess(etlCondition, dml)
,根据配置文件中的etlCondition
条件来过滤清洗数据。根据DML
类型来采用不同的insert
,update
,DELETE
和truncate
操作来具体执行batchExecutor
。
public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) { if (config != null) { try { String etlCondition = config.getDbMapping().getEtlCondition(); if (!streamEtlHitProcess(etlCondition, dml)) { if (logger.isDebugEnabled()) { logger.debug("etl filter {} success: {}", etlCondition, JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue)); } return; } String type = dml.getType(); if (type != null && type.equalsIgnoreCase("INSERT")) { insert(batchExecutor, config, dml); } else if (type != null && type.equalsIgnoreCase("UPDATE")) { update(batchExecutor, config, dml); } else if (type != null && type.equalsIgnoreCase("DELETE")) { delete(batchExecutor, config, dml); } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) { truncate(batchExecutor, config); } if (logger.isDebugEnabled()) { logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue)); } } catch (SQLException e) { throw new RuntimeException(e); } } }
13.在com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
类中,以RdbSyncService.insert()
操作为例,可以从这个方法看到,它做了一个拼接sql的操作,通过拼接后的insert语句,最终完成数据的同步工作。其他几个实现也一样,都是通过拼接SQL的方式,将数据同步到数据库中。
private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException { Map<String, Object> data = dml.getData(); if (data == null || data.isEmpty()) { return; } DbMapping dbMapping = config.getDbMapping(); Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data); StringBuilder insertSql = new StringBuilder(); insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" ("); columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(",")); int len = insertSql.length(); insertSql.delete(len - 1, len).append(") VALUES ("); int mapLen = columnsMap.size(); for (int i = 0; i < mapLen; i++) { insertSql.append("?,"); } len = insertSql.length(); insertSql.delete(len - 1, len).append(")"); Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config); List<Map<String, ?>> values = new ArrayList<>(); for (Map.Entry<String, String> entry : columnsMap.entrySet()) { String targetColumnName = entry.getKey(); String srcColumnName = entry.getValue(); if (srcColumnName == null) { srcColumnName = Util.cleanColumn(targetColumnName); } Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase()); if (type == null) { // throw new RuntimeException("Target Database : " + database + "Table" + table + "Target column: " + targetColumnName + " not matched"); throw new RuntimeException(String.format("Target database:{%s} table:{%s} target column:{%s} not matched", dml.getDatabase(), dml.getTable(), targetColumnName)); } Object value = data.get(srcColumnName); BatchExecutor.setValue(values, type, value); } try { batchExecutor.execute(insertSql.toString(), values); } catch (SQLException e) { if (skipDupException && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) { // ignore // TODO 增加更多关系数据库的主键冲突的错误码 } else { throw e; } } if (logger.isTraceEnabled()) { logger.trace("Insert into target table, sql: {}", insertSql); } }
欢迎指正。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。