当前位置:   article > 正文

Kafka 原理及代码分析_kafka ranger 鉴权代码分析

kafka ranger 鉴权代码分析

目录

1.日志记录

1.LogSegment

2.Log

2.请求处理

1.SocketServer

1.Acceptor

2.Processor

2.KafkaRequestHandlerPool

3.KafkaApis

3.Controller

1.ControllerContext

2.RequestSendThread

3.ControllerChannelManager

4.Controller事件管理

1.ControllerEventManager

2.ControllerEventThread

3.ControllerEventProcessor

4.副本管理

1.ReplicaFetcherThread

 2.ReplicaManager

4.消费者组管理

1.MemberMetadata

2.GroupMetadata

2.GroupMetadataManager

3.GroupCoordinator


Kafka版本3.0.0

1.日志记录

1.LogSegment

        负责日志段管理,进行日志的读取,写入,回复。

append():

入参:

largestOffset: Long:

        最大位移。

largestTimestamp: Long:

        最大时间戳。

shallowOffsetOfMaxTimestamp: Long:

        最大时间戳对应消息的位移

records: MemoryRecords:

        要写入的消息集合

流程:

        1.检查日志段是否为空,如果为空需要写入largestTimestamp

        2.通过ensureOffsetInRange检查输入的最大位移是否合法。

        3.调用FileRecords.append进行写入。

        4.更新日志段的最大时间戳及其所属消息的位移。

        5.更新索引和写入的字节数。

  1. // append an entry to the index (if needed)
  2. if (bytesSinceLastIndexEntry > indexIntervalBytes) {
  3. offsetIndex.append(largestOffset, physicalPosition)
  4. timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
  5. bytesSinceLastIndexEntry = 0
  6. }
  7. bytesSinceLastIndexEntry += records.sizeInBytes

read():

入参:

startOffset: Long:

        读取的第一个消息的位移

maxSize: Int:

        能读取的最大字节数

maxPosition: Long = size:

        能读取日志中能读取的最大位置

minOneMessage: Boolean = false:

        是否返回至少一条消息,当读取的消息大于maxSize时

流程:

        1.根据startOffset通过translateOffset,找到要读取的其实文件位置。

        2.计算要读取的文件大小。

  1. val adjustedMaxSize =
  2. if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
  3. else maxSize
  4. if (adjustedMaxSize == 0)
  5. return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
  6. val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)

          3.通过FileRecords.slice读取指定位置指定大小的数据

recover():

参数:

producerStateManager: ProducerStateManager:
leaderEpochCache: Option[LeaderEpochFileCache]:

流程:

        1.清空所有索引

  1. offsetIndex.reset()
  2. timeIndex.reset()
  3. txnIndex.reset()

        2.遍历日志段中所有消息。

                1.检查消息的位移是否合法。

                2.更新日志段的最大时间戳及其所属消息的位移。

                3.更新索引资源。

                4.更新消息总字节数

                5.更新producerStateManager和leaderEpochCache状态。

        3.进行截断操作,将大于遍历所有消息获取的总大小以外的数据截取。

2.Log

变量:

        保存了所有的需要使用的文件类型

  1. /** a log file */
  2. val LogFileSuffix = ".log"
  3. /** an index file */
  4. val IndexFileSuffix = ".index"
  5. /** a time index file */
  6. val TimeIndexFileSuffix = ".timeindex"
  7. val ProducerSnapshotFileSuffix = ".snapshot"
  8. /** an (aborted) txn index */
  9. val TxnIndexFileSuffix = ".txnindex"
  10. /** a file that is scheduled to be deleted */
  11. val DeletedFileSuffix = ".deleted"
  12. /** A temporary file that is being used for log cleaning */
  13. val CleanedFileSuffix = ".cleaned"
  14. /** A temporary file used when swapping files into the log */
  15. val SwapFileSuffix = ".swap"
  16. /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
  17. * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
  18. * avoided by passing in the recovery point, however finding the correct position to do this
  19. * requires accessing the offset index which may not be safe in an unclean shutdown.
  20. * For more information see the discussion in PR#2104
  21. */
  22. val CleanShutdownFile = ".kafka_cleanshutdown"
  23. /** a directory that is scheduled to be deleted */
  24. val DeleteDirSuffix = "-delete"
  25. /** a directory that is used for future partition */
  26. val FutureDirSuffix = "-future"

初始化:

  1. locally {
  2. initializePartitionMetadata()
  3. updateLogStartOffset(logStartOffset)
  4. maybeIncrementFirstUnstableOffset()
  5. initializeTopicId()
  6. }
initializePartitionMetadata():
        创建分区日志路径。
updateLogStartOffset(logStartOffset):
        设置高水位和回复点。
maybeIncrementFirstUnstableOffset():
        First Unstable Offset事务机制的一部分
initializeTopicId():
        初始topic id

方法:

appendAsFollower():

        Followert添加日志

appendAsLeader():

        Leader添加日志

2.请求处理

        

  1. KafkaServer::startup():
  2. 初始化SocketServer
  3. socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
  4. socketServer.startup(startProcessingRequests = false)
  5. 初始化数据面KafkaRequestHandlerPool
  6. dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
  7. config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
  8. 初始化控制面KafkaRequestHandlerPool
  9. controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
  10. 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
 

1.SocketServer

  方法:

startup():
启动服务。
1.创建控制面的acceptor和processor线程。
2.创建数据面的acceptor和processor线程。
3.启动数据面和控制面的线程。

1.Acceptor

作用:

        负责监听新接入的连接,确认有新连接接入后,将连接传递给某个processor处理线程(currentProcessorIndex = currentProcessorIndex % processors.length)

2.Processor

作用:

        负责接收acceptor监听到的连接,并进行数据接收,接收数据之后通过

RequestChannel发送给KafkaRequestHandler线程进行处理。

成员:

newConnections:ArrayBlockingQueue

        将acceptor分配的新连接保存起来,等待处理。防止processor处理过慢,影响acceptor分配新连接。

2.KafkaRequestHandlerPool

