当前位置:   article > 正文

SpringBoot 集成Kafka操作详解_springboot kafka消息时间戳

springboot kafka消息时间戳

目录[-]


参考信息:

环境说明:

  • Kafka 版本:2.3.0
  • Zookeeper 版本:3.4.14
  • SpringBoot 版本:2.1.7.RELEASE
  • Spring For Apache Kafka 版本:2.2.8

一、概念知识

什么是消息中间件

       消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

什么是 Kafka

       Apache Kafka 是一个分布式高吞吐量的流消息系统,Kafka 建立在 ZooKeeper 同步服务之上。它与 Apache Storm 和 Spark 完美集成,用于实时流数据分析,与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,数据副本和高度容错功能,因此非常适合大型消息处理应用场景。

Kafka 特性

  • 高并发: 支持数千个客户端同时读写。
  • 可扩展性: kafka集群支持热扩展。
  • 容错性: 允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 高吞吐量、低延迟: Kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个消息主题topic可以分多个区,消费者组(consumer group)对消息分区(partition)进行消费。

使用场景

  • 日志收集: 可以用 kafka 收集各种服务的日志,通过kafka以统一接口服务的方式开放给各种消费者,如 hadoop,Hbase,Solr 等。
  • 消息系统: 解耦生产者和消费者、缓存消息等。
  • 用户活动跟踪: Kafka 经常被用来记录web用户或者app用户的各种活动,如浏览网页,搜索,点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标: Kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据,比如报警和报告等。
  • 流式处理: 比如 spark streaming 和 storm。

基本概念

  • Broker: 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。
  • Topic: Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。
  • Partition: 多个分区会分布在Kafka集群的不同服务节点上,消息以追加的方式写入一个或多个分区中。
  • LogSegment: 每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。
  • Offset: 每个分区中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到 Partition 中,每个消息都有一个连续的序列号称之为 Offset 偏移量,用于在 Partition 内唯一标识消息。
  • Message: 消息是 Kafka 中存储的最小最基本的单位,即为一个 commit log,由一个固定长度的消息头和一个可变长度的消息体组成。
  • Producer: 消息的生产者,负责发布消息到 Kafka Broker,生产者在默认情况下把消息均衡地分布到主题的所有分区上,用户也可以自定义分区器来实现消息的分区路由。
  • Consumer: 消息的消费者,从 Kafka Broker 读取消息的客户端,消费者把每个分区最后读取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
  • Consumer Group: 每个 Consumer 属于一个特定的 Consumer Group(若不指定 Group Name则属于默认的 group),一个或多个 Consumer 组成的群组可以共同消费一个 Topic 中的消息,但每个分区只能被群组中的一个消费者操作。

生产者 ACKS 机制

       ACKS 参数指定了必须要有多少个分区副本接收到消息,生产者才会认为消息写入是发送消息成功的,这个参数对消息丢失的可能性会产生重要影响,主参数有如下选项:

  • acks=0: 把消息发送到kafka就认为发送成功。
  • acks=1: 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功。
  • acks=all: 把消息发送到 Kafka Leader 分区,并且 Leader 分区的副本 Follower 对消息进行了同步就认为发送成功。

消费者更新 Offset 偏移量两种方式

详情可以查看参考的一篇文章:https://www.jianshu.com/p/d5cd34e429a2

       消费者把每个分区最后读取的悄息偏移量提交保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失,KafkaConsumer API 提供了很多种方式来提交偏移量,但是不同的提交方式会产生不同的数据影响。

  • 自动提交:

       如果 enable.auto.commit 被设置为 true,那么消费者会自动提交当前处理到的偏移量存入 Zookeeper,自动提交的时间间隔为5s,通过 atuo.commit.interval.ms 属性设置,自动提交是非常方便,但是自动提交会出现消息被重复消费的风险,可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复悄息的时间窗,不过这种情况是无也完全避免的。

  • 手动提交:

       鉴于 Kafka 自动提交 Offset 的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交 Offset 策略,将 auto.commit.offset 自动提交参数设置为 false 来关闭自动提交开启手动模式,手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。

二、SpringBoot 操作 Kafka 示例

1、Maven 引入 Kafka 相关组件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.7.RELEASE</version>
  9. </parent>
  10. <groupId>club.mydlq</groupId>
  11. <artifactId>springboot-kafka-demo</artifactId>
  12. <version>0.0.1-SNAPSHOT</version>
  13. <name>springboot-kafka-demo</name>
  14. <properties>
  15. <java.version>1.8</java.version>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-web</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.kafka</groupId>
  24. <artifactId>spring-kafka</artifactId>
  25. </dependency>
  26. </dependencies>
  27. <build>
  28. <plugins>
  29. <plugin>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-maven-plugin</artifactId>
  32. </plugin>
  33. </plugins>
  34. </build>
  35. </project>

2、Topic 配置

配置 Topic,每次程序启动时检测 Kafka 中是否存在已经配置的 Topic,如果不存在就创建。

  1. import org.apache.kafka.clients.admin.AdminClientConfig;
  2. import org.apache.kafka.clients.admin.NewTopic;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.core.KafkaAdmin;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. @Configuration
  9. public class KafkaTopicConfig {
  10. /**
  11. * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
  12. */
  13. @Bean
  14. public KafkaAdmin kafkaAdmin() {
  15. Map<String, Object> configs = new HashMap<>();
  16. // 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
  17. configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  18. return new KafkaAdmin(configs);
  19. }
  20. /**
  21. * 创建 Topic
  22. */
  23. @Bean
  24. public NewTopic topicinfo() {
  25. // 创建topic,需要指定创建的topic的"名称""分区数""副本数量(副本数数目设置要小于Broker数量)"
  26. return new NewTopic("test", 3, (short) 0);
  27. }
  28. }

3、Producer 配置

(1)、创建 Producer 配置类

