赞
踩
2.1 consumer: consumer 在初次启动的时候需要指定一个消费的起始offset,才能进行消费,如果该consumer 对应的group 之前消费过kafka 数据切commit offset 未过期(提交的数据存在过期时间,因为kafka 的数据也会过期删除),那么就会发送另外一个RPC(OffsetFetchRequest),会在后续解析中介绍)
2.2 follower: follower 从leader fetch 数据的时候,由于fetch 过慢或者其他异常情况下,会触发OUT_OF_RANFE_EXCEPTION,此时需要从leader 获取一个新的offset,继续fetch 数据
- private final int replicaId;
- private final IsolationLevel isolationLevel;
- private final Map<TopicPartition, PartitionData> partitionTimestamps;
- private final Set<TopicPartition> duplicatePartitions;
consumer 调用LISTOFFSET的起点代码位于这个位置
- private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node,
- final Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
- boolean requireTimestamp) {
- ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
- .forConsumer(requireTimestamp, isolationLevel)
- .setTargetTimes(timestampsToSearch);
-
- log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
- return client.send(node, builder)
- .compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>() {
- @Override
- public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> future) {
- ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
- log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
- handleListOffsetResponse(timestampsToSearch, lor, future);
- }
- });
- }
(1) makeFollower的时候,由于fetch的起始offset 小于0 ,此时需要truncat 掉自身的数据并且从leader fetch offset
- def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]) {
- partitionMapLock.lockInterruptibly()
- try {
- initialFetchStates.foreach { case (tp, initialFetchState) =>
- // We can skip the truncation step iff the leader epoch matches the existing epoch
- val currentState = partitionStates.stateValue(tp)
- val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
- currentState
- } else {
- val initialFetchOffset = if (initialFetchState.offset < 0)
- fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
- else
- initialFetchState.offset
- PartitionFetchState(initialFetchOffset, initialFetchState.leaderEpoch, state = Truncating)
- }
- partitionStates.updateAndMoveToEnd(tp, updatedState)
- }
-
- partitionMapCond.signalAll()
- } finally partitionMapLock.unlock()
- }
(2)在fetch leader 数据的时候返回OUT_OF_RANGE时
- private def handleOutOfRangeError(topicPartition: TopicPartition,
- fetchState: PartitionFetchState): Boolean = {
- try {
- val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
- val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching)
- partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
- info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
- s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
- true
--这里首先获取leader 最新的leo,如果leder的leo < follower 的leo,那么需要truncat掉越界的数据并从leader leo 开始拖数据
---如果leder的leo > follower 的leo, 那么继续判断leader的lso 是否 是否比follower 的leo 大,大于则从leader的lso 开始恢复数据
- def handleListOffsetRequest(request: RequestChannel.Request) {
- val version = request.header.apiVersion()
-
- val mergedResponseMap = if (version == 0)
- handleListOffsetRequestV0(request)
- else
- handleListOffsetRequestV1AndAbove(request)
-
- sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
- }
- def fetchOffsetByTimestamp(targetTimestamp: Long): Option[TimestampAndOffset] = {
- maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
- debug(s"Searching offset for timestamp $targetTimestamp")
-
- if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
- targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
- targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
- throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
- s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
- s"required version $KAFKA_0_10_0_IV0")
-
- // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
- // constant time access while being safe to use with concurrent collections unlike `toArray`.
- val segmentsCopy = logSegments.toBuffer
- // For the earliest and latest, we do not need to return the timestamp.
- if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) {
- // The first cached epoch usually corresponds to the log start offset, but we have to verify this since
- // it may not be true following a message format version bump as the epoch will not be available for
- // log entries written in the older format.
- val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
- val epochOpt = earliestEpochEntry match {
- case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch)
- case _ => Optional.empty[Integer]()
- }
- return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
- } else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
- val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
- val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
- return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
- }
-
- val targetSeg = {
- // Get all the segments whose largest timestamp is smaller than target timestamp
- val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
- // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
- if (earlierSegs.length < segmentsCopy.length)
- Some(segmentsCopy(earlierSegs.length))
- else
- None
- }
-
- targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
- }
- }
这里的一个优化是通过对leader每一个segment 维持一个leaderEpochCache来加速查询过程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。