当前位置:   article > 正文

Springboot-2.13 整合 Kafka 基本操作,创建、删除主题。发送消息、消费消息。_springboot kafka 主题能否动态新增主题和删除主题

springboot kafka 主题能否动态新增主题和删除主题

版本介绍

SpringBoot: 2.13

Zookeeper:  cdh-3.4.5

Kafka: 2.12-2.1.0

依赖 pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.3.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.dennis</groupId>
  12. <artifactId>base-kafka</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>base-kafka</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.kafka</groupId>
  26. <artifactId>spring-kafka</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-test</artifactId>
  31. <scope>test</scope>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.kafka</groupId>
  35. <artifactId>spring-kafka-test</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. </dependencies>
  39. </project>

配置 application.yml

  1. ######################## SpringBoot 2.13 | Kafka-2.10 ##########################
  2. spring:
  3. kafka:
  4. bootstrap-servers: IP:9092 # 集群 , 分隔
  5. producer:
  6. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  8. consumer:
  9. group-id: test
  10. enable-auto-commit: true
  11. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. value-serializer: org.apache.kafka.common.serialization.StringSerializer

创建主题

  1. Properties prop = new Properties();
  2. prop.put("bootstrap.servers","ip:9092");
  3. AdminClient admin = AdminClient.create(prop);
  4. ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
  5. // 创建主题 参数:主题名称、分区数、副本数
  6. NewTopic newTopic = new NewTopic(topic, 1, (short)1);
  7. topics.add(newTopic);
  8. CreateTopicsResult result = admin.createTopics(topics);
  9. try {
  10. result.all().get();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } catch (ExecutionException e) {
  14. e.printStackTrace();
  15. }

删除主题

  1. Properties prop = new Properties();
  2. prop.put("bootstrap.servers","ip:9092");
  3. AdminClient client = AdminClient.create(prop);
  4. ArrayList<String> topics = new ArrayList<>();
  5. topics.add(topic);
  6. DeleteTopicsResult result = client.deleteTopics(topics);
  7. try {
  8. result.all().get();
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. } catch (ExecutionException e) {
  12. e.printStackTrace();
  13. }

主题列表

  1. Properties prop = new Properties();
  2. prop.put("bootstrap.servers","ip:9092");
  3. AdminClient admin = AdminClient.create(prop);
  4. ListTopicsResult result = admin.listTopics();
  5. KafkaFuture<Set<String>> future = result.names();
  6. try {
  7. System.out.println("==================Kafka Topics====================");
  8. future.get().forEach(n -> System.out.println(n));
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. } catch (ExecutionException e) {
  12. e.printStackTrace();
  13. }

生产者发送消息

  1. @Autowired
  2. private KafkaTemplate<String,String> kafkaTemplate;
  3. /**
  4. * 发送消息
  5. * @param msg
  6. * @return
  7. */
  8. @RequestMapping(value = "/send.action")
  9. public String send(String msg){
  10. // 主题、消息内容
  11. kafkaTemplate.send("flume", msg);
  12. return "success";
  13. }

消费者消费消息

  1. /**
  2. * Created by Dennis on 2019/3/25.
  3. */
  4. @Component
  5. public class KafkaConsumer {
  6. @KafkaListener(topics = "flume")
  7. public void listener(ConsumerRecord<?,?> record){
  8. System.out.println("==================Kafka Consumer====================");
  9. System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
  10. }
  11. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/846947
推荐阅读
相关标签