创建 Producer 配置类,对 Kafka 生产者进行配置,在配置中需要设置三个 Bean 分别为:

  • kafkaTemplate:kafka template 实例,用于 Spring 中的其它对象引入该 Bean,通过其向 Kafka 发送消息。
  • producerFactory:producer 工厂,用于对 kafka producer 进行配置。
  • producerConfigs:对 kafka producer 参数进行配置。
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.annotation.EnableKafka;
  6. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.kafka.core.ProducerFactory;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. // 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
  12. @Configuration
  13. @EnableKafka
  14. public class KafkaProducerConfig {
  15. /**
  16. * Producer Template 配置
  17. */
  18. @Bean(name="kafkaTemplate")
  19. public KafkaTemplate<String, String> kafkaTemplate() {
  20. return new KafkaTemplate<>(producerFactory());
  21. }
  22. /**
  23. * Producer 工厂配置
  24. */
  25. public ProducerFactory<String, String> producerFactory() {
  26. return new DefaultKafkaProducerFactory<>(producerConfigs());
  27. }
  28. /**
  29. * Producer 参数配置
  30. */
  31. public Map<String, Object> producerConfigs() {
  32. Map<String, Object> props = new HashMap<>();
  33. // 指定多个kafka集群多个地址
  34. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  35. // 重试次数,0为不启用重试机制
  36. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  37. // acks=0 把消息发送到kafka就认为发送成功
  38. // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
  39. // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
  40. props.put(ProducerConfig.ACKS_CONFIG,"1");
  41. // 生产者空间不足时,send()被阻塞的时间,默认60s
  42. props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
  43. // 控制批处理大小,单位为字节
  44. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
  45. // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
  46. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  47. // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
  48. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
  49. // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
  50. props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
  51. // 键的序列化方式
  52. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  53. // 值的序列化方式
  54. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  55. // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
  56. // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
  57. props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
  58. return props;
  59. }
  60. }

(2)、创建 Producer Service 向 kafka 发送数据

创建 Producer Service 引入 KafkaTemplate 对象,再创建 sendMessageSyncsendMessageAsync 两个方法,分别利用“同步/异步”两种方法向 kafka 发送消息。

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.kafka.support.SendResult;
  4. import org.springframework.stereotype.Service;
  5. import org.springframework.util.concurrent.ListenableFuture;
  6. import org.springframework.util.concurrent.ListenableFutureCallback;
  7. import java.util.concurrent.ExecutionException;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.concurrent.TimeoutException;
  10. @Service
  11. public class KafkaProducerService {
  12. @Autowired
  13. private KafkaTemplate kafkaTemplate;
  14. /**
  15. * producer 同步方式发送数据
  16. * @param topic topic名称
  17. * @param message producer发送的数据
  18. */
  19. public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
  20. kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
  21. }
  22. /**
  23. * producer 异步方式发送数据
  24. * @param topic topic名称
  25. * @param message producer发送的数据
  26. */
  27. public void sendMessageAsync(String topic, String message) {
  28. ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
  29. future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
  30. @Override
  31. public void onSuccess(SendResult<Integer, String> result) {
  32. System.out.println("success");
  33. }
  34. @Override
  35. public void onFailure(Throwable ex) {
  36. System.out.println("failure");
  37. }
  38. });
  39. }
  40. }

(3)、创建 Producer Controller 调用 Producer Service 产生数据

Spring Controller 类,用于调用 Producer Service 中的方法向 kafka 发送消息。

  1. import club.mydlq.springbootkafkademo.service.ProducerService;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.concurrent.TimeoutException;
  7. @RestController
  8. public class KafkaProducerController {
  9. @Autowired
  10. private KafkaProducerService producerService;
  11. @GetMapping("/sync")
  12. public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException {
  13. producerService.sendMessageSync("test","同步发送消息测试");
  14. }
  15. @GetMapping("/async")
  16. public void sendMessageAsync(){
  17. producerService.sendMessageAsync("test","异步发送消息测试");
  18. }
  19. }

4、Consumer 配置

(1)、创建 Consumer 配置类

创建 Consumer 配置类,对 Kafka 消费者进行配置,在配置中需要设置三个 Bean 分别为:

  • kafkaListenerContainerFactory:kafka container 工厂,负责创 建container,当使用@KafkaListener时需要提供。
  • consumerFactory:consumer 工厂,用于对 kafka consumer 进行配置。
  • consumerConfigs:对 kafka consumer 参数进行配置。
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.annotation.EnableKafka;
  6. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  7. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  8. import org.springframework.kafka.core.ConsumerFactory;
  9. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  10. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. @Configuration
  14. @EnableKafka
  15. public class KafkaConsumerConfig {
  16. @Bean
  17. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  18. ConcurrentKafkaListenerContainerFactory<Integer, String>
  19. factory = new ConcurrentKafkaListenerContainerFactory<>();
  20. // 设置消费者工厂
  21. factory.setConsumerFactory(consumerFactory());
  22. // 消费者组中线程数量
  23. factory.setConcurrency(3);
  24. // 拉取超时时间
  25. factory.getContainerProperties().setPollTimeout(3000);
  26. return factory;
  27. }
  28. @Bean
  29. public ConsumerFactory<Integer, String> consumerFactory() {
  30. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  31. }
  32. @Bean
  33. public Map<String, Object> consumerConfigs() {
  34. Map<String, Object> propsMap = new HashMap<>();
  35. // Kafka地址
  36. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  37. // 是否自动提交offset偏移量(默认true)
  38. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  39. // 自动提交的频率(ms)
  40. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  41. // Session超时设置
  42. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  43. // 键的反序列化方式
  44. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  45. // 值的反序列化方式
  46. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  47. // offset偏移量规则设置:
  48. // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  49. // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  50. // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  51. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  52. return propsMap;
  53. }
  54. }

(2)、创建 Consumer Service 监听 Kafka 数据

  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class KafkaConsumerService {
  5. @KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory")
  6. public void kafkaListener(String message){
  7. System.out.println(message);
  8. }
  9. }

三、SpringBoot 操作 Kafka 详解

1、Producer Template 发送消息几种方法

