当前位置:   article > 正文

java errors_Java Errors.exception方法代码示例

received unknown topic or partition error in produce request on partition di

import org.apache.kafka.common.protocol.Errors; //导入方法依赖的package包/类

/**

* Complete or retry the given batch of records.

*

* @param batch The record batch

* @param response The produce response

* @param correlationId The correlation id for the request

* @param now The current POSIX timestamp in milliseconds

*/

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,

long now) {

Errors error = response.error;

// 不能重试,则将RecordBatch都标记为异常完成,并释放ProducerBatch

if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&

(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {

// If the batch is too large, we split the batch and send the split batches again. We do not decrement

// the retry attempts in this case.

log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}",

correlationId,

batch.topicPartition,

this.retries - batch.attempts(),

error);

this.accumulator.splitAndReenqueue(batch);

this.accumulator.deallocate(batch);

this.sensors.recordBatchSplit();

} else if (error != Errors.NONE) {

if (canRetry(batch, error)) {

log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",

correlationId,

batch.topicPartition,

this.retries - batch.attempts() - 1,

error);

if (transactionManager == null) {

reenqueueBatch(batch, now);

} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {

// If idempotence is enabled only retry the request if the current producer id is the same as

// the producer id of the batch.

log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,

transactionManager.sequenceNumber(batch.topicPartition));

reenqueueBatch(batch, now);

} else {

failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +

"batch but the producer id changed from " + batch.producerId() + " to " +

transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."));

this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

}

} else {

final RuntimeException exception;

if (error == Errors.TOPIC_AUTHORIZATION_FAILED)

exception = new TopicAuthorizationException(batch.topicPartition.topic());

else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)

exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");

else

exception = error.exception();

// tell the user the result of their request

failBatch(batch, response, exception);

this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

}

if (error.exception() instanceof InvalidMetadataException) {

if (error.exception() instanceof UnknownTopicOrPartitionException)

log.warn("Received unknown topic or partition error in produce request on partition {}. The " +

"topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);

metadata.requestUpdate();

}

} else {

completeBatch(batch, response);

}

// Unmute the completed partition.

if (guaranteeMessageOrder)

this.accumulator.unmutePartition(batch.topicPartition);

}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/839292?site
推荐阅读
相关标签
  

闽ICP备14008679号