作用:

        管理线程池,每个线程执行KafkaRequestHandler方法,处理实际请求信息。具体处理有KafkaApis.handle()进行处理。

方法:

KafkaRequestHandlerPool():根据传入的numThreads创建对应数量的KafkaRequestHandler线程。
数据面有numThreads个处理消息的线程,控制面有一个处理消息的线程。

3.KafkaApis

作用:

        处理收到的实际请求内容。

处理的所有消息:

  1. case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
  2. case ApiKeys.FETCH => handleFetchRequest(request)
  3. case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
  4. case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  5. case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
  6. case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
  7. case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
  8. case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
  9. case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
  10. case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
  11. case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
  12. case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
  13. case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
  14. case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
  15. case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
  16. case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
  17. case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
  18. case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
  19. case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
  20. case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
  21. case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
  22. case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
  23. case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
  24. case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
  25. case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal)
  26. case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
  27. case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
  28. case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
  29. case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal)
  30. case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
  31. case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
  32. case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
  33. case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request, handleAlterConfigsRequest)
  34. case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
  35. case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
  36. case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
  37. case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
  38. case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
  39. case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest)
  40. case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
  41. case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
  42. case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
  43. case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
  44. case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
  45. case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
  46. case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
  47. case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
  48. case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
  49. case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
  50. case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
  51. case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
  52. case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
  53. case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
  54. case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
  55. case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
  56. case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
  57. case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
  58. case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
  59. case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
  60. case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
  61. case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)

3.Controller

1.ControllerContext

作用:

        元数据信息。

成员:

  1. val stats = new ControllerStats //controller统计信息
  2. var offlinePartitionCount = 0 //离线分区计数
  3. var preferredReplicaImbalanceCount = 0 //
  4. val shuttingDownBrokerIds = mutable.Set.empty[Int] //关闭中broker的id列表
  5. private val liveBrokers = mutable.Set.empty[Broker] //当前运行中broker对象列表
  6. private val liveBrokerEpochs = mutable.Map.empty[Int, Long] //运行中broker epoch列表
  7. var epoch: Int = KafkaController.InitialControllerEpoch //controller当前epoch值
  8. var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion //controller对应zookeeper节点的epoch值
  9. val allTopics = mutable.Set.empty[String] //集群主题列表
  10. var topicIds = mutable.Map.empty[String, Uuid]
  11. var topicNames = mutable.Map.empty[Uuid, String]
  12. val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]//主题分区的副本列表
  13. private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition,
  14. LeaderIsrAndControllerEpoch]//主题分区的leader/ISR副本信息
  15. val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]//处于副本重分配过程的主题分区列表
  16. val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]//主题分区状态列表
  17. val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]//主题分区副本列表
  18. val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]//不可用磁盘路劲上的副本列表
  19. val topicsToBeDeleted = mutable.Set.empty[String]//待删除列表
  20. val topicsWithDeletionStarted = mutable.Set.empty[String]//已开启删除的主题列表
  21. val topicsIneligibleForDeletion = mutable.Set.empty[String]//暂时无法删除的主题列表
 

2.RequestSendThread

功能:

        负责发送controller的消息给其他Broker.每个RequestSendThread对应一个Broker。

发送的消息:

LeaderAndIsrRequest:通知Broker相关主题各个分区的Leader副本在那个Broker上,ISR中的副本在呢些Broker上。
StopReplicaRequest:通知指定Broker停止其上面的副本对象。
UpdateMetadataRequest:通知更新Broker上的元数据。

3.ControllerChannelManager

功能:

        管理controller和集群中Broker的连接。为每个Broker创建RequestSendThread来进行消息发送。

函数:

startup():启动所有Broker的发送线程
shutdown():移除所有Broker,并关闭与Broker的发送线程和网络连接。

sendRequest():把对应Broker的消息放入对应RequestSendThread中的队列。

addBroker():添加新的Broker节点,addNewBroker()创建与Broker的连接和发送线程,startRequestSendThread()启动发送线程。

removeBroker():移除Broker,并关闭与Broker的发送线程和网络连接。

4.Controller事件管理

1.ControllerEventManager

功能:

        管理Controller事件。

2.ControllerEventThread

功能:

        处理Controller事件的线程,所有事件由一个线程进行处理不涉及锁的操作。

3.ControllerEventProcessor

功能:

        KafkaController的接口。

KafkaController实现的事件处理

  1. event match {
  2. case event: MockEvent =>
  3. // Used only in test cases
  4. event.process()
  5. case ShutdownEventThread =>
  6. error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
  7. case AutoPreferredReplicaLeaderElection =>
  8. processAutoPreferredReplicaLeaderElection()
  9. case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) =>
  10. processReplicaLeaderElection(partitions, electionType, electionTrigger, callback)
  11. case UncleanLeaderElectionEnable =>
  12. processUncleanLeaderElectionEnable()
  13. case TopicUncleanLeaderElectionEnable(topic) =>
  14. processTopicUncleanLeaderElectionEnable(topic)
  15. case ControlledShutdown(id, brokerEpoch, callback) =>
  16. processControlledShutdown(id, brokerEpoch, callback)
  17. case LeaderAndIsrResponseReceived(response, brokerId) =>
  18. processLeaderAndIsrResponseReceived(response, brokerId)
  19. case UpdateMetadataResponseReceived(response, brokerId) =>
  20. processUpdateMetadataResponseReceived(response, brokerId)
  21. case TopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors) =>
  22. processTopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors)
  23. case BrokerChange =>
  24. processBrokerChange()
  25. case BrokerModifications(brokerId) =>
  26. processBrokerModification(brokerId)
  27. case ControllerChange =>
  28. processControllerChange()
  29. case Reelect =>
  30. processReelect()
  31. case RegisterBrokerAndReelect =>
  32. processRegisterBrokerAndReelect()
  33. case Expire =>
  34. processExpire()
  35. case TopicChange =>
  36. processTopicChange()
  37. case LogDirEventNotification =>
  38. processLogDirEventNotification()
  39. case PartitionModifications(topic) =>
  40. processPartitionModifications(topic)
  41. case TopicDeletion =>
  42. processTopicDeletion()
  43. case ApiPartitionReassignment(reassignments, callback) =>
  44. processApiPartitionReassignment(reassignments, callback)
  45. case ZkPartitionReassignment =>
  46. processZkPartitionReassignment()
  47. case ListPartitionReassignments(partitions, callback) =>
  48. processListPartitionReassignments(partitions, callback)
  49. case UpdateFeatures(request, callback) =>
  50. processFeatureUpdates(request, callback)
  51. case PartitionReassignmentIsrChange(partition) =>
  52. processPartitionReassignmentIsrChange(partition)
  53. case IsrChangeNotification =>
  54. processIsrChangeNotification()
  55. case AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback) =>
  56. processAlterIsr(brokerId, brokerEpoch, isrsToAlter, callback)
  57. case AllocateProducerIds(brokerId, brokerEpoch, callback) =>
  58. processAllocateProducerIds(brokerId, brokerEpoch, callback)
  59. case Startup =>
  60. processStartup()
  61. }

