赞
踩
副本的协同机制请移步:kafka 消息分发机制、分区和副本机制(三、分区的副本机制)
副本还有一个重要的机制,就是数据同步过程,它需要解决
深红色部分表示 test_replica 分区的 leader 副本,另外两个节点上浅色部分表示 follower 副本
Producer 在发布消息到某个 Partition 时:
get /brokers/topics/partitions/2/state
,然后无论该 Topic 的 Replication Factor 为多 少(也即该Partition 有多少个 Replica),Producer 只将该消息发送到该 Partition 的 Leader。LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下 一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外, leader LEO和follower LEO的更新是有区别的。
HW:即上面提到的水位值(Hight Water)。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的 HW更新是有区别的。
通过下面这幅图来表达LEO、HW的含义,随着 follower 副本不断和 leader 副本进行数据同步,follower 副本的 LEO 会主键后移并且追赶到 leader 副本,这个追赶上的判断标准是当前副本的 LEO 是否大于或者等于 leader 副本的 HW,这个追赶上也会使得被踢出的 follower 副本重新加入到 ISR 集合中。
假如说下图中的最右侧的 follower 副本被踢出 ISR 集合,也会导致这个分区的 HW 发生变化,变成了3。
leader 和 follower 的 HW 和 LEO 都是0,leader 副本会保存 remote LEO,表示所有 follower LEO,也会被初始化为0,这个时候,producer没有发送消息。follower会不断地个 leade r发送 FETCH 请求,但是因为没有数据,这个请求会被 leader 寄存,当在指定的时间之后会强制完成请求,这个时间配置是(replica.fetch.wait.max.ms),如果在指定时间内 producer 有消息发送过来,那么 kafka 会唤醒 fetch 请求,让 leader 继续处理
数据的同步处理会分两种情况,这两种情况下处理方式是不一样的
leader 处理完 producer 请求之后,follower 发送一个fetch请求过来 。状态图如下:
leader 副本收到请求以后,会做几件事情:
follower 发送 fetch 请求,leader 副本的处理逻辑是:
读取 log 数据、更新 remote LEO=0 ( follower 还没有写入这条消息,这个值是根据 follower 的 fetch 请求中的 offset 来确定的)
尝试更新 HW,因为这个时候 LEO 和 remoteLEO 还是不一致,所以仍然是 HW=0
把消息内容和当前分区的 HW 值发送给 follower 副本
follower副本收到response以后:
第一次交互结束以后,HW 仍然还是0,这个值会在下一次 follower 发起 fetch 请求时被更新
follower 发第二次 fetch 请求,leader 收到请求以后:
follower 副本收到 response 以后:
到目前为止,数据的同步就完成了,意味着消费端能够消费 offset=1 这条消息。
前面说过,由于 leader 副本暂时没有数据过来,所以 follower 的 fetch 会被阻塞,直到等待超时或者 leader 接收到新的数据。
当 leader 收到请求以后会唤醒处于阻塞的 fetch 请求。处理过程基本上和前面说的一致
kafka 使用 HW 和 LEO的方式来实现副本数据的同步,本身是一个好的设计,但是在这个地方会存在一个数据丢失的问题,当然这个丢失只出现在特定的背景下。我们回想一下,HW 的值是在新的一轮FETCH 中才会被更新。我们分析下这个过程为什么会出现数据丢失。
前提:min.insync.replicas=1
//设定 ISR
中 的最小副本数是多少,默认值为1
(在server.properties
中配 置), 并且acks
参数设置为-1
(表示需要所有副本确认)时,此参数才生效。
表达的含义是,至少需要多少个副本同步才能表示消息是提交的, 所以,当 min.insync.replicas=1 的时候,一旦消息被写入 leader 端 log 即被认为是“已提交”,而延迟一轮 FETCH RPC 更新 HW 值的设计使得 follower HW 值是异步延迟更新的,倘若在这个过程中 leader 发生变更,那么成为新 leader 的 follower的 HW 值就有可能是过期的,使得 clients 端认为是成功提交的消息被删除。
producer 的 ack:
acks 配置表示 producer 发送消息到 broker上 以后的确认值。有三个可选项
在 kafka0.11.0.0 版本之后,引入了一个 leader epoch 来解决这个问题,所谓的 leader epoch 实际上是 一对值(epoch,offset),epoch代表leader的版本号,从0开始递增,当 leader 发生过变更,epoch 就+1,而 offset 则是对应这个 epoch 版本的 leader 写入第一条消息的 offset。
比如 (0,0), (1,50) ,表示第一个leader从offset=0开始写消息,一共写了50条。第二个leader版本号是1,从 offset=50开始写,这个信息会持久化在对应的分区的本地磁盘上,文件名是 /tmp/kafka-log/topic/leader-epoch-checkpoint
。
leader broker 中会保存这样一个缓存,并且定期写入到 checkpoint 文件中
当 leader 写 log 时它会尝试更新整个缓存: 如果这个 leader 首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为 leader 时会查询这部分缓存,获取出对应 leader 版本的 offset。
我们基于同样的情况来分析,follower 宕机并且恢复之后,有两种情况,如果这个时候 leader 副本没有挂,也就是意味着没有发生 leader 选举,那么 follower 恢复之后并不会去截断自己的日志,而是先发送 一个 OffsetsForLeaderEpochRequest
请求给到 leader 副本,leader 副本收到请求之后返回当前的 LEO。
如果 follower 副本的 leaderEpoch 和 leader 副本的 epoch 相同, leader 的 LEO 只可能大于或者等于 follower副本的 LEO 值,所以这个时候不会发生截断。
如果 follower 副本和 leader 副本的 epoch 值不同,那么 leader 副本会查找 follower 副本传过来的 epoch+1 在本地文件中存储的 StartOffset 返回给 follower 副本,也就是新 leader 副本的 LEO。这样也避免了数据丢失的问题。
如果 leader 副本宕机了重新选举新的 leader,那么原本的 follower 副本就会变成 leader,意味着 epoch 从0变成1,使得原本 followe r副本中 LEO 的值的到了保留。
① 优先从 isr 列表中选出第一个作为 leader 副本,这个叫优先副本,理想情况下有限副本就是该分区的 leader 副本
② 如果 isr 列表为空,则查看该 topic 的unclean.leader.election.enable
配置。 unclean.leader.election.enable
:为 true 则代表允许选用非 isr 列表的副本作为 leader,那么此时就意味着数据可能丢失,为 false的话,则表示不允许,直接抛出 NoReplicaOnlineException 异常,造成 leader 副本选举失败。
able配置。
unclean.leader.election.enable`:为 true 则代表允许选用非 isr 列表的副本作为 leader,那么此时就意味着数据可能丢失,为 false的话,则表示不允许,直接抛出 NoReplicaOnlineException 异常,造成 leader 副本选举失败。
③ 如果上述配置为 true,则从其他副本中选出一个作为 leader 副本,并且 isr 列表只包含该 leader 副本。一旦选举成功,则将选举后的 leader 和 isr 和其他副本信息写入到该分区的对应的 zk 路径上。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。