当前位置:   article > 正文

Kafka源码分析(二六)——Broker:集群管理——Topic创建_kafka topic不存在时,自动创建topic 源码

kafka topic不存在时,自动创建topic 源码

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

了解了Broker集群的选举以及整体的集群管理机制,我们来看下Kafka创建Topic,以及对分区副本进行管理的流程。通常来说,我们会通过Kafka自带的kafka-topics.sh脚本来创建Topic。那么,当我们指定了一个Topic的分区数、每个分区的副本数之后,Controller(Leader Broker)是如何选择Leader副本?又是如何分配在Broker集群中分配这些副本的呢?

本章,我就对Topic的分区副本分配原理进行讲解。

如果Producer发送消息时指定了一个不存在的Topic,也会默认创建(分区1,副本1),可以通过Broker端的参数auto.create.topics.enable禁止默认创建的行为,生产环境建议禁止掉。

一、创建Topic

通过上一章的讲解,我们应该已经明白, 集群中的每个Broker都知道整个集群的元数据信息 。所谓元数据信息就是:集群中的每个Broker上有哪些Topic分区,每个Topic的分区信息,这些分区的Leader副本在哪,Follower副本在哪……

1.1 脚本使用

所以,Controller需要对这些Topic的分区进行管理,我以一个Topic的创建作为示例进行讲解,便于大家理解。首先,我们来看Topic的创建流程:

创建Topic通过脚本kafka-topics.sh

  1. # kafka-topics.sh
  2. exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

本质是执行了 TopicCommand 命令:

  1. // TopicCommand.scala
  2. def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  3. // Topic名称
  4. val topic = opts.options.valueOf(opts.topicOpt)
  5. // 配置
  6. val configs = parseTopicConfigsToBeAdded(opts)
  7. val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
  8. try {
  9. // 1.手动指定分区
  10. if (opts.options.has(opts.replicaAssignmentOpt)) {
  11. val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
  12. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
  13. } else {
  14. // 2.自动分配分区
  15. CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
  16. // 配置的分区数
  17. val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
  18. // 配置的副本数
  19. val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
  20. val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
  21. else RackAwareMode.Enforced
  22. // 创建主题
  23. AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
  24. }
  25. println("Created topic \"%s\".".format(topic))
  26. } catch {
  27. case e: TopicExistsException => if (!ifNotExists) throw e
  28. }
  29. }

我们重点看它的自动分配分区分支,调用了AdminUtils.createTopic()来创建Topic并对分区副本进行分配:

  1. // AdminUtils.scala
  2. def createTopic(zkUtils: ZkUtils,
  3. topic: String,
  4. partitions: Int,
  5. replicationFactor: Int,
  6. topicConfig: Properties = new Properties,
  7. rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  8. // 1.从Zookeeper中获取Broker集群的元数据信息
  9. val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
  10. // 2.基于一定的算法,将分区副本分配给各个Broker
  11. val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas,
  12. partitions, replicationFactor)’
  13. // 3.将分配好最终策略,直接写入Zookeeper中的/brokers/topics/[Topic名称]节点中
  14. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
  15. }

可以看到, 创建Topic的本质就是根据Topic的分区数、每个分区的副本数,基于一定的算法把它们分配给各个Broker,然后把分配策略写入到Zookeeper中 。

所谓分区副本分配策略,我这里简单解释下,假设有个Topic设置3个分区,每个分区2个副本,那么分配结果可能就是下面这个样子:

  1. partition1 -> [0,1] #分区1的Leader副本分配在Broker0,Follower副本分配在Broker1
  2. partition2 -> [2,0] #分区2的Leader副本分配在Broker2,Follower副本分配在Broker0
  3. partition3 -> [1,2] #分区3的Leader副本分配在Broker1,Follower副本分配在Broker2

至于具体的分区副本分配算法,我就不赘述了,读者可以自己去AdminUtils.assignReplicasToBrokers方法里瞅一瞅,无非就是类似负载均衡之类的策略,我重点关注分区副本分配的整体流程。