KafkaTemplate 类提供了非常方便的方法将数据发送到 kafka 的 Topic,以下清单显示了该类的提供的相关方法,详情可以查看 KafkaTemplate 类方法文档

  1. // 设定data,向kafka发送消息
  2. ListenableFuture<SendResult<K, V>> sendDefault(V data);
  3. // 设定keydata,向kafka发送消息
  4. ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
  5. // 设定partition、keydata,向kafka发送消息
  6. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
  7. // 设定partition、timestamp、keydata,向kafka发送消息
  8. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
  9. // 设定topic、data,向kafka发送消息
  10. ListenableFuture<SendResult<K, V>> send(String topic, V data);
  11. // 设定topic、keydata,向kafka发送消息
  12. ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
  13. // 设定topic、partition、keydata,向kafka发送消息
  14. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
  15. // 设定topic、partition、timestamp、 keydata,向kafka发送消息
  16. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
  17. // 创建ProducerRecord对象,在ProducerRecord中设置好topic、partion、keyvalue等信息,然后向kafka发送消息
  18. ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
  19. // 创建Spring的Message对象,然后向kafka发送消息
  20. ListenableFuture<SendResult<K, V>> send(Message<?> message);
  21. // 获取指标信息
  22. Map<MetricName, ? extends Metric> metrics();
  23. // 显示Topic分区信息
  24. List<PartitionInfo> partitionsFor(String topic);
  25. //在生产者上执行一些任意操作并返回结果。
  26. <T> T execute(ProducerCallback<K, V, T> callback);
  27. // 生产者刷新消息
  28. void flush();
  29. // 用于执行生产者方法后异步回调
  30. interface ProducerCallback<K, V, T> {
  31. T doInKafka(Producer<K, V> producer);
  32. }

下面将写个使用示例,这里改下上面向 kafka service 发送数据的例子,通过不同的方法向 kafka 发送消息,具体代码如下:

  1. import org.apache.kafka.clients.producer.ProducerRecord;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.kafka.support.KafkaHeaders;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.messaging.support.MessageBuilder;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.util.concurrent.ListenableFuture;
  10. import org.springframework.util.concurrent.ListenableFutureCallback;
  11. import java.util.Date;
  12. import java.util.concurrent.ExecutionException;
  13. import java.util.concurrent.TimeUnit;
  14. import java.util.concurrent.TimeoutException;
  15. @Service
  16. public class ProducerService {
  17. @Autowired
  18. private KafkaTemplate kafkaTemplate;
  19. /**
  20. * producer 同步方式发送数据
  21. *
  22. * @param topic topic名称
  23. * @param message producer发送的数据
  24. */
  25. public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
  26. //------- 方法:send(String topic, @Nullable V data)
  27. kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
  28. //------- 方法:send(String topic, K key, @Nullable V data)
  29. kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
  30. //------- 方法:send(String topic, K key, @Nullable V data)
  31. kafkaTemplate.send(topic, 0, message).get(10, TimeUnit.SECONDS);
  32. //------- 方法:send(String topic, Integer partition, K key, @Nullable V data)
  33. kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
  34. //------- 方法:send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)
  35. kafkaTemplate.send(topic, 0, new Date().getTime(),key, message).get(10, TimeUnit.SECONDS);
  36. //------- 方法:send(Message<?> message)
  37. Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")
  38. .setHeader(KafkaHeaders.MESSAGE_KEY, key)
  39. .setHeader(KafkaHeaders.TOPIC, topic)
  40. .setHeader(KafkaHeaders.PREFIX,"kafka_")
  41. .build();
  42. kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS);
  43. //------- 方法:send(ProducerRecord<K, V> record)
  44. ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");
  45. ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("test", "", "Send ProducerRecord(topic,key,value) Test");
  46. kafkaTemplate.send(producerRecord1).get(10, TimeUnit.SECONDS);
  47. kafkaTemplate.send(producerRecord2).get(10, TimeUnit.SECONDS);
  48. }
  49. /**
  50. * producer 异步方式发送数据
  51. *
  52. * @param topic topic名称
  53. * @param message producer发送的数据
  54. */
  55. public void sendMessageAsync(String topic, String key, String message) {
  56. //------- 方法:send(String topic, @Nullable V data)
  57. ListenableFuture<SendResult<Integer, String>> future1 = kafkaTemplate.send(topic, message);
  58. //------- 方法:send(String topic, K key, @Nullable V data)
  59. ListenableFuture<SendResult<Integer, String>> future2 = kafkaTemplate.send(topic, key, message);
  60. //------- 方法:send(String topic, K key, @Nullable V data)
  61. ListenableFuture<SendResult<Integer, String>> future3 = kafkaTemplate.send(topic, 0, message);
  62. //------- 方法:send(String topic, Integer partition, K key, @Nullable V data)
  63. ListenableFuture<SendResult<Integer, String>> future4 = kafkaTemplate.send(topic, 0, key, message);
  64. //------- 方法:send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)
  65. ListenableFuture<SendResult<Integer, String>> future5 = kafkaTemplate.send(topic, 0, new Date().getTime(),key, message);
  66. //------- 方法:send(Message<?> message)
  67. Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")
  68. .setHeader(KafkaHeaders.MESSAGE_KEY, key)
  69. .setHeader(KafkaHeaders.TOPIC, topic)
  70. .setHeader(KafkaHeaders.PREFIX,"kafka_")
  71. .build();
  72. ListenableFuture<SendResult<Integer, String>> future6 = kafkaTemplate.send(msg);
  73. //------- 方法:send(ProducerRecord<K, V> record)
  74. ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");
  75. ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("test", "", "Send ProducerRecord(topic,key,value) Test");
  76. ListenableFuture<SendResult<Integer, String>> future7 = kafkaTemplate.send(producerRecord1);
  77. ListenableFuture<SendResult<Integer, String>> future8 = kafkaTemplate.send(producerRecord2);
  78. // 设置异步发送消息获取发送结果后执行的动作
  79. ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() {
  80. @Override
  81. public void onSuccess(SendResult<Integer, String> result) {
  82. System.out.println("success");
  83. }
  84. @Override
  85. public void onFailure(Throwable ex) {
  86. System.out.println("failure");
  87. }
  88. };
  89. // 将listenableFutureCallback与异步发送消息对象绑定
  90. future1.addCallback(listenableFutureCallback);
  91. future2.addCallback(listenableFutureCallback);
  92. future3.addCallback(listenableFutureCallback);
  93. future4.addCallback(listenableFutureCallback);
  94. future5.addCallback(listenableFutureCallback);
  95. future6.addCallback(listenableFutureCallback);
  96. future7.addCallback(listenableFutureCallback);
  97. future8.addCallback(listenableFutureCallback);
  98. }
  99. }

2、Kafka Consumer 监听 Kafka 消息

