赞
踩
SpringBoot: 2.13
Zookeeper: cdh-3.4.5
Kafka: 2.12-2.1.0
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.1.3.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.dennis</groupId>
- <artifactId>base-kafka</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>base-kafka</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- </project>

- ######################## SpringBoot 2.13 | Kafka-2.10 ##########################
- spring:
- kafka:
- bootstrap-servers: IP:9092 # 集群 , 分隔
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: test
- enable-auto-commit: true
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
-
- Properties prop = new Properties();
- prop.put("bootstrap.servers","ip:9092");
-
- AdminClient admin = AdminClient.create(prop);
-
- ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
- // 创建主题 参数:主题名称、分区数、副本数
- NewTopic newTopic = new NewTopic(topic, 1, (short)1);
- topics.add(newTopic);
-
- CreateTopicsResult result = admin.createTopics(topics);
-
- try {
- result.all().get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }

- Properties prop = new Properties();
- prop.put("bootstrap.servers","ip:9092");
- AdminClient client = AdminClient.create(prop);
-
- ArrayList<String> topics = new ArrayList<>();
- topics.add(topic);
-
- DeleteTopicsResult result = client.deleteTopics(topics);
-
- try {
- result.all().get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }

- Properties prop = new Properties();
- prop.put("bootstrap.servers","ip:9092");
- AdminClient admin = AdminClient.create(prop);
-
- ListTopicsResult result = admin.listTopics();
- KafkaFuture<Set<String>> future = result.names();
-
- try {
- System.out.println("==================Kafka Topics====================");
- future.get().forEach(n -> System.out.println(n));
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
-
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
-
- /**
- * 发送消息
- * @param msg
- * @return
- */
- @RequestMapping(value = "/send.action")
- public String send(String msg){
-
- // 主题、消息内容
- kafkaTemplate.send("flume", msg);
-
- return "success";
- }

- /**
- * Created by Dennis on 2019/3/25.
- */
-
- @Component
- public class KafkaConsumer {
-
-
- @KafkaListener(topics = "flume")
- public void listener(ConsumerRecord<?,?> record){
- System.out.println("==================Kafka Consumer====================");
- System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
- }
-
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。