赞
踩
使用kafka的API,实现通过kafka收发消息。
1.pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
2.producer
import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class kafkaproducer { public static void main(String[] args){ Properties properties = new Properties(); System.setProperty("java.security.auth.login.config","E:\\JAVA_IDEA\\comjietaokafka\\src\\main\\java\\kafka_jaas.conf"); properties.put("bootstrap.servers", "0.0.0.0:9092,0.0.0.0:9092,0.0.0.0:9092"); //properties.put("acks", "1"); //properties.put("retries", 2); properties.put("batch.size", 60000); properties.put("linger.ms", 1); //properties.put("acks", "all"); // 此参数 在高可用中很重要 1 代表 leader写入即可 , all 代表所有node写入 //properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("sasl.mechanism", "PLAIN"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); System.out.println(producer); for (int i = 0; i < 100; i++) { System.out.println("start send "+i); String msg = "{\"message\":\"Message\",\"id\":"+i+"}"; producer.send(new ProducerRecord<String, String>("-data-6482357557125611488-datakeyae9fa1f81b2011e9ae2f0a58c0a803ea", msg), (recordMetadata, e) -> { System.out.println(e); if (e != null){ System.err.println("--消息发送失败----"+msg); }else{ System.out.println("--消息发送成功"); } }); System.out.println("Sent:" + msg); TimeUnit.SECONDS.sleep(1); } } catch (Exception e) { System.out.println(e.getMessage()); e.printStackTrace(); } finally { producer.close(); } } }
3.consumer
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class kafkaConsumer { public static void main(String[] args){ System.setProperty("java.security.auth.login.config","E:\\JAVA_IDEA\\comjietaokafka\\src\\main\\java\\kafka_jaas.conf"); Properties properties = new Properties(); properties.put("bootstrap.servers", "0.0.0.0:9092,0.0.0.0:9092,0.0.0.0:9092"); properties.put("group.id", "default-group"); //properties.put("enable.auto.commit", "true"); //properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); //properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("sasl.mechanism", "PLAIN"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("-data-6482357557125611488-datakeyae9fa1f81b2011e9ae2f0a58c0a803ea")); //kafkaConsumer.subscribe(Arrays.asList("-data-")); System.out.println(kafkaConsumer.listTopics()); while (true) { System.out.println("start this cycle ... "); ConsumerRecords<String, String> records = kafkaConsumer.poll(100); System.out.println(records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。