当我们需要接收 kafka 中的消息时需要使用消息监听器,Spring For Kafka 提供了八种消息监听器接口,接口如下:

  1. /**
  2. * 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
  3. * 使用此接口处理Kafka consumer poll()操作接收到的各个ConsumerRecord实例。
  4. */
  5. public interface MessageListener<K, V> {
  6. void onMessage(ConsumerRecord<K, V> data);
  7. }
  8. /**
  9. * 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作接收到的各个ConsumerRecord实例。
  10. */
  11. public interface AcknowledgingMessageListener<K, V> {
  12. void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
  13. }
  14. /**
  15. * 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
  16. * 使用此接口处理Kafka consumer poll()操作接收到的各个ConsumerRecord
  17. * 实例。并提供可访问的consumer对象。
  18. */
  19. public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
  20. void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
  21. }
  22. /**
  23. * 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作
  24. * 接收到的各个ConsumerRecord实例。并提供可访问的consumer对象。
  25. */
  26. public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
  27. void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
  28. }
  29. /**
  30. * 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
  31. * 使用此接口处理从Kafka consumer poll()操作接收到的所有ConsumerRecord实例。
  32. *
  33. * 注意:使用此接口时不支持ACK的AckMode.RECORD模式,因为监听器已获得完整的批处理。
  34. */
  35. public interface BatchMessageListener<K, V> {
  36. void onMessage(List<ConsumerRecord<K, V>> data);
  37. }
  38. /**
  39. * 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作接收到的所有ConsumerRecord实例。
  40. */
  41. public interface BatchAcknowledgingMessageListener<K, V> {
  42. void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
  43. }
  44. /**
  45. * 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
  46. * 使用此接口处理从Kafka consumer poll()操作接收到的所有ConsumerRecord实例。
  47. * 并提供可访问的consumer对象。
  48. *
  49. * 注意:使用此接口时不支持ACK的AckMode.RECORD模式,因为监听器已获得完整的批处理。
  50. */
  51. public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
  52. void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
  53. }
  54. /**
  55. * 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作接收到的
  56. * 所有ConsumerRecord实例。并提供可访问的consumer对象。
  57. */
  58. public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
  59. void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
  60. }

上面接口中的方法归总就是:

序号消费方式自动提交Offset偏移量提供Consumer对象
1单条
2单条
3单条
4单条
5批量
6批量
7批量
8批量

Spring For Kafka 提供了消息监听器接口的两种实现类,分别是:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 利用单个线程来接收全部主题中全部分区上的所有消息。
ConcurrentMessageListenerContainer 代理的一个或多个 KafkaMessageListenerContainer 实例,来实现多个线程消费。

下面将创建一个 KafkaMessageListenerContainer 实例来监听 Kafka 消息:

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. /**
  19. * 创建 KafkaMessageListenerContainer 实例监听 kafka 消息
  20. */
  21. @Bean
  22. public KafkaMessageListenerContainer demoListenerContainer() {
  23. // 创建container配置参数,并指定要监听的 topic 名称
  24. ContainerProperties properties = new ContainerProperties("test");
  25. // 设置消费者组名称
  26. properties.setGroupId("group2");
  27. // 设置监听器监听 kafka 消息
  28. properties.setMessageListener(new MessageListener<Integer,String>() {
  29. @Override
  30. public void onMessage(ConsumerRecord<Integer, String> record) {
  31. System.out.println("消息:" + record);
  32. }
  33. });
  34. return new KafkaMessageListenerContainer(consumerFactory(), properties);
  35. }
  36. }

       上面示例启动后将监听 topic 名称为 “test” 的 kafka 消息,不过这样启动只是单线程消费,如果想多线程消费就得创建多个实例来监控该 topic 不同的分区。但是这样操作来完成消费者多线程消费比较麻烦,所以一般使用 Spring For Kafka 组件时都会创建 KafkaListenerContainerFactory Bean 来代理多个 KafkaMessageListenerContainer 完成消费者多线程消费。

3、使用 @KafkaListener 注解监听 Kafka 消息

       为了使创建 kafka 监听器更加简单,Spring For Kafka 提供了 @KafkaListener 注解,该 @KafkaListener 注解配置方法上,凡是带上此注解的方法就会被标记为是 Kafka 消息监听器,所以可以用 @KafkaListener 注解快速创建消息监听器。

下面写几个例子来简单描述下使用方法:

(1)、监听单个 Topic 示例

这里先写一个简单使用 @KafkaListener 完成消息监听的示例。

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String>
  21. factory = new ConcurrentKafkaListenerContainerFactory<>();
  22. factory.setConsumerFactory(consumerFactory());
  23. // 创建3个线程并发消费
  24. factory.setConcurrency(3);
  25. // 设置拉取数据超时时间
  26. factory.getContainerProperties().setPollTimeout(3000);
  27. return factory;
  28. }
  29. /**
  30. * ---使用@KafkaListener注解来标记此方法为kafka消息监听器,创建消费组group1监听test topic
  31. */
  32. @KafkaListener(topics = {"test"},groupId = "group1")
  33. public void kafkaListener(String message){
  34. System.out.println("消息:"+message);
  35. }
  36. }

(2)、监听多个 Topic 示例

使用 @KafkaListener 也可以监控多个 topic 的消息,示例如下:

  1. @KafkaListener(topics = {"test1", "test2"}, groupId = "group1")
  2. public void kafkaListener(String message){
  3. System.out.println("消息:"+message);
  4. }

(3)、监听某个 Topic 的某个分区示例

单独监听某个分区息,示例如下:

  1. @KafkaListener(id = "id0", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "0" }) })
  2. public void kafkaListener1(String message) {
  3. System.out.println("消息:"+message);
  4. }
  5. @KafkaListener(id = "id1", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "1", "2" }) })
  6. public void kafkaListener2(String message) {
  7. System.out.println("消息:"+message);
  8. }

(4)、监听多个 Topic 的分区示例

同时监听多个 topic 的分区,示例如下:

  1. @KafkaListener(id = "test", group = "group1", topicPartitions = {
  2. @TopicPartition(topic = "test1", partitions = {"0"}),
  3. @TopicPartition(topic = "test2", partitions = {"0", "1"})
  4. })
  5. public void kafkaListener(String message) {
  6. System.out.print(message);
  7. }

(5)、获取监听的 topic 消息头中的元数据

