赞
踩
1.在springboot内添加相关依赖
org.springframework.kafka
spring-kafka
2.5.0.RELEASE
2.填写相关配置(相当于xml配置)
3.相关代码实现(可替代.yml配置)代码与.yml二选一
/**
*/
@Configuration
public class KafkaConfig {
@Value(“${spring.kafka.bootstrap-servers}”)
private String bootstrapServers;
/**
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
//kafka地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//k序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//v的序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
/**
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
//从哪个消费者拿东西
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//最多拉取的数据
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 并发数就是一个消费者实例起几个线程
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
4.定义KafKa生产者接受消息 KafkaProducer类
/**
*/
@Slf4j
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
topic 消息队列 不能为空
value 消息 不能为空
key 区分队列分区 可以为空
*/
public void sendMessage(String key, String value, String topic) {
if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(“value or topic is null or empty”);
}
ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
// 异步回调的方式获取通知
future.addCallback(
success -> {
assert null != success && null != success.getRecordMetadata();
// 发送到 kafka 的 topic
String _topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = success.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]",
_topic, partition, offset);
}, failure -> {
log.error("send kafka message failure: [{}], [{}], [{}]",
key, value, topic);
}
);
// 同步等待的方式获取通知
try {
// SendResult<String, String> sendResult = future.get();
//5秒不返回信息 通知异常
SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);
// 发送到 kafka 的 topic
String _topic = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = sendResult.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]",
_topic, partition, offset);
} catch (Exception ex) {
log.error(“send kafka message failure: [{}], [{}], [{}]”,
key, value, topic);
}
}
}
5.创建kafka消费者 KafkaConsumer 类
/**
*/
@Slf4j
@Component
public class KafkaConsumer {
private final ObjectMapper mapper;
public KafkaConsumer(ObjectMapper mapper) {
this.mapper = mapper;
}
/**
*/
@KafkaListener(topics = {“qinyi-springboot”}, groupId = “qinyi-springboot-kafka”)
public void listener01(ConsumerRecord<String, String> record) throws Exception {
String key = record.key();
String value = record.value();
QinyiMessage kafkaMessage = mapper.readValue(value, QinyiMessage.class);
log.info(“in listener01 consume kafka message: [{}], [{}]”,
key, mapper.writeValueAsString(kafkaMessage));
}
/**
*/
@KafkaListener(topics = {“qinyi-springboot”}, groupId = “qinyi-springboot-kafka-1”)
public void listener02(ConsumerRecord<?, ?> record) throws Exception {
Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
if (_kafkaMessage.isPresent()) {
Object message = _kafkaMessage.get();
QinyiMessage kafkaMessage = mapper.readValue(message.toString(),
QinyiMessage.class);
log.info(“in listener02 consume kafka message: [{}]”,
mapper.writeValueAsString(kafkaMessage));
}
}
}
6.编写调用层类 KafkaController
/**
*/
@Slf4j
@RestController
@RequestMapping(“/kafka”)
public class KafkaController {
private final ObjectMapper mapper;
private final KafkaProducer kafkaProducer;
public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
this.mapper = mapper;
this.kafkaProducer = kafkaProducer;
}
/**
*/
@GetMapping(“/send-message”)
public void sendMessage(@RequestParam(required = false) String key,
@RequestParam String topic) throws Exception {
QinyiMessage message = new QinyiMessage(
1,
“Imooc-Study-Ecommerce”
);
//writeValueAsString 反序列化
kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
}
}
7.编写测试用例(提前要启动zk(默认端口:2181)+kafka(默认端口:9092))
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/kafka/send-message?key=qinyi&topic=qinyi-springboot
Content-Type: application/json
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/kafka/send-message?topic=qinyi-springboot
Content-Type: application/json
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。