赞
踩
消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。
Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。
多个分区会分布在Kafka集群的不同服务节点上,消息以追加的方式写入一个或多个分区中。
每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。
每个分区中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到 Partition 中,每个消息都有一个连续的序列号称之为 Offset 偏移量,用于在 Partition 内唯一标识消息。
消息是 Kafka 中存储的最小最基本的单位,即为一个 commit log,由一个固定长度的消息头和一个可变长度的消息体组成。
消息的生产者,负责发布消息到 Kafka Broker,生产者在默认情况下把消息均衡地分布到主题的所有分区上,用户也可以自定义分区器来实现消息的分区路由。
消息的消费者,从 Kafka Broker 读取消息的客户端,消费者把每个分区最后读取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
每个 Consumer 属于一个特定的 Consumer Group(若不指定 Group Name则属于默认的 group),一个或多个 Consumer 组成的群组可以共同消费一个 Topic 中的消息,但每个分区只能被群组中的一个消费者操作。
__consumer_offsets 这个内部 Topic,专门用来存储 Consumer Group 消费的情况,默认情况下有 50 个 partition,每个 partition 默认三个副本
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
Topic是逻辑上的概念,而partition(分区)是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到哪个offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引的机制,将每个partition分为多个segment。(由log.segment.bytes决定,控制每个segment的大小,也可通过log.segment.ms控制,指定多长时间后日志片段会被关闭)每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:bing这个topic有3个分区,则其对应的文件夹为:bing-0、bing-1和bing-2。
索引文件和日志文件命名规则:每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64位的长整形数,固定是20位数字,长度未达到,用 0 进行填补。如下图所示:
index和log文件以当前segment的第一条消息的offset命名。index文件记录的是数据文件的offset和对应的物理位置,正是有了这个index文件,才能对任一数据写入和查看拥有O(1)的复杂度,index文件的粒度可以通过参数log.index.interval.bytes来控制,默认是是每过4096字节记录一条index。下图为index文件和log文件的结构示意图:
查找message的流程(比如要查找offset为170417的message):
当日志片段大小达到log.segment.bytes指定的上限(默认是1GB)或者日志片段打开时长达到log.segment.ms时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。**当前正在写入的片段叫做活跃片段,活跃片段永远不会被删除,**所以如果你要保留数据1天,但是片段包含5天的数据,那么这些数据就会被保留5天,因为片段被关闭之前,这些数据无法被删除。
Kafka的复制机制和分区的多副本架构是kafka可靠性保证的核心。把消息写入多个副本可以使kafka在发生奔溃时仍能保证消息的持久性。
kafka的topic被分成多个分区,分区是基本的数据块。每个分区可以有多个副本,其中一个是首领。所有事件都是发给首领副本,或者直接从首领副本读取事件。其他副本只需要与首领副本保持同步,并及时复制最新的事件。
Leader维护了一个动态的in-sync replica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据同步后,leader就会发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader不可用时,将会从ISR中选举新的leader。满足以下条件才能被认为是同步的:
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等ISR中的follower全部接收成功。所以Kafka提供了三种可靠性级别,用户可以根据对可靠性和延迟的要求进行权衡。acks:
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。
等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
leader发生故障后,会从ISR中选出一个新的leader,之后为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Kafka 的producer 发送消息采用的是异步发送的方式。在消息发送过程中,涉及到了两个线程——main线程和sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
为了提高效率,消息被分批次写入kafka。批次就是一组消息,这些消息属于同一个主题和分区。(如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销。不过要在时间延迟和吞吐量之间做出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长)。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
相关参数
offset的维护
由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到哪个位置,以便故障恢复后继续消费。Kafka0.9版本之前,Consumer默认将offset保存在zookeeper中,从0.9版本开始,Consumer默认将offset保存在Kafka一个内置的名字叫_consumeroffsets的topic中。默认是无法读取的,可以通过设置consumer.properties中的exclude.internal.topics=false来读取。
注意:第一个步骤,之前版本:从zk的 /brokers/topics/saas-device-isapi-topic/partitions/1/state下取;
目前版本:根据实际遇到的问题,是从control去拿到patition的leader节点信息;(broker的节点映射错误后,会取不到leader,比如(host文件故意映射交换control的ip与普通的broker的ip))
第一步流程解释下:leader节点是存在zk的 get /brokers/topics/saas-device-isapi-topic/partitions/1/state 目录下 思考? 这个kafka会不会缓存在自己本地,减少与zk的交互?
这里可以看到 leader :1, 还有个isr
既然大家已经知道了Partiton的多副本同步数据的机制了,那么就可以来看看ISR是什么了。
ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。
大家可以想一下 ,如果说某个Follower所在的Broker因为JVM FullGC之类的问题,导致自己卡顿了,无法及时从Leader拉取同步数据,那么是不是会导致Follower的数据比Leader要落后很多?
所以这个时候,就意味着Follower已经跟Leader不再处于同步的关系了。但是只要Follower一直及时从Leader同步数据,就可以保证他们是处于同步的关系的。
所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。
1、配置同步到所有副本(即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。),且重试次数>1 ,具体根据业务来定
// 重试次数,0为不启用重试机制,幂等性的时候必须大于0 props.put(ProducerConfig.ACKS_CONFIG, "all"); |
2、acks=all 就可以代表数据一定不会丢失了吗?
当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?
当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。
所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。
这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。
指定对应的key |
@Override |
kafka中计算发送对应分组的源代码
这里使用了xmind 整理
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件)
consumer采用pull(拉)模式从broker中读取数据。
1、push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
2、 pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)
1、kafka中的幂等
//幂等性 |
2、业务上的幂等 (msg信息中需要有唯一标识)
1、使用数据库的唯一索引
2、使用redis
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller
1.1 、kafka 中存在一个__consumer_offsets topic 是专门维护所有topic的偏移量,这个topic下面有很多个分区(默认情况下__consumer_offsets有50个分区),每个topic下的分区节点维护在zk中
这个topic下面有50个分区
每个分区的leader不同,并不是只有一个leader维护这个topic,每个partion都有各自的leader
topic过多,导致分区过多,kafka中只要还是会受分区数量的影响;
如下链接是说明kafka与分区数量的关系影响
参考链接:Apache Kafka Supports 200K Partitions Per Cluster | Confluent
翻译链接:https://www.cnblogs.com/huxi2b/p/9984523.html
Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?那就要来看看消费者端配置的几个参数:
kill -9快速关闭,不够优雅。bin目录下Kafka- server- stop.sh脚本工具,首先通过ps ax的方式找出正在运行Kafka的进程号pids,然后使用kill-s TERM $PIDS 或者kill -15
为什么这样关闭的方式会是优雅的?Kafka 服务入口程序中有一个名为“kafka-shutdown-hock”的关闭钩子,待 Kafka 进程捕获终止信号的时候会执行这个关闭钩子中的内容,其中除了正常关闭一些必要的资源,还会执行一个控制关闭(ControlledShutdown)的动作。使用ControlledShutdown的方式关闭Kafka有两个优点:一是可以让消息完全同步到磁盘上,在服务下次重新上线时不需要进行日志的恢复操作;二是 ControllerShutdown 在关闭服务之前,会对其上的leader副本进行迁移,这样就可以减少分区的不可用时间
该图说明了数据如何在生产者和消费者之间传输,以及零拷贝的含义。
2.1 数据从磁盘加载到OS缓存
2.2 数据从OS缓存复制到Kafka应用程序
2.3 Kafka应用程序将数据复制到socket缓冲区
2.4 将数据从socket buffer复制到网卡
2.5 网卡将数据发送给消费者
3.1:数据从磁盘加载到OS缓存 3.2 OS缓存通过sendfile()命令直接将数据复制到网卡 3.3 网卡将数据发送给消费者
零拷贝是在应用程序上下文和内核上下文之间保存多个数据副本的快捷方式。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。