赞
踩
接下来我们来看下kafka的副本
kafka的副本作用:就是提高数据的可靠性
kafka的默认副本1个,生产环境一般配置为2个,太多的副本会
增加磁盘存储空间,增加网络上数据传输,降低效率
kafka的副本分为leader和follower,生产者只会把数据发送给leader,然后follower找leader
同步数据
kafka中所有的副本统称AR
AR=ISR+OSR
ISR:表示和leader保持同步的follower集合,如果follower长时间未向leader发送通信请求
或者同步数据,那么把follwer剔除isr,放入osr,默认超时时间是30秒,
leader发送故障之后,就会从isr中选举新的leader
OSR:表示follower与leader副本同步时,延迟过多的副本;
接下来我们在来看下leader的选举流程
1:broker启动后在zk注册
2:controller谁先注册,谁说了算
3:由选举出来的controller监听brokers节点变化
4:controller决定leader选举,
选举规则:在isr中存活为前提,按照ar中排在前面的优先;
例如ar[1,0,2],isr[1,0,2],那么leader就会按照1,0,2的顺序轮训
5:controller将节点信息上传到zk
6: 其他controller从zk同步相关信息
7:假设broker1中leader挂了
8:controller监听到节点变化
9:获取isr
10:选择新的leader,在isr存活为前提,ar中排在前面的优先
11:更新leader和isr
创建一个新的主题为zt,3个分区,3个副本
- cd /opt/kafka
-
- bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --topic zt --partitions 3 --replication-factor 3
查看下主题的详情
bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic zt
我们可以看到在leader1节点后面是2,按照的是ar优先的
我们把leader1停掉,就是linux2这台机器
- cd /opt/kafka/
- ./bin/kafka-server-stop.sh
我们在看下主题的详情,可以看到leader2上位了,isr踢出了1
我们在恢复1
./bin/kafka-server-start.sh -daemon config/server.properties
再次查看主题详情,可以看到isr已经恢复回来了
接下来我们在来看下leader和Follwer故障处理细节
leader会先收到数据,数据会多一些
follower同步数据,会少一些
假设leader数据是 0 1 2 3 4 5 6 7(offset)
follower 是0 1 2 3 4(offset)
follower 是0 1 2 3 4 5(offset)
LEO:每个副本的最后一个offset,LEO就是最新的offset+1;
HW:所有副本中最小的LEO,就是leader的4,follower的4,follower的4
follower故障
1:follower发送故障后会被临时提出isr
2: 这个期间leader和follower继续接收数据
3:等到follower恢复后,follower会读取本地磁盘纪录的上次hw,
并将log文件高于hw的部分截取掉,从hw开始向leader进行同步
4:等该followe的leo大于等于该分区的hw,则follower追上leader之后,
就可以重新加入isr了
Leader故障处理细节
1:leader发送故障之后,会从isr中选出一个新的leader
2:为保证多个副本之间的数据一致性,其余的follower会先将
各自的log文件高于hw的部分截掉,然后从新的leader同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
接下来我们来看下分区副本分配
创建主题为sl,16个分区,3个副本
bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --partitions 16 --replication-factor 3 --topic sl
查看主题详情
bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic sl
可以看到分区和副本的情况
接下来我们手动分配分区副本
把主题sl的0,1,2,3分区 都设置副本为0,1
- cd /opt/kafka
- vim increase-replication-factor.json
- {
- "version":1,
- "partitions":[{"topic":"sl","partition":0,"replicas":[0,1]},
- {"topic":"sl","partition":1,"replicas":[0,1]},
- {"topic":"sl","partition":2,"replicas":[1,0]},
- {"topic":"sl","partition":3,"replicas":[1,0]}]
- }
执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server linux1:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server linux1:9092 --reassignment-json-file increase-replication-factor.json --verify
再次查看主题详情
bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic sl
可以看到已经发送了变化
kafka文件存储机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。
Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。
每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:zhuti-0。
cd /opt/kafka/datas
可以看到下面的就是主题+分区序号
进入其中一个目录,可以看到index,log,timeindex文件
如果直接使用cat是看不到的,使用下面的命令
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.timeindex
kafka的数据默认存储7天,7天后自动删除
高效读写数据
1:kafka本身是分布式集群,可以采用分区技术,并行度高
2:读数据采用稀疏索引,可以快速定位要消费的数据
3:顺序写磁盘
4:页缓存+零拷贝技术
kafka消费方式
pull(拉取)模式:消费者从broker中主动拉取数据,kafka采用这种方式
push(推送)模式:kafka没有采用这种方式,因为broker决定消息发送速率,
很难适应所有消费者的消费速率,例如推送的速度是50m/s,
消费者1,消费者2就来不及处理消息
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据
消费组
消费组由多个消费者组成,所有消费者的groupid相同;
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费
消费者组之间互不影响,所有的消费者都属于某个消费者组,消费者组是逻辑上的一个订阅者
在消费者代码中必须配置消费者组id,在使用命令行的时候不用写,他底层自动生成了随机的消费者组id
接下来我们创建消费者的代码
- package com.example.client.entity;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
-
- /**
- *
- * 消费者
- * @param
- * @return
- * @throws Exception
- */
- public class Consumer {
- public static void main(String[] args) {
- //创建生产者的配置对象
- Properties properties=new Properties();
- //给kafka配置对象添加配置信息 bootstrap.serve
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
- //key value 反序列化 生产者发送的数据序列化,那么我们这里就要反序列化
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- //配置消费者组 必须 随便起名字
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
- //创建消费者对象
- KafkaConsumer<String,String>kafkaConsumer=new KafkaConsumer<String, String>(properties);
- //注册要消费的主题
- List<String>list=new ArrayList<>();
- //主题的名字
- list.add("zt");
- //订阅主题
- kafkaConsumer.subscribe(list);
- //拉取数据打印
- while (true){
- //设置1秒消费一批数据 poll 拉取模式
- ConsumerRecords<String,String>consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(2));
- //打印消费到的数据
- for (ConsumerRecord<String, String> x : consumerRecords) {
- System.out.println(x);
- }
- }
-
-
- }
- }

