当前位置:   article > 正文

Kafka内容分享(五):Kafka 分区和副本机制_kafka 3.7 kraft模式如何调节分区和副本配hi

kafka 3.7 kraft模式如何调节分区和副本配hi

目录

一、分区和副本机制

1.1 生产者分区写入策略

1.1.1、轮询策略

1.1.2、随机策略(不用)

1.1.3、按key分配策略

1.1.4、乱序问题

1.1.5、自定义分区策略

1.2、消费者组Rebalance机制

1.2.1、Rebalance再均衡

1.2.2、Rebalance的不良影响

1.3、消费者分区分配策略

1.3.1、Range范围分配策略

1.3.2、RoundRobin轮询策略

1.3.3、Stricky粘性分配策略

1.4 副本机制

1.4.1、producer的acks参数

1.4.2、acks配置为0

1.4.3、acks配置为1

1.4.4、acks配置为-1或者all

二、高级(High Level)API与低级(Low Level)API

2.1、高级API

2.2、低级API

2.3、手动消费分区数据

三、监控工具Kafka-eagle介绍

3.1、Kafka-Eagle简介

3.2、安装Kafka-Eagle

3.2.1、开启Kafka JMX端口

3.2.1.1、JMX接口

3.2.1.2、开启Kafka JMX

3.2.2、安装Kafka-Eagle

3.3、Kafka度量指标


一、分区和副本机制

1.1 生产者分区写入策略

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中

  • 轮询分区策略
  • 随机分区策略
  • 按key分区分配策略
  • 自定义分区策略
1.1.1、轮询策略
  • 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
  • 如果在生产消息时,key为null,则使用轮询算法均衡地分配分区

1.1.2、随机策略(不用)

随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。

1.1.3、按key分配策略

按key分配策略,有可能会出现「数据倾斜」,key.hash() % 分区的数量。例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。

1.1.4、乱序问题

轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。

  • 在Kafka中生产者是有写入策略,如果topic有多个分区,就会将数据分散在不同的partition中存储
  • 当partition数量大于1的时候,数据(消息)会打散分布在不同的partition中
  • 如果只有一个分区,消息是有序的

1.1.5、自定义分区策略

实现步骤:

创建自定义分区器

  1. public class KeyWithRandomPartitioner implements Partitioner {
  2. private Random r;
  3. @Override
  4. public void configure(Map<String, ?> configs) {
  5. r = new Random();
  6. }
  7. @Override
  8. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  9. // cluster.partitionCountForTopic 表示获取指定topic的分区数量
  10. return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
  11. }
  12. @Override
  13. public void close() {
  14. }
  15. }

在Kafka生产者配置中,自定使用自定义分区器的类名

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());

1.2、消费者组Rebalance机制

1.2.1、Rebalance再均衡

Kafka中的Rebalance称之为再均衡,是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。

Rebalance触发的时机有:

消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。

订阅的topic个数发生变化
消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。​​​​​​​

订阅的topic分区数发生变化

1.2.2、Rebalance的不良影响
  • 发生Rebalance时,consumer group下的所有consumer都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配
  • Rebalance过程会对consumer group产生非常严重的影响,Rebalance的过程中所有的消费者都将停止工作,直到Rebalance完成

1.3、消费者分区分配策略

保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少

1.3.1、Range范围分配策略

Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。

注意:Rangle范围分配策略是针对每个Topic的。

配置

配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。

算法公式

n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个
剩余消费者消费n个​​​​​​​

1.3.2、RoundRobin轮询策略

RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

消费者挨个分配消费的分区

配置

配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。

1.3.3、Stricky粘性分配策略

从Kafka 0.11.x开始,引入此类分配策略。主要目的:

  1. 分区分配尽可能均匀
  2. 在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同

没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似。

发生了rebalance,轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可

减少上下文的切换

上面如果consumer2崩溃了,此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配,例如:

