赞
踩
这里主要还是之前看到的Sender.java中的completeBatch方法
private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) { //说明响应里面带有异常 并且这个请求是可以重试的 if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts - 1, error); //重新把发送失败的批次重新加入到队列里面。 this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { //其余的情况会走这里,这里可能是原本就不可以重试,还可能是重试次数超了 RuntimeException exception; //如果响应里面带有没有权限的异常 if (error == Errors.TOPIC_AUTHORIZATION_FAILED) //自己封装一个异常信息(自定义异常) exception = new TopicAuthorizationException(batch.topicPartition.topic()); else exception = error.exception(); // tell the user the result of their request //TODO 核心代码 把异常信息也带过去了 //回调函数调用完之后 //说明一个完整的消息发送流程就结束了 batch.done(baseOffset, timestamp, exception); this.accumulator.deallocate(batch); if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); }
我们可以简单的看一下reenqueue做了什么处理
*/
public void reenqueue(RecordBatch batch, long now) {
//重试次数累加
batch.attempts++;
//上一次重试的时间
batch.lastAttemptMs = now;
batch.lastAppendTime = now;
batch.setRetry();
Deque<RecordBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
//重新放入到队列里面
//放入到队头
deque.addFirst(batch);
}
}
最后如果真的是有一些消息带有异常导致无法正常发送,我们也不能放任不管,这样就丢数据了,通常的做法就是在生产者代码的回调函数中捕获到这样的消息,将它存储到备用链路里面,防止丢数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。