赞
踩
写这边篇文章的目的,是记录我在集成kafka客户端遇到的一些问题,文章会记录整个接入的过程,其中会遇到几个坑,如果需要最终版本,直接看最后一节就行了,感觉Spring-Kafka的文档太少了,如果采用SpringBoot集成的方式接入,一不小可能就会踩坑
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- spring:
- profiles:
- active: dev
- application:
- name: goods-center
- kafka:
- bootstrap-servers: 192.168.31.114:9092
- producer:
- acks: all
- timeout.ms: 5000
- # 值序列化:使用Json
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- key-serializer: org.apache.kafka.common.serialization.LongSerializer
- enable:
- idempotence: true # 默认为True
- consumer:
- group-id: goods-center
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
- key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
- enable-auto-commit: false # 取消自动提交

- package com.ychen.goodscenter.fafka;
-
-
- import com.ychen.goodscenter.vo.req.SubmitOrderReq;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MessageProducer {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- @Autowired
- private KafkaTemplate<Long, SubmitOrderReq> kafkaTemplate;
-
- public void sendOrderMessage(SubmitOrderReq msg) {
- kafkaTemplate.send(TopicConstants.ORDER_MESSAGE_TOPIC, msg.getOrderId(), msg);
- logger.info("order-message-topic message sent, orderId: {}", msg.getOrderId());
- }
- }

- package com.ychen.goodscenter.fafka;
-
- import com.ychen.goodscenter.service.OrderService;
- import com.ychen.goodscenter.vo.req.SubmitOrderReq;
- import org.apache.kafka.clients.consumer.Consumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.dao.DuplicateKeyException;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MessageListener {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- @Autowired
- private OrderService orderService;
-
- @KafkaListener(topics = "order-message-topic")
- public void processMessage(Consumer<Long, SubmitOrderReq> consumer, SubmitOrderReq submitOrderReq) {
- try {
- logger.info("order-message-topic message received, orderId: {}", submitOrderReq.getOrderId());
- orderService.submitOrder(submitOrderReq);
- // 同步提交
- consumer.commitSync();
- logger.info("order-message-topic message acked: orderId: {}", submitOrderReq.getOrderId());
- } catch (DuplicateKeyException dupe) {
- // 处理异常情况
- logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
- // 重复数据,忽略掉,同步提交
- consumer.commitSync();
- } catch (Exception e) {
- // 处理异常情况
- logger.error("order-message-topic message processMessage error", e);
- }
- }
- }

- Caused by: java.lang.IllegalArgumentException: The class 'com.ychen.goodscenter.vo.req.SubmitOrderReq' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
- at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
- at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
- at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)
- at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
- ... 15 common frames omitted
原因: 因为我在消费消息时用了json序列化,需要给这个序列化,添加信任自己包,不加json序列号会报错
解决方法:添加配置
- spring:
- kafka:
- consumer:
- properties:
- spring.json.trusted.packages: "com.ychen.**"
解决途径:百度
问题发现:当我正在批量消费消息时,强制重启应用进程,发现有部分消息丢失了,没有处理
我发了5000个样本请求,最后只生成了4912 个订单(中途强制重启了2次)
问题分析:有2中可能
第一种:之前配置的enable-auto-commit: false 是无效的。
第二种: consumer.commitSync(); 一次将批量拉取的offset提交了
问题排查:
通过在 consumer.commitSync(); 代码之前和之后分别打一个断点,然后发送一批数据
consumer.commitSync(); 之前:
consumer.commitSync(); 之后
结果发生了突变,说明是consumer.commitSync();执行之后引发的offset突变
翻阅源码:
总体而言,通过官方文档和源代码,我们可以确定 commitSync()
提交的是已经成功拉取到的消息的最大 offset,而不是当前正在处理的消息的 offset。
既然consumer.commitSync();无法在批量处理消息的环境保证消息不丢失,那么需要寻找新的解决方案:
在org.springframework.kafka.annotation.KafkaListener 类的注释上面有写到可以使用org.springframework.kafka.support.Acknowledgment
然后我们消费者的代码改造后为:
- package com.ychen.goodscenter.fafka;
-
- import com.ychen.goodscenter.service.OrderService;
- import com.ychen.goodscenter.vo.req.SubmitOrderReq;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.dao.DuplicateKeyException;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MessageListener {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- @Autowired
- private OrderService orderService;
-
- @KafkaListener(topics = "order-message-topic")
- public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
- try {
- logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
- orderService.submitOrder(record.value());
- // 同步提交
- acknowledgment.acknowledge();
- logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
- } catch (DuplicateKeyException dupe) {
- // 处理异常情况
- logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
- // 重复数据,忽略掉,同步提交
- acknowledgment.acknowledge();
- } catch (Exception e) {
- // 处理异常情况
- logger.error("order-message-topic message processMessage error", e);
- }
- }
- }

Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
这里我也有点不懂了,明明已经配置自动提交了,还需要配置 ack-mode: MANUAL,既然他说要那就配置吧
在application.yml 增加配置
- spring:
- kafka:
- listener:
- ack-mode: MANUAL
现在准备2000个样本,然后让消费者实例强制重启2次,看看数据库的订单数量是否为2000条
现在正确了,支持系统宕机仍然不丢失消息了
- spring:
- profiles:
- active: dev
- application:
- name: goods-center
- kafka:
- bootstrap-servers: 192.168.31.114:9092
- listener:
- ack-mode: MANUAL
- producer:
- acks: all
- timeout.ms: 5000
- # 值序列化:使用Json
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- key-serializer: org.apache.kafka.common.serialization.LongSerializer
- enable:
- idempotence: true # 默认为True
- consumer:
- properties:
- spring.json.trusted.packages: "com.ychen.**" # 信任自己包,不加json序列号会报错
- group-id: goods-center
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
- key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
- enable-auto-commit: false # 取消自动提交

- package com.ychen.goodscenter.fafka;
-
- import com.ychen.goodscenter.service.OrderService;
- import com.ychen.goodscenter.vo.req.SubmitOrderReq;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.dao.DuplicateKeyException;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MessageListener {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- @Autowired
- private OrderService orderService;
-
- @KafkaListener(topics = "order-message-topic")
- public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
- try {
- logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
- orderService.submitOrder(record.value());
- // 同步提交
- acknowledgment.acknowledge();
- logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
- } catch (DuplicateKeyException dupe) {
- // 处理异常情况
- logger.error("order-message-topic message error DuplicateKeyException", dupe);
- // 重复数据,忽略掉,同步提交
- acknowledgment.acknowledge();
- } catch (Exception e) {
- // 处理异常情况
- logger.error("order-message-topic message error unknown ", e);
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。