赞
踩
项目目录结构如下
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
package com.example.springbootkafka.config; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.List; import java.util.Map; @Configuration @Slf4j public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.bootstrapServers}") private String bootstrapServers; @Value("${spring.kafka.consumer.topics}") private List<String> topics; @Value("${spring.kafka.consumer.groupId}") private String groupId; @Value("${spring.kafka.consumer.sessionTimeOut}") private String sessionTimeOut; @Value("${spring.kafka.consumer.enableAutoCommit}") private String enableAutoCommit; @Value("${spring.kafka.consumer.autoCommitInterval}") private String autoCommitInterval; @Value("${spring.kafka.consumer.maxPollRecords}") private String maxPollRecords; @Value("${spring.kafka.consumer.maxPollInterval}") private String maxPollInterval; @Value("${spring.kafka.consumer.heartbeatInterval}") private String heartbeatInterval; @Value("${spring.kafka.consumer.keyDeserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.valueDeserializer}") private String valueDeserializer; @Value("${spring.kafka.consumer.autoOffsetReset}") private String autoOffsetReset; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 并发数 多个微服务实例会均分 factory.setConcurrency(3); factory.setBatchListener(true); ContainerProperties containerProperties = factory.getContainerProperties(); // 是否设置手动提交 containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> consumerConfigs = consumerConfigs(); log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs)); return new DefaultKafkaConsumerFactory<>(consumerConfigs); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // 服务器地址 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 是否自动提交 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); // 自动提交间隔 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); //会话时间 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut); //key序列化 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); //value序列化 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); // 心跳时间 propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval); // 分组id propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //消费策略 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); // poll记录数 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //poll时间 propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); return propsMap; } }
package com.example.springbootkafka.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component public class Consumer { // @KafkaListener(topics = {"${spring.kafka.consumer.topics}"}, // groupId = "${spring.kafka.consumer.groupId}", // containerFactory = "kafkaListenerContainerFactory", // properties = {"${spring.kafka.consumer.autoOffsetReset}"}) @KafkaListener(topics = {"#{T(java.util.Arrays).asList('${spring.kafka.consumer.topics}'.split(','))}"}, groupId = "${spring.kafka.consumer.groupId}", containerFactory = "kafkaListenerContainerFactory", concurrency = "1", properties = {"${spring.kafka.consumer.autoOffsetReset}"}) public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { for (ConsumerRecord<String, String> record : records) { log.info("topic_test 消费了: Topic:" + record.topic() + ",Message:" + record.value()); //手动提交偏移量 ack.acknowledge(); } } }
package com.example.springbootkafka.service; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.messaging.Message; import java.util.concurrent.ExecutionException; public interface ProducerService { /** * 发送同步消息 * * @param topic * @param data * @throws ExecutionException * @throws InterruptedException */ void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException; /** * 发送普通消息 * * @param topic * @param data */ void sendMessage(String topic, String data); /** * 发送带附加信息的消息 * * @param record */ void sendMessage(ProducerRecord<String, String> record); /** * 发送Message消息 * * @param message */ void sendMessage(Message<String> message); /** * 发送带key的消息 * * @param topic * @param key * @param data */ void sendMessage(String topic, String key, String data); void sendMessage(String topic, Integer partition, String key, String data); void sendMessage(String topic, Integer partition, Long timestamp, String key, String data); }
package com.example.springbootkafka.service.impl; import com.example.springbootkafka.service.ProducerService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.messaging.Message; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.concurrent.ExecutionException; @Component @Slf4j @EnableAsync public class ProducerServiceImpl implements ProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 发送同步消息 * * @param topic * @param data * @throws ExecutionException * @throws InterruptedException */ @Override public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException { SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); log.debug("sendSyncMessage 发送同步消息成功!发送的主题为:{}", recordMetadata.topic()); } /** * 发送普通消息 * * @param topic * @param data */ @Override public void sendMessage(String topic, String data) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data); future.addCallback( success -> log.info("sendMessage topic={}发送消息成功!",topic), failure -> log.error("sendMessage 发送消息失败!失败原因是:{}", failure.getMessage()) ); } /** * 发送带附加信息的消息 * * @param record */ @Override public void sendMessage(ProducerRecord<String, String> record) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { log.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult<String, String> sendResult) { RecordMetadata metadata = sendResult.getRecordMetadata(); log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } /** * 发送Message消息 * * @param message */ @Override public void sendMessage(Message<String> message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { log.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult<String, String> sendResult) { RecordMetadata metadata = sendResult.getRecordMetadata(); log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } /** * 发送带key的消息 * * @param topic * @param key * @param data */ @Override public void sendMessage(String topic, String key, String data) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data); log.info("发送到:{} ,消息体为:{}",topic,data); future.addCallback( success -> log.debug("发送消息成功!"), failure -> log.error("发送消息失败!失败原因是:{}", failure.getMessage()) ); } @Override public void sendMessage(String topic, Integer partition, String key, String data) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { log.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult<String, String> sendResult) { RecordMetadata metadata = sendResult.getRecordMetadata(); log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } @Override public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { log.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult<String, String> sendResult) { RecordMetadata metadata = sendResult.getRecordMetadata(); log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } }
server: port: 8080 spring: kafka: bootstrap-servers: 192.168.80.251:9092 producer: batch-size: 16384 #批次大小,默认16k acks: all #ACK应答级别,指定分区中必须要有多少个副本收到消息之后才会认为消息成功写入,默认为1只要分区的leader副本成功写入消息;0表示不需要等待任何服务端响应;-1或all需要等待ISR中所有副本都成功写入消息 retries: 3 #重试次数 value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer buffer-memory: 33554432 #缓冲区大小,默认32M client-id: abcdefg #客户端ID compression-type: none #消息压缩方式,默认为none,另外有gzip、snappy、lz4 properties: retry.backoff.ms: 100 #重试时间间隔,默认100 linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间 max.request.size: 1048576 #默认1MB,表示发送消息最大值 connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接 receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用操作系统默认值 send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用操作系统默认值 request.timeout.ms: 30000 #默认30000ms,表示等待请求响应的最长时间 consumer: bootstrapServers: 192.168.80.251:9092 topics: testTopic1,testTopic2 groupId: test #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance sessionTimeOut: 30000 #取消自动提交,即便如此 spring会帮助我们自动提交 enableAutoCommit: false #自动提交间隔 autoCommitInterval: 1000 maxPollRecords: 50 #300秒的提交间隔,如果程序大于300秒提交,会报错 maxPollInterval: 300000 #心跳间隔 keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer autoOffsetReset: latest heartbeatInterval: 10000
package com.example.springbootkafka.controller; import com.example.springbootkafka.service.ProducerService; import lombok.extern.slf4j.Slf4j; import org.apache.catalina.connector.Response; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @RequestMapping("/test") public class Controller { @Autowired ProducerService producerService; @GetMapping("/sendMsg") public Integer sendMsg(@RequestParam("msg") String msg){ producerService.sendMessage("testTopic1","key",msg); return Response.SC_OK; } }
直接调用接口地址:127.0.0.1:8080/test/sendMsg?msg=11231231231
控制台打印消息如下,则表示发送、消费消息成功
docker-compose.yaml文件如下,创建文件后,将 192.168.80.251
改为自己的虚拟机的ip,然后使用 docker-compose up -d
命令启动,一般是先启动zookeeper,再启动kafka,如果kafka启动失败了,使用 docker restart kafka
命令重启试试。
version: "3.3" services: zookeeper: container_name: zookeeper image: wurstmeister/zookeeper restart: always networks: - zkp-kafka ports: - "2181:2181" deploy: replicas: 1 update_config: parallelism: 2 delay: 10s restart_policy: condition: on-failure kafka: image: wurstmeister/kafka container_name: kafka restart: always environment: KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.80.251:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 ports: - 9092:9092 networks: - zkp-kafka depends_on: - zookeeper deploy: replicas: 1 update_config: parallelism: 2 delay: 10s restart_policy: condition: on-failure networks: zkp-kafka:
地址:https://gitee.com/wangyunchao6/spring-boot-kafka.git
当enableAutoCommit设置为true时,代表自动提交偏移量,此时自动提交间隔autoCommitInterval默认为5s,当消费代码执行时间超过5s时,kafka就会报错;如果修改autoCommitInterval为10s,即使程序在10s内执行完毕,也会等到第10s才会提交偏移量,再消费下一条数据。所以建议使用手动提交。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。