赞
踩
需求:实现FlinkSQL sink到ArangoDB图数据库
分析:自定义Flink Table & SQL connector 支持flink-connector-arangodb,只需要实现sink部分
官网支持user-defined sources&sinks,对Table SQL的source/sink定义提供了解释

Metadata:对表的声明,封装为Catalog,定义外部存储系统的元数据
Planning:Factory实例由Java SPI机制创建,将外部表元数据配置封装为参数化实例(DynamicTableSource/Sink)
Runtime:读取/写入核心逻辑,实现InputFormat/OutputFormat或SourceFunction/SinkFunction接口,构建与外部存储系统的连接和实现读取和写入逻辑。
需要我们扩展的地方:
Dynamic Table Factory
自定义工场类实现org.apache.flink.table.factories.DynamicTableSinkFactory(我这里仅要支持sink,如果要支持source,需要实现org.apache.flink.table.factories.DynamicTableSourceFactory)
DDL语句中的‘connector’配置项作为标识符用来发现对应的工厂类实例
Factory工场类是由Java SPI来实例化的,我们需要在自定义connector模块的resource下添加文件
META-INF/services/org.apache.flink.table.factories.Factory
文件中指定工厂类的全路径
Factory工厂类主要构建DynamicTableSource/DynamicTableSink,这是个参数化实例对象,定义connector配置参数。在DynamicTableSource/Sink中数据的传递要使用Flink内部数据结构org.apache.flink.table.data.RowData,这里获取到数据需要对value做一下转换,value数据提取封装为RowData,RowData接口的实现类也比较多,可根据情况选择合适的实现类。
Sink接口实现:
有三个接口的实现会影响DML语句的执行
| 接口 | 描述 |
| SupportsOverwrite | 实现此接口可以使用INSERT OVERWRITE语句覆盖现有的表或分区数据 |
| SupportsPartitioning | 允许写入分区数据 |
| SupportsWritingMetadata | 保存持久化DDL中定义的列和类型 |
Runtime Provider
这里官网并没有对实际读取写入的Runtime实现作详细解释。说一下个人的理解
Dynamic Table Source/Sink 提供了获取RuntimeProvider实例函数getSinkRuntimeProvider(Context context),这个函数需要我们自定义逻辑去声明InputFormat/OutputFormat或者source/sinkFunction实例化对象
两种方式运行Provider
1.OutputFormatProvider.of(InputFormat/OutputFormat)
2.SinkFunctionProvider.of(source/sinkFunction)
使用lambda表达式,执行return () -> xxxFormat/xxxFunction;构建静态provider
关于InputFormat/OutputFormat或者source/sinkFunction
关键方法
- 1.xxxxFunction<T>
-
- // 建立连接
- open();
- // 执行读取/写入逻辑
- invoke(T value,Context context);
- // 关闭连接
- close();
-
- 2.xxxxFormat<In>
-
- // 建立连接
- open(int taskNumber, int numTasks);
- // 执行读取/写入逻辑
- writeRecord(In record)
- // 关闭连接
- close();

Encoding / Decoding Formats(待完善)
阅读flink-connector模块,对比几个connector的源码,分析后得出简单的connector主体架构:
- // 1.工厂类构造source和sink,java SPI创建实例
- ArangoDBDynamicTableFactory imp DynamicTableSinkFactory (如果支持source,需imp DynamicTableSourceFactory)
- - createDynamicTableSink // 创建连接器的参数化实例 -> 封装connector的参数
- - optionalOptions
- - requiredOptions
- - factoryIdentifier
-
- // 2.arangodbsink引出sinkfunction
- ArangodbDynamicTableSink
-
- // 3.sinkFunction/outputFormat建立连接执行写入
- - open() // 建立连接
- - invoke() // arangodb API
- - writeRecord
- - ArangoCollection.insertDocuments(values)
- - ArangoCollection.updateDocument(key,value)
- - close() // 关闭连接
-
- // 4.掺杂着其他的辅助类
- - convert
- rowData -> document
- serialize/deserialize

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。