赞
踩
目前flink 1.11.0还不支持多个topic的kafka连接器 , 要实现这个功能需要自定义源,这里是基于已有的kafka connector
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>1.11.0</version>
- </dependency>
flink1.11版本相比之前版本 , 重构了自定义connector方法 , 引入DynamicTable,详情见官网
- public class TopicsKafkaTableConnecterFactory extends KafkaDynamicTableFactoryBase {
- public static final String IDENTIFIER = "topicsKafka";
-
- public TopicsKafkaTableConnecterFactory() {}
-
- @Override
- protected KafkaDynamicSourceBase createKafkaTableSource(DataType producedDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
- return new TopicsKafkaTableSource(producedDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
- }
-
- @Override
- protected KafkaDynamicSinkBase createKafkaTableSink(DataType consumedDataType, String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
- return new KafkaDynamicSink(consumedDataType, topic, properties, partitioner, encodingFormat);
- }
-
- @Override
- public String factoryIdentifier() {
- return "topicsKafka";
- }
- }

- public class TopicsKafkaTableSource extends KafkaDynamicSourceBase {
-
- protected TopicsKafkaTableSource(DataType outputDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
- super(outputDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
- }
-
-
- @Override
- public DynamicTableSource copy() {
- return new KafkaDynamicSource(this.outputDataType, this.topic, this.properties, this.decodingFormat, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis);
- }
-
- @Override
- public String asSummaryString() {
- return "Kafka";
- }
-
- @Override
- protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(String s, Properties properties, DeserializationSchema<RowData> deserializationSchema) {
- //逗号切割topic名字
- List<String> topics = Arrays.asList(s.split(","));
- return new FlinkKafkaConsumer(topics, deserializationSchema, properties);
- }
-
- }

在项目资源文件夹下创建文件
META-INF/services/org.apache.flink.table.factories.Factory

文件内容:
指定上面创建的TopicsKafkaTableConnecterFactory 的全路径

- CREATE TABLE mySource (
- a bigint,
- b bigint
- ) WITH (
- 'connector' = 'topicsKafka',
- 'topic' = 'mytesttopic',
- 'properties.bootstrap.servers' = '172.17.0.2:9092',
- 'properties.group.id' = 'flink-test-cxy',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'json'
- );
- CREATE TABLE mysqlsink (
- id bigint,
- b varchar
- )
- with (
- 'connector' = 'print'
- /*'connector.type' = 'jdbc',
- 'connector.url' = 'jdbc:mysql://...' ,
- 'connector.username' = 'root' ,
- 'connector.password' = 'root',
- 'connector.table' = 'mysqlsink' ,
- 'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,
- 'connector.write.flush.interval' = '5s',
- 'connector.write.flush.max-rows' = '1'*/
- );
- insert into mysqlsink select a , cast(b as varchar) b from mySource;

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。