通过上图,我们发现,consumer0和consumer1原来消费的分区大多发生了改变。接下来我们再来看下粘性分配策略。

我们发现,Striky粘性分配策略,保留rebalance之前的分配结果。这样,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费,例如:之前consumer0、consumer1之前正在消费某几个分区,但由于rebalance发生,导致consumer0、consumer1需要重新消费之前正在处理的分区,导致不必要的系统开销。(例如:某个事务正在进行就必须要取消了)

1.4 副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。

1.4.1、producer的acks参数

对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。

注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常

配置:

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "node1.angyan.cn:9092");
  3. props.put("acks", "all");
  4. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
1.4.2、acks配置为0
  • 生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的
  • 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

ACK为0,基准测试:

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=0
1.4.3、acks配置为1
  • 当生产者的ACK配置为1时,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。
  • 默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。

1.4.4、acks配置为-1或者all
  • 确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的
  • 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。

ACK基准测试:

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=all

根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。

分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。

二、高级(High Level)API与低级(Low Level)API

  • 高级API就是直接让Kafka帮助管理、处理分配、数据
    • offset存储在ZK中
    • 由kafka的rebalance来控制消费者分配的分区
    • 开发起来比较简单,无需开发者关注底层细节
    • 无法做到细粒度的控制
  • 低级API:由编写的程序自己控制逻辑
    • 自己来管理Offset,可以将offset存储在ZK、MySQL、Redis、HBase、Flink的状态存储
    • 指定消费者拉取某个分区的数据
    • 可以做到细粒度的控制
    • 原有的Kafka的策略会失效,需要我们自己来实现消费机制

2.1、高级API

  1. /**
  2. * 消费者程序:从test主题中消费数据
  3. */
  4. public class consumerTest {
  5. public static void main(String[] args) {
  6. // 1. 创建Kafka消费者配置
  7. Properties props = new Properties();
  8. props.setProperty("bootstrap.servers", "192.168.88.100:9092");
  9. props.setProperty("group.id", "test");
  10. props.setProperty("enable.auto.commit", "true");
  11. props.setProperty("auto.commit.interval.ms", "1000");
  12. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14. // 2. 创建Kafka消费者
  15. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  16. // 3. 订阅要消费的主题
  17. consumer.subscribe(Arrays.asList("test"));
  18. // 4. 使用一个while循环,不断从Kafka的topic中拉取消息
  19. while (true) {
  20. // 定义100毫秒超时
  21. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  22. for (ConsumerRecord<String, String> record : records)
  23. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  24. }
  25. }
  26. }
  • 上面是之前编写的代码,消费Kafka的消息很容易实现,写起来比较简单
  • 不需要执行去管理offset,直接通过ZK管理;也不需要管理分区、副本,由Kafka统一管理
  • 消费者会自动根据上一次在ZK中保存的offset去接着获取数据
  • 在ZK中,不同的消费者组(group)同一个topic记录不同的offset,这样不同程序读取同一个topic,不会受offset的影响

高级API的缺点

  • 不能控制offset,例如:想从指定的位置读取

  • 不能细化控制分区、副本、ZK等

2.2、低级API

通过使用低级API,我们可以自己来控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在ZK中,使用低级API,我们可以将offset不一定要使用ZK存储,我们可以自己来存储offset。例如:存储在文件、MySQL、或者内存中。但是低级API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。

2.3、手动消费分区数据

之前的代码,我们让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候,我们需要指定要消费的分区,例如:

  • 如果某个程序将某个指定分区的数据保存到外部存储中,例如:Redis、MySQL,那么保存数据的时候,只需要消费该指定的分区数据即可
  • 如果某个程序是高可用的,在程序出现故障时将自动重启(例如:后面我们将学习的Flink、Spark程序)。这种情况下,程序将从指定的分区重新开始消费数据。

