赞
踩
目录
二、高级(High Level)API与低级(Low Level)API
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
按key分配策略,有可能会出现「数据倾斜」,key.hash() % 分区的数量。例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。
轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
- 在Kafka中生产者是有写入策略,如果topic有多个分区,就会将数据分散在不同的partition中存储
- 当partition数量大于1的时候,数据(消息)会打散分布在不同的partition中
- 如果只有一个分区,消息是有序的
实现步骤:
创建自定义分区器
- public class KeyWithRandomPartitioner implements Partitioner {
-
- private Random r;
-
- @Override
- public void configure(Map<String, ?> configs) {
- r = new Random();
- }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // cluster.partitionCountForTopic 表示获取指定topic的分区数量
- return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
- }
-
- @Override
- public void close() {
- }
- }

在Kafka生产者配置中,自定使用自定义分区器的类名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());
Kafka中的Rebalance称之为再均衡,是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。
Rebalance触发的时机有:
消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。
订阅的topic个数发生变化
消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。
订阅的topic分区数发生变化
保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少
Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个
剩余消费者消费n个
RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
消费者挨个分配消费的分区
配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。
从Kafka 0.11.x开始,引入此类分配策略。主要目的:
没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似。
发生了rebalance,轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可
减少上下文的切换
上面如果consumer2崩溃了,此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配,例如:
通过上图,我们发现,consumer0和consumer1原来消费的分区大多发生了改变。接下来我们再来看下粘性分配策略。
我们发现,Striky粘性分配策略,保留rebalance之前的分配结果。这样,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费,例如:之前consumer0、consumer1之前正在消费某几个分区,但由于rebalance发生,导致consumer0、consumer1需要重新消费之前正在处理的分区,导致不必要的系统开销。(例如:某个事务正在进行就必须要取消了)
副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常
配置:
- Properties props = new Properties();
- props.put("bootstrap.servers", "node1.angyan.cn:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
ACK为0,基准测试:
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=0
ACK基准测试:
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=all
根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。
分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。
- /**
- * 消费者程序:从test主题中消费数据
- */
- public class consumerTest {
- public static void main(String[] args) {
- // 1. 创建Kafka消费者配置
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "192.168.88.100:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- // 2. 创建Kafka消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
- // 3. 订阅要消费的主题
- consumer.subscribe(Arrays.asList("test"));
-
- // 4. 使用一个while循环,不断从Kafka的topic中拉取消息
- while (true) {
- // 定义100毫秒超时
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }

高级API的缺点
不能控制offset,例如:想从指定的位置读取
不能细化控制分区、副本、ZK等
通过使用低级API,我们可以自己来控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在ZK中,使用低级API,我们可以将offset不一定要使用ZK存储,我们可以自己来存储offset。例如:存储在文件、MySQL、或者内存中。但是低级API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。
之前的代码,我们让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候,我们需要指定要消费的分区,例如:
如何进行手动消费分区中的数据呢?
不再使用之前的 subscribe 方法订阅主题,而使用 「assign」方法指定想要消费的消息
- String topic = "test";
- TopicPartition partition0 = new TopicPartition(topic, 0);
- TopicPartition partition1 = new TopicPartition(topic, 1);
- consumer.assign(Arrays.asList(partition0, partition1));
一旦指定了分 区,就可以就像前面的示例一样,在循环中调用「poll」方法消费消息
注意
在开发工作中,当业务前提不复杂时,可以使用Kafka命令来进行一些集群的管理工作。但如果业务变得复杂,例如:我们需要增加group、topic分区,此时,我们再使用命令行就感觉很不方便,此时,如果使用一个可视化的工具帮助我们完成日常的管理工作,将会大大提高对于Kafka集群管理的效率,而且我们使用工具来监控消费者在Kafka中消费情况。
早期,要监控Kafka集群我们可以使用Kafka Monitor以及Kafka Manager,但随着我们对监控的功能要求、性能要求的提高,这些工具已经无法满足。
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。
官网地址:https://www.kafka-eagle.org/
JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。
在启动Kafka的脚本前,添加:
- cd ${KAFKA_HOME}
- export JMX_PORT=9988
- nohup bin/kafka-server-start.sh config/server.properties &
- cd cd /export/software/
- tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/
- cd /export/server/kafka-eagle-bin-1.4.6/
- tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz
- cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
配置 kafka_eagle 环境变量。
vim /etc/profile
- export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
- export PATH=$PATH:$KE_HOME/bin
source /etc/profile
配置 kafka_eagle。使用vi打开conf目录下的system-config.properties
vim conf/system-config.properties
- # 修改第4行,配置kafka集群别名
- kafka.eagle.zk.cluster.alias=cluster1
- # 修改第5行,配置ZK集群地址
- cluster1.zk.list=node1.angyan.cn:2181,node2.angyan.cn:2181,node3.angyan.cn:2181
- # 注释第6行
- #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
-
- # 修改第32行,打开图标统计
- kafka.eagle.metrics.charts=true
- kafka.eagle.metrics.retain=30
-
- # 注释第69行,取消sqlite数据库连接配置
- #kafka.eagle.driver=org.sqlite.JDBC
- #kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
- #kafka.eagle.username=root
- #kafka.eagle.password=www.kafka-eagle.org
-
- # 修改第77行,开启mys
- kafka.eagle.driver=com.mysql.jdbc.Driver
- kafka.eagle.url=jdbc:mysql://node1.angyan.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- kafka.eagle.username=root
- kafka.eagle.password=123456

配置JAVA_HOME
- cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
- vim ke.sh
- # 在第24行添加JAVA_HOME环境配置
- export JAVA_HOME=/export/server/jdk1.8.0_241
修改Kafka eagle可执行权限
- cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
- chmod +x ke.sh
启动 kafka_eagle。
./ke.sh start
访问Kafka eagle,默认用户为admin,密码为:123456
http://node1.angyan.cn:8048/ke
指标 | 意义 |
---|---|
Brokers Spread | broker使用率 |
Brokers Skew | 分区是否倾斜 |
Brokers Leader Skew | leader partition是否存在倾斜 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。