赞
踩
1.zookepper kafka 通过 zk 来存储集群的 meta 数据。 2.broker kafka 集群中的单个服务器 3.producer 生产者,发布消息 4.consumer 消费者,消费消息 5.consumer group high-level consumer API 中,每个 consume r都属于一个 consumer group 每条消息只能被 consumer group 中的一个 consumer 消费,但可以被多个 consumer group 消费。 6.topic 每条发送到 kafka 的消息所属类别 7.partition topic 物理上分组的的概念,每个 topic 包括一个或者多个 partition,每个 partition 是一个有序的队列 8.replica partition 的副本,保证 partition 的高可用 9.leader replica 中的一个角色(leader 是副本),producer 和 consumer 只跟 leader 交互。 10.follower replica 中的一个角色(follower 是副本),从 leader 中 pull 数据 11.segment partition 物理上由多个 segment 组成,每个 segment 存着 message 信息
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 partition 中,属于顺序写入磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
- 指定了 patition ,则直接使用
- 未指定了 patition ,但指定 key ,通过对 key 的 value 进行 hash 选出一个 partition
- patition 和 key 都未指定,使用轮询选出一个 partition
producer 写入消息序列图如下所示:
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
一个消息如何算投递成功,Kafka提供了三种模式:
- At most once消息可能会丢,绝对不会重复传输
- At least once 消息绝对不会丢,但是可能会重复传输
- Exactly once每条信息肯定会被传输一次且仅传输一次
当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于 replication 的存在,它就不会丢。但是如果 producer 发送数据给 broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还并未实现。所以目前默认情况下一条消息从 producer 到 broker 是确保了 At least once,可通过设置 producer 异步发送实现At most once。
producers 可以异步的并行的向 kafka 发送消息,但是通常 producer 在发送完消息之后会得到一个 future 响应,返回的是 offset 值或者发送过程中遇到的错误。这其中有个非常重要的参数“acks”,这个参数决定了 producer 要求l eader partition 收到确认的副本个数
一般情况时 acks 设置为0比较合理,既保证了一定的可靠性,也保证了也相应的系统吞吐量。具体设置根据实际情况来定
Kafka 消息有一个定长的header和变长的字节数组组成。因为 kafka 消息支持字节数组,也就使得 kafka 可以支持任何用户自定义的序列号格式或者其它已有的格式如 Apache Avro、protobuf等。Kafka 没有限定单个消息的大小,但我们推荐消息大小不要超过1MB,通常一般消息大小都在1~10kB之前。
发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息,一次行发布),send消息时,producer client需指定消息所属的topic。
Kafka提供了两套consumer api
- high-level api
- sample-api
High-level API封装了对集群中一系列 broker 的访问,可以透明的消费一个 topic。它自己维持了已消费消息的状态,即每次消费的都是下一个消息。
High-level API 还支持以组的形式消费 topic。
High level api是 consumer 读的 partition 的 offset 是存在zookeeper上。High level api 会启动另外一个线程去每隔一段时间,offset 自动同步到zookeeper上。换句话说,如果使用了High level api, 每个 message 只能被读一次,一旦读了这条 message 之后,无论我 consumer 的处理是否ok。High level api的另外一个线程会自动的把 offset + 1同步到 zookeeper 上。如果 consumer 读取数据出了问题,offset 也会在 zookeeper 上同步。因此,如果 consumer 处理失败了,会继续执行下一条。这往往是不对的行为。因此,Best Practice是一旦 consumer 处理失败,直接让整个 conusmer group 抛 Exception 终止,但是最后读的这一条数据是丢失了,因为在 zookeeper 里面的 offset 已经 +1 了。等再次启动conusmer group的时候,已经从下一条开始读取处理了。
Sample-api 是一个底层的 API,它维持了一个和单一 broker 的连接,并且这个 API 是完全无状态的,每次请求都需要指定 offset 值,因此,这套 API 也是最灵活的。
使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。
- 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个 partition 的 leader 是谁
- 需要处理 leader 的变更
在 kafka 中,当前读到哪条消息的 offset 值是由 consumer 来维护的,因此,consumer 可以自己决定如何读取 kafka 中的数据。比如,consume r可以通过重设 offset 值来重新消费已消费过的数据。不管有没有被消费,kafka 会保存数据一段时间,这个时间周期是可配置的,只有到了过期时间,kafka才会删除这些数据。(这一点与 AMQ 不一样,AMQ 的 message 一般来说都是持久化到 mysql 中的,消费完的 message 会被 delete 掉)
假设实验环境中 kafka 集群只有一个 broker,xxx/message-folder 为数据文件存储根目录,在 kafka broker 中 server.properties 文件配置(参数 log.dirs=xxx/message-folder),例如创建 2 个 topic 名称分别为 report_push、launch_info, partitions 数量都为 partitions=4
存储路径和目录规则为:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
同一个 topic 下有多个不同 partition ,每个 partition 为一个目录, partiton 命名规则为 topic名称+有序序号,第一个 partiton 序号从0开始,序号最大值为 partitions 数量减1。
topic 是以 partition 的形式存放的,每一个 topic 都可以设置它的 partition 数量, partition 的数量决定了组成 topic 的 message 的数量。producer 在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到 topic 的各个 partition 中。副本都是以 partition 为单位的,不过只有一个 partition 的副本会被选举成 leader 作为读写用。
partition是一个Queue的结构,每个partition中的消息都是有序的,生产的消息被不断追加到partition上,其中的每一个消息都被赋予了一个唯一的offset值。
把消息日志以partition的形式存放有多重考虑
1.方便在集群中扩展,每个partition可以通过调整以适应它所在的机器,而一个topic又可以有多个partition组成,因此整个集群就可以适应任意大小的数据了
2. 提高并发,因为可以以Partition为单位读写了
Kafka集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。
kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。如果你的副本数量设置为3,那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。
关于如何设置 partition 值需要考虑的因素。一个 partition 只能被一个消费者消费(一个消费者可以同时消费多个 partition ),因此,如果设置的 partition 的数量小于 consumer 的数量,就会有消费者消费不到数据。所以,推荐 partition 的数量一定要大于同时运行的 consumer 的数量。另外一方面,建议 partition 的数量大于集群 broker 的数量,这样 leader partition 就可以均匀的分布在各个 broker 中,最终使得集群负载均衡。在 Cloudera,每个 topic 都有上百个 partition。需要注意的是,kafka 需要为每个 partition 分配一些内存来缓存消息数据,如果 partition 数量越大,就要为 kafka 分配更大的 heap space。
- 每个 partion(目录) 相当于一个巨型文件被平均分配到多个大小相等 segment(段)数据文件中。但是每一个段segment file消息数量不一定相等,这种特性方便 old segment file 快速被删除
- 每个 partition只需要支持顺序读写就行,segment文件生命周期由服务端配置参数决定
这样子做的好处就是能够快速删除无用文件,有效提高磁盘利用率
producer 发 message 到某个 topic, message 会被均匀分布到多个 partition 上(随机或根据用户指定的回调函数进行分布),kafka broker 收到 message 往对应 partition 的最后一个segment上添加该消息,当某个 segment 上的消息条数达到配置值或消息发布时间超过阀值时,segment 上的消息会被 flush 到磁盘,只有 flush 到磁盘上的消息 consumer 才能消费,segment 达到一定的大小后将不会再往该 segment 写数据,broker 会创建新的 segment
每个 part 在内存中对应一个 index,记录每个 segment 中的第一条消息偏移
每个 segment 中存储很多条信息,消息 id 由其逻辑位置决定,即从消息 id 可直接定位到消息的存储位置,避免 id 到位置的额外映射。
下面文件列表是笔者在 kafka broker上做的一个实验,创建一个 topicXXX 包括1 partioion,设置每个 segment 大小为 500MB,并启动 producer 向 kafka broker 写入大量数据,如下图所示 segment 文件列表形象说明上诉两个原则:
上图是一对 segment file文件模型,segment 中 index <> data file 对应关系物理结构如下:
上图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。其中以索引文件中元数据 3,497 为例,依次在数据文件中表示第 3 个 message(在全局 partition 表示第 368772 个 message),以及该消息的物理偏移地址为 497。
segment data file 由 message 组成,message 的物理结构为:
关键字 | 解释说明 |
---|---|
8 byte offset | 在 parition(分区) 内的每条消息都有一个有序的 id 号,这个 id 号被称为偏移(offset),它可以唯一确定每条消息在 parition(分区) 内的位置。即 offset 表示 partiion 的第多少message |
4 byte message size | message 大小 |
4 byte CRC32 | 用 crc32 校验 message |
1 byte “magic" | 表示本次发布 kafka 服务程序协议版本号 |
1 byte “attributes" | 表示为独立版本、或标识压缩类型、或编码类型。 |
4 byte key length | 表示key的长度,当 key为-1 时,K byte key字段不填 |
K byte key | 可选 |
value bytes payload | 表示实际消息数据。 |
例如读取 offset = 368776 的 message,需要通过以下两步:
根据offset查找 segment file
上述图片为例,其中 00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为0.第二个文件 00000000000000368769.index 的消息量起始偏移量为 368770 = 368769 + 1,同样,第三个文件 00000000000000737337.index 的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。
当 offset=368776 时定位到 00000000000000368769.index|log
根据segment file 查找 message
当 offset=368776 时,依次定位到 00000000000000368769.index 的元数据物理位置和 00000000000000368769.log 的物理偏移地址,然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。
segment index file 采用稀疏索引存储方式,它减少索引文件大小,通过 mmap 可以直接内存操作,稀疏索引为数据文件的每个对应 message 设置一个元数据指针,它比稠密索引节省了更多的存储空间,但是查询起来需要消耗更多的时间。
kafka高效文件存储设计特点
备份机制是 kafka0.8 版本的新特性,备份机制的出现大大提高了 kafka 集群的可靠性、稳定性。有了备份机制后,kafka 允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为 n 的集群允许 n-1 个节点失败。在所有备份节点中,有一个节点作为 lead 节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步
(1) 下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:
(2) 当集群中新增2节点,Partition增加到6个时分布情况如下:
负载低的情况下可以每个线程消费多个 partition。但负载高的情况下,consumer 线程数最好和 partition 数量保持一致。如果还是消费不过来,应该再开 consumer 进程,进程内线程数同样和分区树一致
消费消息时,kafka client 需指定 topic 以及 partition number(每个 partition 对应一个逻辑日志流,如 topic 代表某个产品线,partition 代表产品线的日志按天切分的结果),consumer client 订阅后,就可迭代读取消息,如果没有消息,consumer client 会阻塞直到有新消息发布。consumer 可以累积确认接收到的消息,当其确认了某个 offset 的消息,意味着之前的消息也都成功接收到了,此时 broker 会更新 zk 上的 offset registry。
消息系统的持久化队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。它有一个优点,所有的操作都是常树时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着我们可以提供一般消息系统无法提供的特效。比如说,消息被消费后不是立马删除,我们可以将这些消息保留一段相对比较长的时间。
Apache kafka官网
kafka学习笔记:知识点整理
Kafka史上最详细原理总结
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。