当前位置:   article > 正文

Kafka 高级特性_kafka 最新特性

kafka 最新特性

 1、Offset自动控制

Kafka消费者默认对于未订阅的topic的offset的时候,也就是系统并没有存储该消费者的消费分区的记录信息,默认Kafka消费者的默认消费策略:latest

auto.offset.reset=latest

  • earliest - 将偏移量重置为最早的偏移量,即消费者会消费生产者所有的消息
  • latest - 将偏移量重置为最新的偏移量,消费者只消费自己启动后,生产者的最新的消息
  • none - 如果未找到消费者组的先前偏移量,则向消费者抛出异常

消费者在消费数据的时候默认会定期自动提交偏移量,用户可以通过以下两个参数配置: enable.auto.commit = true  默认

auto.commit.interval.ms = 5000 默认

如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是kafka消费者下一次抓取数据的位置。

2、Acks & Retries

Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间Ack应答,如果没有在规定时间未应答,Kafka生产者会尝试n次重新发送消息。

acks默认为1

  • acks=1  Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下就会对生产者发出做出确认响应,生产者就会认为消息已经发送;在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,会造成记录将丢失。
  • acks=0  生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。
  • acks=all  Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。

如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制。 request.timeout.ms = 30000  默认 retries = 2147483647 默认 

3、幂等性

幂等性:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

Kafka的0.11.0.0版本新增了对幂等的支持。幂等生产者的新增特性,幂等保证了生产者发送的消息,不会丢失和重复。如何实现幂等的关键是服务端是否可以区分消息的重复。

区分请求是否重复的有两点:

  • 唯一标识:区分请求是否重复,需要要求请求中有唯一标识。如订单支付,订单号就可以看作是唯一标识:
  • 记录已处理过的请求:仅有唯一标识还不够,还需记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的唯一标识和处理记录进行比较,如果处理记录中有相同的唯一标识,说明是重复记录,不会重复消费。

kafka实现幂等,需要停止多次处理消息,将其持久化到Kafka Topic中仅一次。在生产者初始化期间,kafka会给每个生产者生成一个唯一的ID称为Producer ID或PID。 PID、序列号(sequenceNumeber,注意序列号粒度为TopicPartition维度,即细致化到分区粒度,每个分区的序列号是单调递增 )、消息绑定在一起发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID在当前TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。 enable.idempotence= false 默认不开启, 

  • 在使用幂等性的时候,要求必须开启acks=all 与retries机制
  • 由于允许重试机制,可能会造成消息乱序;因此可以将通过配置
    max.in.flight.requests.per.connection=1。max.in.flight.requests.per.connection配置代表着一个Producer同时可以发送的未收到确认的消息数量。如果max.in.flight.requests.per.connection数量大于1,那么可能发送了message1后,在没有收到确认前就发送了message2,此时 message1发送失败后触发重试,而 message2直接发送成功,就造成了Broker上消息的乱序。max.in.flight.requests.per.connection的默认值为5

4、事务控制 

Kafka的幂等性,只能保证一条记录的在分区发送的原子性,但是如果要保证多条记录在多个分区的的原子性,这个时候就需要开启kafk的事务操作。 在Kafka0.11.0.0引入了事务。通常Kafka的事务分为生产者事务与消费者事务。消费者消费的消息的默认设置为read_uncommited,会导致读取到生产者发送的失败的事务,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别 isolation.level    = read_committed。开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求"transactional.id"的取值必须是唯一的,同一时刻只能有一个"transactional.id"存储在,其他的将会被关闭。

5、消费者与生产者

有时候消费者再接收到到信息后,需要生产者对消息再次处理,此时生产者作为消息的转发中介,代码如下

  1. //定义消费者
  2. public static KafkaConsumer<String,String> buildKafkaConsumer(String group){
  3. Properties props=new Properties();
  4. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"A:9092,B:9092,C:9092");
  5. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  6. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  7. props.put(ConsumerConfig.GROUP_ID_CONFIG,group);
  8. //消费者关闭自动提交
  9. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  10. //开启消费者事务
  11. props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
  12. return new KafkaConsumer<String, String>(props);
  13. }
  1. //定义生产者
  2. public static KafkaProducer<String,String> buildKafkaProducer(){
  3. Properties props=new Properties();
  4. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"A:9092,B:9092,C:9092");
  5. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  6. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  7. //开启生产者事务
  8. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id");
  9. return new KafkaProducer<String, String>(props);
  10. }
  1. public static void main(String[] args) {
  2. //1.生产者&消费者
  3. KafkaProducer<String,String> producer=buildKafkaProducer();
  4. KafkaConsumer<String, String> consumer = buildKafkaConsumer("group01");
  5. //订阅topic01的消息
  6. consumer.subscribe(Arrays.asList("topic01"));
  7. //初始化事务
  8. producer.initTransactions();
  9. try{
  10. while(true){
  11. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
  12. Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
  13. //开启事务控制
  14. producer.beginTransaction();
  15. Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();
  16. while (consumerRecordIterator.hasNext()){
  17. ConsumerRecord<String, String> record = consumerRecordIterator.next();
  18. //创建Record,送到topic02
  19. ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>("topic02",record.key(),record.value());
  20. producer.send(producerRecord);
  21. //缓冲的消息都被发送到Kafka服务器
  22. producer.flush();
  23. //记录元数据
  24. offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));
  25. }
  26. //提交消费组的offset
  27. producer.sendOffsetsToTransaction(offsets,"group01");
  28. //提交事务
  29. producer.commitTransaction();
  30. }
  31. }catch (Exception e){
  32. producer.abortTransaction();//终止事务
  33. }finally {
  34. producer.close();
  35. }
  36. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/942612
推荐阅读
相关标签
  

闽ICP备14008679号