当前位置:   article > 正文

kakfa RPC 协议(社区 Trunk 版本)--ListOffsetRequest_listoffsetrequestdata.listoffsettopic

listoffsetrequestdata.listoffsettopic

1、RPC 功能和使用场景

  1. 功能:用于consumer 或者 follower 获取消费的起始offset,只支持EARLIEST_TIMESTAMP(最早)和 LATEST_TIMESTAMP(最新)消费。
  2. 使用场景

         2.1 consumer: consumer 在初次启动的时候需要指定一个消费的起始offset,才能进行消费,如果该consumer 对应的group 之前消费过kafka 数据切commit offset 未过期(提交的数据存在过期时间,因为kafka 的数据也会过期删除),那么就会发送另外一个RPC(OffsetFetchRequest),会在后续解析中介绍)

         2.2 follower: follower 从leader fetch 数据的时候,由于fetch 过慢或者其他异常情况下,会触发OUT_OF_RANFE_EXCEPTION,此时需要从leader 获取一个新的offset,继续fetch 数据

2、RPC 字段解析

  1. private final int replicaId;
  2. private final IsolationLevel isolationLevel;
  3. private final Map<TopicPartition, PartitionData> partitionTimestamps;
  4. private final Set<TopicPartition> duplicatePartitions;
  • replicaId ,用于服务端在处理时区分请求是来自于consumer 还是follower(小于0 标识为consumer)
  • isolationLevel:消费事务性控制,控制能够消费的数据区间
  • partitionTimestamps:RPC 携带的tp 信息
  • duplicatePartitions:如果一个rpc 包含重复的tp,就会将其加入到duplicatePartitions 中,服务端在处理时会忽略这个tp(一般也不会出现这种情况)

3、client 端解析

      3.1 consumer 

          consumer 调用LISTOFFSET的起点代码位于这个位置

  1. private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node,
  2. final Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
  3. boolean requireTimestamp) {
  4. ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
  5. .forConsumer(requireTimestamp, isolationLevel)
  6. .setTargetTimes(timestampsToSearch);
  7. log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
  8. return client.send(node, builder)
  9. .compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>() {
  10. @Override
  11. public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> future) {
  12. ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
  13. log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
  14. handleListOffsetResponse(timestampsToSearch, lor, future);
  15. }
  16. });
  17. }

     3.2 follower

          (1)    makeFollower的时候,由于fetch的起始offset 小于0 ,此时需要truncat 掉自身的数据并且从leader fetch offset

  1. def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]) {
  2. partitionMapLock.lockInterruptibly()
  3. try {
  4. initialFetchStates.foreach { case (tp, initialFetchState) =>
  5. // We can skip the truncation step iff the leader epoch matches the existing epoch
  6. val currentState = partitionStates.stateValue(tp)
  7. val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
  8. currentState
  9. } else {
  10. val initialFetchOffset = if (initialFetchState.offset < 0)
  11. fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
  12. else
  13. initialFetchState.offset
  14. PartitionFetchState(initialFetchOffset, initialFetchState.leaderEpoch, state = Truncating)
  15. }
  16. partitionStates.updateAndMoveToEnd(tp, updatedState)
  17. }
  18. partitionMapCond.signalAll()
  19. } finally partitionMapLock.unlock()
  20. }

   (2)在fetch leader 数据的时候返回OUT_OF_RANGE时

  1. private def handleOutOfRangeError(topicPartition: TopicPartition,
  2. fetchState: PartitionFetchState): Boolean = {
  3. try {
  4. val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
  5. val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching)
  6. partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
  7. info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
  8. s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
  9. true

       --这里首先获取leader 最新的leo,如果leder的leo < follower 的leo,那么需要truncat掉越界的数据并从leader leo 开始拖数据

      ---如果leder的leo > follower 的leo, 那么继续判断leader的lso 是否 是否比follower 的leo 大,大于则从leader的lso 开始恢复数据

4、server 处理解析

 

  1. def handleListOffsetRequest(request: RequestChannel.Request) {
  2. val version = request.header.apiVersion()
  3. val mergedResponseMap = if (version == 0)
  4. handleListOffsetRequestV0(request)
  5. else
  6. handleListOffsetRequestV1AndAbove(request)
  7. sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
  8. }
  1. server 端接收到请求后,根据api Version 信息进行区分
  2. handleListOffsetRequestV0与handleListOffsetRequestV1 最大的区别在于是否支持只返回指定offsets 之前的offset(由于这一参数在后期被社区标记为Deprecated,所以区别已然很小),这里重点解析handleListOffsetRequestV1
  3. 核心的处理逻辑位于下列函数中
    1. def fetchOffsetByTimestamp(targetTimestamp: Long): Option[TimestampAndOffset] = {
    2. maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
    3. debug(s"Searching offset for timestamp $targetTimestamp")
    4. if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
    5. targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
    6. targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
    7. throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
    8. s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
    9. s"required version $KAFKA_0_10_0_IV0")
    10. // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
    11. // constant time access while being safe to use with concurrent collections unlike `toArray`.
    12. val segmentsCopy = logSegments.toBuffer
    13. // For the earliest and latest, we do not need to return the timestamp.
    14. if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) {
    15. // The first cached epoch usually corresponds to the log start offset, but we have to verify this since
    16. // it may not be true following a message format version bump as the epoch will not be available for
    17. // log entries written in the older format.
    18. val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
    19. val epochOpt = earliestEpochEntry match {
    20. case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch)
    21. case _ => Optional.empty[Integer]()
    22. }
    23. return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
    24. } else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
    25. val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
    26. val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
    27. return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
    28. }
    29. val targetSeg = {
    30. // Get all the segments whose largest timestamp is smaller than target timestamp
    31. val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
    32. // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
    33. if (earlierSegs.length < segmentsCopy.length)
    34. Some(segmentsCopy(earlierSegs.length))
    35. else
    36. None
    37. }
    38. targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
    39. }
    40. }

    这里的一个优化是通过对leader每一个segment 维持一个leaderEpochCache来加速查询过程

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/773684
推荐阅读
相关标签
  

闽ICP备14008679号