_kafkatemplate send">
当前位置:   article > 正文

KafkaTemplate是如何发送消息的?_kafkatemplate send

kafkatemplate send

Kafka使用KafkaTemplate发送消息,需要先实例化bean.配置如下

  1. <!-- 定义producer的参数 -->
  2. <bean id="producerProperties" class="java.util.HashMap">
  3. <constructor-arg>
  4. <map>
  5. <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
  6. <entry key="group.id" value="0" />
  7. <entry key="retries" value="2" />
  8. <entry key="batch.size" value="16384" />
  9. <entry key="linger.ms" value="1" />
  10. <entry key="buffer.memory" value="33554432" />
  11. <entry key="max.request.size" value="10000000"></entry>
  12. <entry key="send.buffer.bytes" value="10000000"></entry>
  13. <entry key="key.serializer"
  14. value="org.apache.kafka.common.serialization.StringSerializer" />
  15. <entry key="value.serializer"
  16. value="org.apache.kafka.common.serialization.StringSerializer" />
  17. </map>
  18. </constructor-arg>
  19. </bean>
  20. <!-- 创建kafkatemplate需要使用的producerfactory bean -->
  21. <bean id="producerFactory"
  22. class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
  23. <constructor-arg>
  24. <ref bean="producerProperties" />
  25. </constructor-arg>
  26. </bean>
  27. <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
  28. <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
  29. <constructor-arg ref="producerFactory" />
  30. <constructor-arg name="autoFlush" value="true" />
  31. <property name="defaultTopic" value="mhb-test" />
  32. </bean>

使用时直接注入就可以使用了.

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;
  1. ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,key, JSON.toJSONString(obj));
  2. future.get();

 这个是通过Spring包装后的用法.Spring增加了ProducerFactory创建Producer对象,并且给Producer增加了事务功能,把参数包装成ProducerRecord对象,调用Kafka-client包中Producer类的send方法.

  1. protected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord) {
  2. if (this.transactional) {
  3. Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
  4. }
  5. //增加事务功能,使用factory创建producer
  6. Producer<K, V> producer = this.getTheProducer();
  7. this.logger.trace(() -> {
  8. return "Sending: " + producerRecord;
  9. });
  10. SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
  11. //发送Kafka,包装返回结果
  12. producer.send(producerRecord, this.buildCallback(producerRecord, producer, future));
  13. if (this.autoFlush) {
  14. this.flush();
  15. }
  16. this.logger.trace(() -> {
  17. return "Sent: " + producerRecord;
  18. });
  19. return future;
  20. }

接下来都是kafka-client包内的内容了.KafkaProducer实现了Producer接口,在发送前还调用了拦截器ProducerInterceptor,这个拦截器能拦截甚至更改record数据.官方介绍如下.

A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster.