4.副本管理

1.ReplicaFetcherThread

父类:

        AbstractFetcherThread。

功能:

        follower从leader拉取消息。

成员:

doWork():

  1. override def doWork(): Unit = {
  2. maybeTruncate()
  3. maybeFetch()
  4. }

        负责执行副本截断,执行消息获取。

processPartitionData():

        处理从leader拉取的消息,并通过LogSegment.append()写入日志。

truncate():

        对指定分区进行日志截断。

buildFetch():

        构建发送给leader副本的Broker的Fetch消息。

 2.ReplicaManager

成员:
allPartitions:
        管理当前Broker上所有的分区数据。

replicaFetcherManager:
        创建管理follower副本向leader副本拉取消息的ReplicaFetcherThread,并把日志写入副本log。

appendRecords():
        写入日志:
        情况1:leader写入本地日志后返回成功。
        情况2:leader写入本地后,且其他follower也写入成功,此时才返回成功。

        使用场景:
        1.生产者向leader副本写入消息。
        2.消费者组写入信息。
        3.事务管理器写入事务信息。

fetchMessages():
        处理fetch请求。数据累积达到一定值的时候才会把数据返回给请求方。

4.消费者组管理

1.MemberMetadata

成员:

summary,summaryNoMetadata:
        创建返回MemberSummary实例。
        MemberSummary中的成员:
                memberId:
                        消费者组成员id,consumer-组ID-<序号>-
                groupInstanceId:
                        消费者组静态成员ID。
                clientId:
                        消费者组成员配置的client.id
                clientHost:
                        运行消费者组程序的主机名。
                metadata:
                        标识消费者组成员分区分配策略的字节数组。
                assignment:
                        保存分配给该成员的订阅分区。

rebalanceTimeoutMs:
        必须完成rebalance的时间。
sessionTimeoutMs:
        会话超时时间,超时时间内为有心跳产生,被认定成员下线,触发rebalance。
protocolType:
        协议类型。
supportedProtocols:
        多组多分区分配策略。
awaitingJoinCallback:
        表示组成员是否正在等待加入组。
awaitingSyncCallback:
        表示组成员组成员是否等待GroupCoordinator发送分配方案。
isNew:
        表示是否是消费者组的新成员。
heartbeatSatisfied:
        当心跳过期时设置为false,接收到心跳设置为true。

2.GroupMetadata

成员:

GroupState:

        定义消费者组状态。PreparingRebalance,CompletingRebalance,Stable,Dead,Empty ,5种状态。

currentStateTimestamp:
        最近一次状态变更的时间。
generationId:
        每次执行rebalance值加1。
leaderId:
        消费者组中leader成员的Member Id。
members:
        保存消费者组下所有成员的元数据信息。
offsets:
        保存按照主题分区分组的位移主题消息的hashMap ,key是主题分区,value 是 CommitRecordMetadataAndOffset保存位移信息,保存了位移主题消息自己的位移值和位移提交消息中保存的消费者组的位移值。
subscribedTopics:
        保存消费者组订阅的列表用于帮助从offsets字段过滤订阅主题分区的位移值。
supportedProtocols:
        保存分区分配策略的支持票数。
transitionTo():
        设置消费者组状态为传入的状态。并更新状态变化时间currentStateTimestamp。
canRebalance():
        判断是否可以rebalance。
add():

        添加成员。
  1. def add(member: MemberMetadata, callback: JoinCallback = null): Unit = {
  2. member.groupInstanceId.foreach { instanceId =>
  3. if (staticMembers.contains(instanceId))
  4. throw new IllegalStateException(s"Static member with groupInstanceId=$instanceId " +
  5. s"cannot be added to group $groupId since it is already a member")
  6. staticMembers.put(instanceId, member.memberId)
  7. }
  8. // 如果成员组中没有其他成员
  9. if (members.isEmpty)
  10. // 把该成员的protocolType设置为消费组的protocolType
  11. this.protocolType = Some(member.protocolType)
  12. 比较protocolType是否相同
  13. assert(this.protocolType.orNull == member.protocolType)
  14. // 确保成员的分区分配策略和组的分区分配策略匹配
  15. assert(supportsProtocols(member.protocolType, MemberMetadata.plainProtocolSet(member.supportedProtocols)))
  16. //检查是否没有lead成员
  17. if (leaderId.isEmpty)
  18. // 将成员设置成lead成员
  19. leaderId = Some(member.memberId)
  20. //将成员加入members
  21. members.put(member.memberId, member)
  22. // 更新分区分配策略支持票数
  23. incSupportedProtocols(member)
  24. 设置成员加入组后的回调函数
  25. member.awaitingJoinCallback = callback
  26. // 更新已加入组的成员数
  27. if (member.isAwaitingJoin)
  28. numMembersAwaitingJoin += 1
  29. pendingMembers.remove(member.memberId)
  30. }

