赞
踩
参考:https://blog.csdn.net/zhizhi120/article/details/127727810
@Configuration @EnableKafka public class KafkaConsumerConfig { // kafka 消费者集群 @Value("${spring.kafka.consumer.bootstrap-servers}") private String bootstrapServers; // 开启自动提交 @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean autoCommit; // 自动提交延迟 @Value("${spring.kafka.consumer.auto-commit-interval}") private Integer autoCommitInterval; // 批量拉取个数 @Value("${spring.kafka.consumer.max-poll-records}") private Integer maxPollRecords; // 重置消费者的offset @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; // 拉取超时时间 @Value("${spring.kafka.listener.poll-timeout}") private Long pollTimeout; // coordinator感知consumer崩溃所需时间,默认10秒 @Value("${spring.kafka.consumer.session-timeout}") private String sessionTimeout; // 是否批量拉取 @Value("${spring.kafka.listener.batch-listener}") private Boolean batchListener; // 最多并发数 @Value("${spring.kafka.listener.concurrency:3}") private Integer concurrency; // 两次poll的最大时间间隔 @Value("${spring.kafka.consumer.max-poll-interval}") private Integer maxPollInterval; // 1个请求中每个分区可以获取的最大字节数 @Value("${spring.kafka.consumer.max-partition-fetch-bytes}") private Integer maxPartitionFetchBytes; // kafka的分区分配策略 @Value("${spring.kafka.consumer.partition-assignment-strategy:}") private String partitionAssignmentStrategy; public KafkaConsumerConfig() { } /** * 监听工厂 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(this.consumerFactory()); // 批量获取开关 factory.setBatchListener(this.batchListener); // 并发数量 factory.setConcurrency(this.concurrency); // 设置拉取时间超时的时间间隔 factory.getContainerProperties().setPollTimeout(this.pollTimeout); // 手动提交 factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); Properties consumerProperties = this.initKafkaConsumerProperties(); if (!consumerProperties.isEmpty()) { factory.getContainerProperties().setKafkaConsumerProperties(consumerProperties); } return factory; } /** * kafka消费者工厂 */ public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory(this.consumerConfigs()); } /** * kafka消费者配置 */ public Map<String, Object> consumerConfigs() { Map<String, Object> props = this.init(); // key 序列化器选择 props.put("key.deserializer", StringDeserializer.class); // value 序列化器选择 props.put("value.deserializer", StringDeserializer.class); return props; } // 初始化kafka消费者配置 private Properties initKafkaConsumerProperties() { Properties properties = new Properties(); // 设置kafka的分区分配策略 if (!this.partitionAssignmentStrategy.isEmpty()) { properties.setProperty("partition.assignment.strategy", this.partitionAssignmentStrategy); } return properties; } private Map<String, Object> init() { Map<String, Object> props = new HashMap(); props.put("auto.commit.interval.ms", this.autoCommitInterval); props.put("bootstrap.servers", this.bootstrapServers); props.put("enable.auto.commit", this.autoCommit); props.put("max.poll.records", this.maxPollRecords); props.put("auto.offset.reset", this.autoOffsetReset); props.put("session.timeout.ms", this.sessionTimeout); props.put("max.poll.interval.ms", this.maxPollInterval); props.put("max.partition.fetch.bytes", this.maxPartitionFetchBytes); return props; } }
@Configuration @EnableKafka public class KafkaProducerConfig { // kafka 集群 @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; // 批次大小 @Value("${spring.kafka.producer.batch-size}") private Integer batchSize; // 重试次数 @Value("${spring.kafka.producer.retries}") private Integer retries; // 缓冲区大小 @Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory; // 等待时间 @Value("${spring.kafka.producer.linger}") private Integer linger; public KafkaProducerConfig() { } // 配置初始化 private Map<String, Object> init() { Map<String, Object> props = new HashMap(); props.put("bootstrap.servers", this.bootstrapServers); props.put("retries", this.retries); props.put("batch.size", this.batchSize); props.put("linger.ms", this.linger); props.put("buffer.memory", this.bufferMemory); return props; } // 生产者配置 public Map<String, Object> producerConfigs() { Map<String, Object> props = this.init(); props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); return props; } // 生产者工厂 public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory(this.producerConfigs()); } // 创建kafkaTemplate模板,并注入spring中 @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate(this.producerFactory()); } }
public interface IKafkaService {
void send(String topic, String message);
void send(KafkaContext kafkaContext);
void sendWithoutCallback(String topic, String key, String value);
}
public class KafkaService implements IKafkaService { private static final Logger log = LoggerFactory.getLogger(KafkaService.class); @Resource private KafkaTemplate<String, String> kafkaTemplate; public KafkaService() { } @Async public void send(String topic, String message) { try { KafkaContext kafkaContext = new KafkaContext(topic, message); this.send(kafkaContext); } catch (Exception var4) { log.error("topic {} -- message {}", topic, message); log.error("消息发送异常", var4); } } @Async public void send(final KafkaContext kafkaContext) { ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(kafkaContext.getTopic(), kafkaContext.getPartition(), kafkaContext.getTimestamp(), kafkaContext.getKey(), kafkaContext.getMessage()); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { public void onSuccess(SendResult<String, String> result) { RecordMetadata metaData = result.getRecordMetadata(); KafkaService.log.debug("topic {}, partition: {}, offset: {} value: {}", new Object[]{metaData.topic(), metaData.partition(), metaData.offset(), kafkaContext.getMessage()}); } public void onFailure(Throwable throwable) { KafkaService.log.error("发送消息失败: {} ", kafkaContext.getMessage(), throwable); } }); } public void sendWithoutCallback(String topic, String key, String value) { this.kafkaTemplate.send(topic, key, value); } }
public class KafkaContext { Long timestamp; private String topic; private String key; private String message; private Integer partition; public KafkaContext(String topic, String message) { this.topic = topic; this.message = message; } public Long getTimestamp() { return this.timestamp; } public String getTopic() { return this.topic; } public String getKey() { return this.key; } public String getMessage() { return this.message; } public Integer getPartition() { return this.partition; } public void setTimestamp(final Long timestamp) { this.timestamp = timestamp; } public void setTopic(final String topic) { this.topic = topic; } public void setKey(final String key) { this.key = key; } public void setMessage(final String message) { this.message = message; } public void setPartition(final Integer partition) { this.partition = partition; } public KafkaContext() { } }
KafkaContext kafkaContext=new KafkaContext();
kafkaContext.setTopic(LeaseMq**.LEASE_OVERDUE_UPDATE_BRANCH_DATA_TOPIC);
kafkaContext.setKey(branch**.getBranchId());
kafkaContext.setMessage(JSONObject.toJSONString(branch**));
kafkaService.send(kafkaContext);
/** * 消费 * * @param records * @param ack */ @KafkaListener(containerFactory = "kafkaListenerContainerFactory", id = LeaseMqTopicAndGroup.LEASE_OVERDUE_UPDATE_BRANCH_DATA_TOPIC, topics = LeaseMqTopicAndGroup.LEASE_OVERDUE_UPDATE_BRANCH_DATA_TOPIC, concurrency = "10") public void onMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) { try { for (ConsumerRecord<?, ?> record : records) { BranchLeaseStatusInfo branchLeaseStatusInfo = JSON.parseObject(JSON.parse(record.value().toString()).toString(), Branch**.class); JSONObject.toJSONString(branch**)); leaseService.updateLBStatus(branch**); } } catch (Exception e) { log.error("", e); } finally { ack.acknowledge();//手动提交偏移量 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。