赞
踩
kafka 高性能主要体现在
Kafka 是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
解耦
消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
而基于消息发布订阅的机制,可以联动多个业务下游子系统,能够不侵入的情况下分步编排和开发,来保证数据一致性。
冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka 保证一个 Partition 内的消息的有序性。
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
异步通讯
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
Kafka部分名词解释如下:
首先,从数据组织形式来说,kafka有三层形式,kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。
而每个分区可以分布到不同的机器上,这样一来,从服务端来说,分区可以实现高伸缩性(集群的高可用 ),以及负载均衡,动态调节的能力。
假如不进行分区的话就如同 MySQL 单表存储一样,发消息就会被集中存储,这样会导致某台 Kafka 服务器存储 Topic 消息过多,如果在写消息压力很大的情况下,最终会导致这台 Kafka 服务器吞吐量出现瓶颈, 因此 Kafka 设计了分区的概念,同时也带来了「负载均衡」、「横向扩展」的能力,如下图所示:。
1)负载均衡:发送消息时可以根据分区的数量进行数据均匀分布,使其落在不同的分区上, 这样可以提高并发写性能;同时消费的时候多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力,提高读消息性能。
2)横向扩展:可以将一个 Topic 分成了多个 Partition,将不同的 Partition 尽可能的部署在不同的物理节点上,这样扩展起来非常方便,另外一个消费者可以消费多个分区中的数据,但是这样还是不能够充分的发挥横向扩展,这时候消费者组就出现了,我们用消费者组,来消费整个的 Topic,一个消费者消费 Topic 中的一个分区。
一、Kafka 如何合理设置分区数
首先我们要了解在 Partition 级别上达到负载均衡是实现高吞吐量的关键,合适的 Partition 数量可以达到并行读写和负载均衡的目的,需要根据每个分区的生产者和消费者的目标吞吐量进行估计。
此时我们可以遵循一定的步骤来计算确定分区数:
1)首先根据某个 Topic 当前接收的数据量等经验来确定分区的初始值。
2)然后针对这个 Topic,进行测试 Producer 端吞吐量和 Consumer 端的吞吐量。
3)测试的结果, 假设此时他们的值分别是 Tp「Producer 端吞吐量」、Tc「负Consumer 端吞吐量」,总的目标吞吐量是 Tt, 单位是 MB/s, 那么结果 numPartition = Tt / max (Tp, Tc)。
4)特殊说明:测试 Tp 通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到 Kafka 就好了。而测试 Tc 通常与应用消费消息后进行其他什么处理有关,相对复杂一些。
总之,通常情况下 Kafka 集群中越多的 Partition 会带来越高的吞吐量。但是,如果 Kafka 集群中 Partition 总量过大或者单个 Broker 节点 Partition 过多,都可能会对系统的可用性和消息延迟带来潜在的负面影响,需要引起我们的重视。
所谓分区写入策略,即是生产者将数据写入到kafka主题后,kafka如何将数据分配到不同分区中的策略。
常见的有三种策略,轮询策略,随机策略,和按键保存策略。其中轮询策略是默认的分区策略(均衡性)。
按键保存策略
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储。
至于要如何实现,只要让生产者发送的时候指定key就行。
另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力
综上所述,一个是增大副本高可用,另外增加并发;
假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2个topic名称分别为report_push、launch_info, partitions数量都为partitions=4 存储路径和目录规则为: xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。 如果是多broker分布情况,请参考kafka集群partition分布原理分析
下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:
(2)当集群中新增2节点,Partition增加到6个时分布情况如下:
https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html
https://blog.csdn.net/lizhitao/article/details/41778193
在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。
多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。
这里通过问题来整理这部分内容。
Kafka中partition replication之间同步数据,从partition的leader复制数据到follower只需要一个线程(ReplicaFetcherThread),实际上复制是follower(一个follower相当于consumer)主动从leader批量拉取消息的,这极大提高了吞吐量,从中可以看出无处不显示Kafka高吞吐量设计思想。
kafka的副本都有哪些作用?
在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。
说说follower副本为什么不对外提供服务?
这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。
而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。
而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。
Kafka中partition replication之间同步数据,从partition的leader复制数据到follower只需要一个线程(ReplicaFetcherThread),实际上复制是follower(扮演consumer角色)主动从leader批量拉取消息的,这极大提高了吞吐量,从中可以看出无处不显示Kafka高吞吐量设计思想。
优点
性能高,吞吐量大。
降低了系统和磁盘开销,Leader充分利用磁盘顺序读以及send file(zero copy)机制。
降低Leader与Follower之间网络开销和交互次数。
缺点
有可能会占用大量网络带宽(例如本来集群很大而且数据量很多,后来新增Broker节点需要迁移数据),甚至堵塞网络,需要有流控机制,否则会影响线上服务。
因为Follower是批量拉取Leader消息,如果设置为保证所有replicas commit,才返回Ack给生产者会存在抖动现象,Follow拉取Leader修改HW,当HW与当次生产者请求logEndOffset的offst一致时,客户端等待时间会拉长。
ISR 的引入主要是解决同步副本与异步复制两种方案各自的缺陷:
同步副本中如果有个副本宕机或者超时就会施慢该副本组的整体性
如果仅仅使用异步副本,当所有的副本消息均远落后于主副本时,一旦主副本宕机重新边那么就会存在消息丢失情况。
kafka 为了保证数据的一致性使用了isr 机制
isr 的全称是:In-Sync Replicas isr 是一个副本的列表,里面存储的都是能跟leader 数据一致的副本,确定一个副本在isr列表中,有2个判断条件
根据副本和leader 的交互时间差,如果大于某个时间差 就认定这个副本不行了,就把此副本从isr 中剔除,此时间差根据
配置参数rerplica.lag.time.max.ms=10000
也就是默认10s,isr中的follow没有向isr发送心跳包就会被移除
根据leader 和副本的信息条数差值决定是否从isr 中剔除此副本,此信息条数差值根据配置参数
rerplica.lag.max.messages=4000 决定 ,也就是默认消息差大于4000会被移除
注意点:kafka后续版本移除了第二个判断条件,只保留了第一个,以内极端情况下,如果producor一次性发来了10000条数据,而默认条数差立马会大于4000
首先我们知道kafka 的数据是多副本的,某个topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)。kafka的replica包含leader与follower。每个topic 下的每个分区下都有一个leader 和(N-1)个follower,
每个follower 的数据都是同步leader的 这里需要注意 是follower 主动拉取leader 的数据
Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,因此可以使用Broker id 指定Partition的Replica
Kafka 在0.8以前的版本中,并不提供 HA 机制,一旦一个或多个 Broker 宕机,则宕机期间其上所有 Partition 都无法继续提供服务。若该 Broker 永远不能再恢复,亦或磁盘故障,则其上数据将丢失。
在 Kafka 在0.8以前的版本中,是没有 Replication 的,一旦某一个 Broker 宕机,则其上所有的 Partition 数据都不可被消费,这与 Kafka 数据持久性及 Delivery Guarantee 的设计目标相悖。同时 Producer 都不能再将数据存于这些 Partition 中。
如果 Producer 使用同步模式则 Producer 会在尝试重新发送 message.send.max.retries(默认值为3)次后抛出 Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该 Broker 的数据的丢失。
如果 Producer 使用异步模式,则 Producer 会尝试重新发送 message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。
由此可见,在没有 Replication 的情况下,一旦某机器宕机或者某个 Broker 停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言 Replication 机制的引入非常重要。
一次简单的消息从生产到消费过程,需要经过 2次网络IO 和 2次磁盘IO 。如果消息体过大,势必会增加IO的耗时,进而影响kafka生产和消费的速度。消费者速度太慢的结果,就会出现消息积压情况。
对于Produce请求:Server端的I/O线程统一将请求中的数据写入到操作系统的PageCache后立即返回,当消息条数到达一定阈值后,Kafka应用本身或操作系统内核会触发强制刷盘操作(如左侧流程图所示)。
对于Consume请求:主要利用了操作系统的ZeroCopy机制,当Kafka Broker接收到读数据请求时,会向操作系统发送sendfile系统调用,操作系统接收后,首先试图从PageCache中获取数据(如中间流程图所示);如果数据不存在,会触发缺页异常中断将数据从磁盘读入到临时缓冲区中(如右侧流程图所示),随后通过DMA操作直接将数据拷贝到网卡缓冲区中等待后续的TCP传输。
Producer 发送消息到 Broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition里,这样就实现了负载均衡。
写message
读message
磁盘缓存:BufferIo
DirectIo:bypass kernel 绕过内核,直接读底层硬件存储,不经过pagecache
free -m
buff: 写 IO 缓存
cache: 读 IO 缓存
读机械硬盘:滑动机械臂,寻某一个位置还需要等磁盘旋转,有寻道延迟和旋转延迟,读取延迟,影响磁盘性能
随即IO:要等机械臂旋转、浪费时间
向kafka的某个topic中写消息,实际上就是向内部的某个partition写消息,读取消息也是从某个partition上读取消息。
一个topic的实际物理存储实际上对应着一个个的文件夹,文件夹名字是按照topic-num进行目录划分的,topic就是topic名字,num就是partition编号,从0开始。
partition文件夹下面也不仅仅是一个日志文件,而是有多个文件,这些多个文件可以从逻辑上将文件名一致的文件集合就称为一个 LogSegment日志段文件组。
partion相当于一个巨型文件,被平均分配到多个大小相等的LogSegment数据文件中,每个LogSegment内部的消息数量不一定相等,这种分段存储的特性特性方便旧的LogSegment file快速被删除,同时加快了文件查找速度。
每个逻辑segment段主要包含:消息日志文件(以log结尾)、偏移量索引文件(以index结尾)、时间戳索引文件(以timeindex结尾)、快照文件(以.snaphot结尾)等等文件。
如下表示一个某个topic文件夹下的主要文件结构:
每个分区是由多个Segment组成,当Kafka要写数据到一个partition时,它会写入到状态为active的segment中。如果该segment被写满,则一个新的segment将会被新建,然后变成新的“active” segment。
log文件作为日志文件,存储了消息内容以及该消息相关的元数据。Kafka会将消息以追加的方式顺序持久化到partition下面的最新的日志段下面的日志文件中,即只有最后一个logSegment文件才能执行写入操作。
每一个条消息日志的主要包含offset,MessageSize,data三个属性(还有一些其他属性):
index文件作为偏移量索引文件,主要用于加快查找消息的速度。该文件中的每一条索引记录都对应着log文件中的一条消息记录。
一条索引记录包含相对offset和position两个属性(均为4字节):
相对offset大小为4字节,因此最大值为Integer.MAX_VALUE。在写入消息时会对要写入的相对offset进行校验,超过该值时将会自动进行日志段切分。
假设某个LogSegment段中的数据文件名1234.log,索引文件中某个索引条目为(3,497)为例,那么这个索引条目对应的消息就是在数据文件中的第4个消息,该消息全局offset为1238,该消息的物理偏移地址(相对数据文件)为497。
稀疏索引
index索引文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏索引,默认每间隔4k的数据建立一条索引,时间戳索引文件(以timeindex结尾)也是这个规则。通过log.index.interval.bytes属性可以更新间隔数量大小。
稀疏索引避免了索引文件占用过多的空间,从而可以将索引文件长期保留在内存中,但缺点是没有建立索引的Message也不能一次性定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
index索引文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏索引,默认每间隔4k的数据建立一条索引,时间戳索引文件(以timeindex结尾)也是这个规则。通过log.index.interval.bytes属性可以更新间隔数量大小。
稀疏索引避免了索引文件占用过多的空间,从而可以将索引文件长期保留在内存中,但缺点是没有建立索引的Message也不能一次性定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
Kafka还采用了mmap的方式直接将 index 文件映射到内存中(Java中就是MappedByteBuffer),这样对 index 的操作就不需要操作磁盘 IO,大大的减少了磁盘IO次数和时间。
例如读取offset=368776的message,需要通过下面2个步骤查找。
第一步查找segment file 上述图2为例,
其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.
第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,
其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
从上述图3可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
本质上,page cache 可以看成是一个 write- through 的缓存。
一般情况下,用户直接使用 write/fflush 只是 写到了 page cache 里面。
这个时候如果宕机,那么依赖会丢失掉数据。
可以依赖于 Linux 自动刷新,也可以自己强 制发起系统调用来刷新
Linux 自动刷新 page cache:
linux 对每一个设备维持了一个后台线程,用 于回写 page cache。 会在两种情况下触发:
Linux内核由于存在page cache, 一般修改的文件数据并不会马上同步到磁盘,会缓存在内存的page cache中,我们把这种和磁盘数据不一致的页称为脏页,脏页会在合适的时机同步到磁盘。为了回写page cache中的脏页,需要标记页为脏(dirty)。
其实对于用户代码来说,整个过程是不可控的。因此依赖于自动刷新机制,就要容忍数据丢失
MySQL binlog刷盘可以通过 sync_binlog参数来控制
MySQL redo log:通过参数 innodb_flush_log_at_trx_commit 控制
Redis AOF 刷盘时机:
Kafka 刷盘时机:
Kafka producer有三种ack机制 初始化producer时在config中进行配置
ack=0
意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息
提供了最低的延迟。但是最弱的持久性,当服务器发生故障时,就很可能发生数据丢失。例如leader已经死亡,producer不知情,还会继续发送消息broker接收不到数据就会数据丢失
ack=1
意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。此选项提供了较好的持久性较低的延迟性。
Partition的Leader死亡,follwer尚未复制,数据就会丢失
ack=-1
意味着producer得到follwer确认,才发送下一条数据,延时性最差,性能最慢,但可靠性最强(数据强一致性)
三种机制性能递减,可靠性递增。
同时,Ack默认值为1,此时吞吐量与可靠性折中。实际生产中可以根据实际需求进行调整。
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了
好了,我们来总结一下今天的内容。Kafka Consumer 的位移提交,是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程中,推荐你使用手动提交机制,因为它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性。总之,Kafka Consumer API 提供了多种灵活的提交方法,方便你根据自己的业务场景定制你的提交策略。
https://www.cnblogs.com/zz-ksw/p/12801632.html
传统的文件读写或者网络传输,通常需要将数据从内核态转换为用户态。
应用程序读取用户态内存数据,写入文件 / Socket之前,需要从用户态转换为内核态之后才可以写入文件或者网卡当中。
例如消息中间件 Kafka 就是这个应用场景,从磁盘中读取一批消息后原封不动地写入网卡(NIC,Network interface controller)进行发送。
在没有任何优化技术使用的背景下,操作系统为此会进行 4 次数据拷贝,以及 4 次上下文切换,如下图所示:
在Consumer从Broker中读取数据,按照传统的IO模型,磁盘 —> 内核态 —> 用户态 —> 内核态 —> 网卡,一共要经历四次拷贝,四次上下文切换。
传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:
显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。
DMA(Direct Memory Access)直接内存访问: DMA是允许外设组件将 I/O 数据直接传送到主存储器中并且传输不需要CPU的参与,以此将CPU解放出来去完成其他的事情。
允许某些电脑内部的硬件子系统(电脑外设),可以独立地直接读写系统内存,而不需要中央处理器(CPU)介入处理 。DMA是一种快速的数据传送方式
DMA实现
DMA传输将数据从一个地址空间复制到另一个地址空间,提供在外设和存储器之间或者存储器和存储器之间的高速数据传输。当CPU初始化这个传输动作,传输动作本身是由DMA控制器来实现和完成的。DMA传输方式无需CPU直接控制传输,也没有中断处理方式那样保留现场和恢复现场过程,通过硬件为RAM和IO设备开辟一条直接传输数据的通道,使得CPU的效率大大提高。
而用户空间与内核空间之间的数据传输并没有类似DMA这种可以不需要CPU参与的传输工具,因此用户空间与内核空间之间的数据传输是需要CPU全程参与的。所有也就有了通过零拷贝技术来减少和避免不必要的CPU数据拷见过程
现在,DMA 代替了 CPU 负责内存与磁盘以及内存与网卡之间的数据搬运,CPU 作为 DMA 的控制者,如下图所示:
零拷贝技术是一个思想,指的是指计算机执行操作时,CPU 不需要先将数据从某处内存复制到另一个特定区域。
可见,**零拷贝的特点是 CPU 不全程负责内存中的数据写入其他组件,CPU 仅仅起到管理的作用。但注意,零拷贝不是不进行拷贝,而是 CPU 不再全程负责数据拷贝时的搬运工作。**如果数据本身不在内存中,那么必须先通过某种方式拷贝到内存中(这个过程 CPU 可以不参与),因为数据只有在内存中,才能被转移,才能被 CPU 直接读取计算。
sendfile
mmap
splice
直接 Direct I/O
不同的零拷贝技术适用于不同的应用场景,下面依次进行 sendfile、mmap、Direct I/O 的分析。
DMA 技术回顾:DMA 负责内存与其他组件之间的数据拷贝,CPU 仅需负责管理,而无需负责全程的数据拷贝;
使用 page cache 的 zero copy:
sendfile:一次代替 read/write 系统调用,通过使用 DMA 技术以及传递文件描述符,实现了 zero copy
mmap:仅代替 read 系统调用,将内核空间地址映射为用户空间地址,write 操作直接作用于内核空间。通过 DMA 技术以及地址映射技术,用户空间与内核空间无须数据拷贝,实现了 zero copy
不使用 page cache 的 Direct I/O:读写操作直接在磁盘上进行,不使用 page cache 机制,通常结合用户空间的用户缓存使用。通过 DMA 技术直接与磁盘/网卡进行数据交互,实现了 zero copy
snedfile 的应用场景是:用户从磁盘读取一些文件数据后不需要经过任何计算与处理就通过网络传输出去。此场景的典型应用是消息队列。
在传统 I/O 下,正如第一节所示,上述应用场景的一次数据传输需要四次 CPU 全权负责的拷贝与四次上下文切换,正如本文第一节所述。
sendfile 主要使用到了两个技术:
DMA 技术;
传递文件描述符代替数据拷贝;
1.page cache 以及 socket buffer 都在内核空间中;
2.数据传输过程前后没有任何写操作;
在Kafka中,transferFrom和transferTo方法。
sendfile零拷贝技术过程
先从用户态切换到内核态,把磁盘数据拷贝到内核缓冲区,同时从内环缓冲区拷贝一些offset和length数据到socket缓冲区,
接着从内核态切换到用户态,从内核缓冲区直接把数据拷贝到网络协议引擎里去,同时从Socket缓冲区拷贝一些offset和length信息到网络协议引擎里去,offset和length量几乎可以忽略。
只要2次切换,2次拷贝。
mmap,内存映射,直接将磁盘文件数基于DMA引擎拷贝据映射到内核缓冲区,同时用户缓冲区是跟内核缓冲区共享一块映射数据,建立映射后,不需要从内核缓冲区拷贝到用户缓冲区。故,可减少一次拷贝。总共是4次切换,3次拷贝。
mmap 适合小数据量读写,sendFile 适合大文件传输。
mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。
包括mmap和sendfile两种形式,在各种框架中常有体现。
Kafka中存在大量的网络数据持久化到磁盘和磁盘文件通过网络发送的过程,使用Sendfile方式
把磁盘文件映射到内存中,然后把映射到内存的数据通过Socket发送出去。
Consumer消费消息过程使用零拷贝,零拷贝包括2种方式,RocketMQ使用第一种方式,因小块数据传输的效果比sendfile方式好
使用sendfile方式
优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
缺点:小块文件效率低于mmap方式,只能是BIO方式传输(同步阻塞),不能使用NIO(同步非阻塞)
在这个选择上:rocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。
3.BIO和NIO的区别
https://blog.csdn.net/weixin_48872249/article/details/113845526
顺序写:
磁盘IO慢主要慢在寻道和旋转,顺序写减少了磁盘寻道和旋转次数。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
Kafka中每个分区是一个有序的,不可变的消息序列,新的消息不断的追加到Partition的末尾,Partition是一个逻辑概念,每个Partition被划分成多个Segment,每个Segment对应一个物理文件,Kafka对Segment文件追加写,就是顺序写文件。
Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile 。
kafka将来自Producer的数据,顺序追加在partition,partition就是一个文件,以此实现顺序写入。
Consumer从broker读取数据时,因为自带了偏移量,接着上次读取的位置继续读,以此实现顺序读。
顺序读写,是kafka利用磁盘特性的一个重要体现。
PageCache:
producer 生成消息到 Broker 时,Broker 会使用 pwrite() 系统调用,按偏移量写入数据。写入时,会先写入 page cache。
Consumer 消费消息时,Broker会使用sendfile() 系统调用,零拷贝的将数据从 page cache 传输到 Broker 的 Socket Buffer,通过网络传输。因此当Kafka的生产速率和消费速率相差不大时,就能几乎只靠 page cache 的读写完成整个生产-消费过程,磁盘访问非常少
什么页缓存?
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘I/O的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异 ,现代操作系统越来越多地将内存作为磁盘缓存,甚至会将所有可用的内存用途磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。
**Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因此之一。**虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.message、log.flush.interval.ms等参数来控制。
同步刷盘可以提高 消息的可行性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过一般不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。
kafka 为了解决内核态和用户态数据不必要 Copy 这个问题, 在读取数据的时候就引入了「零拷贝技术」。即让操作系统的 os cache 中的数据直接发送到网卡后传出给下游的消费者,中间跳过了两次拷贝数据的步骤,从而减少拷贝的 CPU 开销, 减少用户态内核态的上下文切换次数, 从而优化数据传输的性能, 而Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存,如下图所示:
在 Kafka 中主要有以下两个地方使用到了「零拷贝技术」:
1)基于 mmap 机制实现的索引文件:首先索引文件都是基于 MappedByBuffer 实现,即让用户态和内核态来共享内核态的数据缓冲区,此时数据不需要 Copy 到用户态空间。虽然 mmap 避免了不必要的 Copy,但是在不同操作系统下, 其创建和销毁成功是不一样的,不一定都能保证高性能。所以在 Kafka 中只有索引文件使用了 mmap。
2)基于sendfile 机制实现的日志文件读写:在 Kafka 传输层接口中有个 TransportLayer 接口,它的实现类中有使用了 Java FileChannel 中 transferTo 方法。该方法底层就是使用 sendfile 实现的零拷贝机制, 目前只是在 I/O 通道是普通的 PLAINTEXT 的时候才会使用到零拷贝机制。
2、Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
3、Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。
Kafka Producer 向 Broker 发送消息不是一条一条发送,而是按批发送。且roducer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。
如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。
Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。
index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。
Kafka 充分利用二分法来查找对应 offset 的消息位置
一个Acceptor线程处理新的连接,多个Processor线程select 和 read socket请求,多个Handler线程处理请求并响应(I/O多路复用)。
Kafka 的网络通信模型是基于 NIO 的Reactor 多线程模型来设计的。其中包含一个Acceptor线程用于处理连接,多个 Processor 线程 select 和 read socket 请求,一个Processor 由包含多个 Handler 线程处理请求并响应。
Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。
Kafka 对于单一读写请求均拥有很好的吞吐和延迟。
处理写请求时,数据写入 PageCache 后立即返回,数据通过异步方式批量刷入磁盘,既保证了多数写请求都能有较低的延迟,同时批量顺序刷盘对磁盘更加友好。
处理读请求时,实时消费的作业可以直接从 PageCache 读取到数据,请求延迟较小,
同时 ZeroCopy 机制 能够减少数据传输过程中用户态与内核态的切换,大幅提升了数据传输的效率。
Zookeeper 主要用于在集群中不同节点之间进行通信,在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
Kafka 集群能够正常工作,目前还是需要依赖于 ZooKeeper,主要用来「负责 Kafka集群元数据管理,集群协调工作」,在每个 Kafka 服务器启动的时候去连接并将自己注册到 Zookeeper,类似注册中心。
Kafka 使用 Zookeeper 存放「集群元数据」、「集群成员管理」、 「Controller 选举」、「其他管理类任务」等。待 KRaft 提案完成后,Kafka 将完全不依赖 Zookeeper。
1)集群元数据:Topic 对应 Partition 的所有数据都存放在 Zookeeper 中,且以 Zookeeper 保存的数据为准。
2)集群成员管理:Broker 节点的注册、删除以及属性变更操作等。主要包括两个方面:成员数量的管理,主要体现在新增成员和移除现有成员;单个成员的管理,如变更单个 Broker 的数据等。
3)Controller 选举:即选举 Broker 集群的控制器 Controller。其实它除了具有一般 Broker 的功能之外,还具有选举主题分区 Leader 节点的功能。在启动 Kafka系统时,其中一个 Broker 会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。如果在 Kafka 系统运行过程中,当前的控制器出现故障导致不可用,那么 Kafka 系统会从其他正常运行的 Broker 中重新选举出新的控制器。
4)其他管理类任务:包括但不限于 Topic 的管理、参数配置等等。
原因有以下2点:
1)集群运维层面:Kafka 本身就是一个分布式系统,如果还需要重度依赖 Zookeeper,集群运维成本和系统复杂度都很高。
2)集群性能层面:Zookeeper 架构设计并不适合这种高频的读写更新操作, 由于之前的提交位移的操作都是保存在 Zookeeper 里面的,这样的话会严重影响 Zookeeper 集群的性能。
为了保证数据的可靠性,Kafka会给每个分区找一个节点当带头大哥(Leader),以及若干个节点当随从(Follower)。消息写入分区时,带头大哥除了自己复制一份外还会复制到多个随从。如果随从挂了,Kafka会再找一个随从从带头大哥那里同步历史消息;如果带头大哥挂了,随从中会选举出新一任的带头大哥,继续笑傲江湖。
https://www.cnblogs.com/listenfwind/p/12465409.html
1.持久化
2.副本机制
先同步持久化,再同步副本
如果有一副本,网络不太好,
ISR集合,检测到副本长时间没有同步数据,会剔除 ISR集合,
Kafka通过消费者组机制同时实现了发布/订阅模型和点对点模型。多个组的消费者消费同一个分区属于多订阅者的模式,自然没有什么问题;
而在单个组内某分区只交由一个消费者处理的做法则属于点对点模式。其实这就是设计上的一种取舍,如果Kafka真的允许组内多个消费者消费同一个分区,也不是什么灾难性的事情,只是没什么意义,而且还会重复消费消息。
顺序性也会被破坏
Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。
(1)节点必须维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
request.required.acks 有三个值 0 1 -1(all),具体如下:
这个问题换种问法,就是kafka如何保证消息的幂等性。对于消息队列来说,出现重复消息的概率还是挺大的,不能完全依赖消息队列,而是应该在业务层进行数据的一致性幂等校验。
比如你处理的数据要写库(mysql,redis等),你先根据主键查一下,如果这数据都有了,你就别插入了,进行一些消息登记或者update等其他操作。另外,数据库层面也可以设置唯一健,确保数据不要重复插入等 。一般这里要求生产者在发送消息的时候,携带全局的唯一id。
1.基于时间或者文件大小 某种策略批量删除
队列模型:P2P、Pub/Sub
Kafka 采用的是 Pub/Sub 模型
kafka出现消息重复消费的原因:
服务端侧已经消费的数据没有成功提交 offset(根本原因)。
Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。
将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交offset合适?
方法:
1 增大partion数量,
2 消费者加了并发,服务, 扩大消费线程
3 增加消费组服务数量
4 kafka单机升级成了集群
5 避免消费者消费消息时间过长,导致超时
6 使Kafka分区之间的数据均匀分布
场景:
1 如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数,
同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)
2 若是下游数据处理不及时,则提高每批次拉取的数量。批次拉取数量过少
(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。
解决方案:重试表
顺序消息消费时,可以先查询订单是否在重试表有记录,如果有记录,则记录到重试表
消费失败时,也可以加到重试表中
在 Linux 的实现中,文件 Cache 分为两个层面,一是 Page Cache,另一个是 Buffer Cache(块缓存)。
page cache用于缓存文件的页数据,大小通常为4K;
Buffer cache用于缓存块设备(如磁盘)的块数据,大小通常为1K。
page cache,又称pcache,其中文名称为页高速缓冲存储器,简称页高缓。
page cache的大小为一页,通常为4K。在linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问。
单位:页。通常为4K
大小:动态变化,因为操作系统会将所有未直接分配给应用程序的物理内存都用于页面缓存。
文件系统层级的缓存:page cache用于缓存文件的页数据,从磁盘中读取到的内容是存储在page cache里的。
Kafka对page cache的利用
Kafka为什么不自己管理缓存,而非要用page cache?原因有如下三点:
JVM中一切皆对象,数据的对象存储会带来所谓object overhead,浪费空间;
如果由JVM来管理缓存,会受到GC的影响,并且过大的堆也会拖累GC的效率,降低吞吐量;
一旦程序崩溃,自己管理的缓存数据会全部丢失。
Kafka三大件(broker、producer、consumer)与page cache的关系可以用下面的简图来表示。
集群规模过大的kafka
pagecache会被污染,会存老的数据,因为内存不是无限大的,会有资源限制
kafka底层优化
多个group消费速度接近时,再开启page cache
多个group消费速度差距大时:不开启预读pagecache,直接走direct Io,绕过linux buffer IO
https://github.com/flycash/interview-baguwen/tree/main/mq
Kafka 是一个分布式的流式处理平台,用于实时构建流处理应用。主要应用在大数据实时处理领域。Kafka 凭借「高性能」、「高吞吐」、「高可用」、「低延迟」、「可伸缩」几大特性,成为「消息队列」的首选。
其主要设计目标如下:
1)高性能:以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
2)高吞吐、低延迟:在非常廉价的机器上也能做到单机支持每秒几十万条消息的传输,并保持毫秒级延迟。
3)持久性、可靠性:消息最终被持久化到磁盘,且提供数据备份机制防止数据丢失。
4)容错性:支持集群节点故障容灾恢复,即使 Kafka 集群中的某一台 Kafka 服务节点宕机,也不会影响整个系统的功能(若副本数量为N, 则允许N-1台节点故障)。
5)高并发:可以支撑数千个客户端同时进行读写操作。
答案:
解耦:将不同的系统之间解耦开来。尤其是当你在不希望感知到下游的情况。(后面我们用一个一对多的项目来进一步说明,这是一个很常见的场景,可以自行替换自己的例子)通常而言,当我需要对接多个系统,需要告知我的某些情况的时候,但是我又不知道究竟有多少人关心,以及他们为啥关心的时候,就会考虑采用消息队列来通信。(这个例子是我的例子)例如我们退款,会用消息队列来暴露我们的退款信息,比如说退款成功与否。很多下游关心,但是实际上,我们退款部门根本不关心有谁关心。在不使用消息队列的时候,我们就需要一个个循环调用过去;
异步:是指将一个同步流程,利用消息队列来拆成多个步骤。(下面是我准备的一个刷亮点的方面,我们从事件驱动设计上来讨论)这个特性被广泛应用在事件驱动之中。(下面是我退款的例子)比如说在退款的时候,退款需要接入风控,多个款项资金转移,这些步骤都是利用消息队列解耦,上一个步骤完成,发出事件来驱动执行下一步;
削峰:(这是最大的考点,而且如果你的简历里面有类似电商之类的经历,那么就很可能追问下去,接连考秒杀啥的)削峰主要是为了应对突如其来的流量。一般来说,数据库只能承受每秒上千的写请求,如果在这个时候,突然来了几十万的请求,那么数据库可能就会崩掉。消息队列这时候就起到一个缓冲的效果,服务器可以根据自己的处理能力,一批一批从消息队列里面拉取请求并进行处理。
1)日志收集方向:可以用 Kafka 来收集各种服务的 log,然后统一输出,比如日志系统 elk,用 Kafka 进行数据中转。
2)消息系统方向:Kafka 具备系统解耦、副本冗余、流量削峰、消息缓冲、可伸缩性、容错性等功能,同时还提供了消息顺序性保障以及消息回溯功能等。
3)大数据实时计算方向:Kafka 提供了一套完整的流式处理框架, 被广泛应用到大数据处理,如与 flink、spark、storm 等整合。
分析:典型的反直觉题。因为我们只会说消息队列怎么怎么好,很少有人会思考使用消息队列会带来什么问题。
**答:
可用性降低:引入任何一个中间件,或者多任何一个模块,都会导致你的可用性降低。(所以这个其实不是MQ的特性,而是所有中间件的特性)**
一致性难保证:引入消息队列往往意味着本地事务不可用,那么就容易出现数据一致性的问题。例如业务成功了,但是消息没发出去;
复杂性上升:复杂性分两方面,一方面是消息队列集群维护的复杂性,一方面是代码的复杂性
(升华主题)几乎所有的中间件的引入,都会引起类似的问题。
关键字:可用性,一致性,复杂性
核心是零拷贝,Page Cache,顺序写,批量操作,数据压缩,日志分段存储;
核心是理解Kafka 如何维护 ISR,什么情况下会导致一个 partition 进去(或者出来)ISR
kafka因为Full GC而抖动,导致副本被踢出ISR
列举策略,要注意分析优缺点。更进一步可以讨论更加宽泛的负载均衡的做法,和 RPC 之类的负载均衡结合做对比
违背直觉的问题,关键是要协调偏移量的代价太大;
注意和 PUSH 模型做对比。最好是能够举一个适用 PUSH 的例子
这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。
消息系统,实时性要求高
一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。
又是一个违背直觉的问题,核心在于顺序写
属实没必要,做好消费幂等简单多了
核心把 rebalance 的过程背下来
没啥好办法,也就是加快消费,合并消息
分析:消息积压,核心就在于生产者太快而消费者太慢。解决思路要么控制生产者的发送速率,要么提高消费者的效率。一般我们不太会倾向于控制发送者的速率,所以解决问题的思路就变成了如何提高消费者效率。
提高消费者的效率,要么提高消费单条消息的效率,要么是增加消费者的数量。
答案:整体上有两个思路:
(刷亮点)其实消息积压要看是突然的积压,即偶然的,那么只需要扩大集群规模,确保突然起来的消息都能在消息中间件上保存起来,就可以了。因为后续生产者的速率回归正常,消费者可以逐步消费完积压的消息。如果是常态化的生产者速率大于消费者,那么说明容量预估就不对。这时候就要调整集群规模,并且增加消费者。典型的就是,Kafka增加新的Partition。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。