remove():

        移除成员。

  1. def remove(memberId: String): Unit = {
  2. //从members中移除成员
  3. members.remove(memberId).foreach { member =>
  4. 更新分区分配策略支持票数
  5. decSupportedProtocols(member)
  6. if (member.isAwaitingJoin)
  7. 已加入组成员数减1
  8. numMembersAwaitingJoin -= 1
  9. member.groupInstanceId.foreach(staticMembers.remove)
  10. }
  11. 如果是lead成员
  12. if (isLeader(memberId))
  13. 选择剩余成员列表中第一个
  14. leaderId = members.keys.headOption
  15. pendingMembers.remove(memberId)
  16. pendingSyncMembers.remove(memberId)
  17. }
initializeOffsets():
        添加位移值。
onOffsetCommitAppend():
        添加位移值。
completePendingTxnOffsetCommit():
         完成一个待决事物。
getExpiredOffsets():
        获取订阅过期分区。

  1. def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long,
  2. subscribedTopics: Set[String] = Set.empty): Map[TopicPartition,
  3. //同时满足一下三个条件时可以移除
  4. OffsetAndMetadata] = {
  5. offsets.filter {
  6. case (topicPartition, commitRecordMetadataAndOffset) =>
  7. //分区所属主题不在订阅主题列表中,不在subscribedTopics中
  8. !subscribedTopics.contains(topicPartition.topic()) &&
  9. //主题分期一完成提交不在pendingOffsetCommits中
  10. !pendingOffsetCommits.contains(topicPartition) && {
  11. //主题分区在位移主题中对应的消息的存在时间超过了阈值
  12. commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
  13. case None =>
  14. // current version with no per partition retention
  15. currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
  16. case Some(expireTimestamp) =>
  17. // older versions with explicit expire_timestamp field => old expiration semantics is used
  18. currentTimestamp >= expireTimestamp
  19. }
  20. }
  21. }.map {
  22. case (topicPartition, commitRecordOffsetAndMetadata) =>
  23. (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
  24. }.toMap
  25. }
removeExpiredOffsets(): 
        移除过期分区。
selectProtocol():
        选取分区分配策略。
  1. def selectProtocol: String = {
  2. if (members.isEmpty)
  3. throw new IllegalStateException("Cannot select protocol for empty group")
  4. // select the protocol for this group which is supported by all members
  5. // 找出所有成员都支持的分区分片策略
  6. val candidates = candidateProtocols
  7. // let each member vote for one of the protocols and choose the one with the most votes
  8. //所有成员投票,选取票数多的策略
  9. val (protocol, _) = allMemberMetadata
  10. .map(_.vote(candidates))
  11. .groupBy(identity)
  12. .maxBy { case (_, votes) => votes.size }
  13. protocol
  14. }

2.GroupMetadataManager

消费者组的位移管理

 

成员:

brokerId:
        所在broker的id。
interBrokerProtocolVersion:
        broker间通信使用的版本。
config:
        OffsetConfig,定义位移管理的相关参数
replicaManager:
        副本管理区。
CompressionType:
        压缩器类型。
groupMetadataCache:
        Pool[String, GroupMetadata],key是消费者组名称,value是GroupMetadata消费者组元数据。保存由当前broker上GroupCoordinator管理的消费者组元数据
loadingPartitions:
        位移主题下正在执行加载操作的分区号集合。
ownedPartitions:
        位移主题下完成加载操作的分区号集合。
groupMetadataTopicPartitionCount:
        位移主题分区数。
getGroup():
        获取消费者组元数据。
getOrMaybeCreateGroup():
        获取相应的消费者组原信息,如果不存在可以创建并添加消费者组。removeGroupsForPartition():
        移除消费者组信息,通过scheduler.schedule()创建异步任务。调用removeGroupsAndOffsets(),移除组信息和位移信息。
  1. private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
  2. coordinatorEpoch: Option[Int],
  3. onGroupUnloaded: GroupMetadata => Unit): Unit = {
  4. val offsetsPartition = topicPartition.partition
  5. if (maybeUpdateCoordinatorEpoch(offsetsPartition, coordinatorEpoch)) {
  6. var numOffsetsRemoved = 0
  7. var numGroupsRemoved = 0
  8. debug(s"Started unloading offsets and group metadata for $topicPartition for " +
  9. s"coordinator epoch $coordinatorEpoch")
  10. inLock(partitionLock) {
  11. // we need to guard the group removal in cache in the loading partition lock
  12. // to prevent coordinator's check-and-get-group race condition
  13. //从ownedPartitions和loadingPartitions移除特定位移主题分区。
  14. ownedPartitions.remove(offsetsPartition)
  15. loadingPartitions.remove(offsetsPartition)
  16. //遍历所有消费者组信息
  17. for (group <- groupMetadataCache.values) {
  18. //如果组信息是在给定位移主题分区下保存的
  19. if (partitionFor(group.groupId) == offsetsPartition) {
  20. //卸载组
  21. onGroupUnloaded(group)
  22. //将信息从组数据元中移除。
  23. groupMetadataCache.remove(group.groupId, group)
  24. //把消费者组从producer对应的组集合中移除
  25. removeGroupFromAllProducers(group.groupId)
  26. //更新已移除组计数器
  27. numGroupsRemoved += 1
  28. //更新已移除位移值计数器
  29. numOffsetsRemoved += group.numOffsets
  30. }
  31. }
  32. }
  33. info(s"Finished unloading $topicPartition for coordinator epoch $coordinatorEpoch. " +
  34. s"Removed $numOffsetsRemoved cached offsets and $numGroupsRemoved cached groups.")
  35. } else {
  36. info(s"Not removing offsets and group metadata for $topicPartition " +
  37. s"in epoch $coordinatorEpoch since current epoch is ${epochForPartitionId.get(topicPartition.partition)}")
  38. }
  39. }
addGroup():
        添加消费者组元数据。
loadGroup():
        加载消费者组元数据。
  1. private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, CommitRecordMetadataAndOffset],
  2. pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
  3. // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
  4. // view of the group's offsets
  5. trace(s"Initialized offsets $offsets for group ${group.groupId}")
  6. //初始化消费者组的位移信息
  7. group.initializeOffsets(offsets, pendingTransactionalOffsets.toMap)
  8. //调动addGroup添加消费者组
  9. val currentGroup = addGroup(group)
  10. if (group != currentGroup)
  11. debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
  12. s"because there is already a cached group with generation ${currentGroup.generationId}")
  13. }
