当前位置:   article > 正文

java中使用kafka_java kafka使用

java kafka使用


前言

使用kafka来实现基于生产者消费者的模型。


一、生产者

@Slf4j
public class KafkaProducerTest implements Runnable {
	//生产者
    private final Producer<Integer, String> producer;
    //主题
    private final String topic;
    //服务列表
    private static final String BROKER_LIST = "172.16.10.129:9092";

	//创建KafkaProducer
    public KafkaProducerTest(String topic) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
        this.topic = topic;
    }


 @Override
    public void run() {
        int msgNo = 1;
        while (msgNo < 100) {
            String msg = "msg_" + msgNo;
             //构建ProducerRecord,并发送
            producer.send(new ProducerRecord<>(topic, msg), (recordMetadata, e) -> {
                if (null != e) {
                    log.info("error" + e.getMessage());
                } else {
                    System.out.println(String.format("offset:%s,partition:%s", recordMetadata.offset(), recordMetadata.partition()));
                }
            });
            msgNo++;
            System.out.println("send:" + msg);
            try {
                TimeUnit.MILLISECONDS.sleep(3000);
            } catch (InterruptedException e) {
                log.error("send to topic: {},error:{}", topic, e.getMessage());
            }
        }
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

二、消费者

public class KafkaConsumerTest implements Runnable {
	//消费者
    private final KafkaConsumer<Integer, String> consumer;
    //主题
    private final String topic;
    //服务列表
    private static final String BROKER_LIST = "172.16.10.129:9092";
	//创建KafkaConsumer
    public KafkaConsumerTest(String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

    @Override
    public void run() {
        try{
       		//订阅主题
            consumer.subscribe(Arrays.asList(topic));
            while (true) {
            	//获取消息
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));
                for (ConsumerRecord<Integer, String> record : records)
                    System.out.println("offset =" + record.offset() + ", key = " + record.key() + ", value = " + record.value());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

三、测试

public class KafkaStart {
    public static String topic = "test";

    public static void main(String[] args) {
    	//启动生产者
        new Thread(new KafkaProducerTest(topic)).start();
        //启动消费者
        new Thread(new KafkaConsumerTest(topic)).start();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

此时就能看到生产者和消费者线程打印的日志信息。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/985127
推荐阅读
相关标签
  

闽ICP备14008679号