赞
踩
ads:
关注以下公众号查看更多文章
gradle配置如下:
- implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
- implementation "com.fasterxml.jackson.core:jackson-databind:2.11.2"
- compile "org.slf4j:slf4j-simple:1.7.25"
生产者代码:
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
- properties.setProperty("key.serializer", StringSerializer.class.getName());
- properties.setProperty("value.serializer", StringSerializer.class.getName());
- properties.setProperty("acks", "1");
- properties.setProperty("retries", "3");
- properties.setProperty("linger.ms", "6");
-
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
- for (int i=1; i<10; i++) {
- ProducerRecord<String, String> record = new ProducerRecord<>("first_topic", i+"", "message"+i);
- producer.send(record);
- }
-
- // producer.flush();
-
- producer.close();
- }

消费者代码:
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
- properties.setProperty("key.deserializer", StringDeserializer.class.getName());
- properties.setProperty("value.deserializer", StringDeserializer.class.getName());
- properties.setProperty("group.id", "test");
- // properties.setProperty("enable.auto.commit", "false");
- properties.setProperty("auto.commit.interval.ms", "1000");
- properties.setProperty("auto.offset.reset", "earliest");
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
- consumer.subscribe(Arrays.asList("first_topic"));
-
- while (true){
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
- for (ConsumerRecord<String, String> record : records){
- String partion = record.partition() + "";
- String offset = record.offset() + "";
- String key = record.key();
- String value = record.value();
- System.out.println("partion:"+partion+",offset:"+offset+",key:"+key+",value:"+value);
- }
- // consumer.commitAsync();
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。