storeOffsets():
        保存消费者组位移。
  1. def storeOffsets(group: GroupMetadata,
  2. consumerId: String,
  3. offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  4. responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
  5. producerId: Long = RecordBatch.NO_PRODUCER_ID,
  6. producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
  7. requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
  8. // first filter out partitions with offset metadata size exceeding limit
  9. val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
  10. //位移消息要小于maxMetadataSize
  11. validateOffsetMetadataLength(offsetAndMetadata.metadata)
  12. }
  13. group.inLock {
  14. if (!group.hasReceivedConsistentOffsetCommits)
  15. warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " +
  16. s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
  17. s"should be avoided.")
  18. }
  19. val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
  20. // construct the message set to append
  21. //如果位移消息都过大,就返回错误信息
  22. if (filteredOffsetMetadata.isEmpty) {
  23. // compute the final error codes for the commit response
  24. val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
  25. responseCallback(commitStatus)
  26. } else {
  27. //查看当前borker是否为指定消费者组的Coordinator
  28. getMagic(partitionFor(group.groupId)) match {
  29. case Some(magicValue) =>
  30. // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
  31. val timestampType = TimestampType.CREATE_TIME
  32. val timestamp = time.milliseconds()
  33. //创建位移主题的位移提交消息
  34. val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
  35. val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
  36. val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
  37. new SimpleRecord(timestamp, key, value)
  38. }
  39. val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
  40. //申请内存,等待将位移消息写入
  41. val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
  42. if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
  43. throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue)
  44. val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(),
  45. producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH)
  46. records.foreach(builder.append)
  47. val entries = Map(offsetTopicPartition -> builder.build())
  48. // set the callback function to insert offsets into cache after log append completed
  49. def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
  50. // the append response should only contain the topics partition
  51. //确保消息写入指定位移主题分区
  52. if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
  53. throw new IllegalStateException("Append status %s should only have one partition %s"
  54. .format(responseStatus, offsetTopicPartition))
  55. // construct the commit response status and insert
  56. // the offset and metadata to cache if the append status has no error
  57. val status = responseStatus(offsetTopicPartition)
  58. val responseError = group.inLock {
  59. //写入结果没有异常
  60. if (status.error == Errors.NONE) {
  61. //如果不是dead状态
  62. if (!group.is(Dead)) {
  63. filteredOffsetMetadata.forKeyValue { (topicPartition, offsetAndMetadata) =>
  64. if (isTxnOffsetCommit)
  65. group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
  66. else
  67. //填充GroupMetadata中的元数据
  68. group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
  69. }
  70. }
  71. // Record the number of offsets committed to the log
  72. offsetCommitsSensor.record(records.size)
  73. Errors.NONE
  74. } else {
  75. if (!group.is(Dead)) {
  76. if (!group.hasPendingOffsetCommitsFromProducer(producerId))
  77. removeProducerGroup(producerId, group.groupId)
  78. filteredOffsetMetadata.forKeyValue { (topicPartition, offsetAndMetadata) =>
  79. if (isTxnOffsetCommit)
  80. group.failPendingTxnOffsetCommit(producerId, topicPartition)
  81. else
  82. //取消未完成的位移消息写入
  83. group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
  84. }
  85. }
  86. debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
  87. s"with generation ${group.generationId} failed when appending to log due to ${status.error.exceptionName}")
  88. // transform the log append error code to the corresponding the commit status error code
  89. status.error match {
  90. case Errors.UNKNOWN_TOPIC_OR_PARTITION
  91. | Errors.NOT_ENOUGH_REPLICAS
  92. | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
  93. Errors.COORDINATOR_NOT_AVAILABLE
  94. case Errors.NOT_LEADER_OR_FOLLOWER
  95. | Errors.KAFKA_STORAGE_ERROR =>
  96. Errors.NOT_COORDINATOR
  97. case Errors.MESSAGE_TOO_LARGE
  98. | Errors.RECORD_LIST_TOO_LARGE
  99. | Errors.INVALID_FETCH_SIZE =>
  100. Errors.INVALID_COMMIT_OFFSET_SIZE
  101. case other => other
  102. }
  103. }
  104. }
  105. // compute the final error codes for the commit response
  106. //获取最后的状态
  107. val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
  108. if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
  109. (topicPartition, responseError)
  110. else
  111. (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
  112. }
  113. // finally trigger the callback logic passed from the API layer
  114. //最后调用回调函数
  115. responseCallback(commitStatus)
  116. }
  117. if (isTxnOffsetCommit) {
  118. group.inLock {
  119. addProducerGroup(producerId, group.groupId)
  120. group.prepareTxnOffsetCommit(producerId, offsetMetadata)
  121. }
  122. } else {
  123. group.inLock {
  124. group.prepareOffsetCommit(offsetMetadata)
  125. }
  126. }
  127. //将消息写入到位移主题,并把putCacheCallback作为回调函数执行,更新消费者元数据
  128. appendForGroup(group, entries, requestLocal, putCacheCallback)
  129. case None =>
  130. val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
  131. (topicPartition, Errors.NOT_COORDINATOR)
  132. }
  133. responseCallback(commitStatus)
  134. }
  135. }
  136. }

提交分区位移

getOffsets():

查询消费者组位移,从对应的GroupMetadata中获取。

