赞
踩
Kafka 特点
Kafka 最早是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,之后成为 Apache 的顶级项目。主要特点如下:
同时为发布和订阅提供高吞吐量
Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB 级以上数据也能保证常数时间的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。
消息持久化
将消息持久化到磁盘,因此可用于批量消费,例如 ETL 以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。
分布式
支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输。这样易于向外扩展,所有的producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
消费消息采用 pull 模式
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。
支持 online 和 offline 的场景。
同时支持离线数据处理和实时数据处理
Broker
Kafka 集群中的一台或多台服务器统称为 Broker
Topic
每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不同 Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
Partition
Topic 物理上的分组,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)
Producer
消息和数据的生产者,可以理解为往 Kafka 发消息的客户端
Consumer
消息和数据的消费者,可以理解为从 Kafka 取消息的客户端
Consumer Group
每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group)。 这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。如果要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。
group概念可以类似的必做rabbit里的queue。一个group里的消息只能被一个消费者消费。
topic的消息会发给所有订阅的用户组。我们可以自由的根据业务定制我们的实现。
导入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置
spring: jackson: serialization: FAIL_ON_EMPTY_BEANS: false # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 47.114.161.233:9991 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 # Kafka Producer 配置项 producer: acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 retries: 3 # 发送失败时,重试发送的次数 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化 logging: level: org: springframework: kafka: debug # spring-kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别 apache: kafka: debug # kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别 server: port: 9991
创建日志消息类,模拟发送日志对象到kafka
@Data @NoArgsConstructor public class Log { /** * 日志消息 */ String msg; /** * 日志类型 */ String type = "手动操作"; /** * 日志时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") Date date; }
创建发送消息的controller,便于调试
@RestController public class Provide1 { @Resource private KafkaTemplate<Object, Object> kafkaTemplate; /** * 同步发送消息 * @param msg * @return * @throws ExecutionException * @throws InterruptedException */ @GetMapping("syncSend") public Object syncSend(String msg) throws ExecutionException, InterruptedException { // 创建log 消息 Log log = new Log(); log.setMsg(msg); log.setDate(new Date()); // 同步发送消息 SendResult<Object, Object> logTopic = kafkaTemplate.send("logTopic", log).get(); return logTopic; } /** * 异步发送消息不会等待结果 * @param msg * @return */ @GetMapping("asyncSend") public ListenableFuture<SendResult<Object, Object>> asyncSend(String msg) { // 创建log 消息 Log log = new Log(); log.setMsg(msg); log.setDate(new Date()); // 异步发送消息 return kafkaTemplate.send("logTopic", log); } }
导入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
消费者application.yml配置
spring: jackson: serialization: FAIL_ON_EMPTY_BEANS: false # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 47.114.161.233:9991 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 # Kafka Consumer 配置项 consumer: auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Kafka Consumer Listener 监听器配置 listener: missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 logging: level: org: springframework: kafka: error # spring-kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别 apache: kafka: error # kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别 server: port: 9001
建立一个消费者监听类
@Component
public class Customer1 {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = "logTopic",
groupId = "consumer-group-logTopic-1")
public void onMessage(String message, ConsumerRecord record) {
logger.info("(正常)[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
这里我们时候用的cnsumer-group为 consumer-group-logTopic-1
根据概念,我们的消息会被接受并消费一次。

调用发送消息的接口。
可以观察到,消费者已经接收并消费了。
2020-09-16 13:26:53.753 INFO 30752 --- [ntainer#0-0-C-1] com.yu.customer_1.customer.Customer1 : (正常)[onMessage][线程编号:150 消息内容:{"msg":"demoData","type":"手动操作","date":"2020-09-16 13:26:53"}]
如果再增加一个消费者2,topic想同,用户组不同
@Component
public class Customer2 {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = "logTopic",
groupId = "consumer-group-logTopic-2")
public void onMessage(String message, ConsumerRecord record) {
logger.info("[onMessage2][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
按照我们consumer-group的定义,应该两个用户组下的消费者都可以收到。
2020-09-16 16:42:02.796 INFO 40795 --- [ntainer#0-0-C-1] com.yu.customer_1.customer.Customer1 : (正常)[onMessage][线程编号:149 消息内容:{"msg":"demoData","type":"手动操作","date":"2020-09-16 16:42:02"}]
2020-09-16 16:42:02.799 INFO 40795 --- [ntainer#2-0-C-1] com.yu.customer_1.customer.Customer2 : [onMessage2][线程编号:147 消息内容:{"msg":"demoData","type":"手动操作","date":"2020-09-16 16:42:02"}]
Spring-Kafka 的消费重试功能,通过实现自定义的 SeekToCurrentErrorHandler ,在 Consumer 消费消息异常的时候,进行拦截处理:
在重试小于最大次数时,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。例如说,我们测试的 Topic 是 “logTopic” ,则其对应的死信队列的 Topic 就是 “logTopic.DLT” ,即在原有 Topic 加上 .DLT 后缀,就是其死信队列的 Topic 。
Spring-Kafka 提供消费重试的机制。在消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
@Configuration
public class KafkaConfig {
@Bean
@Primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
// <1> 创建 DeadLetterPublishingRecoverer 对象 设置死信队列 默认为 toopic.DLT
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// <2> 创建 FixedBackOff 对象 设置重试间隔 10秒 次数为 3次
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// <3> 创建 SeekToCurrentErrorHandler 对象
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
}
DeadLetterPublishingRecoverer对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。FixedBackOff 对象。这里,我们配置了重试 3 次,每次固定间隔 30 秒。当然,胖友可以选择 BackOff 的另一个子类 ExponentialBackOff 实现,提供指数递增的间隔时间。SeekToCurrentErrorHandler对象,负责处理异常,串联整个消费重试的整个过程。这里,我们来简单说说 SeekToCurrentErrorHandler 是怎么提供消费重试的功能的。
在消息消费失败时,SeekToCurrentErrorHandler会将 调用 Kafka Consumer 的 #seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。
同时,Spring-Kafka 使用 FailedRecordTracker对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。 本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/article/detail/43926
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。