当前位置:   article > 正文

消息中间件RocketMQ_消息队列的地址是

消息队列的地址是

1 MQ 消息队列

MQ全称为Message Queue,即消息队列;RabbitMQ (Messaging that just works — RabbitMQ)由erlang语言开发,基于AMQP协议实现的消息队列; 

思考:为什么需要用到消息队列呢?它有什么好处呢?

传统模式

  • 账号注册完成之后,发送短信,再发送邮件;整个流程都是主线程执行,一个花了150ms。
  • 缺点:代码臃肿、执行时间相对长、如果后期新增发微信,则不好扩展。

多线程模式

  • 执行完成主业务(账号注册),则通过创建子线程来处理次要业务。它只需要花50ms即可给用户响应。
  • 优点:响应速度比较快
  • 缺点:1)代码还是臃肿;2)需要扩展性不好,比如后期新增发送微信功能还得改源代码;3)需要实现容错比较复杂,比如子线程处理错误,则怎么实现补偿机制;4)子线程和主业务代码在同一台服务器,如果并发量很高的情况,其实性能还是受限。

消息队列模式

  • 执行完成主业务(账号注册),则发送消息给消息队列,由消息队列的消费端监听并处理后续业务。它只需要花50ms即可给用户响应。
  • 优点:1)响应速度比较快;2)业务彻底解耦,编码变的更加简单;3)由于服务都是独立部署在不同服务器,所以性能更加的高。4)需求扩展性好,比如后期新增发送微信功能,则只需要创建一个微信服务监听消息队列即可。5)可以灵活的处理容错性。
  • 缺点:架构变的更加复杂了;服务数量比较多。

常见的其它消息队列 :ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(也可做消息队列)

特性ActiveMQRabbitMQRocketMQKafka
语言JavaErlangJavaScala
吞吐量万级万级10万级10万级
速度毫秒微秒毫秒毫秒
集群主从架构主从架构分布式架构分布式架构
场景---大数据实时处理
  • 性能排序:Kafka>RocketMQ>RabbitMQ>ActiveMQ
  • 部署难度:ActiveMQ<RabbitMQ<Kafka<RocketMQ

消息队列的优势:

1)解耦:系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

 2)异步:如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能

 3)削峰:如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。

2 RocketMQ

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。

NameServer:主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

Broker:负责存储消息,转发消息。单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer。

2.1 环境安装

快速开始 | RocketMQ

1)单集群启动

# 启动NameServer

nohup sh bin/mqnamesrv &

# 验证Name Server 是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log

NameServer成功启动后,我们启动Broker和Proxy。

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

tail -f ~/logs/rocketmqlogs/proxy.log 

关闭服务器

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

2)管控台

https://github.com/apache/rocketmq-dashboard

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

2.2 rocketmq

2.2.1 生产者与消费者

1)生产者

  1. public class Send {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producer-demo");
  4. //连接nameserver
  5. defaultMQProducer.setNamesrvAddr("192.168.67.5:9876");
  6. //启动生产者
  7. defaultMQProducer.start();
  8. //设置消息发送的目的地
  9. String topic = "producer-demo";
  10. //发送消息
  11. for(int i=0 ; i<10 ; i++){
  12. Message msg = new Message(topic,("rocketmq消息 " + i).getBytes(StandardCharsets.UTF_8));
  13. //发送到broker
  14. SendResult send = defaultMQProducer.send(msg);
  15. System.out.println(i + "发送状态:" + send.getSendStatus());
  16. }
  17. //关闭资源
  18. defaultMQProducer.shutdown();
  19. }

2)消费者

  1. public class Custom {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer01");
  4. defaultMQPushConsumer.setNamesrvAddr("192.168.67.5:9876");
  5. //设置订阅主题
  6. defaultMQPushConsumer.subscribe("producer-demo","*");
  7. //设置消息监听器
  8. defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  11. for (MessageExt messageExt : list) {
  12. String s = new String(messageExt.getBody(), StandardCharsets.UTF_8);
  13. System.out.println("thread: " + Thread.currentThread() + ",content: " + s );
  14. }
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. //启动消费者
  19. defaultMQPushConsumer.start();
  20. }
  21. }

2.2.2 消息消费模式

(1)集群模式

一个ConsumerGroup中的Consumer实例平均分摊消费消息。

(2)广播模式

一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。

2.3 SpringBoot配置

2.3.1 生产者