拦截器处理完后就是真正发送到Kafka了.调用了org.apache.kafka.clients.producer.KafkaProducer#doSend方法.源码如下:

  1. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  2. TopicPartition tp = null;
  3. try {
  4. //0.检查和一些参数的初始化,计算等待时间,因为Kafka是批量发送.
  5. this.throwIfProducerClosed();
  6. KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
  7. try {
  8. clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
  9. } catch (KafkaException var19) {
  10. if (this.metadata.isClosed()) {
  11. throw new KafkaException("Producer closed while send in progress", var19);
  12. }
  13. throw var19;
  14. }
  15. long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
  16. Cluster cluster = clusterAndWaitTime.cluster;
  17. byte[] serializedKey;
  18. try {//1.序列化key
  19. serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
  20. } catch (ClassCastException var18) {
  21. throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var18);
  22. }
  23. byte[] serializedValue;
  24. try {//2.序列化value
  25. serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
  26. } catch (ClassCastException var17) {
  27. throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var17);
  28. }
  29. //3,计算根据key,计算所在分区
  30. int partition = this.partition(record, serializedKey, serializedValue, cluster);
  31. //组装TopicPartition对象
  32. tp = new TopicPartition(record.topic(), partition);
  33. this.setReadOnly(record.headers());
  34. Header[] headers = record.headers().toArray();
  35. int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
  36. this.ensureValidRecordSize(serializedSize);
  37. long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
  38. this.log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
  39. //组装Callback 对象
  40. Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
  41. if (this.transactionManager != null && this.transactionManager.isTransactional()) {
  42. this.transactionManager.maybeAddPartitionToTransaction(tp);
  43. }
  44. //3.添加到RecordAccumulator中等待发送
  45. RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
  46. if (result.batchIsFull || result.newBatchCreated) {
  47. this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  48. this.sender.wakeup();
  49. }
  50. //4.返回结果
  51. return result.future;
  52. } catch (ApiException var20) {
  53. this.log.debug("Exception occurred during message send:", var20);
  54. if (callback != null) {
  55. callback.onCompletion((RecordMetadata)null, var20);
  56. }
  57. this.errors.record();
  58. this.interceptors.onSendError(record, tp, var20);
  59. return new KafkaProducer.FutureFailure(var20);
  60. } catch (InterruptedException var21) {
  61. this.errors.record();
  62. this.interceptors.onSendError(record, tp, var21);
  63. throw new InterruptException(var21);
  64. } catch (BufferExhaustedException var22) {
  65. this.errors.record();
  66. this.metrics.sensor("buffer-exhausted-records").record();
  67. this.interceptors.onSendError(record, tp, var22);
  68. throw var22;
  69. } catch (KafkaException var23) {
  70. this.errors.record();
  71. this.interceptors.onSendError(record, tp, var23);
  72. throw var23;
  73. } catch (Exception var24) {
  74. this.interceptors.onSendError(record, tp, var24);
  75. throw var24;
  76. }
  77. }

 这里是发送Kafka消息的核心逻辑了,这段代码非常重要,尤其是计算分区逻辑和Kafka批量发送逻辑.

分区默认是采用hash算法计算key,转32位后与总分区取余.

发送消息是批量发送,先把数据在client中存下来,等队列满了或者等待时间到了就发送给Kafka服务器.重点关注org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法,代码如下,具体逻辑将在下一篇中补充.

  1. public RecordAccumulator.RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException {
  2. this.appendsInProgress.incrementAndGet();
  3. ByteBuffer buffer = null;
  4. if (headers == null) {
  5. headers = Record.EMPTY_HEADERS;
  6. }
  7. RecordAccumulator.RecordAppendResult var16;
  8. try {
  9. //1.检查是否有包含该主题分区的批处理对象的双端队列,如果没有则新建
  10. Deque<ProducerBatch> dq = this.getOrCreateDeque(tp);
  11. synchronized(dq) {
  12. if (this.closed) {
  13. throw new KafkaException("Producer closed while send in progress");
  14. }
  15. //尝试向批处理对象追加消息,并返回追加结果,如果队列里没有批处理对象,则返回空
  16. RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
  17. if (appendResult != null) {
  18. RecordAccumulator.RecordAppendResult var14 = appendResult;
  19. return var14;
  20. }
  21. }
  22. byte maxUsableMagic = this.apiVersions.maxUsableProduceMagic();
  23. int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, this.compression, key, value, headers));
  24. this.log.trace("Allocating a new {} byte message buffer for topic {} partition {}", new Object[]{size, tp.topic(), tp.partition()});
  25. buffer = this.free.allocate(size, maxTimeToBlock);
  26. synchronized(dq) {
  27. if (this.closed) {
  28. throw new KafkaException("Producer closed while send in progress");
  29. }
  30. RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
  31. if (appendResult == null) {
  32. //2. 将消息写入内存中,封装成一个内存消息对象
  33. MemoryRecordsBuilder recordsBuilder = this.recordsBuilder(buffer, maxUsableMagic);
  34. //根据内存消息对象新建一个批处理对象
  35. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, this.time.milliseconds());
  36. //批量处理
  37. FutureRecordMetadata future = (FutureRecordMetadata)Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, this.time.milliseconds()));
  38. //将批处理对象添加到双端队列中
  39. dq.addLast(batch);
  40. this.incomplete.add(batch);
  41. buffer = null;
  42. RecordAccumulator.RecordAppendResult var19 = new RecordAccumulator.RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
  43. return var19;
  44. }
  45. var16 = appendResult;
  46. }
  47. } finally {
  48. if (buffer != null) {
  49. this.free.deallocate(buffer);
  50. }
  51. this.appendsInProgress.decrementAndGet();
  52. }
  53. return var16;
  54. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/879674
推荐阅读
相关标签