如何进行手动消费分区中的数据呢?

 不再使用之前的 subscribe 方法订阅主题,而使用 「assign」方法指定想要消费的消息

  1. String topic = "test";
  2. TopicPartition partition0 = new TopicPartition(topic, 0);
  3. TopicPartition partition1 = new TopicPartition(topic, 1);
  4. consumer.assign(Arrays.asList(partition0, partition1));

一旦指定了分 区,就可以就像前面的示例一样,在循环中调用「poll」方法消费消息

注意

  1. 当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用
  2. 如果消费者失败,也将不再自动进行分区重新分配

三、监控工具Kafka-eagle介绍

3.1、Kafka-Eagle简介

在开发工作中,当业务前提不复杂时,可以使用Kafka命令来进行一些集群的管理工作。但如果业务变得复杂,例如:我们需要增加group、topic分区,此时,我们再使用命令行就感觉很不方便,此时,如果使用一个可视化的工具帮助我们完成日常的管理工作,将会大大提高对于Kafka集群管理的效率,而且我们使用工具来监控消费者在Kafka中消费情况。

早期,要监控Kafka集群我们可以使用Kafka Monitor以及Kafka Manager,但随着我们对监控的功能要求、性能要求的提高,这些工具已经无法满足。

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。

官网地址:https://www.kafka-eagle.org/

3.2、安装Kafka-Eagle

3.2.1、开启Kafka JMX端口
3.2.1.1、JMX接口

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。

3.2.1.2、开启Kafka JMX

在启动Kafka的脚本前,添加:

  1. cd ${KAFKA_HOME}
  2. export JMX_PORT=9988
  3. nohup bin/kafka-server-start.sh config/server.properties &
3.2.2、安装Kafka-Eagle
  1. 安装JDK,并配置好JAVA_HOME。
  2. 将kafka_eagle上传,并解压到 /export/server 目录中。
  1. cd cd /export/software/
  2. tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/
  3. cd /export/server/kafka-eagle-bin-1.4.6/
  4. tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz
  5. cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6

配置 kafka_eagle 环境变量。

vim /etc/profile

  1. export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
  2. export PATH=$PATH:$KE_HOME/bin

source /etc/profile

配置 kafka_eagle。使用vi打开conf目录下的system-config.properties

vim conf/system-config.properties

  1. # 修改第4行,配置kafka集群别名
  2. kafka.eagle.zk.cluster.alias=cluster1
  3. # 修改第5行,配置ZK集群地址
  4. cluster1.zk.list=node1.angyan.cn:2181,node2.angyan.cn:2181,node3.angyan.cn:2181
  5. # 注释第6行
  6. #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
  7. # 修改第32行,打开图标统计
  8. kafka.eagle.metrics.charts=true
  9. kafka.eagle.metrics.retain=30
  10. # 注释第69行,取消sqlite数据库连接配置
  11. #kafka.eagle.driver=org.sqlite.JDBC
  12. #kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
  13. #kafka.eagle.username=root
  14. #kafka.eagle.password=www.kafka-eagle.org
  15. # 修改第77行,开启mys
  16. kafka.eagle.driver=com.mysql.jdbc.Driver
  17. kafka.eagle.url=jdbc:mysql://node1.angyan.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  18. kafka.eagle.username=root
  19. kafka.eagle.password=123456

配置JAVA_HOME

  1. cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
  2. vim ke.sh
  3. # 在第24行添加JAVA_HOME环境配置
  4. export JAVA_HOME=/export/server/jdk1.8.0_241

修改Kafka eagle可执行权限

  1. cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
  2. chmod +x ke.sh

启动 kafka_eagle。

./ke.sh start

访问Kafka eagle,默认用户为admin,密码为:123456

http://node1.angyan.cn:8048/ke

3.3、Kafka度量指标

指标意义
Brokers Spreadbroker使用率
Brokers Skew分区是否倾斜
Brokers Leader Skewleader partition是否存在倾斜
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/700446
推荐阅读
相关标签
  

闽ICP备14008679号