当前位置:   article > 正文

kafka3.x入门教程(四)_kafka通常要几个副本

kafka通常要几个副本

接下来我们来看下kafka的副本

kafka的副本作用:就是提高数据的可靠性

kafka的默认副本1个,生产环境一般配置为2个,太多的副本会

增加磁盘存储空间,增加网络上数据传输,降低效率

kafka的副本分为leader和follower,生产者只会把数据发送给leader,然后follower找leader

同步数据

kafka中所有的副本统称AR

AR=ISR+OSR

ISR:表示和leader保持同步的follower集合,如果follower长时间未向leader发送通信请求

或者同步数据,那么把follwer剔除isr,放入osr,默认超时时间是30秒,

leader发送故障之后,就会从isr中选举新的leader

OSR:表示follower与leader副本同步时,延迟过多的副本;

接下来我们在来看下leader的选举流程

1:broker启动后在zk注册

2:controller谁先注册,谁说了算

3:由选举出来的controller监听brokers节点变化

4:controller决定leader选举,

选举规则:在isr中存活为前提,按照ar中排在前面的优先;

例如ar[1,0,2],isr[1,0,2],那么leader就会按照1,0,2的顺序轮训

5:controller将节点信息上传到zk

6: 其他controller从zk同步相关信息

7:假设broker1中leader挂了

8:controller监听到节点变化

9:获取isr

10:选择新的leader,在isr存活为前提,ar中排在前面的优先

11:更新leader和isr

创建一个新的主题为zt,3个分区,3个副本

  1. cd /opt/kafka
  2. bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --topic zt --partitions 3 --replication-factor 3

查看下主题的详情

bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic zt

我们可以看到在leader1节点后面是2,按照的是ar优先的

我们把leader1停掉,就是linux2这台机器

  1. cd /opt/kafka/
  2. ./bin/kafka-server-stop.sh

我们在看下主题的详情,可以看到leader2上位了,isr踢出了1

我们在恢复1

./bin/kafka-server-start.sh -daemon config/server.properties

再次查看主题详情,可以看到isr已经恢复回来了

接下来我们在来看下leader和Follwer故障处理细节

leader会先收到数据,数据会多一些

follower同步数据,会少一些

假设leader数据是 0 1 2 3 4 5 6 7(offset)

follower 是0 1 2 3 4(offset)

follower 是0 1 2 3 4 5(offset)

LEO:每个副本的最后一个offset,LEO就是最新的offset+1;

HW:所有副本中最小的LEO,就是leader的4,follower的4,follower的4

follower故障

1:follower发送故障后会被临时提出isr

2: 这个期间leader和follower继续接收数据

3:等到follower恢复后,follower会读取本地磁盘纪录的上次hw,

并将log文件高于hw的部分截取掉,从hw开始向leader进行同步

4:等该followe的leo大于等于该分区的hw,则follower追上leader之后,

就可以重新加入isr了

Leader故障处理细节

1:leader发送故障之后,会从isr中选出一个新的leader

2:为保证多个副本之间的数据一致性,其余的follower会先将

各自的log文件高于hw的部分截掉,然后从新的leader同步数据

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

接下来我们来看下分区副本分配

创建主题为sl,16个分区,3个副本

 bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --partitions 16 --replication-factor 3 --topic sl

查看主题详情

bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic sl

可以看到分区和副本的情况

接下来我们手动分配分区副本

把主题sl的0,1,2,3分区 都设置副本为0,1

  1. cd /opt/kafka
  2. vim increase-replication-factor.json
  1. {
  2. "version":1,
  3. "partitions":[{"topic":"sl","partition":0,"replicas":[0,1]},
  4. {"topic":"sl","partition":1,"replicas":[0,1]},
  5. {"topic":"sl","partition":2,"replicas":[1,0]},
  6. {"topic":"sl","partition":3,"replicas":[1,0]}]
  7. }

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server linux1:9092 --reassignment-json-file increase-replication-factor.json --execute

验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server linux1:9092 --reassignment-json-file increase-replication-factor.json --verify

再次查看主题详情

bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic sl

可以看到已经发送了变化

kafka文件存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。

Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。

每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:zhuti-0。

cd /opt/kafka/datas

可以看到下面的就是主题+分区序号

进入其中一个目录,可以看到index,log,timeindex文件

如果直接使用cat是看不到的,使用下面的命令

kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.timeindex

kafka的数据默认存储7天,7天后自动删除

高效读写数据

1:kafka本身是分布式集群,可以采用分区技术,并行度高

2:读数据采用稀疏索引,可以快速定位要消费的数据

3:顺序写磁盘

4:页缓存+零拷贝技术

kafka消费方式

pull(拉取)模式:消费者从broker中主动拉取数据,kafka采用这种方式

push(推送)模式:kafka没有采用这种方式,因为broker决定消息发送速率,

很难适应所有消费者的消费速率,例如推送的速度是50m/s,