groupMetadataKey():生成消费者组注册消息的key。
groupMetadataValue():生成消费者组注册消息的value。当注册消息的value为空的时候,表示可以将这个消费者组元数据从位移主题中删除。
  1. def groupMetadataValue(groupMetadata: GroupMetadata,
  2. assignment: Map[String, Array[Byte]],
  3. apiVersion: ApiVersion): Array[Byte] = {
  4. val version =
  5. if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
  6. else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
  7. else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
  8. else 3.toShort
  9. MessageUtil.toVersionPrefixedBytes(version, new GroupMetadataValue()
  10. //设置协议类型
  11. .setProtocolType(groupMetadata.protocolType.getOrElse(""))
  12. //设置generation Id。
  13. .setGeneration(groupMetadata.generationId)
  14. //设置分配策略。
  15. .setProtocol(groupMetadata.protocolName.orNull)
  16. //设置leader ID
  17. .setLeader(groupMetadata.leaderOrNull)
  18. //设置最近一次状态变更时间
  19. .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
  20. //设置所有成员信息
  21. .setMembers(groupMetadata.allMemberMetadata.map { memberMetadata =>
  22. new GroupMetadataValue.MemberMetadata()
  23. .setMemberId(memberMetadata.memberId)
  24. .setClientId(memberMetadata.clientId)
  25. .setClientHost(memberMetadata.clientHost)
  26. .setSessionTimeout(memberMetadata.sessionTimeoutMs)
  27. .setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs)
  28. .setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
  29. // The group is non-empty, so the current protocol must be defined
  30. .setSubscription(groupMetadata.protocolName.map(memberMetadata.metadata)
  31. .getOrElse(throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.")))
  32. .setAssignment(assignment.getOrElse(memberMetadata.memberId,
  33. throw new IllegalStateException(s"Attempted to write member ${memberMetadata.memberId} of group ${groupMetadata.groupId} with no assignment.")))
  34. }.asJava))
  35. }
offsetCommitKey():创建提交位移消息key。消费者名称+主题+分区号
offsetCommitValue():创建提交位移消息value。当位移消息的value为空的时候,表示可以将这个消费者组在的主题分区上的位移调教删除。
  1. def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
  2. apiVersion: ApiVersion): Array[Byte] = {
  3. val version =
  4. if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
  5. else if (apiVersion < KAFKA_2_1_IV1) 2.toShort
  6. else 3.toShort
  7. MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue()
  8. //设置位移值
  9. .setOffset(offsetAndMetadata.offset)
  10. //元数据信息
  11. .setMetadata(offsetAndMetadata.metadata)
  12. //设置更新时间
  13. .setCommitTimestamp(offsetAndMetadata.commitTimestamp)
  14. //设置leader的Epoch
  15. .setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
  16. // version 1 has a non empty expireTimestamp field
  17. .setExpireTimestamp(offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
  18. )
  19. }
storeGroup():
        向Coordinator中注册消费者组,putCacheCallback把消息存入内存中。Coordinator对应的Broker是组所在位移主题分区的leader的Broker。
  1. def storeGroup(group: GroupMetadata,
  2. groupAssignment: Map[String, Array[Byte]],
  3. responseCallback: Errors => Unit,
  4. requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
  5. //判断当前broker是不是group的coordinator
  6. getMagic(partitionFor(group.groupId)) match {
  7. case Some(magicValue) =>
  8. // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
  9. val timestampType = TimestampType.CREATE_TIME
  10. val timestamp = time.milliseconds()
  11. //构建注册消息的key和value
  12. val key = GroupMetadataManager.groupMetadataKey(group.groupId)
  13. val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
  14. val records = {
  15. val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
  16. Seq(new SimpleRecord(timestamp, key, value)).asJava))
  17. val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
  18. builder.append(timestamp, key, value)
  19. builder.build()
  20. }
  21. //计算要写入的目标分区
  22. val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
  23. val groupMetadataRecords = Map(groupMetadataPartition -> records)
  24. val generationId = group.generationId
  25. // set the callback function to insert the created group into cache after log append completed
  26. //填充cache
  27. def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
  28. // the append response should only contain the topics partition
  29. if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
  30. throw new IllegalStateException("Append status %s should only have one partition %s"
  31. .format(responseStatus, groupMetadataPartition))
  32. // construct the error status in the propagated assignment response in the cache
  33. val status = responseStatus(groupMetadataPartition)
  34. val responseError = if (status.error == Errors.NONE) {
  35. Errors.NONE
  36. } else {
  37. debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
  38. s"due to ${status.error.exceptionName}")
  39. // transform the log append error code to the corresponding the commit status error code
  40. status.error match {
  41. case Errors.UNKNOWN_TOPIC_OR_PARTITION
  42. | Errors.NOT_ENOUGH_REPLICAS
  43. | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
  44. Errors.COORDINATOR_NOT_AVAILABLE
  45. case Errors.NOT_LEADER_OR_FOLLOWER
  46. | Errors.KAFKA_STORAGE_ERROR =>
  47. Errors.NOT_COORDINATOR
  48. case Errors.REQUEST_TIMED_OUT =>
  49. Errors.REBALANCE_IN_PROGRESS
  50. case Errors.MESSAGE_TOO_LARGE
  51. | Errors.RECORD_LIST_TOO_LARGE
  52. | Errors.INVALID_FETCH_SIZE =>
  53. error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
  54. s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
  55. Errors.UNKNOWN_SERVER_ERROR
  56. case other =>
  57. error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
  58. s"due to unexpected error: ${status.error.exceptionName}")
  59. other
  60. }
  61. }
  62. responseCallback(responseError)
  63. }
  64. //向位移主题写入消息
  65. appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback)
  66. case None =>
  67. responseCallback(Errors.NOT_COORDINATOR)
  68. None
  69. }
  70. }
管理消费者组的coordinator所在的broker是group所使用的位移主题的分区leader的broker

