当前位置:   article > 正文

Kafka使用教程_enablekafka

enablekafka

Kafka使用教程

参考:https://blog.csdn.net/zhizhi120/article/details/127727810

一、配置类

1.1、kafka消费者配置

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
	// kafka 消费者集群
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    // 开启自动提交
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;
    // 自动提交延迟
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;
    // 批量拉取个数
    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;
    // 重置消费者的offset
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    // 拉取超时时间
    @Value("${spring.kafka.listener.poll-timeout}")
    private Long pollTimeout;
    // coordinator感知consumer崩溃所需时间,默认10秒
    @Value("${spring.kafka.consumer.session-timeout}")
    private String sessionTimeout;
    // 是否批量拉取
    @Value("${spring.kafka.listener.batch-listener}")
    private Boolean batchListener;
    // 最多并发数
    @Value("${spring.kafka.listener.concurrency:3}")
    private Integer concurrency;
    // 两次poll的最大时间间隔
    @Value("${spring.kafka.consumer.max-poll-interval}")
    private Integer maxPollInterval;
    // 1个请求中每个分区可以获取的最大字节数
    @Value("${spring.kafka.consumer.max-partition-fetch-bytes}")
    private Integer maxPartitionFetchBytes;
    // kafka的分区分配策略
    @Value("${spring.kafka.consumer.partition-assignment-strategy:}")
    private String partitionAssignmentStrategy;

    public KafkaConsumerConfig() {
    }
	/**
	* 监听工厂
	*/
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.consumerFactory());
        // 批量获取开关
        factory.setBatchListener(this.batchListener);
        // 并发数量
        factory.setConcurrency(this.concurrency);
        // 设置拉取时间超时的时间间隔
        factory.getContainerProperties().setPollTimeout(this.pollTimeout);
        // 手动提交
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        Properties consumerProperties = this.initKafkaConsumerProperties();
        if (!consumerProperties.isEmpty()) {
            factory.getContainerProperties().setKafkaConsumerProperties(consumerProperties);
        }

        return factory;
    }
    
	/**
     * kafka消费者工厂
     */
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(this.consumerConfigs());
    }

	/**
     * kafka消费者配置
     */
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = this.init();
        // key 序列化器选择
        props.put("key.deserializer", StringDeserializer.class);
        // value 序列化器选择
        props.put("value.deserializer", StringDeserializer.class);
        return props;
    }

	// 初始化kafka消费者配置
    private Properties initKafkaConsumerProperties() {
        Properties properties = new Properties();
        // 设置kafka的分区分配策略
        if (!this.partitionAssignmentStrategy.isEmpty()) {
            properties.setProperty("partition.assignment.strategy", this.partitionAssignmentStrategy);
        }

        return properties;
    }

    private Map<String, Object> init() {
        Map<String, Object> props = new HashMap();
        props.put("auto.commit.interval.ms", this.autoCommitInterval);
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("enable.auto.commit", this.autoCommit);
        props.put("max.poll.records", this.maxPollRecords);
        props.put("auto.offset.reset", this.autoOffsetReset);
        props.put("session.timeout.ms", this.sessionTimeout);
        props.put("max.poll.interval.ms", this.maxPollInterval);
        props.put("max.partition.fetch.bytes", this.maxPartitionFetchBytes);
        return props;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

1.2、kafka生产者配置

@Configuration
@EnableKafka
public class KafkaProducerConfig {
	// kafka 集群
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
    // 批次大小
    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;
    // 重试次数
    @Value("${spring.kafka.producer.retries}")
    private Integer retries;
    // 缓冲区大小
    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;
    // 等待时间
    @Value("${spring.kafka.producer.linger}")
    private Integer linger;

    public KafkaProducerConfig() {
    }
	// 配置初始化
    private Map<String, Object> init() {
        Map<String, Object> props = new HashMap();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("retries", this.retries);
        props.put("batch.size", this.batchSize);
        props.put("linger.ms", this.linger);
        props.put("buffer.memory", this.bufferMemory);
        return props;
    }
	// 生产者配置
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = this.init();
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        return props;
    }
	// 生产者工厂
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory(this.producerConfigs());
    }
	// 创建kafkaTemplate模板,并注入spring中
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(this.producerFactory());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