二、副本管理

创建Topic只是将 分区副本分配策略 写入到了Zookeeper的/brokers/topics/[Topic名称]节点中,那么接下来Controller如何根据策略来进行执行副本分配?如何对副本进行管理呢?

2.1 监听Topic创建

显然,Controller是可以感知到新Topic的创建的,也就是说它会去监听/brokers/topics节点的变化,整个监听的过程我用下面这张图来表示:

我们来看下底层的源码:

  1. // KafkaController.scala
  2. // Broker选举成为Leader后,会调用该方法
  3. def onControllerFailover() {
  4. if(isRunning) {
  5. info("Broker %d starting become controller state transition".format(config.brokerId))
  6. readControllerEpochFromZookeeper()
  7. incrementControllerEpoch(zkUtils.zkClient)
  8. registerReassignedPartitionsListener()
  9. registerIsrChangeNotificationListener()
  10. registerPreferredReplicaElectionListener()
  11. // 关键看这里
  12. partitionStateMachine.registerListeners()
  13. //...
  14. }
  15. else
  16. info("Controller has been shut down, aborting startup/failover")
  17. }

onControllerFailover方法中调用了PartitionStateMachine.registerListeners(),它会去监听/brokers/topics/节点的变化:

  1. // PartitionStateMachine.scala
  2. def registerListeners() {
  3. registerTopicChangeListener()
  4. registerDeleteTopicListener()
  5. }
  6. private def registerTopicChangeListener() = {
  7. // 监听“/brokers/topics”节点的变化
  8. zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
  9. }
  10. private def registerDeleteTopicListener() = {
  11. // 监听“/admin/delete_topics”节点的变化
  12. zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
  13. }

我们来看下TopicChangeListener这个监听器,它的内部就 根据“/brokers/topics”节点下的子节点变化,筛选出新增的Topic, 然后按照分区维度维护成一个Map[TopicAndPartition, Seq[Int]]

  1. // PartitionStateMachine.scalaSS
  2. class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
  3. protected def logName = "TopicChangeListener"
  4. // 当“/brokers/topics”节点下的子节点发生变化时,会触发Controller调用该方法
  5. def doHandleChildChange(parentPath: String, children: Seq[String]) {
  6. inLock(controllerContext.controllerLock) {
  7. if (hasStarted.get) {
  8. try {
  9. val currentChildren = {
  10. debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
  11. children.toSet
  12. }
  13. // 新创建的分区
  14. val newTopics = currentChildren -- controllerContext.allTopics
  15. // 删除的分区
  16. val deletedTopics = controllerContext.allTopics -- currentChildren
  17. controllerContext.allTopics = currentChildren
  18. // 获取分区副本分配策略
  19. val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
  20. controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
  21. !deletedTopics.contains(p._1.topic))
  22. controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
  23. info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
  24. deletedTopics, addedPartitionReplicaAssignment))
  25. if (newTopics.nonEmpty)
  26. // 关键看这里,
  27. controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
  28. } catch {
  29. case e: Throwable => error("Error while handling new topic", e)
  30. }
  31. }
  32. }
  33. }
  34. }

最后,上述代码会调用KafkaController的onNewTopicCreation方法,发送Topic的元数据信息给各个Broker,这个Broker就可以进行一些初始化操作,比如新建分区日志段,准备接受Producer发送过来的消息等等:

  1. // KafkaController.scala
  2. def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
  3. info("New topic creation callback for %s".format(newPartitions.mkString(",")))
  4. // subscribe to partition changes
  5. topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  6. // 按照分配策略,发送Topic的元数据信息给各个Broker
  7. onNewPartitionCreation(newPartitions)
  8. }

三、总结

本章,我对Topic创建的整体流程和底层原理进行了讲解,Controller会监听新Topic的创建,同时对分区副本进行管理,向新的元数据信息发送给集群中的其它Broker。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/895686
推荐阅读
相关标签
  

闽ICP备14008679号