赞
踩
!](https://img-blog.csdnimg.cn/85d5d31e5bab470ba5abdabd930f458b.png)
介绍:生产者发送过来的数据,不需要等数据落盘应答
数据可靠性分析:容易丢数据
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了
介绍:生产者发送过来的数据,Leader收到数据后应答
数据可靠性分析:容易丢数据
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms
参数设定,默认30s。这样就不用等长期联系不上或者已经故障的节点
数据完全可靠条件=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerAck {
public static void main(String[] args) {
// 1.创建kafka配置对象
Properties properties = new Properties();
// 2.配置对应参数
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 设置ack
properties.put(ProducerConfig.ACKS_CONFIG,"all");
// 重试次数retries,默认是int最大值2147483647
properties.put(ProducerConfig.RETRIES_CONFIG,3);
// 3.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 4.调用send方法,发送消息
for(int i=0;i<5;i++){
kafkaProducer.send(new ProducerRecord<>("first","testMessage"+1));
}
// 5.关闭资源
kafkaProducer.close();
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerAck {
public static void main(String[] args) {
// 1.创建kafka配置对象
Properties properties = new Properties();
// 2.配置对应参数
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 设置ack
properties.put(ProducerConfig.ACKS_CONFIG,"all");
// 重试次数retries,默认是int最大值2147483647
properties.put(ProducerConfig.RETRIES_CONFIG,3);
// 3.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 4.调用send方法,发送消息
for(int i=0;i<5;i++){
kafkaProducer.send(new ProducerRecord<>("first","testMessage"+1));
}
// 5.关闭资源
kafkaProducer.close();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。