当前位置:   article > 正文

Spring Boot、kafka、spring-kafka 生产者消费者实践(从搭建kafka集群开始)_auto.create.topics.enable springboot

auto.create.topics.enable springboot

一、搭建kafka集群

参考文档:http://kafka.apache.org/quickstart 官方文档讲的很详细,而且没坑,照着做很快就可以搭好

注意点 or 建议:

1、在Linux下,启动的kafka集群经常无故退出,看日志也没有报错,就是启动了关闭流程,正常关闭。

      解决方案:用守护进程启动,参考:https://blog.csdn.net/xiaoyu_bd/article/details/52268659

bin/kafka-server-start.sh  -daemon  config/server.properties > k0.log &

2、kafka各项配置以及默认值说明:http://kafka.apache.org/documentation/#configuration  还是官方文档

3、kafka是Java进程,因此可以用 jps 命令方便的查看对应的端口。

4、以下配置为 监听地址,默认是 localhost,这样的话无法远程连接,需要配置为 特定的IP地址,然后用配置的IP来连接。

listeners=PLAINTEXT://172.17.10.89:9092

如上配置,本地连接也需要 使用 172.17.10.89:9092这个地址,而不是 localhost。

二、kafka介绍

参考文档:http://kafka.apache.org/intro 官方文档  

                  https://www.jianshu.com/p/d3e963ff8b70  网友的中文版本,很详细,但要注意有些配置在新版本中发生了变化,比如:                              auto.create.topics.auto 变成了  auto.create.topics.enable  并且默认值是 true topic不存在时,按照默认配置创建topic

                  https://www.jianshu.com/p/4e00dff97f39  关闭 offset自动提交,让  spring-kafka 来提交offset

                  https://blog.csdn.net/lishuangzhe7047/article/details/74530417  kafka auto.offset.reset值详解,offset缺失情况下的消费者消                      费策略

                  网上文档很多,不要犹豫该看哪些,而是都看下,对照着看。

三、Spring Boot 集成 kafka 

pom spring-kafka 的版本要稍微注意下 我用的 kafka_2.11-2.0.0 配合 spring-kafka-1.1.3.RELEASE 可以正常使用,但是用  spring-kafka-2.xxx的时候 项目无法启动,会报错:not found

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>1.1.3.RELEASE</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
  7. <dependency>
  8. <groupId>org.projectlombok</groupId>
  9. <artifactId>lombok</artifactId>
  10. <version>1.18.0</version>
  11. <scope>provided</scope>
  12. </dependency>

yml配置:

  1. spring:
  2. kafka:
  3. bootstrap-servers: 172.17.10.89:9092,172.17.10.89:9093,172.17.10.89:9094
  4. producer:
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. batch-size: 65536
  8. buffer-memory: 524288
  9. consumer:
  10. group-id: default-group #默认组id 后面会配置多个消费者组
  11. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  13. auto-offset-reset: latest
  14. enable-auto-commit: false #关闭自动提交 改由spring-kafka提交
  15. auto-commit-interval: 100
  16. max-poll-records: 20 #批量消费 一次接收的最大数量

bootstrap-servers 这边配置的地址 可以事先看下 是否可以访问到对应的端口

配置读取类:

  1. import lombok.Data;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * @author fandong
  6. * @create 2018/11/1
  7. */
  8. @Configuration
  9. @Data
  10. public class KafkaConsumerProps {
  11. @Value("${spring.kafka.bootstrap-servers}")
  12. private String bootstrapServers;
  13. @Value("${spring.kafka.consumer.group-id}")
  14. private String defaultGroupId;
  15. @Value("${spring.kafka.consumer.auto-offset-reset}")
  16. private String autoOffsetReset;
  17. @Value("${spring.kafka.consumer.enable-auto-commit}")
  18. private String enableAutoCommit;
  19. @Value("${spring.kafka.consumer.auto-commit-interval}")
  20. private String autoCommitInterval;
  21. @Value("${spring.kafka.consumer.max-poll-records}")
  22. private String maxPollRecords;
  23. public KafkaConsumerProps() {
  24. }
  25. }

消费者配置类,配置多个消费者组、批量消费、并发数

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  7. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  8. import org.springframework.kafka.core.ConsumerFactory;
  9. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  10. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. /**
  14. * @author fandong
  15. * @create 2018/11/1
  16. */
  17. @Configuration
  18. public class KafkaConsumerConfig {
  19. @Autowired
  20. private KafkaConsumerProps kafkaConsumerProps;
  21. private static final String GROUP0_ID = "group0";
  22. private static final String GROUP1_ID = "group1";
  23. @Bean
  24. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory0() {
  25. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  26. factory.setConsumerFactory(consumerFactory0());
  27. //对应topic分区数
  28. factory.setConcurrency(3);
  29. //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  30. factory.setBatchListener(true);
  31. factory.getContainerProperties().setPollTimeout(3000);
  32. return factory;
  33. }
  34. public ConsumerFactory<String, String> consumerFactory0() {
  35. Map<String, Object> map = consumerConfigs();
  36. map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID);
  37. return new DefaultKafkaConsumerFactory<>(map);
  38. }
  39. @Bean
  40. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
  41. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  42. factory.setConsumerFactory(consumerFactory1());
  43. //对应topic分区数
  44. factory.setConcurrency(3);
  45. //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  46. factory.setBatchListener(true);
  47. factory.getContainerProperties().setPollTimeout(3000);
  48. return factory;
  49. }
  50. public ConsumerFactory<String, String> consumerFactory1() {
  51. Map<String, Object> map = consumerConfigs();
  52. map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1_ID);
  53. return new DefaultKafkaConsumerFactory<>(map);
  54. }
  55. public Map<String, Object> consumerConfigs() {
  56. Map<String, Object> propsMap = new HashMap<>(16);
  57. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProps.getBootstrapServers());
  58. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProps.getEnableAutoCommit());
  59. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConsumerProps.getAutoCommitInterval());
  60. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  61. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  62. propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProps.getDefaultGroupId());
  63. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProps.getAutoOffsetReset());
  64. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerProps.getMaxPollRecords());
  65. return propsMap;
  66. }
  67. }