2.3.2 消费者

  1. @Component
  2. @RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic")
  3. public class TestTopicListener implements RocketMQListener<MessageExt> {
  4. @Override
  5. public void onMessage(MessageExt messageExt) {
  6. String mess = new String(messageExt.getBody(), Charset.defaultCharset());
  7. System.out.println("接受消息:" + mess);
  8. }
  9. }

2.3.3 异步消息发送

2.3.4 负载均衡测试

2.3.5 顺序消息发送

2.3.6 TAG过滤

2.3.7 SQL92过滤

@RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic",selectorType = SelectorType.SQL92,selectorExpression = "age >= 96 and grade >95")

3 ActiveMQ

ActiveMQ主要有两种模式的消息,分别是Queue(队列)和Topic(订阅)。

  • Queue,每个消费者消费的数据不一样,一个消息只能被一个消费者消费
  • Topic,每个消费者消费的数据是一样的

3.1 安装

#1.解压
tar -zxvf apache-activemq-5.12.0-bin.tar.gz

#2.启动
cd apache-activemq-5.12/bin
./activemq start

#3.停止
./activemq stop

#4.查看状态
./activemq status

ActiveMQ有两个核心端口分别是8161(后台系统的端口),61616(客户端通讯端口)

启动完ActiveMQ之后,可以在浏览器登录后台系统:

http://ip地址:8161,账号和密码都是admin

3.2 消息提供者

第一步:导入坐标

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-activemq</artifactId>
  4. </dependency>

第二步:配置文件

  1. spring.activemq.broker-url=tcp://192.168.1.8:61616
  2. spring.activemq.password=admin
  3. spring.activemq.user=admin

第三步:发送Queue消息

  1. @Component
  2. public class Producer {
  3. @Autowired
  4. private JmsMessagingTemplate jmt;
  5. public void send(String content) {
  6. //注意:项目启动的时候,会在后台自动创建queue名称
  7. jmt.convertAndSend(new ActiveMQQueue("队列名称"),content);
  8. }
  9. }

第四步:发送Topic消息

  1. @Component
  2. public class Producer {
  3. @Autowired
  4. private JmsMessagingTemplate jmt;
  5. public void send(String content) {
  6. //注意:项目启动的时候,会在后台自动创建topic名称
  7. jmt.convertAndSend(new ActiveMQTopic("sample.topic"),content);
  8. }
  9. }

提示:SpringBoot默认发送Queue消息

  1. #如果为True,则是Topic;如果是false或者默认,则是queue
  2. spring.jms.pub-sub-domain=true

3.3 消息消费者

第一步:导入坐标

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-activemq</artifactId>
  4. </dependency>

第二步:配置文件

  1. spring.activemq.broker-url=tcp://192.168.1.8:61616
  2. spring.activemq.password=admin
  3. spring.activemq.user=admin

第三步:监听消息

  1. @Component
  2. public class Consumer {
  3. @JmsListener(destination = "队列名称")
  4. public void receiveQueue(String text) {
  5. System.out.println(text);
  6. }
  7. }

提示:SpringBoot可以同时监听Queue和Topic消息,不需要任何配置

注意:

消息队列中间件宕机了,重启之后会不会丢失数据呢?

  • 首先,ActiveMQ默认有三种持久化方式,分别是内存持久化、日志持久化和JDBC持久化,内存持久化是不安全的;日志和JDBC则是安全的;ActiveMQ默认是日志持久化。

  • Queue重启之后不会丢失数据,直到消费被消费成功才被删除;Topic无法再消费之前的消息,所以重启会丢失。

消息队列会不会出现重复消息的问题?什么情况会导致重复消费?

  • 正常情况下,多个消费端监听同一个queue,同一条消息不会被多个消费端同时消费,只有一个消费端能消费成功。

  • 但是,如果由于网络原因,消息队列可能无法及时收到消费端的应答,会重发消息,这样的话消费端相当于消费了两次。

  • 解决方案:消费端每次接受消息的时候,先根据msgId去Redis查询是否存在;如果存在则不处理;如果不存在,则把消息ID存储到Redis。

什么是死信队列?它有什么作用?

  • 消息被消费失败的时候,则消息队列默认重发6次,如果还是失败则把该消息加入到死信队列里面;或者消息过期也会被加入死信队列。
  • 默认的私信队列是ActiveMQ.DLQ,一般为了方便管理,针对每个队列都创建一个相应的死信队列。
  • 为了数据的一致性,可以对死信队列进行监听,进行数据的补偿处理
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/49752?site
推荐阅读
相关标签
  

闽ICP备14008679号