赞
踩
这里是小奏,觉得文章不错可以关注公众号小奏技术
线上的kafka
集群要进行扩容,原先的2broker
,扩容之后变成了新增3个broker
,然后下掉了原先老的broker
。
新集群看着没问题,但是出现了一个问题。
消息发送都正常,但是消息消费异常
消费者启动后一直打印如下关键log
2024-06-14 09:54:23 DEBUG o.a.k.c.c.i.ConsumerCoordinator:887 - [Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca] Received FindCoordinator response ClientResponse(receivedTimeMs=1718330063896, latencyMs=285, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-gid_xiaozou-1, correlationId=61, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='gid_xiaozou', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')]))
2024-06-14 09:54:23 DEBUG o.a.k.c.c.i.ConsumerCoordinator:914 - [Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca] Group coordinator lookup failed:
2024-06-14 09:54:23 DEBUG o.a.k.c.c.i.ConsumerCoordinator:276 - [Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca] Coordinator discovery failed, refreshing metadata
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
可以看到关键的log是CoordinatorNotAvailableException: The coordinator is not available.
进入到kafka
进去bin
目录执行如下脚本
./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers> --describe --group <your-consumer-group-id>
输出如下信息
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094 attempt(s)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094 attempt(s)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:543)
at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:313)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:542)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:558)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:367)
at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:72)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled describeGroups(api=FIND_COORDINATOR) request with correlation id 42096 due to node 6 being disconnected
看着是查询不到comsumer group
的信息,然后报了TimeoutException
异常,没有什么有用的信息
网上有一种说法是如果broker
配置文件中的offsets.topic.replication.factor
必须小于等于broker
的数量,否则会出现The coordinator is not available
的问题。
比如broker
的数量是3,offsets.topic.replication.factor
是4,就会出现这个问题。
实际我这边是不会出现这个问题,因为我的broker
数量是3,offsets.topic.replication.factor
配置的是2
删除topic
也没用,还是会出现The coordinator is not available
的问题
kafka
的消费位点主要通过__consumer_offsets
这个topic来管理
由于本次为了迁移简单,同时线上的业务数据可以丢弃,所以并没有进行topic
分区迁移,而是直接删除了topic
,然后重建topic
。
然后我们查看__consumer_offsets
这个topic
的分区情况
点进topic
详情发现 partition 还是仅在老的broker
中,即已经下掉的topic。这就导致新的broker
没有__consumer_offsets
这个topic
删除掉__consumer_offsets
这个topic
,然后系统会自动重建,就可以解决了。
如果想更平滑的方式也可以考虑对__consumer_offsets
进行分区迁移,注意重建后使用原来的comsumer group
好像消费还是会有问题,如果使用原来的comsumer group
还是消费异常,换个comsumer group
就好了
如果kafka能正常发送消息,但是消费异常,一般是消费位点出现了问题,即管理消费位点__consumer_offsets
的这个toipc
目前来看新增了3个broker
kafka并没有自动对__consumer_offsets
进行分区迁移,需要手动进行迁移
所以后续出现消费相关的问题可以优先检查__consumer_offsets
这个topic
的情况,毕竟kafka
得消费位点都依赖于__consumer_offsets
这个topic
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。