消费者1,消费者2就来不及处理消息

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据

消费组

消费组由多个消费者组成,所有消费者的groupid相同;

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费

消费者组之间互不影响,所有的消费者都属于某个消费者组,消费者组是逻辑上的一个订阅者

在消费者代码中必须配置消费者组id,在使用命令行的时候不用写,他底层自动生成了随机的消费者组id

接下来我们创建消费者的代码

  1. package com.example.client.entity;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import java.time.Duration;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. import java.util.Properties;
  13. /**
  14. *
  15. * 消费者
  16. * @param
  17. * @return
  18. * @throws Exception
  19. */
  20. public class Consumer {
  21. public static void main(String[] args) {
  22. //创建生产者的配置对象
  23. Properties properties=new Properties();
  24. //给kafka配置对象添加配置信息 bootstrap.serve
  25. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
  26. //key value 反序列化 生产者发送的数据序列化,那么我们这里就要反序列化
  27. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  28. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29. //配置消费者组 必须 随便起名字
  30. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
  31. //创建消费者对象
  32. KafkaConsumer<String,String>kafkaConsumer=new KafkaConsumer<String, String>(properties);
  33. //注册要消费的主题
  34. List<String>list=new ArrayList<>();
  35. //主题的名字
  36. list.add("zt");
  37. //订阅主题
  38. kafkaConsumer.subscribe(list);
  39. //拉取数据打印
  40. while (true){
  41. //设置1秒消费一批数据 poll 拉取模式
  42. ConsumerRecords<String,String>consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(2));
  43. //打印消费到的数据
  44. for (ConsumerRecord<String, String> x : consumerRecords) {
  45. System.out.println(x);
  46. }
  47. }
  48. }
  49. }

先启动消费者,在启动生产者,可以看到消费到数据了

接下来我们来看下消费某一个分区的代码

先对生产者改造

  1. package com.example.client.entity;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. import java.util.UUID;
  6. import java.util.concurrent.ExecutionException;
  7. //生产者
  8. public class Producer {
  9. public static void main(String[] args) throws ExecutionException, InterruptedException {
  10. //创建生产者的配置对象
  11. Properties properties=new Properties();
  12. //给kafka配置对象添加配置信息 bootstrap.serve
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
  14. //key value序列化
  15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  17. //设置事物id
  18. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "shiwu_id");
  19. //acks级别
  20. properties.put(ProducerConfig.ACKS_CONFIG,"-1");
  21. //重试次数
  22. properties.put(ProducerConfig.RETRIES_CONFIG,3);
  23. //使用幂等性
  24. properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
  25. //创建kafka生产者对象
  26. KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
  27. //初始化事物
  28. kafkaProducer.initTransactions();
  29. //开启事物
  30. kafkaProducer.beginTransaction();
  31. //调用send方法 发送消息
  32. try {
  33. for (int i = 0; i < 5; i++) {
  34. //给zhuti这个主题 发送消息
  35. ProducerRecord<String, String> var1=new ProducerRecord("zt",0,"","你好");
  36. kafkaProducer.send(var1, new Callback() {
  37. //异步回调 完成时
  38. @Override
  39. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  40. if(e==null){
  41. System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
  42. }else {
  43. //打印异常信息
  44. e.printStackTrace();
  45. }
  46. }
  47. });
  48. }
  49. //模拟报错
  50. // int a=1/0;
  51. //提交事物
  52. kafkaProducer.commitTransaction();
  53. }catch (Exception e){
  54. //终止事物
  55. kafkaProducer.abortTransaction();
  56. }
  57. //关闭资源
  58. kafkaProducer.close();
  59. }
  60. }

主要就是生产者发送到0号分区

  1. package com.example.client.entity;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import org.apache.kafka.common.TopicPartition;
  8. import org.apache.kafka.common.serialization.StringDeserializer;
  9. import org.apache.kafka.common.serialization.StringSerializer;
  10. import java.time.Duration;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Properties;
  14. /**
  15. *
  16. * 消费者
  17. * @param
  18. * @return
  19. * @throws Exception
  20. */
  21. public class Consumer {
  22. public static void main(String[] args) {
  23. //创建生产者的配置对象
  24. Properties properties=new Properties();
  25. //给kafka配置对象添加配置信息 bootstrap.serve
  26. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
  27. //key value 反序列化 生产者发送的数据序列化,那么我们这里就要反序列化
  28. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  30. //配置消费者组 必须 随便起名字
  31. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
  32. //创建消费者对象
  33. KafkaConsumer<String,String>kafkaConsumer=new KafkaConsumer<String, String>(properties);
  34. //主题分区
  35. List<TopicPartition>list=new ArrayList<>();
  36. //主题的名字 消费的分区
  37. TopicPartition topicPartition=new TopicPartition("zt",0);
  38. list.add(topicPartition);
  39. //分配
  40. kafkaConsumer.assign(list);
  41. //拉取数据打印
  42. while (true){
  43. //设置1秒消费一批数据 poll 拉取模式
  44. ConsumerRecords<String,String>consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
  45. //打印消费到的数据
  46. for (ConsumerRecord<String, String> x : consumerRecords) {
  47. System.out.println(x);
  48. }
  49. }
  50. }
  51. }

