当前位置:   article > 正文

SpringBoot 3.1.7 集成Kafka 3.5.0_spring-kafka 版本

spring-kafka 版本

一、背景

写这边篇文章的目的,是记录我在集成kafka客户端遇到的一些问题,文章会记录整个接入的过程,其中会遇到几个坑,如果需要最终版本,直接看最后一节就行了,感觉Spring-Kafka的文档太少了,如果采用SpringBoot集成的方式接入,一不小可能就会踩坑

二、操作步骤

1 添加依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2 添加配置文件

  1. spring:
  2. profiles:
  3. active: dev
  4. application:
  5. name: goods-center
  6. kafka:
  7. bootstrap-servers: 192.168.31.114:9092
  8. producer:
  9. acks: all
  10. timeout.ms: 5000
  11. # 值序列化:使用Json
  12. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  13. key-serializer: org.apache.kafka.common.serialization.LongSerializer
  14. enable:
  15. idempotence: true # 默认为True
  16. consumer:
  17. group-id: goods-center
  18. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  19. key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
  20. enable-auto-commit: false # 取消自动提交

3 生产者代码

  1. package com.ychen.goodscenter.fafka;
  2. import com.ychen.goodscenter.vo.req.SubmitOrderReq;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class MessageProducer {
  10. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  11. @Autowired
  12. private KafkaTemplate<Long, SubmitOrderReq> kafkaTemplate;
  13. public void sendOrderMessage(SubmitOrderReq msg) {
  14. kafkaTemplate.send(TopicConstants.ORDER_MESSAGE_TOPIC, msg.getOrderId(), msg);
  15. logger.info("order-message-topic message sent, orderId: {}", msg.getOrderId());
  16. }
  17. }

4 消费者代码

  1. package com.ychen.goodscenter.fafka;
  2. import com.ychen.goodscenter.service.OrderService;
  3. import com.ychen.goodscenter.vo.req.SubmitOrderReq;
  4. import org.apache.kafka.clients.consumer.Consumer;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.dao.DuplicateKeyException;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. public class MessageListener {
  13. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  14. @Autowired
  15. private OrderService orderService;
  16. @KafkaListener(topics = "order-message-topic")
  17. public void processMessage(Consumer<Long, SubmitOrderReq> consumer, SubmitOrderReq submitOrderReq) {
  18. try {
  19. logger.info("order-message-topic message received, orderId: {}", submitOrderReq.getOrderId());
  20. orderService.submitOrder(submitOrderReq);
  21. // 同步提交
  22. consumer.commitSync();
  23. logger.info("order-message-topic message acked: orderId: {}", submitOrderReq.getOrderId());
  24. } catch (DuplicateKeyException dupe) {
  25. // 处理异常情况
  26. logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
  27. // 重复数据,忽略掉,同步提交
  28. consumer.commitSync();
  29. } catch (Exception e) {
  30. // 处理异常情况
  31. logger.error("order-message-topic message processMessage error", e);
  32. }
  33. }
  34. }

三、开始踩坑了

1 添加信任自己包

  1. Caused by: java.lang.IllegalArgumentException: The class 'com.ychen.goodscenter.vo.req.SubmitOrderReq' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
  2. at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
  3. at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
  4. at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)
  5. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
  6. ... 15 common frames omitted

原因: 因为我在消费消息时用了json序列化,需要给这个序列化,添加信任自己包,不加json序列号会报错

解决方法:添加配置

  1. spring:
  2. kafka:
  3. consumer:
  4. properties:
  5. spring.json.trusted.packages: "com.ychen.**"

解决途径:百度

2 consumer.commitSync(); 无效

问题发现:当我正在批量消费消息时,强制重启应用进程,发现有部分消息丢失了,没有处理

我发了5000个样本请求,最后只生成了4912 个订单(中途强制重启了2次)

问题分析:有2中可能

第一种:之前配置的enable-auto-commit: false  是无效的。

第二种: consumer.commitSync(); 一次将批量拉取的offset提交了

问题排查:

通过在 consumer.commitSync(); 代码之前和之后分别打一个断点,然后发送一批数据

