赞
踩
Kafka是一个高吞吐的分布式的消息系统,是基于发布/订阅模式的消息队列。
[song@hadoop102 kafka]$ bin/kafka-topics.sh
[song@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 –list
[song@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
[song@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic first
[song@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --alter --topic first --partitions 3
[song@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic first
[song@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --delete --topic first
[song@hadoop102 kafka]$ bin/kafka-console-producer.sh
[song@hadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server hadoop102:9092 --topic first
> hello world
>atguigu atguigu
[song@hadoop102 kafka]$ bin/kafka-console-consumer.sh
[song@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
[song@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
外部数据 ----> 存储到kafka集群中
在数据发送的过程中会涉及到两个线程——main 线程和 Sender 线程,当接受外部传过来的数据的时候,会先创建一个main线程,在main线程中创建producer对象,然后调用send方法,将数据进行发送,然后会经过拦截器,对发送的数据进行处理、加工,再经过序列化器,对传输的数据进行序列化,在根据分区器的分区策略对传输的数据进行分区处理,到达缓冲区
(RecordAccumulator记录收集器,里面维护了一个一个双端的缓存队列和一个内存池,内存池的默认大小是32m,当向双端队列中发送数据的时候,会创建批次大小,当创建批次的时候,会从内存池中取出内存,当数据发送到kafka集中后,清理批次的时候,会释放内存,放回到内存池中)中指定的分区(一个分区会创建一个队列),在分区中按照批次的方式进行存储数据,每一个批次的默认大小(batch.size)为16k,当一个批次的(数据大小积累到batch.size(默认16k)大小时)或者(数据没有积累到batch.size,但是到达了延迟时间(linger.ms 默认是0ms,表示没有延迟)),唤醒sender线程,然后会发送请求从分区中拉取数据,拉取数据的方式是以brokerId为key,所有分区的请求为value放到队列中,如果发送数据的第一个请求到达集群中的某一个broker没有应答,允许继续发送请求,默认每个broker节点最多缓存5个请求,通过selector发送数据,数据发送成功之后,会有应答机制,返回acks,应答级别有3种,如果反馈回来的请求是成功,则会删除发送数据成功的请求以及清理分区中请求中拉取的数据,如果失败会进行重试,重试的次数(默认是Int的最大值,可以进行修改,一般是3-5次),
package com.song.kafka_demo.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取数据 song
String msgValues = value.toString();
int partition;
if (msgValues.contains("song")) {
partition = 0;
} else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
===============================================
// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.song.kafka_demo.producer.MyPartitioner");
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2,
但是使用此方式会造成数据重复性问题,解决方式看下面的数据去重
acks: -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答,接受完数据之后,在未进行应答反馈之前,leader挂掉,会在follower中重新选举leader,producer未接受到成功的反馈,所以回向leader再次发送数据,就造成了消息重复的问题。
在生产环境中:
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。
开启幂等性参数 enable.idempotence 默认为 true,false 关闭幂等性。如果想要在整个集群中保证数据不重复,则还需要依赖于事务。
开启事务的前提是必须开启幂等性。
事务的实现主要是依赖于事务协调器(Transaction Coordinator),但是在kafka中每一个broker都有事务协调器,所以应该使用哪一个事务协调器?
首先Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id,事务id不但可以判断出要使用哪一个事务协调器,还能够保证在在客户端挂掉,重新启动之后也能继续处理未完成的事务。
在每一个broker中都存在一个存储事务信息的特殊主题(__transaction_state-分区-Leader),在这个Topic中默认有50个分区,每个分区负责一部分事务,事务划分是根据设置的transactional-id的哈希值计算对_transaction_state分区数(默认值是50)取余运算,找到分区编号,该分区对应的leader副本所对应的broker的即为Tranaction Coordinator所在节点。
[song@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 2] ls /kafka
主要有以下几个比较主要的
Broker启动之后,都会在zookeeper中进行注册,然后每一个broker节点上的controller就会去抢占zookeeper中的controller节点,谁抢先注册到controller节点,谁才是负责分区副本Leader的选举的controller,当某一个controller抢占到注册权后,会先监听zookeeper节点下的brokers(下面的ids)里面的节点变化(当节点发生变化的时候,会第一时间知道),然后再进行Leader的真正选举,Leader的选举策略首先是以在ISR中存活为前提,按照AR(所有的副本分区的统称)中排列在前面的顺序进行优先选举。例如ar[1,0,2], isr [1,0,2],那么leader就会按照1,0,2的顺序轮询,AR的顺序是固定的,应该是在注册的时候确定的,选举出来topic分区副本的leader之后,会将Leader信息保存到zookeeper中topics节点下具体的topic节点中,其他broker节点中的controller会从zookeeper中同步相关的leader信息,其他的controller同步leader数据的目的是当已经抢占到注册权的controller宕机之后,能够更快的替代原先的controller,当选举出来的副本leader宕机之后,controller监控到brokers的节点的变化,会重新从zookeeper中拉取回leader信息和ISR信息,根据选举策略重新进行选举,选举完新的leader之后,会将新的Leader信息以及ISR信息更新到zookeeper中的topics节点下的具体topic节点中,其他的controller也会再次重新同步相关的数据。
当Follower发生故障后会被临时踢出ISR,这个期间没有出现故障的Leader和Follower继续接收数据,等待该故障的Follower恢复正常之后,Follower会读取本地磁盘记录的上次的HW(高水位线),并且将Log文件中高于HW的数据给截掉,从HW的位置开始向Leader进行数据同步,等到该Follower 的LEO (副本的最后一个offset+1)大于等于该Partition 的HW的时候,即Follower追上Leader之后,就可以重新加入ISR了。
当Leader发生故障之后,会从ISR中选出一个新的Leader,选出新的Leader后,会与其他分区进行比较,获取当前分区的HW,如果Follower中的数据高于Leader中的数据,为保证多个副本之间的数据一致性,所有的Follower会将各自的log文件中高于HW的部分数据进行截掉,然后从新的Leader中进行数据同步。
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
解决方案,使用leader的再平衡,默认情况下再平衡机制是打开的(auto.leader.rebalance.enable,默认是true),每个broker允许的不平衡的leader的比率默认是10%,如果某个broker超过了这个值,控制器会触发leader的平衡。Kafka中检查leader负载是否平衡的间隔时间为5分钟(默认值300秒)。
计算不平衡的比例方式:
分区2的AR优先副本是0节点,但是0节点却不是Leader节点,说明发生了宕机。
但是一般情况下再平衡不建议开启,或者选择开启的话,再平衡比例可以选择大一些,因为触发再平衡机制后,在整个平衡的过程中是不能够处理数据的,所以再平衡会影响效率。
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。
Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。
“.log”文件存放的是数据,“.index”存放的时候偏移量索引文件,“.timeindex”文件存放的是时间戳索引文件,而且“.log”和“.index”文件的命名规则是以当前segment的第一条消息的offset索引进行命名的,每个segment的默认大小是1G,这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:/opt/mode/kafka/data/first-0。
首先会根据offset去比较索引文件以及“.log”文件的文件名,定位到是哪一个segment文件,然后根据给定的offset,与(segment的起始offset+相对offset)然后进行比较,找到小于等于目标offset的最大offset对应的索引项的position的值,根据找到的position的值,去该segment下的“.log”文件中找对应的position位置,然后向下遍历寻找,就能找到目标offset=600的记录数据。
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
在Kafka 中提供的日志清理策略有 delete 和 compact两种。
配置文件清理策略为日志删除,log.cleanup.policy = delete 所有数据启用删除策略。有删除的时候有两种方式,基于时间以及基于大小。
log.retention.bytes,默认等于-1,表示无穷大。
当segment文件中有一部分数据过期,一部分没有过期的时候,会以该文件中的最大时间戳作为过期时间,如果最大过期时间没有达到要清理的时间的话则不用清理,等待下次检查过期文件的来进行清理。
compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
log.cleanup.policy = compact 所有数据启用压缩策略
压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。
生产者生产消息,将消息发送到kafka集群,接着kafka会将消息内容交给linux内核进行处理,linux内核会将数据缓存起来一份,放到页缓存中,最后由linux内核将数据存储到磁盘中,当消费者消费消息的时候,kafka会先去页缓存中找,页缓存如果找不到的话,会去磁盘中读取,返回消息信息,同时也会在页缓存中存储一份数据,当kafka向消费者发送数据的时候(有可能kafka和消费者不在一台服务器上,需要跨节点通信,跨节点通讯需要网卡), kafka会先将数据先发送到linux内核中的socket缓存中,再由socket缓存将数据通过网卡发送到消费者。
在非零拷贝的过程中一共经历了4次拷贝数据的过程,从磁盘文件到linux内核页缓存,从内核页缓存到kafka,接着从kafka到内核的socket缓存,再从socket缓存到网卡。
从磁盘文件到linux内核,从socket缓存到网卡,是属于DMA拷贝(直接存储器访问),不经过cpu,消耗的资源和时间比较小
从内核页缓存到kafka,接着从kafka到内核的socket缓存,是属于cpu拷贝,而且需要在用户态和内核态之间来回切换,会消耗大量的资源和时间。
零拷贝其实是根据内核状态划分的,在这里没有经过CPU的拷贝,数据在用户态的状态下,经历了零次CPU拷贝,所以才叫做零拷贝,但不是说不拷贝。
生产者生产消息,将消息发送到kafka集群,接着kafka会将消息内容交给linux内核进行处理,linux内核会将数据缓存起来一份,放到页缓存中,最后由linux内核将数据存储到磁盘中,当消费者消费消息的时候,kafka会先去页缓存中找,页缓存如果找不到的话,会去磁盘中读取,返回消息信息,接着会直接通过网卡将数据发送到消费者,由消费者消费数据。因为kafka操作数据的过程是在生产者端和消费者端,broker应用层不操作数据,所以不需要将数据拷贝到broker应用层,可以直接由页面缓存通过网卡将数据发送到消费者。
在零拷贝的过程中一共经历了2次拷贝数据的过程,从磁盘文件到linux内核页缓存,再从linux内核页缓存到网卡。
从磁盘文件到linux内核页缓存,再从linux内核页缓存到网卡,不经过cpu,消耗的资源和时间比较小,而且避免了需要在用户态和内核态之间来回切换,从而节省了大量的资源和时间。
consumer采用从broker中主动拉取数据,Kafka采用这种方式。pull模式不足之处是,如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
由broker主动向消费者主动推送消息,Kafka没有采用这种方式的原因是,由broker决定消息发送速率,很难适应所有消费者的消费速率。
生产者生产消息到达broker的各个分区中,消费者从分区的leader中读取消息,一个消费者可以消费多个分区数据,每个分区的数据只能由消费者组中一个消费者消费,每个消费者消费消息的offset,由消费者提交到__consumer_offsets系统主题保存。
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
消费者组的初始化会借助coordinator辅助实现消费者组的初始化和分区的分配,首先每一个broker中都会存在一个coordinator,应该选择哪一个broker上的coordinator?coordinator的选择策略是根据groupId的hashcode值与offsets的分区数量求模(_comsumer_offsetsd的默认分区数是50,例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大,消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset),选择出coordinator之后,这个消费者组的所有消费者都会向coordinator发送加入消费者组的请求,coordinator收到请求之后,会从所有的消费者中随机选择一个作为leader,然后将要消费的topic信息发送给leader,由leader负责制定消费方案,制定完相应的消费计划之后,再将这个消费计划发送给coordinator,然后coordinator将计划发给每一个消费者,每一个消费者按照制定的消费计划进行消费,在消费过程中每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并且原本该消费者的任务,也会被分配到其他的消费者上,触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),超过5分区未从分区中拉取数据,也会导致消费者被移除,并且原本该消费者的任务,也会被分配到其他的消费者上,触发再平衡。
消费者组要消费消息,首先会先创建消费者连接客户端(ConsumerNetworkClient),与kafka集群进行交互,调用sendFetches方法发送消费请求,进行抓取数据的初始化,抓取数据初始化主要有三个参数。
Kafka有四种主流的分区分配策略:
Range 是对每个topic而言的,首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序,通过 partitions(分区数)/consumer(消费者个数)来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区。消费者在进行消息消费的过程中,突然其中的一个消费者宕机,就会出现再平衡,消费者组会按照超时时间 45s 来判断它是否退出,时间到了45s 后,判断它真的退出就会把任务分配给其他的消费者进行消费,任务分配的方式是会整体被分配到其中的一个消费者上。
假如现在有7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。7/3 = 2 余 1 ,除不尽,那么消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个分区。
0 号消费者挂掉之后,0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
RoundRobin 针对集群中所有Topic而言。RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。消费者在进行消息消费的过程中,突然其中的一个消费者宕机,就会出现再平衡,消费者组会按照超时时间 45s 来判断它是否退出,时间到了45s 后,判断它真的退出就会把任务分配给其他的消费者进行消费,任务的分配方式是将已经宕机的消费者的分区数据按照轮询的方式添加到其他的消费者上。
0 号消费者挂掉之后,0 号消费者的分区会轮询的方式分配到1号消费者或者 2 号消费者。
首先会尽量均衡的放置分区到消费者上面,例如说7个分区,交给3个消费者进行消费,他的分配策略也是3,2,2的格式,和range类似,但不同的是黏性分区中的3不一定是第一个,而是随机的,尽可能的均衡,消费者在进行消息消费的过程中,突然其中的一个消费者宕机,就会出现再平衡,消费者组会按照超时时间 45s 来判断它是否退出,时间到了45s 后,判断它真的退出就会把任务分配给其他的消费者进行消费,任务的分配方式是会按照尽量保持均衡的方式进行分区。
和sticky类似只是说支持了cooperative的rebalance。
从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets ,__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
生产者向分区中生产数据,消费者从分区中拉取数据,在拉取的过程中,消费者每间隔5s会提交消费的offset到系统主题中(_consumer_offsets)主题中。
自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。
100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。 1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。
每条日志大小:0.5k - 2k(取 1k)。
1150 条/每秒钟 * 1k ≈ 1m/s 。
高峰期每秒钟:1150 条 * 20 倍 = 23000 条。
每秒多少数据量:20MB/s。
服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1
= 2 * (20m/s * 2 / 100) + 1
= 3 台
建议 3 台服务器。
kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。
建议选择普通的机械硬盘。
每天总数据量:1 亿条 * 1k ≈ 100g
100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T
建议三台服务器硬盘总大小,大于等于 1T。
Kafka 内存组成:堆内存 + 页缓存
1) Kafka 堆内存建议每个节点:10g ~ 15g
在 kafka-server-start.sh 中修改
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx10G -Xms10G”
fi
2)页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中25%的数据在内存中就好。
每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g
建议服务器内存大于等于 11G。
num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。 num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。
num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。
建议 32 个 cpu core。
网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。
100Mbps 单位是 bit;10M/s 单位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。
例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;
分区数 = 100 / 20 = 5 分区
分区数一般设置为:3-10 个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。
在生产环境中,如果某个 Kafka 节点挂掉。
正常处理办法:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。