loadGroupsAndOffsets():
doLoadGroupsAndOffsets():
        从位移主题中加载消费者组和消费者组的位移到内存中。当前Broker成为某个分区的Leader副本的时候执行。
  1. private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
  2. def logEndOffset: Long = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
  3. //从副本管理中,查找到 主题分区的 日志对象
  4. replicaManager.getLog(topicPartition) match {
  5. case None =>
  6. warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
  7. case Some(log) =>
  8. //已完成位移值加载的分区列表。
  9. val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
  10. //正在位移加载的分区列表
  11. val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
  12. //已完成组信息加载的消费者组列表
  13. val loadedGroups = mutable.Map[String, GroupMetadata]()
  14. //等待移除的消费者组列表
  15. val removedGroups = mutable.Set[String]()
  16. // buffer may not be needed if records are read from memory
  17. var buffer = ByteBuffer.allocate(0)
  18. // loop breaks if leader changes at any time during the load, since logEndOffset is -1
  19. //位移主题分区的起始位置
  20. var currOffset = log.logStartOffset
  21. // loop breaks if no records have been read, since the end of the log has been reached
  22. //至少读取一条消息
  23. var readAtLeastOneRecord = true
  24. //读取位移值小于日志LEO值,至少读取一次,GroupMetadataManager没有关闭
  25. while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
  26. //读取位移主题指定分区的数据
  27. val fetchDataInfo = log.read(currOffset,
  28. maxLength = config.loadBufferSize,
  29. isolation = FetchLogEnd,
  30. minOneMessage = true)
  31. readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
  32. //创建消息集合
  33. val memRecords = (fetchDataInfo.records: @unchecked) match {
  34. case records: MemoryRecords => records
  35. //从FileRecords 转换到MemoryRecords
  36. case fileRecords: FileRecords =>
  37. val sizeInBytes = fileRecords.sizeInBytes
  38. val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)
  39. // minOneMessage = true in the above log.read means that the buffer may need to be grown to ensure progress can be made
  40. if (buffer.capacity < bytesNeeded) {
  41. if (config.loadBufferSize < bytesNeeded)
  42. warn(s"Loaded offsets and group metadata from $topicPartition with buffer larger ($bytesNeeded bytes) than " +
  43. s"configured offsets.load.buffer.size (${config.loadBufferSize} bytes)")
  44. buffer = ByteBuffer.allocate(bytesNeeded)
  45. } else {
  46. buffer.clear()
  47. }
  48. fileRecords.readInto(buffer, 0)
  49. MemoryRecords.readableRecords(buffer)
  50. }
  51. memRecords.batches.forEach { batch =>
  52. val isTxnOffsetCommit = batch.isTransactional
  53. //判断是否为控制类消息
  54. if (batch.isControlBatch) {
  55. val recordIterator = batch.iterator
  56. if (recordIterator.hasNext) {
  57. val record = recordIterator.next()
  58. val controlRecord = ControlRecordType.parse(record.key)
  59. if (controlRecord == ControlRecordType.COMMIT) {
  60. pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
  61. .foreach {
  62. case (groupTopicPartition, commitRecordMetadataAndOffset) =>
  63. if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
  64. loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
  65. }
  66. }
  67. pendingOffsets.remove(batch.producerId)
  68. }
  69. } else {
  70. var batchBaseOffset: Option[Long] = None
  71. for (record <- batch.asScala) {
  72. //检查消息有key
  73. require(record.hasKey, "Group metadata/offset entry key should not be null")
  74. if (batchBaseOffset.isEmpty)
  75. //记录第一条消息位移值
  76. batchBaseOffset = Some(record.offset)
  77. GroupMetadataManager.readMessageKey(record.key) match {
  78. //判断是提交位移消息
  79. case offsetKey: OffsetKey =>
  80. if (isTxnOffsetCommit && !pendingOffsets.contains(batch.producerId))
  81. pendingOffsets.put(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
  82. // load offset
  83. val groupTopicPartition = offsetKey.key
  84. //判断是否有value
  85. if (!record.hasValue) {
  86. //没有value从列表中移除
  87. if (isTxnOffsetCommit)
  88. pendingOffsets(batch.producerId).remove(groupTopicPartition)
  89. else
  90. loadedOffsets.remove(groupTopicPartition)
  91. } else {
  92. //有value,放入列表中
  93. val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
  94. if (isTxnOffsetCommit)
  95. pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
  96. else
  97. loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
  98. }
  99. //消费者组注册信息
  100. case groupMetadataKey: GroupMetadataKey =>
  101. // load group metadata
  102. val groupId = groupMetadataKey.key
  103. val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
  104. if (groupMetadata != null) {
  105. removedGroups.remove(groupId)
  106. loadedGroups.put(groupId, groupMetadata)
  107. } else {
  108. loadedGroups.remove(groupId)
  109. removedGroups.add(groupId)
  110. }
  111. case unknownKey =>
  112. throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
  113. }
  114. }
  115. }
  116. //更新读取位置到批次最后一条消息
  117. currOffset = batch.nextOffset
  118. }
  119. }
  120. val (groupOffsets, emptyGroupOffsets) = loadedOffsets
  121. .groupBy(_._1.group)
  122. .map { case (k, v) =>
  123. k -> v.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }
  124. }.partition { case (group, _) => loadedGroups.contains(group) }
  125. val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
  126. pendingOffsets.forKeyValue { (producerId, producerOffsets) =>
  127. producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
  128. producerOffsets
  129. .groupBy(_._1.group)
  130. .forKeyValue { (group, offsets) =>
  131. val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
  132. val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
  133. groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) =>
  134. (groupTopicPartition.topicPartition, offset)
  135. }
  136. }
  137. }
  138. val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup
  139. .partition { case (group, _) => loadedGroups.contains(group)}
  140. //已完成加载的消费者组信息
  141. loadedGroups.values.foreach { group =>
  142. val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
  143. val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
  144. debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
  145. //添加消费者组信息到cache中
  146. loadGroup(group, offsets, pendingOffsets)
  147. //外层传入方法,处理下分组组下所有成员心跳设置。
  148. onGroupLoaded(group)
  149. }
  150. // load groups which store offsets in kafka, but which have no active members and thus no group
  151. // metadata stored in the log
  152. //没有对应组信息的列表
  153. (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
  154. //创建全新的消费者组元数据
  155. val group = new GroupMetadata(groupId, Empty, time)
  156. val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
  157. val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
  158. debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
  159. loadGroup(group, offsets, pendingOffsets)
  160. onGroupLoaded(group)
  161. }
  162. //检查removedGroups中的消费者组不会出现在cache中。
  163. removedGroups.foreach { groupId =>
  164. // if the cache already contains a group which should be removed, raise an error. Note that it
  165. // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
  166. // offset storage (i.e. by "simple" consumers)
  167. if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
  168. throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
  169. s"loading partition $topicPartition")
  170. }
  171. }
  172. }