可以从消息头中获取有关消息的元数据,例如:

  1. @KafkaListener(topics = "test", groupId = "group1")
  2. public void kafkaListener(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
  3. @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
  4. System.out.println("主题:" + topic);
  5. System.out.println("键key:" + key);
  6. System.out.println("消息:" + message);
  7. }

(6)、监听 topic 进行批量消费

如果参数配置中设置为批量消费,则 @KafkaListener 注解的方法的参数要使用 List 来接收,例如:

  1. @KafkaListener(topics = "test", groupId = "group1")
  2. public void kafkaListener(List<String> messages) {
  3. for(String msg:messages){
  4. System.out.println(msg);
  5. }
  6. }

(7)、监听 topic 并手动提交 Offset 偏移量

如果设置为手动提交 Offset 偏移量,并且设置 Ack 模式为 MANUAL 或 MANUAL_IMMEDIATE,则需要在方法参数中引入 Acknowledgment 对象,并执行它的 acknowledge() 方法来提交偏移量。

  1. @KafkaListener(topics = "test",groupId = "group5")
  2. public void kafkaListener(List<String> messages, Acknowledgment acknowledgment) {
  3. for(String msg:messages){
  4. System.out.println(msg);
  5. }
  6. // 触发提交offset偏移量
  7. acknowledgment.acknowledge();
  8. }

4、使用 @KafkaListener 模糊匹配多个 Topic

使用 @KafkaListener 注解时,可以添加参数 topicPattern ,输入通配符来对多个 topic 进行监听,例如这里使用 “test.*” 将监听所有以 test 开头的 topic 的消息。

  1. @KafkaListener(topicPattern = "test.*",groupId = "group6")
  2. public void annoListener2(String messages) {
  3. System.err.println(messages);
  4. }

5、使用 @SendTo 注解转发消息

       在平时处理业务逻辑时候,经常需要接收 kafka 中某个 topic 的消息,进行一系列处理来完成业务逻辑,然后再进行转发到一个新的 topic 中,由于这种业务需求,Spring For Kafka 提供了 @SendTo 注解,只要在 @KafkaListener 与 @SendTo 注解在同一个方法上,并且该方法存在返回值,那么就能将监听的数据在方法内进行处理后 return,然后转发到 @SendTo 注解内设置的 topic 中。

完成上面操作需要几个步骤:

  1. 配置 Producer 参数,并创建 kafkaTemplate Bean。
  2. 配置KafkaListenerContainerFactory的ReplyTemplate,将 kafkaTemplate 对象添加到其中。
  3. 创建消息监听器方法,设置该方法拥有返回值,并添加 @KafkaListener 与 @SendTo 两个注解,并在 @SendTo 注解中输入消息转发的 Topic。

(1)、配置 Producer 参数,并创建 kafkaTemplate Bean

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaProducerConfig {
  4. /**
  5. * kafkaTemplate Bean
  6. */
  7. @Bean(name="kafkaTemplate")
  8. public KafkaTemplate<String, String> kafkaTemplate() {
  9. return new KafkaTemplate<>(producerFactory());
  10. }
  11. public ProducerFactory<String, String> producerFactory() {
  12. return new DefaultKafkaProducerFactory<>(producerConfigs());
  13. }
  14. public Map<String, Object> producerConfigs() {
  15. Map<String, Object> props = new HashMap<>();
  16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  17. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  18. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  19. return props;
  20. }
  21. }

(2)、配置KafkaListenerContainerFactory的ReplyTemplate,将 kafkaTemplate 对象添加到其中

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConsumerConfig {
  4. @Autowired
  5. private KafkaTemplate kafkaTemplate;
  6. @Bean
  7. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  8. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  9. factory.setConsumerFactory(consumerFactory());
  10. factory.setConcurrency(3);
  11. factory.getContainerProperties().setPollTimeout(3000);
  12. // ---设置ReplyTemplate参数,将kafkaTemplate对象加入
  13. factory.setReplyTemplate(kafkaTemplate);
  14. return factory;
  15. }
  16. @Bean
  17. public ConsumerFactory<Integer, String> consumerFactory() {
  18. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  19. }
  20. @Bean
  21. public Map<String, Object> consumerConfigs() {
  22. Map<String, Object> propsMap = new HashMap<>();
  23. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  24. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  25. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. return propsMap;
  28. }

(3)、创建消息监听器方法,设置该方法拥有返回值,并添加 @KafkaListener 与 @SendTo 两个注解,并在 @SendTo 注解中输入消息转发的 Topic。

  1. @Service
  2. public class KafkaConsumerMessage {
  3. /**
  4. * 监听test1 topic,设置返回值为string类型,并添加@SendTo注解,将消息转发到 test2
  5. */
  6. @KafkaListener(topics = "test1",groupId = "group1")
  7. @SendTo("test2")
  8. public String kafkaListener1(String messages) {
  9. System.out.println(messages);
  10. String newMsg = messages + "消息转发测试";
  11. // 将处理后的消息返回
  12. return newMsg;
  13. }
  14. /**
  15. * 监听 test2 topic
  16. */
  17. @KafkaListener(topics = "test2",groupId = "group2")
  18. public void kafkaListener2(String messages) {
  19. System.err.println(messages);
  20. }
  21. }

6、Kafka Consumer 并发批量消费消息

(1)、设置并发数与开启批量

  • kafkaListenerContainerFactory 设置 factory.setConcurrency(3) 设置并发,这个值不能超过topic分区数目
  • kafkaListenerContainerFactory 设置 factory.setBatchListener(true) 开启批量
  • consumerConfigs 配置 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 值,来设置批量消费每次最多消费多少条消息记录
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo1 {
  4. @Bean
  5. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  6. ConcurrentKafkaListenerContainerFactory<Integer, String>
  7. factory = new ConcurrentKafkaListenerContainerFactory<>();
  8. factory.setConsumerFactory(consumerFactory());
  9. // 消费者组中线程数量,例如topic有3个分区,为了加快消费将并发设置为3
  10. factory.setConcurrency(3);
  11. // 拉取超时时间
  12. factory.getContainerProperties().setPollTimeout(3000);
  13. // 当使用批量监听器时需要设置为true
  14. factory.setBatchListener(true);
  15. return factory;
  16. }
  17. @Bean
  18. public ConsumerFactory<Integer, String> consumerFactory() {
  19. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  20. }
  21. @Bean
  22. public Map<String, Object> consumerConfigs() {
  23. Map<String, Object> propsMap = new HashMap<>();
  24. // Kafka地址
  25. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  26. // 是否自动提交
  27. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  28. // 自动提交的频率
  29. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  30. // Session超时设置
  31. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  32. // 键的反序列化方式
  33. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  34. // 值的反序列化方式
  35. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  36. // 批量消费每次最多消费多少条消息记录
  37. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
  38. return propsMap;
  39. }
  40. }