consumer.commitSync(); 之前:

consumer.commitSync(); 之后

结果发生了突变,说明是consumer.commitSync();执行之后引发的offset突变

翻阅源码:

总体而言,通过官方文档和源代码,我们可以确定 commitSync() 提交的是已经成功拉取到的消息的最大 offset,而不是当前正在处理的消息的 offset。

3 缺少AckMode 配置

既然consumer.commitSync();无法在批量处理消息的环境保证消息不丢失,那么需要寻找新的解决方案:

在org.springframework.kafka.annotation.KafkaListener 类的注释上面有写到可以使用org.springframework.kafka.support.Acknowledgment

然后我们消费者的代码改造后为:

  1. package com.ychen.goodscenter.fafka;
  2. import com.ychen.goodscenter.service.OrderService;
  3. import com.ychen.goodscenter.vo.req.SubmitOrderReq;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.dao.DuplicateKeyException;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.kafka.support.Acknowledgment;
  11. import org.springframework.stereotype.Component;
  12. @Component
  13. public class MessageListener {
  14. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  15. @Autowired
  16. private OrderService orderService;
  17. @KafkaListener(topics = "order-message-topic")
  18. public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
  19. try {
  20. logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
  21. orderService.submitOrder(record.value());
  22. // 同步提交
  23. acknowledgment.acknowledge();
  24. logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
  25. } catch (DuplicateKeyException dupe) {
  26. // 处理异常情况
  27. logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
  28. // 重复数据,忽略掉,同步提交
  29. acknowledgment.acknowledge();
  30. } catch (Exception e) {
  31. // 处理异常情况
  32. logger.error("order-message-topic message processMessage error", e);
  33. }
  34. }
  35. }
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

这里我也有点不懂了,明明已经配置自动提交了,还需要配置 ack-mode: MANUAL,既然他说要那就配置吧

在application.yml 增加配置

  1. spring:
  2. kafka:
  3. listener:
  4. ack-mode: MANUAL

现在准备2000个样本,然后让消费者实例强制重启2次,看看数据库的订单数量是否为2000条

现在正确了,支持系统宕机仍然不丢失消息了

四、最终的配置文件和消费者代码

1 配置文件

  1. spring:
  2. profiles:
  3. active: dev
  4. application:
  5. name: goods-center
  6. kafka:
  7. bootstrap-servers: 192.168.31.114:9092
  8. listener:
  9. ack-mode: MANUAL
  10. producer:
  11. acks: all
  12. timeout.ms: 5000
  13. # 值序列化:使用Json
  14. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  15. key-serializer: org.apache.kafka.common.serialization.LongSerializer
  16. enable:
  17. idempotence: true # 默认为True
  18. consumer:
  19. properties:
  20. spring.json.trusted.packages: "com.ychen.**" # 信任自己包,不加json序列号会报错
  21. group-id: goods-center
  22. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  23. key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
  24. enable-auto-commit: false # 取消自动提交

2 消费者代码

  1. package com.ychen.goodscenter.fafka;
  2. import com.ychen.goodscenter.service.OrderService;
  3. import com.ychen.goodscenter.vo.req.SubmitOrderReq;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.dao.DuplicateKeyException;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.kafka.support.Acknowledgment;
  11. import org.springframework.stereotype.Component;
  12. @Component
  13. public class MessageListener {
  14. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  15. @Autowired
  16. private OrderService orderService;
  17. @KafkaListener(topics = "order-message-topic")
  18. public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
  19. try {
  20. logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
  21. orderService.submitOrder(record.value());
  22. // 同步提交
  23. acknowledgment.acknowledge();
  24. logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
  25. } catch (DuplicateKeyException dupe) {
  26. // 处理异常情况
  27. logger.error("order-message-topic message error DuplicateKeyException", dupe);
  28. // 重复数据,忽略掉,同步提交
  29. acknowledgment.acknowledge();
  30. } catch (Exception e) {
  31. // 处理异常情况
  32. logger.error("order-message-topic message error unknown ", e);
  33. }
  34. }
  35. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/985220
推荐阅读
相关标签
  

闽ICP备14008679号