赞
踩
- <dependencies>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
- @Configuration
- public class KafkaTopicConfig {
-
-
- /**
- * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
- * @return
- */
- @Bean
- public KafkaAdmin kafkaAdmin(){
- Map<String, Object> configs = new HashMap<>();
- configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
-
- return new KafkaAdmin(configs);
- }
-
- @Bean
- public NewTopic newTopic(){
- // 创建一个主题参数: ①主题名称 ②分区数 ③副本数量(副本数量要小于Broker数量)
- return new NewTopic("test",3,(short) 0);
- }
-
- }

这里注意,我们需要添加@EnableKafka注解。这样才可以使用KafkaTemplate
- @Configuration
- @EnableKafka // 打开应用中KafkaTemplate能力
- public class KafkaProducerConfig {
-
- @Bean
- public KafkaTemplate kafkaTemplate(){
- return new KafkaTemplate(new DefaultKafkaProducerFactory(configs()));
- }
-
- public Map<String, Object> configs(){
- Map<String, Object> props = new HashMap<>();
- // 指定多个kafka集群多个地址
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
- // 重试次数,0为不启用重试机制
- props.put(ProducerConfig.RETRIES_CONFIG, 0);
- // acks=0 把消息发送到kafka就认为发送成功
- // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
- // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
- props.put(ProducerConfig.ACKS_CONFIG,"1");
- // 生产者空间不足时,send()被阻塞的时间,默认60s
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
- // 控制批处理大小,单位为字节
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
- // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
- props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
- // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
- // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
- props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
- // 键的序列化方式
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- // 值的序列化方式
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
- // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
- return props;
- }
- }

- @Configuration
- public class KafkaConsumerConfig {
-
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory(){
- ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- // 设置消费者工厂
- factory.setConsumerFactory(consumerFactory());
- // 消费者组中线程数量
- factory.setConcurrency(3);
- // 拉取超时时间
- factory.getContainerProperties().setPollTimeout(3000);
- return factory;
- }
-
- public ConsumerFactory<Integer, String> consumerFactory(){
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> propsMap = new HashMap<>();
- // Kafka地址
- propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
- // 是否自动提交offset偏移量(默认true)
- propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- // 自动提交的频率(ms)
- propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
- // Session超时设置
- propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
- // 键的反序列化方式
- propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // 值的反序列化方式
- propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // offset偏移量规则设置:
- // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- return propsMap;
- }
- }

- @Service
- @Slf4j
- public class KafkaProducerService {
-
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- /**
- * 生产者同步向Kafka发送消息
- * @param topic 指定的主题
- * @param message 发送的消息内容
- */
- public void sendMessageBySync(String topic, String message) throws ExecutionException, InterruptedException, TimeoutException {
- kafkaTemplate.send(topic,message).get(10, TimeUnit.SECONDS);
- }
-
- /**
- * 生产者异步向Kafka发送消息
- * @param topic 指定的主题
- * @param message 发送的消息内容
- */
- public void sendMessageByAsync(String topic, String message){
- ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
-
- future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
- @Override
- public void onFailure(Throwable ex) {
- log.info("消息发送失败,具体消息为:{}",message);
- }
-
- @Override
- public void onSuccess(SendResult<Integer, String> result) {
- log.info("消息发送成功,具体消息为:{}",message);
- }
- });
- }
- }

@KafkaListener注解代表此方法监听指定topic
- @Service
- @Slf4j
- public class KafkaConsumerService {
-
- @KafkaListener(topics = "test", groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
- public void consumerMessage(String message){
- log.info("消费者消费消息{}",message);
- }
- }
- // 设定data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> sendDefault(V data);
- // 设定key、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
- // 设定partition、key、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
- // 设定partition、timestamp、key、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
- // 设定topic、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> send(String topic, V data);
- // 设定topic、key、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
- // 设定topic、partition、key、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
- // 设定topic、partition、timestamp、 key、data,向kafka发送消息
- ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
- // 创建ProducerRecord对象,在ProducerRecord中设置好topic、partion、key、value等信息,然后向kafka发送消息
- ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
- // 创建Spring的Message对象,然后向kafka发送消息
- ListenableFuture<SendResult<K, V>> send(Message<?> message);
- // 获取指标信息
- Map<MetricName, ? extends Metric> metrics();
- // 显示Topic分区信息
- List<PartitionInfo> partitionsFor(String topic);
- //在生产者上执行一些任意操作并返回结果。
- <T> T execute(ProducerCallback<K, V, T> callback);
- // 生产者刷新消息
- void flush();
-
- // 用于执行生产者方法后异步回调
- interface ProducerCallback<K, V, T> {
- T doInKafka(Producer<K, V> producer);
- }

Spring提供了监听消息的两个实现类,分别是:
KafkaMessageListenerContainer 利用单个线程来接收全部主题中全部分区上的所有消息。
ConcurrentMessageListenerContainer 代理的一个或多个 KafkaMessageListenerContainer 实例,来实现多个线程消费。
- @Configuration
- @EnableKafka
- public class ConsumerConfigDemo {
- @Bean
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> propsMap = new HashMap<>();
- propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
- propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
- propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return propsMap;
- }
-
- @Bean
- public ConsumerFactory<Integer, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- /**
- * 创建 KafkaMessageListenerContainer 实例监听 kafka 消息
- */
- @Bean
- public KafkaMessageListenerContainer demoListenerContainer() {
- // 创建container配置参数,并指定要监听的 topic 名称
- ContainerProperties properties = new ContainerProperties("test");
- // 设置消费者组名称
- properties.setGroupId("group2");
- // 设置监听器监听 kafka 消息
- properties.setMessageListener(new MessageListener<Integer,String>() {
- @Override
- public void onMessage(ConsumerRecord<Integer, String> record) {
- System.out.println("消息:" + record);
- }
- });
- return new KafkaMessageListenerContainer(consumerFactory(), properties);
- }
-
- }

(1)监听单个Topic
- @Configuration
- @EnableKafka
- public class ConsumerConfigDemo {
- @Bean
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> propsMap = new HashMap<>();
- propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
- propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
- propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return propsMap;
- }
-
- @Bean
- public ConsumerFactory<Integer, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- @Bean
- KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<Integer, String>
- factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- // 创建3个线程并发消费
- factory.setConcurrency(3);
- // 设置拉取数据超时时间
- factory.getContainerProperties().setPollTimeout(3000);
- return factory;
- }
-
- /**
- * ---使用@KafkaListener注解来标记此方法为kafka消息监听器,创建消费组group1监听test topic
- */
- @KafkaListener(topics = {"test"},groupId = "group1")
- public void kafkaListener(String message){
- System.out.println("消息:"+message);
- }
-
- }

(2)监听多个Topic
-
- @KafkaListener(topics = {"test1", "test2"}, groupId = "group1")
- public void kafkaListener(String message){
- System.out.println("消息:"+message);
- }
(3)监听某个Topic中的某个分区
- @KafkaListener(id = "id0", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "0" }) })
- public void kafkaListener1(String message) {
- System.out.println("消息:"+message);
- }
-
- @KafkaListener(id = "id1", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "1", "2" }) })
- public void kafkaListener2(String message) {
- System.out.println("消息:"+message);
- }
(4)监听多个Topic分区
-
- @KafkaListener(id = "test", group = "group1", topicPartitions = {
- @TopicPartition(topic = "test1", partitions = {"0"}),
- @TopicPartition(topic = "test2", partitions = {"0", "1"})
- })
- public void kafkaListener(String message) {
- System.out.print(message);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。