(2)、设置分区消费

有多个分区的 Topic,可以设置多个注解单独监听 Topic 各个分区以提高效率。

  1. @Component
  2. public class ConsumerMessage {
  3. @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "0" }) })
  4. public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
  5. System.out.println("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
  6. System.out.println("Id0 records size " + records.size());
  7. for (ConsumerRecord<?, ?> record : records) {
  8. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  9. System.out.println("Received: " + record);
  10. if (kafkaMessage.isPresent()) {
  11. Object message = record.value();
  12. String topic = record.topic();
  13. System.out.printf(topic + " p0 Received message=" + message);
  14. }
  15. }
  16. }
  17. @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "1" }) })
  18. public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
  19. System.out.println("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
  20. System.out.println("Id1 records size " + records.size());
  21. for (ConsumerRecord<?, ?> record : records) {
  22. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  23. System.out.println("Received: " + record);
  24. if (kafkaMessage.isPresent()) {
  25. Object message = record.value();
  26. String topic = record.topic();
  27. System.out.printf(topic + " p1 Received message=" + message);
  28. }
  29. }
  30. }
  31. @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "2" }) })
  32. public void listenPartition2(List<ConsumerRecord<?, ?>> records) {
  33. System.out.println("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
  34. System.out.println("Id2 records size " + records.size());
  35. for (ConsumerRecord<?, ?> record : records) {
  36. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  37. System.out.println("Received: " + record);
  38. if (kafkaMessage.isPresent()) {
  39. Object message = record.value();
  40. String topic = record.topic();
  41. System.out.printf(topic + " p2 Received message=" + message);
  42. }
  43. }
  44. }
  45. }

7、暂停和恢复 Listener Containers

Spring For Kafka 提供 start()pause() 和 resume() 方法来操作监听容器的启动、暂停和恢复。

  • start():启动监听容器。
  • pause():暂停监听容器。
  • resume():恢复监听容器。

       这些方法一般可以灵活操作 kafka 的消费,例如进行服务进行升级,暂停消费者进行消费;例如在白天高峰期不进行服务消费,等到晚上再进行,这时候可以设置定时任务,白天关闭消费者消费到晚上开启;考虑到这些情况,利用 start()、pause()、resume() 这些方法能很好控制消费者进行消费。这里写一个简单例子,通过 cotroller 操作暂停、恢复消费者监听容器。

  1. @RestController
  2. public class KafkaController {
  3. @Autowired
  4. private KafkaListenerEndpointRegistry registry;
  5. /**
  6. * 暂停监听容器
  7. */
  8. @GetMapping("/pause")
  9. public void pause(){
  10. registry.getListenerContainer("pause.resume").pause();
  11. }
  12. /**
  13. * 恢复监听容器
  14. */
  15. @GetMapping("/resume")
  16. public void resume(){
  17. //判断监听容器是否启动,未启动则将其启动,否则进行恢复监听容器
  18. if (!registry.getListenerContainer("pause.resume").isRunning()) {
  19. registry.getListenerContainer("pause.resume").start();
  20. }
  21. registry.getListenerContainer("pause.resume").resume();
  22. }
  23. }

在上面例子中,调用 /pause 接口可以暂停消费者监听容器,调用 /resume 接口可以恢复消费者监听容器。

8、过滤监听器中的消息

在接收消息时候可以创建一个过滤器来过滤接收的消息,这样方便我们不必处理全部消息,只接收我们需要的消息进行处理。

在 kafkaListenerContainerFactory 中配置一个过滤器 RecordFilterStrategy 对象过滤消息,这里演示下如何操作:

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String>
  21. factory = new ConcurrentKafkaListenerContainerFactory<>();
  22. factory.setConsumerFactory(consumerFactory());
  23. factory.setConcurrency(3);
  24. factory.getContainerProperties().setPollTimeout(3000);
  25. // 设置过滤器,只接收消息内容中包含 "test" 的消息
  26. RecordFilterStrategy recordFilterStrategy = new RecordFilterStrategy() {
  27. @Override
  28. public boolean filter(ConsumerRecord consumerRecord) {
  29. String value = consumerRecord.value().toString();
  30. if (value !=null && value.contains("test")) {
  31. System.err.println(consumerRecord.value());
  32. // 返回 false 则接收消息
  33. return false;
  34. }
  35. // 返回 true 则抛弃消息
  36. return true;
  37. }
  38. };
  39. // 将过滤器添添加到参数中
  40. factory.setRecordFilterStrategy(recordFilterStrategy);
  41. return factory;
  42. }
  43. /**
  44. * 监听消息,接收过滤器过滤后的消息
  45. */
  46. @KafkaListener(topics = {"test"},groupId = "group1")
  47. public void kafkaListener(String message){
  48. System.out.println("消息:"+message);
  49. }
  50. }

9、监听器异常处理

(1)、单消息消费异常处理器

  1. @Service
  2. public class ConsumerService {
  3. /**
  4. * 消息监听器
  5. */
  6. @KafkaListener( topics = {"test"},groupId = "group1",errorHandler = "listenErrorHandler")
  7. public void listen(String message) {
  8. System.out.println(message);
  9. // 创建异常,触发异常处理器
  10. throw new NullPointerException("测试错误处理器");
  11. }
  12. /**
  13. * 异常处理器
  14. */
  15. @Bean
  16. public ConsumerAwareListenerErrorHandler listenErrorHandler() {
  17. return new ConsumerAwareListenerErrorHandler() {
  18. @Override
  19. public Object handleError(Message<?> message,
  20. ListenerExecutionFailedException e,
  21. Consumer<?, ?> consumer) {
  22. System.out.println("message:" + message.getPayload());
  23. System.out.println("exception:" + e.getMessage());
  24. consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
  25. message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
  26. message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
  27. return null;
  28. }
  29. };
  30. }
  31. }

