赞
踩
查看Kafka源码,发现生产者这个配置(enable.idempotence)有一个说明:
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires <code>max.in.flight.requests.per.connection</code> to be less than or equal to 5, <code>retries</code> to be greater than 0 and <code>acks</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a <code>ConfigException</code> will be thrown.
查看Kafka官网,对改配置也有相同的说明:
总结一下,服务端要支持生产幂等性的话,需要保证以下几个配置:
以下是为什么需要这么配置的原因:
这些配置的组合可以确保在各种故障情况下,消息的顺序、可靠性和一致性都能得到保证。
因为集群开启了ACL认证,所以还需要开启幂等写的权限,执行以下命令进行开启:
./kafka-acls.sh --bootstrap-server kafka-m2wi5kig-headless.kafka-pro.svc.xadd.staff.xdf.cn:29092 --command-config m2wi5kig.properties --add --allow-principal
User:kafka-m2wi5kig.plain1 --topic 'cdata_flink_kafka_test' --producer --idempotent
–command-config 需要指定该集群的admin账号及密码,格式如下:
security.protocol = SASL_PLAINTEXT
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";
执行完之后,可以看到权限中已经包含IDEMPOTENT_WRITE了:
用户重试了,不在报错,问题解决。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。