3.GroupCoordinator

 

函数:

handleJoinGroup():
        消费者组成员加入消费者组。
  1. def handleJoinGroup(groupId: String,//消费者组id
  2. memberId: String,//成员id
  3. groupInstanceId: Option[String], //
  4. requireKnownMemberId: Boolean,//
  5. clientId: String,
  6. clientHost: String,//消费者主机名
  7. rebalanceTimeoutMs: Int, //reblance超时时间
  8. sessionTimeoutMs: Int, //会话超时时间
  9. protocolType: String, //协议类型
  10. protocols: List[(String, Array[Byte])],
  11. responseCallback: JoinCallback,//回调函数
  12. requestLocal: RequestLocal = RequestLocal.NoCaching

  1. //如果成员id有但是,消费者组没有则返回失败。
  2. //其他情况返回现有消费者组或者创建新的消费者组
  3. groupManager.getOrMaybeCreateGroup(groupId, isUnknownMember) match {
  4. case None =>
  5. responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
  6. case Some(group) =>
  7. group.inLock {
  8. //检查消费者组是否已经满了,无法再添加成员
  9. if (!acceptJoiningMember(group, memberId)) {
  10. group.remove(memberId)
  11. responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
  12. } else if (isUnknownMember) {
  13. //空id成员加入组
  14. doNewMemberJoinGroup(
  15. group,
  16. groupInstanceId,
  17. requireKnownMemberId,
  18. clientId,
  19. clientHost,
  20. rebalanceTimeoutMs,
  21. sessionTimeoutMs,
  22. protocolType,
  23. protocols,
  24. responseCallback,
  25. requestLocal
  26. )
  27. } else {
  28. //非空id成员加入组
  29. doCurrentMemberJoinGroup(
  30. group,
  31. memberId,
  32. groupInstanceId,
  33. clientId,
  34. clientHost,
  35. rebalanceTimeoutMs,
  36. sessionTimeoutMs,
  37. protocolType,
  38. protocols,
  39. responseCallback
  40. )
  41. }
  42. // attempt to complete JoinGroup
  43. //如果消费者组正处于PreparingRebalance状态
  44. if (group.is(PreparingRebalance)) {
  45. //放入延迟队列,延迟处理
  46. rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
  47. }
  48. }
  49. }

 handleSyncGroup():

  1. private def validateSyncGroup(
  2. group: GroupMetadata,
  3. generationId: Int,
  4. memberId: String,
  5. protocolType: Option[String],
  6. protocolName: Option[String],
  7. groupInstanceId: Option[String],
  8. ): Option[Errors] = {
  9. //消费者组是dead状态
  10. if (group.is(Dead)) {
  11. Some(Errors.COORDINATOR_NOT_AVAILABLE)
  12. } else {
  13. //成员是否属于group
  14. validateCurrentMember(
  15. group,
  16. memberId,
  17. groupInstanceId,
  18. operation = "sync-group"
  19. ).orElse {
  20. //generationId一致
  21. if (generationId != group.generationId) {
  22. Some(Errors.ILLEGAL_GENERATION)
  23. //协议类型是否一致
  24. } else if (protocolType.isDefined && !group.protocolType.contains(protocolType.get)) {
  25. Some(Errors.INCONSISTENT_GROUP_PROTOCOL)
  26. //分区策略是否一致
  27. } else if (protocolName.isDefined && !group.protocolName.contains(protocolName.get)) {
  28. Some(Errors.INCONSISTENT_GROUP_PROTOCOL)
  29. } else {
  30. None
  31. }
  32. }
  33. }
  34. }
  1. case None => group.currentState match {
  2. //ID异常
  3. case Empty =>
  4. responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
  5. //正在rebalance中
  6. case PreparingRebalance =>
  7. responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
  8. case CompletingRebalance =>
  9. group.get(memberId).awaitingSyncCallback = responseCallback
  10. removePendingSyncMember(group, memberId)
  11. //判断是否为leader
  12. if (group.isLeader(memberId)) {
  13. info(s"Assignment received from leader $memberId for group ${group.groupId} for generation ${group.generationId}. " +
  14. s"The group has ${group.size} members, ${group.allStaticMembers.size} of which are static.")
  15. //如果成员没有分配方案,创建空的方案
  16. val missing = group.allMembers.diff(groupAssignment.keySet)
  17. val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
  18. if (missing.nonEmpty) {
  19. warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
  20. }
  21. //消费者组信息保存到消费者组元数据,并写入内部位移
  22. groupManager.storeGroup(group, assignment, (error: Errors) => {
  23. group.inLock {
  24. //如果CompletingRebalance且generationId 相同
  25. if (group.is(CompletingRebalance) && generationId == group.generationId) {
  26. if (error != Errors.NONE) {
  27. //清空分配方案发送给所有成员
  28. resetAndPropagateAssignmentError(group, error)
  29. //准备开始新的Rebalance
  30. maybePrepareRebalance(group, s"Error when storing group assignment during SyncGroup (member: $memberId)")
  31. } else {
  32. //保存分配方案到组元数据中
  33. setAndPropagateAssignment(group, assignment)
  34. //消费者组到Stable状态
  35. group.transitionTo(Stable)
  36. }
  37. }
  38. }
  39. }, requestLocal)
  40. groupCompletedRebalanceSensor.record()
  41. }
  42. case Stable =>
  43. 移除同步中的成员
  44. removePendingSyncMember(group, memberId)
  45. // if the group is stable, we just return the current assignment
  46. //获取成员元数据
  47. val memberMetadata = group.get(memberId)
  48. //封装协议类型,策略,分配方案
  49. responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
  50. completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
  51. case Dead =>
  52. throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
  53. }

rebalance:

消费者成员发送心跳到coordinator, 如果需要rebalance, coordinator会在心跳包的回复通知消费者成员。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/346935
推荐阅读
相关标签
  

闽ICP备14008679号