(2)、批量消费异常处理器

  1. @Service
  2. public class ConsumerService {
  3. /**
  4. * 消息监听器
  5. */
  6. @KafkaListener( topics = {"test"},groupId = "group1",errorHandler = "listenErrorHandler")
  7. public void listen(List<String> messages) {
  8. for(String msg:messages){
  9. System.out.println(msg);
  10. }
  11. // 创建异常,触发异常处理器
  12. throw new NullPointerException("测试错误处理器");
  13. }
  14. /**
  15. * 异常处理器
  16. */
  17. @Bean
  18. public ConsumerAwareListenerErrorHandler listenErrorHandler() {
  19. return new ConsumerAwareListenerErrorHandler() {
  20. @Override
  21. public Object handleError(Message<?> message,
  22. ListenerExecutionFailedException e,
  23. Consumer<?, ?> consumer) {
  24. System.out.println("message:" + message.getPayload());
  25. System.out.println("exception:" + e.getMessage());
  26. consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
  27. message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
  28. message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
  29. return null;
  30. }
  31. };
  32. }
  33. }

(3)、全局异常处理

将异常处理器添加到 kafkaListenerContainerFactory 中来设置全局异常处理。

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String>
  21. factory = new ConcurrentKafkaListenerContainerFactory<>();
  22. factory.setConsumerFactory(consumerFactory());
  23. factory.setConcurrency(3);
  24. factory.getContainerProperties().setPollTimeout(3000);
  25. // 将单条消息异常处理器添加到参数中
  26. factory.setErrorHandler(errorHandler);
  27. // 将批量消息异常处理器添加到参数中
  28. //factory.setErrorHandler(errorHandler);
  29. return factory;
  30. }
  31. /**
  32. * 单消息消费异常处理器
  33. */
  34. @Bean
  35. public ConsumerAwareListenerErrorHandler listenErrorHandler() {
  36. return new ConsumerAwareListenerErrorHandler() {
  37. @Override
  38. public Object handleError(Message<?> message,
  39. ListenerExecutionFailedException e,
  40. Consumer<?, ?> consumer) {
  41. System.out.println("message:" + message.getPayload());
  42. System.out.println("exception:" + e.getMessage());
  43. consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
  44. message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
  45. message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
  46. return null;
  47. }
  48. };
  49. }
  50. /**
  51. * 批量息消费异常处理器
  52. */
  53. @Bean
  54. public ConsumerAwareListenerErrorHandler listenErrorHandler() {
  55. return new ConsumerAwareListenerErrorHandler() {
  56. @Override
  57. public Object handleError(Message<?> message,
  58. ListenerExecutionFailedException e,
  59. Consumer<?, ?> consumer) {
  60. System.out.println("message:" + message.getPayload());
  61. System.out.println("exception:" + e.getMessage());
  62. consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
  63. message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
  64. message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
  65. return null;
  66. }
  67. };
  68. }
  69. /**
  70. * 监听消息,接收过滤器过滤后的消息
  71. */
  72. @KafkaListener(topics = {"test"},groupId = "group1")
  73. public void kafkaListener(String message){
  74. System.out.println("消息:"+message);
  75. }
  76. }

10、Kafka Consumer 手动/自动提交 Offset

       在kafka的消费者中有一个非常关键的机制,那就是 offset 机制。它使得 Kafka 在消费的过程中即使挂了或者引发再均衡问题重新分配 Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。

       Kafka中偏移量的自动提交是由参数 enable_auto_commit 和 auto_commit_interval_ms 控制的,当 enable_auto_commit=true 时,Kafka在消费的过程中会以频率为 auto_commit_interval_ms 向 Kafka 自带的 topic(__consumer_offsets) 进行偏移量提交,具体提交到哪个 Partation 是以算法:”partation=hash(group_id)%50” 来计算的。

在 Spring 中对 Kafka 设置手动或者自动提交Offset如下:

(1)、自动提交

自动提交需要配置下面两个参数:

  • auto.commit.enable=true:是否将offset维护交给kafka自动提交到zookeeper中维护,设置为true。
  • auto.commit.interval.ms=10000:自动提交时间间隔。

配置示例如下:

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为true
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(consumerFactory());
  22. // 消费者线程数
  23. factory.setConcurrency(3);
  24. // 拉取超时时间
  25. factory.getContainerProperties().setPollTimeout(3000);
  26. return factory;
  27. }
  28. /**
  29. * -------------接收消息-------------
  30. */
  31. @KafkaListener(topics = {"test"}, groupId = "group1")
  32. public void kafkaListener(String message){
  33. System.out.println("消息:"+message);
  34. }

(2)、手动提交

手动提交需要配置下面一个参数:

  • auto.commit.enable=false:是否将offset维护交给kafka自动提交到zookeeper中维护,设置为false。

然后需要在程序中设置ack模式,从而进行手动提交维护offset。

  1. @Bean
  2. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  3. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  4. factory.setConsumerFactory(consumerFactory());
  5. factory.setConcurrency(3);
  6. factory.getContainerProperties().setPollTimeout(3000);
  7. 设置ACK模式(手动提交模式,这里有七种)
  8. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  9. return factory;
  10. }

在 kafkaListenerContainerFactory 配置中设置 AckMode,它有七种模式分别为:

  • RECORD: 每处理完一条记录后提交。
  • BATCH(默认): 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。
  • TIME: 每次间隔ackTime的时间提交。
  • COUNT: 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount就提交。
  • COUNT_TIME: TIME和COUNT中任意一条满足即提交。
  • MANUAL: 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交。
  • MANUAL_IMMEDIATE: 手动调用Acknowledgment.acknowledge()后立即提交。

注意:如果设置 AckMode 模式为 MANUAL 或者 MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入 Acknowledgment 对象参数,并调用 acknowledge() 方法进行手动提交

