赞
踩
MQ( Message queue ),从字面意思上看,本质是个队列,FIFO 先进先出,只不过队列中存放的内容是消息(message),消息可以非常简单,比如只包含文本字符串,JSON 等,也可以很复杂,比如内嵌对象,MQ 多用于分布式系统之间进行通信;
MQ 主要工作是接收并转发消息,在不同的应用场景下可以展现不同的作用:
异步解耦:在业务流程中,一些操作可能非常耗时,但并不需要即时返回结果。借助MQ可以将这些操作异步化。
流量削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用MQ能够使关键组件支撑突发访问压力,不会因为突发流量而崩溃。
消息分发:当多个系统需要对同一数据做出响应时,可以使用MQ进行消息分发。例如,支付成功后,支付系统可以向MQ发送消息,其他系统订阅该消息,而无需轮询数据库。
(官网 RabbitMQ: One broker to queue them all | RabbitMQ)
RabbitMQ 是一个采用 Erlang 语言开发的消息队列系统,以其完备的功能、对多种主流语言的支持、友好的开源界面、良好的性能、以及活跃的社区而闻名。它特别适合中小型公司的应用场景,在数据量和并发量没有超大需求的情况下表现优异。
Kafka:Kafka 是由 Apache 软件基金会开发的一种分布式流处理平台,其最初的目的是用于日志收集和传输。Kafka 以其高吞吐量和卓越的性能而著称,特别适用于需要处理大量数据流的场景。
RocketMQ:RocketMQ 是一个由阿里巴巴开发并捐赠给 Apache 基金会的分布式消息中间件。它基于 Java 开发,在设计上借鉴了 Kafka 的思想,但也引入了一些自己的改进,但支持的客户端语言不多,且社区活跃度一般。
- sudo apt-get update
- #更新软件包
-
- sudo apt-get install erlang
- #安装erlang
安装完成之后输入 erl 命令查看 erlang 版本
- sudo apt-get install rabbitmq-server
- #安装rabbitmq
-
- systemctl status rabbitmq-server
- #确认安装结果
rabbitmq-plugins enable rabbitmq_management
添加管理员用户 - # rabbitmqctl add_user ${账号} ${密码}
- rabbitmqctl add_user admin admin
-
-
- #给用户添加权限
- #rabbitmqctl set_user_tags ${账号} ${⻆⾊名称}
- rabbitmqctl set_user_tags admin administrator
-
sudo service rabbitmq-server start
通过 IP:port 访问管理界面 公网ip + 15672(默认端口号)
输入刚才添加的用户名和密码进行登录,来到管理界面
- #获取镜像
- docker pull rabbitmq:management
-
- #运⾏镜像
- docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
-
- #查看正在运⾏的容器
- docker ps
访问管理界面,可通过guest,guest 登录
添加用户
- #查看正在运⾏的容器
- docker ps
-
- #进⼊容器内部
- docker exec -it 容器ID /bin/bash
-
- #添加⽤⼾admin
- rabbitmqctl add_user admin admin
-
- #给⽤⼾授权
- rabbitmqctl set_user_tags admin administrator
RabbitMQ 是一个消息中间件,也是一个生产者消费者模型,它负责接收,存储并转发消息;
Producer:生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ 发送消息
Consumer:消费者,也是 RabbitMQ Server 的客户端,从 RabbitMQ 接收消息
Broker:代理,其实就是 RabbitMQ Server,主要是接收、存储和转发消息
Connection: 连接是客户端和 RabbitMQ 服务器之间的一个TCP连接,这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息
Channel:通道,信道,Channel 是在 Connection 之上的一个抽象层,在 RabbitMQ 中,一个TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接,消息的发送和接收都是基于 Channel 的,通道的主要作用是将消息的读写操作复用到同一个TCP连接上,这样可以减少建立和关闭连接的开销提高性能
Virtual host:虚拟主机,这是一个虚拟概念,它为消息队列提供了一种逻辑上的隔离机制,对于RabbitMQ 而言,一个 BrokerServer 上可以存在多个 Virtual Host,当多个不同的用户使用同一个RabbitMo Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 创建exchange/queue等
类似MySOL的"database",是一个逻辑上的集合,一个MySOL服务器可以有多个database
Queue:队列,是 RabbitMQ 的内部对象,用于存储消息,多个消费者,可以订阅同一个队列
Exchange: 交换机,message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中 Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息
创建一个Maven 项目,在 pom.xml 中引入 RabbitMQ 的依赖
- <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.20.0</version>
- </dependency>
- package rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("101.42.44.62"); // 主机
- factory.setPort(5672); // 端口号
- factory.setUsername("rht"); // 账号
- factory.setPassword("47298"); // 密码
- factory.setVirtualHost("test"); // 虚拟主机
- Connection connection = factory.newConnection();
- // 2. 开启通道
- Channel channel = connection.createChannel();
- // 3. 声明交换机, 此处使用内置的交换机
-
- // 4. 声明队列
- /**
- * queueDeclare(String queue 队列名称,
- * boolean durable 是否可持久化,
- * boolean exclusive 该队列是否被独占,
- * boolean autoDelete 该队列没有消费者时 是否自动删除,
- * Map<String, Object> arguments 参数) throws IOException
- */
- channel.queueDeclare("hello", true, false, false, null);
- // 5. 发送消息
- /**
- * basicPublish(String exchange 交换机名称,
- * String routingKey 内置交换机 队列名,
- * BasicProperties props 属性配置,
- * byte[] body 消息)
- */
- String message = "hello rabbitmq";
- channel.basicPublish("", "hello", null, message.getBytes());
- System.out.println("消息发送成功");
- // 6. 资源释放
- channel.close();
- connection.close();
- }
- }

