当前位置:   article > 正文

SpringBoot操作Kafka_springboot kafkaadmin

springboot kafkaadmin

一 SpringBoot简单操作Kafka

1.1 引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. </dependencies>

1.2 Kafka相关配置

1.2.1 topic

  1. @Configuration
  2. public class KafkaTopicConfig {
  3. /**
  4. * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
  5. * @return
  6. */
  7. @Bean
  8. public KafkaAdmin kafkaAdmin(){
  9. Map<String, Object> configs = new HashMap<>();
  10. configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  11. return new KafkaAdmin(configs);
  12. }
  13. @Bean
  14. public NewTopic newTopic(){
  15. // 创建一个主题参数: ①主题名称 ②分区数 ③副本数量(副本数量要小于Broker数量)
  16. return new NewTopic("test",3,(short) 0);
  17. }
  18. }

1.2.2 producer生产者

这里注意,我们需要添加@EnableKafka注解。这样才可以使用KafkaTemplate

  1. @Configuration
  2. @EnableKafka // 打开应用中KafkaTemplate能力
  3. public class KafkaProducerConfig {
  4. @Bean
  5. public KafkaTemplate kafkaTemplate(){
  6. return new KafkaTemplate(new DefaultKafkaProducerFactory(configs()));
  7. }
  8. public Map<String, Object> configs(){
  9. Map<String, Object> props = new HashMap<>();
  10. // 指定多个kafka集群多个地址
  11. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  12. // 重试次数,0为不启用重试机制
  13. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  14. // acks=0 把消息发送到kafka就认为发送成功
  15. // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
  16. // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
  17. props.put(ProducerConfig.ACKS_CONFIG,"1");
  18. // 生产者空间不足时,send()被阻塞的时间,默认60s
  19. props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
  20. // 控制批处理大小,单位为字节
  21. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
  22. // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
  23. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  24. // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
  25. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
  26. // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
  27. props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
  28. // 键的序列化方式
  29. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  30. // 值的序列化方式
  31. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  32. // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
  33. // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
  34. props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
  35. return props;
  36. }
  37. }

1.2.3 Consumer消费者

  1. @Configuration
  2. public class KafkaConsumerConfig {
  3. @Bean
  4. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory(){
  5. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  6. // 设置消费者工厂
  7. factory.setConsumerFactory(consumerFactory());
  8. // 消费者组中线程数量
  9. factory.setConcurrency(3);
  10. // 拉取超时时间
  11. factory.getContainerProperties().setPollTimeout(3000);
  12. return factory;
  13. }
  14. public ConsumerFactory<Integer, String> consumerFactory(){
  15. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  16. }
  17. public Map<String, Object> consumerConfigs() {
  18. Map<String, Object> propsMap = new HashMap<>();
  19. // Kafka地址
  20. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  21. // 是否自动提交offset偏移量(默认true)
  22. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  23. // 自动提交的频率(ms)
  24. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  25. // Session超时设置
  26. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  27. // 键的反序列化方式
  28. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29. // 值的反序列化方式
  30. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  31. // offset偏移量规则设置:
  32. // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  33. // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  34. // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  35. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  36. return propsMap;
  37. }
  38. }

1.3 生产者发送消息

  1. @Service
  2. @Slf4j
  3. public class KafkaProducerService {
  4. @Autowired
  5. private KafkaTemplate kafkaTemplate;
  6. /**
  7. * 生产者同步向Kafka发送消息
  8. * @param topic 指定的主题
  9. * @param message 发送的消息内容
  10. */
  11. public void sendMessageBySync(String topic, String message) throws ExecutionException, InterruptedException, TimeoutException {
  12. kafkaTemplate.send(topic,message).get(10, TimeUnit.SECONDS);
  13. }
  14. /**
  15. * 生产者异步向Kafka发送消息
  16. * @param topic 指定的主题
  17. * @param message 发送的消息内容
  18. */
  19. public void sendMessageByAsync(String topic, String message){
  20. ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
  21. future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
  22. @Override
  23. public void onFailure(Throwable ex) {
  24. log.info("消息发送失败,具体消息为:{}",message);
  25. }
  26. @Override
  27. public void onSuccess(SendResult<Integer, String> result) {
  28. log.info("消息发送成功,具体消息为:{}",message);
  29. }
  30. });
  31. }
  32. }

1.4 消费者消费消息