手动提交下这里将列出七种ACK模式示例,如下:

  • ACK 模式: RECORD
  • 描述: 每处理完一条记录后提交。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(consumerFactory());
  22. factory.setConcurrency(3);
  23. factory.getContainerProperties().setPollTimeout(3000);
  24. // 设置ACK模式为RECORD
  25. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  26. return factory;
  27. }
  28. /**
  29. * -------------接收消息-------------
  30. */
  31. @KafkaListener(topics = {"test"}, groupId = "group1")
  32. public void kafkaListener(String message){
  33. System.out.println("消息:"+message);
  34. }
  • ACK 模式: BATCH
  • 描述: 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. // 设置每次批量消费数目,例如生产者生成10条数据,设置此值为4,那么需要三次批消费(三次中每次消费数目为:4,4,2)才能完成
  10. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
  11. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  12. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  13. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  14. return propsMap;
  15. }
  16. @Bean
  17. public ConsumerFactory<Integer, String> consumerFactory() {
  18. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  19. }
  20. @Bean
  21. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  22. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  23. factory.setConsumerFactory(consumerFactory());
  24. factory.setConcurrency(3);
  25. factory.getContainerProperties().setPollTimeout(3000);
  26. // 开启批量消费监听器
  27. factory.setBatchListener(true);
  28. // 设置ACK模式为BATCH
  29. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
  30. return factory;
  31. }
  32. /**
  33. * -------------接收消息-------------
  34. * 批量消费时,设置参数为List来接收数据
  35. */
  36. @KafkaListener(topics = {"test"}, groupId = "group1")
  37. public void kafkaListener(List<String> message){
  38. System.out.println("消息:"+message);
  39. }
  • ACK 模式: COUNT
  • 描述: 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount值就提交。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(consumerFactory());
  22. factory.setConcurrency(3);
  23. factory.getContainerProperties().setPollTimeout(3000);
  24. // 设置ACK模式为COUNT
  25. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT);
  26. // 设置AckCount数目,每接收AckCount条记录数就提交Offset偏移量
  27. factory.getContainerProperties().setAckCount(10);
  28. return factory;
  29. }
  30. /**
  31. * -------------接收消息-------------
  32. */
  33. @KafkaListener(topics = {"test"}, groupId = "group1")
  34. public void kafkaListener(String message){
  35. System.out.println("消息:"+message);
  36. }
  • ACK 模式: TIME
  • 描述: 每次间隔ackTime的时间提交。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(consumerFactory());
  22. factory.setConcurrency(3);
  23. factory.getContainerProperties().setPollTimeout(3000);
  24. // 设置ACK模式为TIME
  25. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.TIME);
  26. // 设置提交Ack的时间间隔,单位(ms)
  27. factory.getContainerProperties().setAckTime(1000);
  28. return factory;
  29. }
  30. /**
  31. * -------------接收消息-------------
  32. */
  33. @KafkaListener(topics = {"test"}, groupId = "group1")
  34. public void kafkaListener(String message){
  35. System.out.println("消息:"+message);
  36. }
  • ACK 模式: COUNT_TIME。
  • 描述: 每次间隔ackTime的时间或处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount值,任意一条满足即提交。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(consumerFactory());
  22. factory.setConcurrency(3);
  23. factory.getContainerProperties().setPollTimeout(3000);
  24. // 设置ACK模式为COUNT_TIME
  25. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT_TIME);
  26. // 设置提交Ack的时间间隔,单位(ms)
  27. factory.getContainerProperties().setAckTime(1000);
  28. // 设置AckCount数目,每接收AckCount条记录数就提交Offset偏移量
  29. factory.getContainerProperties().setAckCount(10);
  30. return factory;
  31. }
  32. /**
  33. * -------------接收消息-------------
  34. */
  35. @KafkaListener(topics = {"test"}, groupId = "group1")
  36. public void kafkaListener(String message){
  37. System.out.println("消息:"+message);
  38. }
  • ACK 模式: MANUAL
  • 描述: 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. // 设置每次批量消费数目,例如生产者生成10条数据,设置此值为4,那么需要三次批消费(三次中每次消费数目为:4,4,2)才能完成
  10. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
  11. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  12. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  13. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  14. return propsMap;
  15. }
  16. @Bean
  17. public ConsumerFactory<Integer, String> consumerFactory() {
  18. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  19. }
  20. @Bean
  21. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  22. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  23. factory.setConsumerFactory(consumerFactory());
  24. factory.setConcurrency(3);
  25. factory.getContainerProperties().setPollTimeout(3000);
  26. // 开启批量消费监听器
  27. factory.setBatchListener(true);
  28. // 设置ACK模式为MANUAL
  29. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  30. return factory;
  31. }
  32. /**
  33. * -------------接收消息-------------
  34. * 批量消费时,设置参数为List来接收数据,并且因为ack模式为MANUAL,所以需要手动调用acknowledge()方法提交
  35. */
  36. @KafkaListener(topics = {"test"}, groupId = "group1")
  37. public void kafkaListener(List<String> message, Acknowledgment acknowledgment){
  38. System.out.println("消息:"+message);
  39. // 手动执行acknowledge()提交offset偏移量
  40. acknowledgment.acknowledge();
  41. }
  • ACK 模式: MANUAL_IMMEDIATE
  • 描述: 手动调用Acknowledgment.acknowledge()后立即提交。
  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. // ---设置自动提交Offset为false
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. // 设置每次批量消费数目,例如生产者生成10条数据,设置此值为4,那么需要三次批消费(三次中每次消费数目为:4,4,2)才能完成
  10. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
  11. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  12. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  13. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  14. return propsMap;
  15. }
  16. @Bean
  17. public ConsumerFactory<Integer, String> consumerFactory() {
  18. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  19. }
  20. @Bean
  21. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  22. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  23. factory.setConsumerFactory(consumerFactory());
  24. factory.setConcurrency(3);
  25. factory.getContainerProperties().setPollTimeout(3000);
  26. // 开启批量消费监听器
  27. factory.setBatchListener(true);
  28. // 设置ACK模式为MANUAL_IMMEDIATE
  29. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  30. return factory;
  31. }
  32. /**
  33. * -------------接收消息-------------
  34. * 批量消费时,设置参数为List来接收数据,并且因为ack模式为MANUAL,所以需要手动调用acknowledge()方法提交
  35. */
  36. @KafkaListener(topics = {"test"}, groupId = "group1")
  37. public void kafkaListener(List<String> message, Acknowledgment acknowledgment){
  38. System.out.println("消息:"+message);
  39. // 手动执行acknowledge()提交offset偏移量
  40. acknowledgment.acknowledge();
  41. }
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号