先启动消费者,在启动生产者,可以看到消费到数据了
接下来我们来看下消费某一个分区的代码
先对生产者改造
- package com.example.client.entity;
-
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
- import java.util.UUID;
- import java.util.concurrent.ExecutionException;
-
- //生产者
- public class Producer {
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- //创建生产者的配置对象
- Properties properties=new Properties();
- //给kafka配置对象添加配置信息 bootstrap.serve
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
- //key value序列化
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- //设置事物id
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "shiwu_id");
- //acks级别
- properties.put(ProducerConfig.ACKS_CONFIG,"-1");
- //重试次数
- properties.put(ProducerConfig.RETRIES_CONFIG,3);
- //使用幂等性
- properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
-
-
- //创建kafka生产者对象
- KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
- //初始化事物
- kafkaProducer.initTransactions();
- //开启事物
- kafkaProducer.beginTransaction();
-
- //调用send方法 发送消息
- try {
- for (int i = 0; i < 5; i++) {
- //给zhuti这个主题 发送消息
- ProducerRecord<String, String> var1=new ProducerRecord("zt",0,"","你好");
- kafkaProducer.send(var1, new Callback() {
- //异步回调 完成时
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if(e==null){
- System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
- }else {
- //打印异常信息
- e.printStackTrace();
- }
- }
- });
- }
- //模拟报错
- // int a=1/0;
- //提交事物
- kafkaProducer.commitTransaction();
- }catch (Exception e){
- //终止事物
- kafkaProducer.abortTransaction();
- }
- //关闭资源
- kafkaProducer.close();
- }
- }

主要就是生产者发送到0号分区
- package com.example.client.entity;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
-
- /**
- *
- * 消费者
- * @param
- * @return
- * @throws Exception
- */
- public class Consumer {
- public static void main(String[] args) {
- //创建生产者的配置对象
- Properties properties=new Properties();
- //给kafka配置对象添加配置信息 bootstrap.serve
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
- //key value 反序列化 生产者发送的数据序列化,那么我们这里就要反序列化
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- //配置消费者组 必须 随便起名字
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
- //创建消费者对象
- KafkaConsumer<String,String>kafkaConsumer=new KafkaConsumer<String, String>(properties);
- //主题分区
- List<TopicPartition>list=new ArrayList<>();
- //主题的名字 消费的分区
- TopicPartition topicPartition=new TopicPartition("zt",0);
- list.add(topicPartition);
- //分配
- kafkaConsumer.assign(list);
- //拉取数据打印
- while (true){
- //设置1秒消费一批数据 poll 拉取模式
- ConsumerRecords<String,String>consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
- //打印消费到的数据
- for (ConsumerRecord<String, String> x : consumerRecords) {
- System.out.println(x);
- }
- }
-
-
- }
- }

