赞
踩
kafka是一个分布式的基于发布\订阅模式的消息队列,主要用于大数据实时处理领域
(1)生产者:向kafka集群发送数据
(2)broker:启动的kafka集群
(3)消费者:从kafka集群消费数据
(4)zookeeper:帮助kafka实现分布式
(1)大量使用操作系统页缓存,内存操作速度快;
(2)不直接参与IO操作,而是交由操作系统来完成;
(3)采用追加写入方式,摒弃了缓慢的磁盘随机读写操作;
(4)使用以sendfile为代表的0拷贝技术加强网络间的数据传输效率;
(1)负载均衡:通过分区领导者选举实现。kafka默认提供了智能的leader算法,可以在集群的所有机器上以均等机会分散各个patition的leader。
(2)故障转移:每台kafka服务器启动会以会话机制把自己注册到zk上。
(1)消息传输
(2)网站行为日志追踪
(3)审计数据收集
(4)日志收集
(5)event sourcing
(6)流式处理
可以是flume、MySQL、java等,其实就是向kafka发送数据的。
(可能是MySQL、Hadoop、spark、flink、java),就是向kafka取数据的。
由一个或者多个consumer组成,在kafka中,消费者都是有组的,即使是在consumer创建时没有没有设置组,但是kafka会默认一个有一个组,是组直接从kafka中的leader中拉取数据,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
kafka服务器的官方名字,一个集群由多个broker组成,一个broker可以容纳多个topic。
可以理解成队列,但是和点对点队列不同的是,不同的消费者组都可以从topic拉去相同的消息。
由此引出推模型和拉模型的区别:
推模型 push :指定消息推送给谁,如果要给多个对象推送的话,需要推送多份。
拉模型 pull :消息发布出去,放到某个地方,感兴趣的自己来拉。只需要推一份数据。
我们通常使用topic来区分实际业务。
(1)Topic是Kafka中消息的逻辑分类,可以看作是一个消息的存储类别;
(2)Partition则是Kafka 中实际保存数据的单位。每个Topic可以被划分为多个Partition,而这些 Partition 会尽量平均的分配到各个 Broker 上。当一条消息发送到Kafka时,它会被分配到一个特定的Partition中,并最终写入 Partition 对应的日志文件里。
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition的数量设置为broker数量的整数倍。
一个partition在kafka上就是一个目录,目录名称为:[topicName]-[partitionNo];
一个partition可以被在不同消费者组的消费者同时消费。多个partition也可以被同一个消费者消费。
kafka中的topic通常会被多个消费者订阅,出于性能的考量,kafka并不是topic-message的两级结构,而是采用了topic-partition-message的三级结构来分散负载。从本质上来讲,topic是由若干个partition组成的。而topic的partition是不可修改的有序消息序列,每个partitiony有自己的编号,用户对partition唯一能做的就是在消息序列的尾部追加写入消息。
说明:partition没有太多的业务含义,它的引入就是单纯的提升系统的吞吐量,创建topic时可以根据集群实际配置设置具体的partition数,实现整体性能的最大化。
(1)方便在集群中扩展(相当于负载) :每个Partition 可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发:因为可以以Partition为单位读写了;
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition 的情况下,直接将指明的值作为partition的值
(2)没有指明partition但有key的情况下,将key的hash值与topic的partition数据进行取余得到partition值
(3)既没有partition值又没有key值的情况下,第一次调用是随机生成一个整数(后面调用在这个整数上自增),将这个值与topic可用的parition总数取余得到partition值,也就是常说的round-robin算法。
有四种,见后面集成篇介绍。
将partition进一步细分为了若干的segment,每个segment文件的最大大小相等。
segment是一个逻辑概念,其由两类物理文件组成,分别为“.index”文件和“.log”文件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。
.log的文件名为当前该segment前有多少条消息,00000000000000170210.log 表示该segment中,当前文件前面有170210条消息:
partition是有序消息日志,如果只存一份,一旦保存partition的kafaka挂了,其上保存的消息也就丢失了,所以需要有备份,这些备份日志在kafka中称为副本,replica的唯一目的是防止数据丢失。每个分区都有一个或者多个副本。kafka保证同一个partition的多个replica一定不会分配在同一台broker上,否则实现不了冗余备份的效果。
副本分为两类,领导者副本leader和追随者follower副本。如果leader所在的broker宕机,kafka会从剩余的replica中选举新的leader。
每个分区可能有多个副本,但是这些副本中会选出一个leader,即多个副本中的“主”,producer向kafka发送数据时,和consumer拉取数据时都是和leader做交互,leader会和follower之间会同步数据。
多个副本中的的“从”,有了副本保证数据的安全性,如果有leader挂掉,从follower选取新的leader,所以follower肯定不能和leader在同一个服务器上。
Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
从图中的流程可以看出,生产者和kafka集群之间还有一个RecordAccumulator队列默认大小是32M,topic分区的话,producer会对应有一个分区器,数据在进入中间队列前,已经被分区器进行了分区,sender()方法在发送数据时,就直接根据分区进行拉取了,拉取时有两个参数,也就是调优参数。
(1)batch.size :也就是批大小,只有数据累计到batch.size后,sender才会发送数据,默认16k (2)linger.ms :也就是等待时间,如果数据未达到batch.size,sender等待linger.ms设置的时间就会发送数据,单位ms,默认值就是0ms,就是有了一条数据直接发(默认为0是因为kafka要接实时数仓,所以设置为0)。
参数名称 | 描述 |
bootstrap.servers | 生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。 |
key.serializer和value.serializer | 指定发送消息的key和value的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator缓冲区总大小,默认32m。 |
batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。 |
0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。 | |
max.in.flight.requests.per.connection | 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
是否开启幂等性,默认true,开启幂等性。 compression.type 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。 |
提高吞吐量,就是提高批次传输大小,还有就是效率问题
- //调优参数,还是需要根据业务需求来调整
- //batch.size 批次大小,默认是16k,将批次大小增大,进而提高吞吐量
- properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768);
- //linger.ms 等待时长,默认是0ms,增加等待时长
- properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
- //双端队列大小,默认是32M,可以提高到64M
- properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);
- //调整压缩格式,默认没有压缩
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
数据可靠性基于ack应答机制。为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后都需要向producer发送ack(acknowledgement 确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
数据完全可靠的条件:Acks级别设置为-1,分区副本大于等于2,ISR应答的最小副本数大于等于2。具体来看下:
方案 | 优点 | 缺点 |
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点故障,需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点故障,需要n+1 个副本 | 延迟高 |
Kafka选择了第二种方案,原因如下:
(1)同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1 个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
(2)虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。
采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据, 但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去, 直到它完成同步,才能发送 ack。这个问题怎么解决呢?
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集 合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失, 所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡, 选择以下的配置。
(1) 0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
(2)producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;
(3) -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会 造成数据重复。
Log文件中的HW和LEO。
LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。
(1)follower 故障
follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘 记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加入 ISR 了
(2)leader 故障
leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
分区内有序,分区间无序
参数名称 | 描述 |
replica.lag.time.max.ms | ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。 |
auto.leader.rebalance.enable | 默认是true。 自动Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 |
leader.imbalance.check.interval.seconds | 默认值300秒。检查leader负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。 |
log.index.interval.bytes | 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 |
log.retention.hours | Kafka中数据保存的时间,默认7天。 |
log.retention.minutes | Kafka中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是5分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。 |
log.cleanup.policy | 默认是delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的50%的1/3 |
num.network.threads | 默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。 |
描述 | |
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
key.deserializer和value.deserializer | 指定接收消息的key和value的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认1个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条。 |
(2)RoundRobin:
在这种策略下,每个消费者依次按顺序获得一个分区。当消费者数量多于分区数量时,多余的消费者将没有分配到任何分区。
(2)Range:
这是最常用的分配策略,消费者将根据分区的范围来均匀分配给各个消费者。例如,如果有10个分区和4个消费者,则每个消费者将被分配2-3个分区。
(3)Sticky(粘性):
这种策略下,消费者会尽量保持与之前分配的分区相同。如果有新的消费者加入或有消费者退出,分区的重新分配会尽量减少。这个策略对于需要保持状态的应用程序比较有用。
(4)CooperativeSticky(合作者粘性)后面版本加的分配策略:
这是Kafka 2.4.0版本引入的新策略,通过考虑消费者的健康状况、处理速度、网络延迟等因素,动态地进行分区分配,以实现更好的负载均衡和消费者协作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。