@KafkaListener注解代表此方法监听指定topic

  1. @Service
  2. @Slf4j
  3. public class KafkaConsumerService {
  4. @KafkaListener(topics = "test", groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
  5. public void consumerMessage(String message){
  6. log.info("消费者消费消息{}",message);
  7. }
  8. }

二 SpringBoot操作Kafka详解

2.1 KafkaTemplate 发送消息的几种方法

  1. // 设定data,向kafka发送消息
  2. ListenableFuture<SendResult<K, V>> sendDefault(V data);
  3. // 设定key、data,向kafka发送消息
  4. ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
  5. // 设定partition、key、data,向kafka发送消息
  6. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
  7. // 设定partition、timestamp、key、data,向kafka发送消息
  8. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
  9. // 设定topic、data,向kafka发送消息
  10. ListenableFuture<SendResult<K, V>> send(String topic, V data);
  11. // 设定topic、key、data,向kafka发送消息
  12. ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
  13. // 设定topic、partition、key、data,向kafka发送消息
  14. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
  15. // 设定topic、partition、timestamp、 key、data,向kafka发送消息
  16. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
  17. // 创建ProducerRecord对象,在ProducerRecord中设置好topic、partion、key、value等信息,然后向kafka发送消息
  18. ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
  19. // 创建Spring的Message对象,然后向kafka发送消息
  20. ListenableFuture<SendResult<K, V>> send(Message<?> message);
  21. // 获取指标信息
  22. Map<MetricName, ? extends Metric> metrics();
  23. // 显示Topic分区信息
  24. List<PartitionInfo> partitionsFor(String topic);
  25. //在生产者上执行一些任意操作并返回结果。
  26. <T> T execute(ProducerCallback<K, V, T> callback);
  27. // 生产者刷新消息
  28. void flush();
  29. // 用于执行生产者方法后异步回调
  30. interface ProducerCallback<K, V, T> {
  31. T doInKafka(Producer<K, V> producer);
  32. }

2.2 监听Kafka消息

Spring提供了监听消息的两个实现类,分别是:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 利用单个线程来接收全部主题中全部分区上的所有消息。
ConcurrentMessageListenerContainer 代理的一个或多个 KafkaMessageListenerContainer 实例,来实现多个线程消费。

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. /**
  19. * 创建 KafkaMessageListenerContainer 实例监听 kafka 消息
  20. */
  21. @Bean
  22. public KafkaMessageListenerContainer demoListenerContainer() {
  23. // 创建container配置参数,并指定要监听的 topic 名称
  24. ContainerProperties properties = new ContainerProperties("test");
  25. // 设置消费者组名称
  26. properties.setGroupId("group2");
  27. // 设置监听器监听 kafka 消息
  28. properties.setMessageListener(new MessageListener<Integer,String>() {
  29. @Override
  30. public void onMessage(ConsumerRecord<Integer, String> record) {
  31. System.out.println("消息:" + record);
  32. }
  33. });
  34. return new KafkaMessageListenerContainer(consumerFactory(), properties);
  35. }
  36. }

2.2.1 使用@KafkaListener注解监听Kafka消息

(1)监听单个Topic

  1. @Configuration
  2. @EnableKafka
  3. public class ConsumerConfigDemo {
  4. @Bean
  5. public Map<String, Object> consumerConfigs() {
  6. Map<String, Object> propsMap = new HashMap<>();
  7. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  8. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  10. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. return propsMap;
  13. }
  14. @Bean
  15. public ConsumerFactory<Integer, String> consumerFactory() {
  16. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  17. }
  18. @Bean
  19. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<Integer, String>
  21. factory = new ConcurrentKafkaListenerContainerFactory<>();
  22. factory.setConsumerFactory(consumerFactory());
  23. // 创建3个线程并发消费
  24. factory.setConcurrency(3);
  25. // 设置拉取数据超时时间
  26. factory.getContainerProperties().setPollTimeout(3000);
  27. return factory;
  28. }
  29. /**
  30. * ---使用@KafkaListener注解来标记此方法为kafka消息监听器,创建消费组group1监听test topic
  31. */
  32. @KafkaListener(topics = {"test"},groupId = "group1")
  33. public void kafkaListener(String message){
  34. System.out.println("消息:"+message);
  35. }
  36. }

(2)监听多个Topic

  1. @KafkaListener(topics = {"test1", "test2"}, groupId = "group1")
  2. public void kafkaListener(String message){
  3. System.out.println("消息:"+message);
  4. }

(3)监听某个Topic中的某个分区

  1. @KafkaListener(id = "id0", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "0" }) })
  2. public void kafkaListener1(String message) {
  3. System.out.println("消息:"+message);
  4. }
  5. @KafkaListener(id = "id1", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "1", "2" }) })
  6. public void kafkaListener2(String message) {
  7. System.out.println("消息:"+message);
  8. }

(4)监听多个Topic分区

  1. @KafkaListener(id = "test", group = "group1", topicPartitions = {
  2. @TopicPartition(topic = "test1", partitions = {"0"}),
  3. @TopicPartition(topic = "test2", partitions = {"0", "1"})
  4. })
  5. public void kafkaListener(String message) {
  6. System.out.print(message);
  7. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/761274
推荐阅读
相关标签
  

闽ICP备14008679号