当前位置:   article > 正文

1.11.0 flinksql自定义kafka源(支持多个topic)_flink addsource 多个kafka topic

flink addsource 多个kafka topic

目前flink 1.11.0还不支持多个topic的kafka连接器 , 要实现这个功能需要自定义源,这里是基于已有的kafka connector

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>1.11.0</version>
  5. </dependency>

flink1.11版本相比之前版本 , 重构了自定义connector方法 , 引入DynamicTable,详情见官网

1 创建connectorFactory,继承KafkaDynamicTableFactoryBase

  1. public class TopicsKafkaTableConnecterFactory extends KafkaDynamicTableFactoryBase {
  2. public static final String IDENTIFIER = "topicsKafka";
  3. public TopicsKafkaTableConnecterFactory() {}
  4. @Override
  5. protected KafkaDynamicSourceBase createKafkaTableSource(DataType producedDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
  6. return new TopicsKafkaTableSource(producedDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
  7. }
  8. @Override
  9. protected KafkaDynamicSinkBase createKafkaTableSink(DataType consumedDataType, String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
  10. return new KafkaDynamicSink(consumedDataType, topic, properties, partitioner, encodingFormat);
  11. }
  12. @Override
  13. public String factoryIdentifier() {
  14. return "topicsKafka";
  15. }
  16. }

2 创建TopicsKafkaTableSource,继承KafkaDynamicSourceBase

  1. public class TopicsKafkaTableSource extends KafkaDynamicSourceBase {
  2. protected TopicsKafkaTableSource(DataType outputDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
  3. super(outputDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
  4. }
  5. @Override
  6. public DynamicTableSource copy() {
  7. return new KafkaDynamicSource(this.outputDataType, this.topic, this.properties, this.decodingFormat, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis);
  8. }
  9. @Override
  10. public String asSummaryString() {
  11. return "Kafka";
  12. }
  13. @Override
  14. protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(String s, Properties properties, DeserializationSchema<RowData> deserializationSchema) {
  15. //逗号切割topic名字
  16. List<String> topics = Arrays.asList(s.split(","));
  17. return new FlinkKafkaConsumer(topics, deserializationSchema, properties);
  18. }
  19. }

3 配置

在项目资源文件夹下创建文件

META-INF/services/org.apache.flink.table.factories.Factory

文件内容:

 指定上面创建的TopicsKafkaTableConnecterFactory 的全路径

4flinksql使用 

  1. CREATE TABLE mySource (
  2. a bigint,
  3. b bigint
  4. ) WITH (
  5. 'connector' = 'topicsKafka',
  6. 'topic' = 'mytesttopic',
  7. 'properties.bootstrap.servers' = '172.17.0.2:9092',
  8. 'properties.group.id' = 'flink-test-cxy',
  9. 'scan.startup.mode' = 'latest-offset',
  10. 'format' = 'json'
  11. );
  12. CREATE TABLE mysqlsink (
  13. id bigint,
  14. b varchar
  15. )
  16. with (
  17. 'connector' = 'print'
  18. /*'connector.type' = 'jdbc',
  19. 'connector.url' = 'jdbc:mysql://...' ,
  20. 'connector.username' = 'root' ,
  21. 'connector.password' = 'root',
  22. 'connector.table' = 'mysqlsink' ,
  23. 'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,
  24. 'connector.write.flush.interval' = '5s',
  25. 'connector.write.flush.max-rows' = '1'*/
  26. );
  27. insert into mysqlsink select a , cast(b as varchar) b from mySource;

 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号