赞
踩
kafka是大数据,大并发的杀手锏
kafka + zookeeper
kafka官网:https://kafka.apache.org/
把同步变成异步操作。
同步存在的问题:
在业务的上游(下单)和下游(积分,数据库,优惠券等)之间建立一个消息队列;
异步的优势:
针对同步的通信方式来说,异步的方法,可以让上游快速成功,极大的提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务,也能保障业务执行之后的最终一致性。
MQ(MESSAGE QUEUE)
消息队列解决的是通信问题,使用的是生产者/消费者模型。
rabbitMQ,rocketMQ, kafka,zeroMQ等等消息队列中间件,分成了有无Broker(进行消息分发进队列的功能)的MQ
kafka 是全球消息处理性能最快的一款MQ
通过有一台服务器作为broker,所有的消息都通过他来中转。生产者吧消息发送给它就结束了自己的任务,broker把消息给队里,再有队列推送给消费者(或者消费者主动轮询获得消息)
zeroMQ
是一个分布式、支持分区(partition)、多副本(replica),给予zookeeper协调的分布式消息系统。
可以实现:
使用场景:
前提需要有jdk,以及部署一台zookeeper的服务器
kafka官网:https://kafka.apache.org/
可以下载2.4.1的版本使用
1.在usr/local中创建一个文件夹
2.吧下载的kafka安装包放进文件夹中,然后解压
3.删除压缩包
4.加压成功后,进入文件夹中查看
5.在config里面关注一下这个文件 server.properties
vim server.properties查看配置
6. 启动kafka
进入到bin的文件夹里面,然后输入:
kafka-server-start.sh -daemon …/config/server.properties
表示:带着配置文件启动kafka
进程:ps -aux | grep server.properties
kafka创建的topic主题是存储在zookeeper上的。
消息才是保存在kafka上的usr/local/kafka/data/kafka-logs/test-0/0000000.log(日志路径/topic-id中)文件里面
问:在一个kafka的topic中,启动两个消费者,那生产者发送消息,是否同时被这这两消费者消息吗?
答:如果在同一个消费组,name只有一个消费组可以收到消息;换言之,同一个消费组中 一个topic中的消息 只能有一个消费者收到
目的:为了保证消费的顺序
不同的消费组订阅同一个topic,那么每个消费组中只能有一个消费者收到消息;
单,多播 图解:
是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费组消费。
但是有一个问题,假如这个topic中的消息非常的多,多到需要几T存储,因为消息会被保存在log日志文件中的,此时,为了解决文件过大的问题,kafka提出了partition分区的概念,
通过partition将一个topic中的消息分区存储,这样的好处有多个
举例:假如消费者1在消费第50个消息的时候挂了,然后消费者2继续消费是从51开始的,原理如上图:会在_consimer_offsets里面存储消费的offset(偏移量信息) + _consimer_offsets默认有50个分区。
文件中保存的消息,默认保存7天,7天后会被删除。
准备3个server.properties⽂件
每个⽂件中的这些内容要调整
broker.id=0
listeners=PLAINTEXT://192.168.65.60:9092
log.dir=/usr/local/data/kafka-logs
broker.id=1
listeners=PLAINTEXT://192.168.65.60:9093
log.dir=/usr/local/data/kafka-logs-1
broker.id=2
listeners=PLAINTEXT://192.168.65.60:9094
log.dir=/usr/local/data/kafka-logs-2
使⽤如下命令来启动3台服务器
./kafka-server-start.sh -daemon ../config/server0.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
搭建完后通过查看zk中的/brokers/ids 看是否启动成功
副本是对分区的备份。在集群中,不同的副本会被部署在不同的briker上。
副本是为了主题中的分区创建多个备份,多个副本子啊kafka集群的偶的个broker中,会有一个副本作为leader,其他都是follower
下面例子:创建一个主题,两个分区,三个副本。
注意:localhost是zk的ip地址,输入命令创建和查看的命令
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,其中的关键数据:
replicas:
当前副本存在的broker节点
leader:副本⾥的概念
每个partition都有⼀个broker作为leader。
消息发送⽅要把消息发给哪个broker?就看副本的leader是在哪个broker上⾯。副本⾥的leader专⻔⽤来接收消息。
接收到消息,其他follower通过poll的⽅式来同步数据。
消费者也是消费leader的broker的消息
follower:leader处理所有针对这个partition的读写请求,⽽follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果leader所在的broker挂掉,那么就会进⾏新leader的选举,⾄于怎么选,在之后的controller的概念中介绍。
isr:可以同步的broker节点和已同步的broker节点,存放在isr集合中;如果isr中的节点服务器性能差,会被踢出isr集合。
broker、topic主题。partition分区。replication副本之间的关系是如何的呢?
集群中有多个broker,创建主题时可以致命主题的分区数(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里.
1)向集群发送消息:
2)从集群中消费消息:
3)指定消费组消费消息
4)分区分消息组的集群消费中的细节
图中kafka集群有两个broker,每个broker中有多个partition。一个partition只能被一个消费组中的某一个消费组消费,从而保证消费顺序。 但是,卡发卡只在prtition的范围内保证消息消费的局部顺序,不能再不同的partition内保证总的消费顺序。
一个消费组可以消费多个partition
消费组中消费者的数量不能比一个topic中的partition数量多,否则多出的消费组消费不到消息的(由于单边播放原则)
1.依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.具体实现
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MySimpleProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws ExecutionException,InterruptedException { //1.设置参数,存放键值对的,相当于map Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094"); //把发送的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //2.创建⽣产消息的客户端,传⼊参数 Producer<String,String> producer = new KafkaProducer<String, String>(props); //3.创建消息 // 参数一:主题名, 参数二: key,作⽤是决定了往哪个分区上发,参数三:value,具体要发送的消息内容 // 具体的发送分区就算公司:hash(key)%partitionNum , 或者直接 ProducerRecord<String,String> producerRecord = new ProducerRecord<> (TOPIC_NAME,"mykeyvalue","hellokafka"); // new ProducerRecord<> (TOPIC_NAME, 0 ,"mykeyvalue","hellokafka"); 也可以指定分区id //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步⽅式发送消息结果:"+"topic-"+metadata.topic() +"|partition-" + metadata.partition()+"|offset-"+metadata.offset()); producer.close(); } }
生产者同步发送消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态;
通过配置我们知道:阻塞3S的时间,如果还没有收到ack确定,会重试3次,还没有就报错了抛InterruptedException异常,我们可以捕获处理
// 发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get(); // 同步发送
不需要等待kafka返回一个ack信息给生产者,直接默认成功,发送完消息后就可以直接执行后面的业务,kafka的breker在收到消息后异步调用生产者提供的callback回调方法。
异步发送 是指使用 异步回调方式 发送消息
问题:那发送消息使用异步多还是同步多? – 同步
虽然异步会提升性能,但是不能保证消息是否不会丢失,且发送消息用异步性能提升不明显,所以使用同步发送消息较多。
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有三个配置:
下⾯是关于ack和重试(如果没有收到ack,就开启重试)的配置
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack = 1
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,
⽐如⽹络抖动,所以需要在接收者那边做好消息接收的**幂等性处理**
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
//重试间隔设置,ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
重试发送消息,此时可能会有重复发送的问题,解决方法见下;
消息不是一条一条的发送到kafka;
消息从生产者到kafka之间有缓存区;
代码:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MySimpleConsumer { private final static String TOPIC_NAME= "my-replicated-topic"; private final static String CONSUMER_GROUP_NAME ="testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094"); //消费分组名,及其序列化 props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //1.创建⼀个消费者的客户端 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //2.消费者订阅主题列表 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { /* * 3.poll() API是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // 消费poll下来的消息 //4.打印消息 System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } } }
1)提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。
2)自动提交
消费组poll消息下来以后就会自动提交offset
// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COM`在这里插入代码片`MIT_INTERVAL_MS_CONFIG, "1000");
注意:自动提交会丢消息。因为消费者在消费之前提交offset,有可能提交完成后还没有消费时消费者就挂了。
3)手动提交
需要把自动提交的配置改成fasle
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);
手动提交分为两种
手动同步提交(常用)
在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
consumer.commitSync();此时会阻塞,等ack
手动异步提交
在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
consumer.commitASync();此时不会阻塞
一般使用同步提交就可以了。
默认情况下,消费者一次会poll500条消息。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
代码中设置了⻓轮询的时间是1000毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
意味着:
如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让⼀次poll的消息条数少⼀点
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
指定分区消费
从头消费(回溯消费)
指定offset消费
指定时间点开始消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。
默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.编写配置文件
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/msg") public class MyKafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send") publicStringsendMessage(){ kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!"); return "send success!"; } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component publicclassMyConsumer { @KafkaListener(topics="my-replicated-topic",groupId="MyGroup1") // 监听主题,属于的消费组 public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { // record是指一条消息记录 String value =record.value(); System.out.println(value); System.out.println(record); //⼿动提交offset,自动提交,这个ack没有作用。 ack.acknowledge(); } @KafkaListener(topics="my-replicated-topic",groupId="MyGroup2") // 监听主题,属于的消费组 public void listensGroup(ConsumerRecord<String, String> records, Acknowledgment ack) { // record是指一批消息记录 records进行操作 //⼿动提交offset ack.acknowledge(); } }
@KafkaListener(groupId="testGroup", topicPartitions= {
@TopicPartition(topic="topic1", partitions= {"0", "1"}), // topic1主题的0,1分区
@TopicPartition(topic="topic2", partitions="0", // topic2主题的0 分区
partitionOffsets=@PartitionOffset(partition="1",initialOffset="100"))} // 对于任意主题的1号分区,从100偏移量开始消费
, concurrency="3") //concurrency就是同组下创建的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}
集群中谁来充当controller
每个broker启动时会在zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller(所以很多时候是0的broker当controller)
controller的作用:负责管理整个集群中所有分区和副本的状态。
range:根据公式计算得到某个消费者 消费哪个分区;前面的消费者是 分区总数/消费者数量 + 1,之后的消费者是分区总数/消费者数量
轮询:大家轮着来(消费者1消费p036,消费者2消费p147,消费者3消费25)
sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没开,那么就进行全部的重新分配,建议开启。
LEO是某个副本最后消息的消息位置(log-end-offset)
HW俗称高水位 ,是已完成同步的位置。消息在写⼊broker时,且每个broker完成这条消息的同步后,hw才会变化。
在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的⽬的是防⽌消息的丢失。
作用:防止重复消费
生产者:使用同步发送消息 + 把ack设置成1或者all + 设置同步的分区数 >=2
消费者:把自动提交改成手动提交
在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。
此时,生产者会进行重试,于是broker就会收到多条相同的消息,而照成消费者的重复消费。
解决:
- 生产者关闭重试:会造成丢失消息(不建议)
- 消费者解决非幂等性消息问题:
所谓幂等性:多次访问的结果都是一样的。对于rest的请求(get、put、delete幂等,post非幂等)
方案:
1. 在数据库中创建联合主键,防止相同的主键创建出多条记录
2. 使用分布式锁,以业务id为锁,保证只有一条记录能创建成功
生产者:保证消息按顺序消费,且消息不丢失 – 使用同步发送,ack设置成非0的值
消费者:主题只能设置一个分区,消费组中只能有一个消费者(1对1)
此时,是牺牲了性能。
消息积压(生产能力远大于消费能力)会导致很多问题,比如磁盘呗打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:
延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单
1.去kafka-eagle官网下载压缩包 : https://www.kafka-eagle.org/
2.分配一台虚拟机,安装jdk
3.解压缩第一步的压缩包
4.给kafka-eagle配置环境变量,修改zk和数据库的地址
启动查看
看到
请不要吝啬你发财的小手,点赞收藏评论,谢谢!
请不要吝啬你发财的小手,点赞收藏评论,谢谢!
请不要吝啬你发财的小手,点赞收藏评论,谢谢!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。