我们创建2个消费者,代码都一样

生产者在这里发送不同的分区

启动消费者,在启动生产者

可以看到一个消费者组,每个消费者只能消费一个分区的数据

如果生产者发送的数据,造成了数据积压怎么办?

1:增加消费者数,增加分区数,两者保持一致,一个消费者对应一个分区

2:提高每批次拉取数量

接下来我们在来看下Kafka-Eagle监控

Kafka-Eagle可以监控kafka集群的整体运行情况

先安装mysql 主要用来存储可视化展示的数据

docker run -d -p 13306:3306 --privileged=true -v /root/mysql/log:/var/log/mysql -v /root/mysql/data:/var/lib/mysql -v /root/mysql/conf:/etc/mysql/conf.d -e MYSQL_ROOT_PASSWORD=123456 --name mysql57 mysql:5.7

关闭所有kafka,调整jvm

  1. cd /opt/kafka
  2. ./bin/kafka-server-stop.sh
  3. cd /opt/kafka/bin
  4. vim kafka-server-start.sh

把export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"注释掉

然后添加下面的内容

  1. export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G
  2. -XX:PermSize=128m
  3. -XX:+UseG1GC
  4. -XX:MaxGCPauseMillis=200
  5. -XX:ParallelGCThreads=8
  6. -XX:ConcGCThreads=5
  7. -XX:InitiatingHeapOccupancyPercent=70"
  8. export JMX_PORT="9999"

再次启动所有kafka

  1. cd /opt/kafka
  2. ./bin/kafka-server-start.sh -daemon config/server.properties

接下来安装Kafka-Eagle

访问https://www.kafka-eagle.org/

下载kafka-eagle-bin-2.0.8.tar.gz

上传到/opt目录

解压

  1. tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
  2. cd kafka-eagle-bin-2.0.8/
  3. tar -zxvf efak-web-2.0.8-bin.tar.gz
  4. mv efak-web-2.0.8 efak

修改配置文件

  1. cd /opt/kafka-eagle-bin-2.0.8/efak/conf
  2. vi system-config.properties

主要关注箭头的位置去修改

注释这里

配置数据库

保存退出

添加环境变量

 vi /etc/profile
  1. # kafkaEFAK
  2. export KE_HOME=/opt/kafka-eagle-bin-2.0.8/efak
  3. export PATH=$PATH:$KE_HOME/bin

刷新环境变量

source /etc/profile

启动efak

  1. cd /opt/kafka-eagle-bin-2.0.8/efak
  2. ./bin/ke.sh start

停止efak

  1. cd /opt/kafka-eagle-bin-2.0.8/efak
  2. ./bin/ke.sh stop

前置环境要安装好jdk1.8,java_home要有环境变量,不能用系统自带的

浏览器访问

http://192.168.1.5:8048/

账号admin

密码123456

可以看到主题的信息

在这里还可以创建主题的信息

在这个位置可以看到图表信息

接下来我们在看下kraft模式

就是不用在使用zookeeper了,kafka可以独立运行,不需要从zk中读取数据了

我们把所有的kafka都停掉

  1. cd /opt/kafka/
  2. ./bin/kafka-server-stop.sh

然后进入kraft目录

  1. cd /opt/kafka/config/kraft
  2. vi server.properties

我们按照ip来区分节点id

node.id=5

controller.quorum.voters=5@linux1:9093,6@linxu2:9093,7@linux3:9093

这里改一下

# broker对外暴露的地址

advertised.listeners=PLAINTEXT://linux1:9092

这里改一下

#数据存储目录

log.dirs=/opt/kafka/data

其他2个ip node.id=6,node.id=7 其他配置都一样

先在第一台机器生成存储目录唯一id

  1. cd /opt/kafka
  2. bin/kafka-storage.sh random-uuid

拿着这个uuid

MQIfWd38RWuohebCszAneA

在三台节点上面格式化kafka存储目录

bin/kafka-storage.sh format -t MQIfWd38RWuohebCszAneA -c /opt/kafka/config/kraft/server.properties

在三台机器上面启动kafka,注意 启动是kraft目录的server.properties

bin/kafka-server-start.sh -daemon config/kraft/server.properties

我们来创建一个主题aaa

  1. cd /opt/kafka/
  2. bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --partitions 1 --replication-factor 3 --topic aaa

发送下消息

bin/kafka-console-producer.sh --bootstrap-server linux1:9092 --topic aaa

在来消费一条消息

bin/kafka-console-consumer.sh --bootstrap-server linux1:9092 --topic aaa

可以看到收到消息了

kafka3.x入门教程(五)

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号