当前位置:   article > 正文

Flink Kafka source神操作之Flink Kafka connector

flink kafka source

引言

Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 消费组的偏移量,而是在内部跟踪和检查偏移量。

当我们在使用Spark Streaming、Flink等计算框架进行数据实时处理时,使用Kafka作为一款发布与订阅的消息系统成为了标配。Spark Streaming与Flink都提供了相对应的Kafka Consumer,使用起来非常的方便,只需要设置一下Kafka的参数,然后添加kafka的source就万事大吉了。如果你真的觉得事情就是如此的so easy,感觉妈妈再也不用担心你的学习了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source为讨论对象,首先从基本的使用入手,然后深入源码逐一剖析,一并为你拨开Flink Kafka connector的神秘面纱。

值得注意的是,本文假定读者具备了Kafka的相关知识,关于Kafka的相关细节问题,不在本文的讨论范围之内。

Flink Kafka Consumer介绍

Flink Kafka Connector有很多个版本,可以根据你的kafka和Flink的版本选择相应的包(maven artifact id)和类名。本文所涉及的Flink版本为1.10,Kafka的版本为2.3.4。Flink所提供的Maven依赖于类名如下表所示:

å¾ç

 

Demo示例

添加Maven依赖

  1. <!--本文使用的是通用型的connector-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka_2.11</artifactId>
  5. <version>1.10.0</version>
  6. </dependency>

简单代码案例

  1. public class KafkaConnector {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
  4. // 开启checkpoint,时间间隔为毫秒
  5. senv.enableCheckpointing(5000L);
  6. // 选择状态后端
  7. senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
  8. //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
  9. Properties props = new Properties();
  10. // kafka broker地址
  11. props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
  12. // 仅kafka0.8版本需要配置
  13. props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
  14. // 消费者组
  15. props.put("group.id", "test");
  16. // 自动偏移量提交
  17. props.put("enable.auto.commit", true);
  18. // 偏移量提交的时间间隔,毫秒
  19. props.put("auto.commit.interval.ms", 5000);
  20. // kafka 消息的key序列化器
  21. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. // kafka 消息的value序列化器
  23. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  24. // 指定kafka的消费者从哪里开始消费数据
  25. // 共有三种方式,
  26. // #earliest
  27. // 当各分区下有已提交的offset时,从提交的offset开始消费;
  28. // 无提交的offset时,从头开始消费
  29. // #latest
  30. // 当各分区下有已提交的offset时,从提交的offset开始消费;
  31. // 无提交的offset时,消费新产生的该分区下的数据
  32. // #none
  33. // topic各分区都存在已提交的offset时,
  34. // 从offset后开始消费;
  35. // 只要有一个分区不存在已提交的offset,则抛出异常
  36. props.put("auto.offset.reset", "latest");
  37. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
  38. "qfbap_ods.code_city",
  39. new SimpleStringSchema(),
  40. props);
  41. //设置checkpoint后在提交offset,即oncheckpoint模式
  42. // 该值默认为true
  43. consumer.setCommitOffsetsOnCheckpoints(true);
  44. // 最早的数据开始消费
  45. // 该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。
  46. //consumer.setStartFromEarliest();
  47. // 消费者组最近一次提交的偏移量,默认。
  48. // 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置
  49. //consumer.setStartFromGroupOffsets();
  50. // 最新的数据开始消费
  51. // 该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。
  52. //consumer.setStartFromLatest();
  53. // 指定具体的偏移量时间戳,毫秒
  54. // 对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。
  55. // 如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。
  56. // 在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。
  57. //consumer.setStartFromTimestamp(1585047859000L);
  58. // 为每个分区指定偏移量
  59. /*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
  60. specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);
  61. specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);
  62. specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);
  63. consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/
  64. /**
  65. *
  66. * 请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,
  67. * 这些起始位置配置方法不会影响消费的起始位置。
  68. * 在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 checkpoint 中的 offset 确定
  69. *
  70. */
  71. DataStreamSource<String> source = senv.addSource(consumer);
  72. // TODO
  73. source.print();
  74. senv.execute("test kafka connector");
  75. }
  76. }

参数配置解读

在Demo示例中,给出了详细的配置信息,下面将对上面的参数配置进行逐一分析。

kakfa的properties参数配置

  • bootstrap.servers:kafka broker地址
  • zookeeper.connect:仅kafka0.8版本需要配置
  • group.id:消费者组
  • enable.auto.commit:自动偏移量提交,该值的配置不是最终的偏移量提交模式,需要考虑用户是否开启了checkpoint,在下面的源码分析中会进行解读
  • auto.commit.interval.ms:偏移量提交的时间间隔,毫秒
  • key.deserializer:kafka 消息的key序列化器,如果不指定会使用ByteArrayDeserializer序列化器
  • value.deserializer

kafka 消息的value序列化器,如果不指定会使用ByteArrayDeserializer序列化器

  • auto.offset.reset:指定kafka的消费者从哪里开始消费数据,共有三种方式,
  • 第一种:earliest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • 第二种:latest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • 第三种:none
    topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常注意:上面的指定消费模式并不是最终的消费模式,取决于用户在Flink程序中配置的消费模式

Flink程序用户配置的参数

  • consumer.setCommitOffsetsOnCheckpoints(true)

解释:设置checkpoint后在提交offset,即oncheckpoint模式,该值默认为true,该参数会影响偏移量的提交方式,下面的源码中会进行分析

  • consumer.setStartFromEarliest()解释:最早的数据开始消费 ,该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。该方法为继承父类FlinkKafkaConsumerBase的方法。
  • consumer.setStartFromGroupOffsets()解释:消费者组最近一次提交的偏移量,默认。如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置,该方法为继承父类FlinkKafkaConsumerBase的方法。
  • consumer.setStartFromLatest()解释:最新的数据开始消费,该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。该方法为继承父类FlinkKafkaConsumerBase的方法。
  • consumer.setStartFromTimestamp(1585047859000L)解释:指定具体的偏移量时间戳,毫秒。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。
  • consumer.setStartFromSpecificOffsets(specificStartOffsets)

解释:为每个分区指定偏移量,该方法为继承父类FlinkKafkaConsumerBase的方法。

请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,这些起始位置配置方法不会影响消费的起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/43899?site
推荐阅读
相关标签
  

闽ICP备14008679号