我们创建2个消费者,代码都一样
生产者在这里发送不同的分区
启动消费者,在启动生产者
可以看到一个消费者组,每个消费者只能消费一个分区的数据
如果生产者发送的数据,造成了数据积压怎么办?
1:增加消费者数,增加分区数,两者保持一致,一个消费者对应一个分区
2:提高每批次拉取数量
接下来我们在来看下Kafka-Eagle监控
Kafka-Eagle可以监控kafka集群的整体运行情况
先安装mysql 主要用来存储可视化展示的数据
docker run -d -p 13306:3306 --privileged=true -v /root/mysql/log:/var/log/mysql -v /root/mysql/data:/var/lib/mysql -v /root/mysql/conf:/etc/mysql/conf.d -e MYSQL_ROOT_PASSWORD=123456 --name mysql57 mysql:5.7
关闭所有kafka,调整jvm
- cd /opt/kafka
- ./bin/kafka-server-stop.sh
- cd /opt/kafka/bin
- vim kafka-server-start.sh
把export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"注释掉
然后添加下面的内容
- export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G
- -XX:PermSize=128m
- -XX:+UseG1GC
- -XX:MaxGCPauseMillis=200
- -XX:ParallelGCThreads=8
- -XX:ConcGCThreads=5
- -XX:InitiatingHeapOccupancyPercent=70"
- export JMX_PORT="9999"
再次启动所有kafka
- cd /opt/kafka
- ./bin/kafka-server-start.sh -daemon config/server.properties
接下来安装Kafka-Eagle
访问https://www.kafka-eagle.org/
下载kafka-eagle-bin-2.0.8.tar.gz
上传到/opt目录
解压
- tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
- cd kafka-eagle-bin-2.0.8/
- tar -zxvf efak-web-2.0.8-bin.tar.gz
- mv efak-web-2.0.8 efak
修改配置文件
- cd /opt/kafka-eagle-bin-2.0.8/efak/conf
- vi system-config.properties
主要关注箭头的位置去修改
注释这里
配置数据库
保存退出
添加环境变量
vi /etc/profile
- # kafkaEFAK
- export KE_HOME=/opt/kafka-eagle-bin-2.0.8/efak
- export PATH=$PATH:$KE_HOME/bin
刷新环境变量
source /etc/profile
启动efak
- cd /opt/kafka-eagle-bin-2.0.8/efak
- ./bin/ke.sh start
停止efak
- cd /opt/kafka-eagle-bin-2.0.8/efak
- ./bin/ke.sh stop
前置环境要安装好jdk1.8,java_home要有环境变量,不能用系统自带的
浏览器访问
账号admin
密码123456
可以看到主题的信息
在这里还可以创建主题的信息
在这个位置可以看到图表信息
接下来我们在看下kraft模式
就是不用在使用zookeeper了,kafka可以独立运行,不需要从zk中读取数据了
我们把所有的kafka都停掉
- cd /opt/kafka/
- ./bin/kafka-server-stop.sh
然后进入kraft目录
- cd /opt/kafka/config/kraft
- vi server.properties
我们按照ip来区分节点id
node.id=5
controller.quorum.voters=5@linux1:9093,6@linxu2:9093,7@linux3:9093
这里改一下
# broker对外暴露的地址
advertised.listeners=PLAINTEXT://linux1:9092
这里改一下
#数据存储目录
log.dirs=/opt/kafka/data
其他2个ip node.id=6,node.id=7 其他配置都一样
先在第一台机器生成存储目录唯一id
- cd /opt/kafka
- bin/kafka-storage.sh random-uuid
拿着这个uuid
MQIfWd38RWuohebCszAneA
在三台节点上面格式化kafka存储目录
bin/kafka-storage.sh format -t MQIfWd38RWuohebCszAneA -c /opt/kafka/config/kraft/server.properties
在三台机器上面启动kafka,注意 启动是kraft目录的server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties
我们来创建一个主题aaa
- cd /opt/kafka/
- bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --partitions 1 --replication-factor 3 --topic aaa
发送下消息
bin/kafka-console-producer.sh --bootstrap-server linux1:9092 --topic aaa
在来消费一条消息
bin/kafka-console-consumer.sh --bootstrap-server linux1:9092 --topic aaa
可以看到收到消息了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。