赞
踩
Kafka消费者默认对于未订阅的topic的offset的时候,也就是系统并没有存储该消费者的消费分区的记录信息,默认Kafka消费者的默认消费策略:latest
auto.offset.reset=latest
消费者在消费数据的时候默认会定期自动提交偏移量,用户可以通过以下两个参数配置: enable.auto.commit = true 默认
auto.commit.interval.ms = 5000 默认
如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是kafka消费者下一次抓取数据的位置。
Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间Ack应答,如果没有在规定时间未应答,Kafka生产者会尝试n次重新发送消息。
acks默认为1
如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制。 request.timeout.ms = 30000 默认 retries = 2147483647 默认
幂等性:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
Kafka的0.11.0.0版本新增了对幂等的支持。幂等生产者的新增特性,幂等保证了生产者发送的消息,不会丢失和重复。如何实现幂等的关键是服务端是否可以区分消息的重复。
区分请求是否重复的有两点:
kafka实现幂等,需要停止多次处理消息,将其持久化到Kafka Topic中仅一次。在生产者初始化期间,kafka会给每个生产者生成一个唯一的ID称为Producer ID或PID。 PID、序列号(sequenceNumeber,注意序列号粒度为TopicPartition维度,即细致化到分区粒度,每个分区的序列号是单调递增 )、消息绑定在一起发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID在当前TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。 enable.idempotence= false 默认不开启,
max.in.flight.requests.per.connection=1。max.in.flight.requests.per.connection配置代表着一个Producer同时可以发送的未收到确认的消息数量。如果max.in.flight.requests.per.connection数量大于1,那么可能发送了message1后,在没有收到确认前就发送了message2,此时 message1发送失败后触发重试,而 message2直接发送成功,就造成了Broker上消息的乱序。max.in.flight.requests.per.connection的默认值为5
Kafka的幂等性,只能保证一条记录的在分区发送的原子性,但是如果要保证多条记录在多个分区的的原子性,这个时候就需要开启kafk的事务操作。 在Kafka0.11.0.0引入了事务。通常Kafka的事务分为生产者事务与消费者事务。消费者消费的消息的默认设置为read_uncommited,会导致读取到生产者发送的失败的事务,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别 isolation.level = read_committed。开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求"transactional.id"的取值必须是唯一的,同一时刻只能有一个"transactional.id"存储在,其他的将会被关闭。
有时候消费者再接收到到信息后,需要生产者对消息再次处理,此时生产者作为消息的转发中介,代码如下
- //定义消费者
- public static KafkaConsumer<String,String> buildKafkaConsumer(String group){
- Properties props=new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"A:9092,B:9092,C:9092");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
- props.put(ConsumerConfig.GROUP_ID_CONFIG,group);
- //消费者关闭自动提交
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
- //开启消费者事务
- props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
-
- return new KafkaConsumer<String, String>(props);
- }
- //定义生产者
- public static KafkaProducer<String,String> buildKafkaProducer(){
- Properties props=new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"A:9092,B:9092,C:9092");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //开启生产者事务
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id");
- return new KafkaProducer<String, String>(props);
- }
- public static void main(String[] args) {
-
- //1.生产者&消费者
- KafkaProducer<String,String> producer=buildKafkaProducer();
- KafkaConsumer<String, String> consumer = buildKafkaConsumer("group01");
- //订阅topic01的消息
- consumer.subscribe(Arrays.asList("topic01"));
- //初始化事务
- producer.initTransactions();
-
- try{
- while(true){
- ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
- Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
- //开启事务控制
- producer.beginTransaction();
- Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();
- while (consumerRecordIterator.hasNext()){
- ConsumerRecord<String, String> record = consumerRecordIterator.next();
- //创建Record,送到topic02
- ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>("topic02",record.key(),record.value());
- producer.send(producerRecord);
- //缓冲的消息都被发送到Kafka服务器
- producer.flush();
- //记录元数据
- offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));
- }
- //提交消费组的offset
- producer.sendOffsetsToTransaction(offsets,"group01");
- //提交事务
- producer.commitTransaction();
- }
- }catch (Exception e){
- producer.abortTransaction();//终止事务
- }finally {
- producer.close();
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。