当前位置:   article > 正文

[springboot配置Kafka] springboot配置多个kafka,包含账号密码

springboot配置多个kafka

说明

本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分

1.引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
  • 1
  • 2
  • 3
  • 4

2.yml配置

spring:
  kafka:
    one:
    #测试环境
      bootstrap-servers: 127.0.0.1:9092
      topic: default_topic
      properties:
        security:
          protocol: SASL_PLAINTEXT
        sasl:
          mechanism: SCRAM-SHA-512
          jaas:
            config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName
        #关闭自动提交
        enable-auto-commit: false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5
    two:
      #测试环境
      bootstrap-servers: 127.0.0.1:9092
      topic: default_topic_two
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName_two
        #关闭自动提交
        enable-auto-commit: false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

3.新建配置文件

3.1配置文件_1

@Configuration
@EnableKafka
public class K1kafkaConfiguration {

    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.one.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.one.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.one.properties.security.protocol}")
    private String securityprotocol;
    @Value("${spring.kafka.one.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.one.properties.sasl.jaas.config}")
    private String jaasconfig;


    //@Value("${spring.kafka.one.producer.linger-ms}")
    //private Integer lingerMs;
    //@Value("${spring.kafka.one.producer.max-request-size}")
    //private Integer maxRequestSize;
    //@Value("${spring.kafka.one.producer.batch-size}")
    //private Integer batchSize;
    //@Value("${spring.kafka.one.producer.buffer-memory}")
    //private Integer bufferMemory;


    //@Bean
    //public KafkaTemplate<String, String> kafkaOneTemplate() {
    //    return new KafkaTemplate<>(producerFactory());
    //}
    @Bean
    @Primary
//理解为默认优先选择当前容器下的消费者工厂
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        //并发数量
        factory.setConcurrency(1);
        //开启批量监听
         //factory.setBatchListener(type);
        // 被过滤的消息将被丢弃
        // factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        // 设置记录筛选策略
        //factory.setRecordFilterStrategy(new RecordFilterStrategy() {
        //    @Override
        //    public boolean filter(ConsumerRecord consumerRecord) {
        //        String msg = consumerRecord.value().toString();
        //        if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){
        //            return false;
        //        }
        //        // 返回true消息将会被丢弃
        //        return true;
        //    }
        //});
        return factory;
    }

    //private ProducerFactory<String, String> producerFactory() {
    //    return new DefaultKafkaProducerFactory<>(producerConfigs());
    //}
    @Bean//第一个消费者工厂的bean
    public ConsumerFactory<Integer, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    //private Map<String, Object> producerConfigs() {
    //    Map<String, Object> props = new HashMap<>();
    //    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    //    props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
    //    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
    //    props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
    //    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
    //    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //    return props;
    //}


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put("security.protocol",securityprotocol);
        props.put("sasl.mechanism",mechanism);
        props.put("sasl.jaas.config",jaasconfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

3.2配置文件_2

@Configuration
@EnableKafka
public class K2kafkaConfiguration {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.two.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.two.consumer.max-poll-records}")
    private String maxPollRecords;


  

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        //并发数量
        factory.setConcurrency(1);
        //开启批量监听
        //factory.setBatchListener(type);
        // 被过滤的消息将被丢弃
        // factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        return factory;
    }


    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

 

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

4.设置消费

4.1 设置消费_1

@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer {

    @KafkaListener(topics = "#{'${spring.kafka.one.topic}'}", groupId = "defaultName",containerFactory = "kafkaListenerContainerFactory")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("[Consumer] 接收到kafka消息:{}",record.value());
        System.out.println(record);
        System.out.println(record.value());
        //手动提交offset
        //ack.acknowledge();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

4.2 设置消费_2

@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer2 {

    @KafkaListener(topics = "#{'${spring.kafka.two.topic}'}", groupId = "defaultName_two",containerFactory = "kafkaTwoContainerFactory")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("[Consumer2 ] 接收到kafka消息:{}",record.value());
        System.out.println(record);
        System.out.println(record.value());
        //手动提交offset
        //ack.acknowledge();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/951035
推荐阅读
相关标签
  

闽ICP备14008679号