当前位置:   article > 正文

Flink DataStream API (十二)Flink 输出到 Kafka_add_sink( flinkkafkaproducer

add_sink( flinkkafkaproducer

文章目录

输出到Kafka

Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。

现在我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:
(1)添加 Kafka 连接器依赖
由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动 Kafka 集群
(3)编写输出到 Kafka 的示例代码

kafka中的clicks主题作为生产者,events主题作为消费者,也就是数据从clicks中来,到events中去

//查看kafka中的topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list

//创建events主题
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka  \
--create --replication-factor 1 --partitions 1 --topic events

//创建clicks主题
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka  \
--create --replication-factor 1 --partitions 1 --topic events

//开启生产者
bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic clicks

//开启消费者
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic events
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

代码如下:Flink输出到Kafka

addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/43893
推荐阅读
相关标签
  

闽ICP备14008679号