当前位置:   article > 正文

kafka 生产者消费者使用案例_kafka消费者 properties

kafka消费者 properties

  ads

关注以下公众号查看更多文章

gradle配置如下:

  1. implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
  2. implementation "com.fasterxml.jackson.core:jackson-databind:2.11.2"
  3. compile "org.slf4j:slf4j-simple:1.7.25"

生产者代码:

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
  4. properties.setProperty("key.serializer", StringSerializer.class.getName());
  5. properties.setProperty("value.serializer", StringSerializer.class.getName());
  6. properties.setProperty("acks", "1");
  7. properties.setProperty("retries", "3");
  8. properties.setProperty("linger.ms", "6");
  9. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  10. for (int i=1; i<10; i++) {
  11. ProducerRecord<String, String> record = new ProducerRecord<>("first_topic", i+"", "message"+i);
  12. producer.send(record);
  13. }
  14. // producer.flush();
  15. producer.close();
  16. }

消费者代码: 

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
  4. properties.setProperty("key.deserializer", StringDeserializer.class.getName());
  5. properties.setProperty("value.deserializer", StringDeserializer.class.getName());
  6. properties.setProperty("group.id", "test");
  7. // properties.setProperty("enable.auto.commit", "false");
  8. properties.setProperty("auto.commit.interval.ms", "1000");
  9. properties.setProperty("auto.offset.reset", "earliest");
  10. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  11. consumer.subscribe(Arrays.asList("first_topic"));
  12. while (true){
  13. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
  14. for (ConsumerRecord<String, String> record : records){
  15. String partion = record.partition() + "";
  16. String offset = record.offset() + "";
  17. String key = record.key();
  18. String value = record.value();
  19. System.out.println("partion:"+partion+",offset:"+offset+",key:"+key+",value:"+value);
  20. }
  21. // consumer.commitAsync();
  22. }
  23. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/846039
推荐阅读
相关标签
  

闽ICP备14008679号