赞
踩
MQ全称为Message Queue,即消息队列;RabbitMQ (Messaging that just works — RabbitMQ)由erlang语言开发,基于AMQP协议实现的消息队列;
思考:为什么需要用到消息队列呢?它有什么好处呢?
传统模式

多线程模式

消息队列模式
常见的其它消息队列 :ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(也可做消息队列)
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 语言 | Java | Erlang | Java | Scala |
| 吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
| 速度 | 毫秒 | 微秒 | 毫秒 | 毫秒 |
| 集群 | 主从架构 | 主从架构 | 分布式架构 | 分布式架构 |
| 场景 | - | - | - | 大数据实时处理 |
消息队列的优势:
1)解耦:系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

2)异步:如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能

3)削峰:如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。

NameServer:主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
Broker:负责存储消息,转发消息。单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer。
1)单集群启动
# 启动NameServer
nohup sh bin/mqnamesrv &
# 验证Name Server 是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log

NameServer成功启动后,我们启动Broker和Proxy。
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
tail -f ~/logs/rocketmqlogs/proxy.log


关闭服务器
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

2)管控台
https://github.com/apache/rocketmq-dashboard
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar


1)生产者

- public class Send {
- public static void main(String[] args) throws Exception{
-
- DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producer-demo");
- //连接nameserver
- defaultMQProducer.setNamesrvAddr("192.168.67.5:9876");
- //启动生产者
- defaultMQProducer.start();
- //设置消息发送的目的地
- String topic = "producer-demo";
- //发送消息
- for(int i=0 ; i<10 ; i++){
- Message msg = new Message(topic,("rocketmq消息 " + i).getBytes(StandardCharsets.UTF_8));
- //发送到broker
- SendResult send = defaultMQProducer.send(msg);
- System.out.println(i + "发送状态:" + send.getSendStatus());
- }
- //关闭资源
- defaultMQProducer.shutdown();
- }



2)消费者
- public class Custom {
- public static void main(String[] args) throws Exception{
- DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer01");
- defaultMQPushConsumer.setNamesrvAddr("192.168.67.5:9876");
- //设置订阅主题
- defaultMQPushConsumer.subscribe("producer-demo","*");
- //设置消息监听器
- defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- for (MessageExt messageExt : list) {
- String s = new String(messageExt.getBody(), StandardCharsets.UTF_8);
- System.out.println("thread: " + Thread.currentThread() + ",content: " + s );
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- //启动消费者
- defaultMQPushConsumer.start();
- }
- }



(1)集群模式
一个ConsumerGroup中的Consumer实例平均分摊消费消息。
(2)广播模式
一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。






- @Component
- @RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic")
- public class TestTopicListener implements RocketMQListener<MessageExt> {
- @Override
- public void onMessage(MessageExt messageExt) {
- String mess = new String(messageExt.getBody(), Charset.defaultCharset());
- System.out.println("接受消息:" + mess);
- }
- }












@RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic",selectorType = SelectorType.SQL92,selectorExpression = "age >= 96 and grade >95")

ActiveMQ主要有两种模式的消息,分别是Queue(队列)和Topic(订阅)。

#1.解压
tar -zxvf apache-activemq-5.12.0-bin.tar.gz#2.启动
cd apache-activemq-5.12/bin
./activemq start#3.停止
./activemq stop#4.查看状态
./activemq status
ActiveMQ有两个核心端口分别是8161(后台系统的端口),61616(客户端通讯端口)
启动完ActiveMQ之后,可以在浏览器登录后台系统:
http://ip地址:8161,账号和密码都是admin
第一步:导入坐标
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- </dependency>
第二步:配置文件
- spring.activemq.broker-url=tcp://192.168.1.8:61616
- spring.activemq.password=admin
- spring.activemq.user=admin
第三步:发送Queue消息
- @Component
- public class Producer {
- @Autowired
- private JmsMessagingTemplate jmt;
-
- public void send(String content) {
- //注意:项目启动的时候,会在后台自动创建queue名称
- jmt.convertAndSend(new ActiveMQQueue("队列名称"),content);
- }
- }
第四步:发送Topic消息
- @Component
- public class Producer {
- @Autowired
- private JmsMessagingTemplate jmt;
-
- public void send(String content) {
- //注意:项目启动的时候,会在后台自动创建topic名称
- jmt.convertAndSend(new ActiveMQTopic("sample.topic"),content);
- }
- }
提示:SpringBoot默认发送Queue消息
- #如果为True,则是Topic;如果是false或者默认,则是queue
- spring.jms.pub-sub-domain=true
第一步:导入坐标
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- </dependency>
第二步:配置文件
- spring.activemq.broker-url=tcp://192.168.1.8:61616
- spring.activemq.password=admin
- spring.activemq.user=admin
第三步:监听消息
- @Component
- public class Consumer {
- @JmsListener(destination = "队列名称")
- public void receiveQueue(String text) {
- System.out.println(text);
- }
- }
提示:SpringBoot可以同时监听Queue和Topic消息,不需要任何配置
注意:
消息队列中间件宕机了,重启之后会不会丢失数据呢?
首先,ActiveMQ默认有三种持久化方式,分别是内存持久化、日志持久化和JDBC持久化,内存持久化是不安全的;日志和JDBC则是安全的;ActiveMQ默认是日志持久化。
Queue重启之后不会丢失数据,直到消费被消费成功才被删除;Topic无法再消费之前的消息,所以重启会丢失。
消息队列会不会出现重复消息的问题?什么情况会导致重复消费?
正常情况下,多个消费端监听同一个queue,同一条消息不会被多个消费端同时消费,只有一个消费端能消费成功。
但是,如果由于网络原因,消息队列可能无法及时收到消费端的应答,会重发消息,这样的话消费端相当于消费了两次。
解决方案:消费端每次接受消息的时候,先根据msgId去Redis查询是否存在;如果存在则不处理;如果不存在,则把消息ID存储到Redis。
什么是死信队列?它有什么作用?
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。