1.3、kafka服务接口类

public interface IKafkaService {
    void send(String topic, String message);

    void send(KafkaContext kafkaContext);

    void sendWithoutCallback(String topic, String key, String value);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.4、kafka服务实现类

public class KafkaService implements IKafkaService {
    private static final Logger log = LoggerFactory.getLogger(KafkaService.class);
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public KafkaService() {
    }

    @Async
    public void send(String topic, String message) {
        try {
            KafkaContext kafkaContext = new KafkaContext(topic, message);
            this.send(kafkaContext);
        } catch (Exception var4) {
            log.error("topic {} -- message {}", topic, message);
            log.error("消息发送异常", var4);
        }

    }

    @Async
    public void send(final KafkaContext kafkaContext) {
        ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(kafkaContext.getTopic(), kafkaContext.getPartition(), kafkaContext.getTimestamp(), kafkaContext.getKey(), kafkaContext.getMessage());
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            public void onSuccess(SendResult<String, String> result) {
                RecordMetadata metaData = result.getRecordMetadata();
                KafkaService.log.debug("topic {}, partition: {}, offset: {}  value: {}", new Object[]{metaData.topic(), metaData.partition(), metaData.offset(), kafkaContext.getMessage()});
            }

            public void onFailure(Throwable throwable) {
                KafkaService.log.error("发送消息失败: {} ", kafkaContext.getMessage(), throwable);
            }
        });
    }

    public void sendWithoutCallback(String topic, String key, String value) {
        this.kafkaTemplate.send(topic, key, value);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

1.5 kafka内容

public class KafkaContext {
    Long timestamp;
    private String topic;
    private String key;
    private String message;
    private Integer partition;

    public KafkaContext(String topic, String message) {
        this.topic = topic;
        this.message = message;
    }

    public Long getTimestamp() {
        return this.timestamp;
    }

    public String getTopic() {
        return this.topic;
    }

    public String getKey() {
        return this.key;
    }

    public String getMessage() {
        return this.message;
    }

    public Integer getPartition() {
        return this.partition;
    }

    public void setTimestamp(final Long timestamp) {
        this.timestamp = timestamp;
    }

    public void setTopic(final String topic) {
        this.topic = topic;
    }

    public void setKey(final String key) {
        this.key = key;
    }

    public void setMessage(final String message) {
        this.message = message;
    }

    public void setPartition(final Integer partition) {
        this.partition = partition;
    }

    public KafkaContext() {
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

1.6 使用举例 - 生产者端

KafkaContext kafkaContext=new KafkaContext();
                kafkaContext.setTopic(LeaseMq**.LEASE_OVERDUE_UPDATE_BRANCH_DATA_TOPIC);
                kafkaContext.setKey(branch**.getBranchId());
                kafkaContext.setMessage(JSONObject.toJSONString(branch**));
                kafkaService.send(kafkaContext);
  • 1
  • 2
  • 3
  • 4
  • 5

1.7 使用举例 - 消费者端

/**
     * 消费
     *
     * @param records
     * @param ack
     */
    @KafkaListener(containerFactory = "kafkaListenerContainerFactory",
            id = LeaseMqTopicAndGroup.LEASE_OVERDUE_UPDATE_BRANCH_DATA_TOPIC,
            topics = LeaseMqTopicAndGroup.LEASE_OVERDUE_UPDATE_BRANCH_DATA_TOPIC, concurrency = "10")
    public void onMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        try {
            for (ConsumerRecord<?, ?> record : records) {
                BranchLeaseStatusInfo branchLeaseStatusInfo = JSON.parseObject(JSON.parse(record.value().toString()).toString(), Branch**.class);
JSONObject.toJSONString(branch**));
                leaseService.updateLBStatus(branch**);
            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            ack.acknowledge();//手动提交偏移量
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号