赞
踩
使用kafka来实现基于生产者消费者的模型。
@Slf4j public class KafkaProducerTest implements Runnable { //生产者 private final Producer<Integer, String> producer; //主题 private final String topic; //服务列表 private static final String BROKER_LIST = "172.16.10.129:9092"; //创建KafkaProducer public KafkaProducerTest(String topic) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); this.topic = topic; } @Override public void run() { int msgNo = 1; while (msgNo < 100) { String msg = "msg_" + msgNo; //构建ProducerRecord,并发送 producer.send(new ProducerRecord<>(topic, msg), (recordMetadata, e) -> { if (null != e) { log.info("error" + e.getMessage()); } else { System.out.println(String.format("offset:%s,partition:%s", recordMetadata.offset(), recordMetadata.partition())); } }); msgNo++; System.out.println("send:" + msg); try { TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { log.error("send to topic: {},error:{}", topic, e.getMessage()); } } producer.close(); } }
public class KafkaConsumerTest implements Runnable { //消费者 private final KafkaConsumer<Integer, String> consumer; //主题 private final String topic; //服务列表 private static final String BROKER_LIST = "172.16.10.129:9092"; //创建KafkaConsumer public KafkaConsumerTest(String topic) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); props.put(ConsumerConfig.GROUP_ID_CONFIG, "0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; } @Override public void run() { try{ //订阅主题 consumer.subscribe(Arrays.asList(topic)); while (true) { //获取消息 ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<Integer, String> record : records) System.out.println("offset =" + record.offset() + ", key = " + record.key() + ", value = " + record.value()); } } catch (Exception e) { e.printStackTrace(); } } }
public class KafkaStart {
public static String topic = "test";
public static void main(String[] args) {
//启动生产者
new Thread(new KafkaProducerTest(topic)).start();
//启动消费者
new Thread(new KafkaConsumerTest(topic)).start();
}
}
此时就能看到生产者和消费者线程打印的日志信息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。