当前位置:   article > 正文

Flink--KafkaSink实现Excatly-once的过程_flink kafkasinkfunction

flink kafkasinkfunction

 FlinkKafkaProducer继承了TwoPhaseCommitSinkFunction(分为两个阶段提交的Sink),TwoPhaseCommitSinkFunction实现了两个接口CheckpointedFunctionCheckpointListener

实现CheckpointedFunction接口要实现initializeState方法和snapshotState方法

实现CheckpointListener接口要实现notifyCheckpointComplete和notifyCheckpointAborted方法

1.程序启动,在TwoPhaseCommitSinkFunction的initializeState方法中初始化一个ListState(为了缓存写入的数据)

2.当有数据进入到Sink中,会调用invoke方法,在该方法中,将数据使用KafkaProducer的连接将数据send(数据没有立即写入到Kafka的Broker,而是缓存到客户端),然后再有数据进入到Sink中,再调用invoke方法,将数据使用Kafka的Producer发送,只要没有flush或数据达到一定的大小,数据就会缓存在客户端

3.随着时间的推移,在checkpoint时,会调用snapshotState方法,在该方法中,调用TwoPhaseCommitSinkFuntcion的sanpshotState,在该方法中调用preCommit,在preCommit将KafkaProducer这种的数据flush到broker中,但是没有提交事务,(写入的Kafka Broker中的数据是unCommited 状态),然后将缓存在客户端的数据持久化到stateBackend中

4.如果所有的subtask都完成了snapshotState,就会向jobManager进行ACK应答,当jobManager接收到该job这次checkpoint所有subtask的ack后,标记这次checkpoint成功,并且会向实现类CheckpointListener接口的subtask发送checkpoint成功的消息,在subtask中会调用notifyCheckpointComplete方法,在该方法中提交事务,这样写入到Kafka中的数据才算真正写入成功!

5.如果Checkpoint失败,所有subtask重启,然后从上一次的状态恢复数据,会接着以前的偏移量继续读,继续处理

6.如果checkpoint成功了,但是提交事务失败了,所有subtasks重启,然后再initalizeState方法中恢复状态中的数据,在将数据写入到Kafka的Broker中,并提交事务

分两阶段提交,都分为哪两个阶段提交

preCommit,即在checkpoint时,将客户端缓存的数据flush到broker(没有提交事务)

commit在这次checkpoint成功后,jobManager通知实现了checkpointListener接口的subtask(Sink),提交事务

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44124
推荐阅读
相关标签
  

闽ICP备14008679号