注意:

1、factory.setConcurrency(3);  此处设置的目的在于:假设 topic test 下有 0、1、2三个 partition,Spring Boot中只有一个 @KafkaListener() 消费者订阅此 topic,此处设置并发为3,启动后 会有三个不同的消费者分别订阅 p0、p1、p2,本地实际有三个消费者线程。而 factory.setConcurrency(1); 的话 本地只有一个消费者线程, p0、p1、p2被同一个消费者订阅。由于 一个partition只能被同一个消费者组下的一个消费者订阅,对于只有一个 partition的topic,即使设置 并发为3,也只会有一个消费者,多余的消费者没有 partition可以订阅。

2、factory.setBatchListener(true); 设置批量消费 ,每个批次数量在Kafka配置参数ConsumerConfig.MAX_POLL_RECORDS_CONFIG中配置,限制的是 一次批量接收的最大条数,而不是 等到达到最大条数才接收,这点容易被误解。实际测试时,接收是实时的,当生产者大量写入时,一次批量接收的消息数量为 配置的最大条数。

生产者我们借助自动配置,在yml文件中加入生产者配置之后,直接注入 KafkaTemplate 即可使用。

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.kafka.support.SendResult;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.util.concurrent.ListenableFuture;
  10. import java.util.List;
  11. /**
  12. * @author fandong
  13. * @create 2018/11/1
  14. */
  15. @Service
  16. public class KafkaServiceImpl implements KafkaService {
  17. private final KafkaTemplate<String, String> kafkaTemplate;
  18. private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
  19. @Autowired
  20. public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {
  21. this.kafkaTemplate = (KafkaTemplate<String, String>) kafkaTemplate;
  22. }
  23. @Override
  24. public void send(String topic, String value) {
  25. ListenableFuture<SendResult<String, String>> resultListenableFuture = kafkaTemplate.send(topic, value);
  26. resultListenableFuture.addCallback(
  27. successCallback -> logger.info("发送成功:topic= " + topic + " value= " + value),
  28. failureCallback -> logger.info("发送失败:topic= " + topic + " value= " + value));
  29. }
  30. @Override
  31. @KafkaListener(topics = {"test"}, containerFactory = "kafkaListenerContainerFactory0")
  32. public void kafkaConsumerTest(String msg) {
  33. logger.info("接收到消息--" + msg);
  34. }
  35. @Override
  36. @KafkaListener(topics = {"3-test"}, containerFactory = "kafkaListenerContainerFactory0")
  37. public void listenPartition0(List<ConsumerRecord<String, String>> records) {
  38. System.out.println(records.size());
  39. for (ConsumerRecord<String, String> consumerRecord : records){
  40. String value = consumerRecord.value();
  41. logger.info("a 消息:partition " + consumerRecord.partition() + " value " + consumerRecord.value() );
  42. }
  43. }
  44. @Override
  45. @KafkaListener(topics = {"3-test"}, containerFactory = "kafkaListenerContainerFactory1")
  46. public void listenPartition2(List<ConsumerRecord<String, String>> records) {
  47. System.out.println(records.size());
  48. for (ConsumerRecord<String, String> consumerRecord : records){
  49. String value = consumerRecord.value();
  50. try {
  51. Thread.sleep(10000);
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. logger.info("c 消息:partition " + consumerRecord.partition() + " value " + consumerRecord.value() + " thread id " + Thread.currentThread().getName());
  56. }
  57. }
  58. }

指定  containerFactory = "kafkaListenerContainerFactory1" 参数给消费者分组,值为 之前定义的 KafkaListenerContainerFactory的 Bean 名称,不指定的情况下 默认是 方法名称。

注:topic  3-test下有 3个partition,由于之前配置了 factory.setConcurrency(3); 项目启动之后,本地会有三个消费者线程。

使用如下命令可以查看各个消费者组的情况  以下为查看  group0消费组,可以看到每个partition由不同的消费者订阅。

  1. bin/kafka-consumer-groups.sh --bootstrap-server 172.17.10.89:9092 --describe --group group0
  2. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test 0 12 12 0 consumer-1-9419e037-1501-4f33-85c9-25df75e3a5a9 /172.17.10.33 consumer-1
  4. 3-test 1 1400 1400 0 consumer-8-1a520c8e-c412-4178-a76f-77c68e7472b7 /172.17.10.33 consumer-8
  5. 3-test 0 1398 1398 0 consumer-7-ede99b63-ea10-48dd-be62-61139360e39c /172.17.10.33 consumer-7
  6. 3-test 2 1397 1397 0 consumer-9-4bacc023-7de7-4e98-aefd-bb39d0bf6547 /172.17.10.33 consumer-9

四、关于提高消费者消费能力的思考

kafka写具有很好的性能,而消费者在消费时往往会有相对耗时的操作,所以经常出现 消费者性能跟不上的情况。

思路:

1、在topic下适当创建多个 partition,然后使用多个消费者来消费多个partition

2、使用批量消费,一次接收 多条消息,相比一个一个接收,(猜)可以减少IO次数,提高速度

3、消费者再使用 线程池配合适当长度的阻塞队列,进一步提高处理能力(需要分析任务类型以及考虑处理器的能力)。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/879681
推荐阅读
相关标签
  

闽ICP备14008679号