如果不进行资源释放,在管理界面可以看到 channel 和 connection 的信息
同时也可以看到生产的队列中未被消费的元素
- package rabbitmq;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1. 建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("101.42.44.62"); // 主机
- factory.setPort(5672); // 端口号
- factory.setUsername("rht"); // 账号
- factory.setPassword("47298"); // 密码
- factory.setVirtualHost("test"); // 虚拟主机
- Connection connection = factory.newConnection();
- // 2. 创建 channel
- Channel channel = connection.createChannel();
- // 3. 声明队列(如果存在, 可以不声明)
- channel.queueDeclare("test", true, false, false, null);
- // 4. 消费消息
- /**
- * String basicConsume(String queue 队列名称,
- * boolean autoAck 是否自动确认,
- * Consumer callback 接收到消息后执行的逻辑) throws IOException
- */
- DefaultConsumer consumer = new DefaultConsumer(channel){
- // 从队列中收到消息就会执行的方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收到的消息 " + new String(body));
- }
- };
- channel.basicConsume("test", true, consumer);
- // 5. 关闭资源
- channel.close();
- connection.close();
- }
- }

P:生产者,也就是要发送消息的程序
C:消费者,消息的接收者
Queue:消息队列,生产者向其中投递消息,消费者从其中取出消息
特点:一个生产者P,一个消费者C,消息只能被消费一次,也称为点对点(Point-to-Point)模式
适用场景:消息只能被单个消费者处理
一个生产者P,多个消费者 C1,C2;
在多个消息的情况下,Work Queue 会将消息分派给不同的消费者,每个消费者都会接收到不同消息,若 P 向队列中发送 10 条消息,则 C1 消费 + C2 消费 = 10
特点:消息不会重复,分配给不同的消费者
适用场景:集群环境中做异步处理
图中 X 表示交换机,作用:生产者将消息发送到 Exchange,由交换机将消息按一定规则路由到一个或多个队列中;
RabbitMQ 交换机有四种类型: fanout,direct,topic,headers,不同类型有着不同的路由策略,AMQP协议里还有另外两种类型:System 和自定义
1. Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
2. Direct:定向,把消息交给符合指定 routing key 的队列(Routing模式)
3. Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics模式)
4. Headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers 属性进行匹配,headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在
Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失;
RoutingKey:路由键,生产者将消息发给交换器时,指定的一个字符串,用来告诉交换机应该如何处理这个消息.
Binding Key:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候一般会指定一个Binding Key,这样RabbitMO就知道如何正确地将消息路由到队列了
路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由 key 发布订阅模式是无条件的将所有消息分发给所有消费者,路由模式是 Exchange 根据 RoutingKey 的规则将数据筛选后发给对应的消费者队列
适合场景:需要根据特定规则分发消息的场景;比如系统打印日志,日志等级分为error,warning,info,debug,就可以通过这种模式,把不同的日志发送到不同的队列,最终输出到不同的文件
路由模式的升级版,在 routingKey 的基础上,增加了通配符的功能,使之更加灵活 Topics 和 Routing 的基本原理相同,即:生产者将消息发给交换机,交换机根据 RoutingKey 将消息转发给与 Routing Key 匹配的队列,类似于正则表达式的方式来定义 Routingkey 的模式
不同之处是:routingKey 的匹配方式不同,Routing 模式是相等匹配,topics 模式是通配符匹配
适合场景:需要灵活匹配和过滤消息的场景
在RPC通信的过程中,没有生产者和消费者,是通过两个队列实现了一个可回调的过程
1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了一个回调队列,用于接收服务端的响应
2. 服务端接收到请求后,处理请求并发送响应消息到 replyTo 指定的回调队列
3. 客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息的correlationld属性,以确保它是所期望的响应
Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理
1. 生产者将 Channel 设置为 confirm 模式(通过调用channel.confirmSelect()完成)后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态
2. 当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个确认(ACK)给生产者(包含消息的唯一ID),表明消息已经送达
通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失的问题
适用场景:对数据安全性要求较高的场景,比如金融交易,订单处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。