赞
踩
结合前面所说的问题:
如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。
MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
RabbitMQ是基于AMQP的一款消息管理系统
docker pull rabbitmq:management
docker run -d --name rabbitmq --publish 5671:5671 \ --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \ rabbitmq:management
注:
4369 – erlang 发现口
5672 --client 端通信口
15672 – 管理界面ui端口
25672 – server 间内部通信口
rpm -ivh erlang-21.3.8.9-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.1-1.el7.noarch.rpm
安装成功后rabbitmq命令存放在:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.1/sbin/
rabbitmq-plugins enable rabbitmq_management
systemctl start rabbitmq-server.service #启动
systemctl status rabbitmq-server.service #查看状态
systemctl restart rabbitmq-server.service #重启
systemctl stop rabbitmq-server.service #停止
ps -ef | grep rabbitmq
启动成功后查看占用的端口 : netstat -anp
在web浏览器中输入地址:http://虚拟机ip:15672
/
输入默认账号: guest
: guest
,默认不允许远程连接
增加管理员账号:rabbitmqctl add_user admin admin
给账号分配角色:rabbitmqctl set_user_tags admin administrator
使用刚才创建好的角色登录, 进入首页
overview
:概览
connections
:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels
:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges
:交换机,用来实现消息的路由
Queues
:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
修改角色密码:rabbitmqctl change_password admin 123456
查看用户列表:rabbitmqctl list_users
端口:
5672
: rabbitMq的编程语言客户端连接端口
15672
:rabbitMq管理界面端口
25672
:rabbitMq集群的端口
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
无法登陆管理控制台,通常就是普通的生产者和消费者。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 获取 Rabbitmq 连接的工具类
* Created by YongXin Xue on 2020/06/11 21:06
*/
public class ConnectionUtil {
/**
* 获取连接对象
* @return
*/
public static Connection getConnection() throws Exception {
// 创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 给工厂对象设置参数: rabbitmq服务的参数[IP, 端口号, 账号密码, 虚拟主机]
factory.setHost("192.168.230.205"); // 服务器IP
factory.setPort(5672); // 端口号
// 设置连接超时的时间
factory.setConnectionTimeout(3000);
// 设置账号密码, 创建的虚拟机名
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/host_admin");
// 使用工厂对象,创建连接对象
Connection connection = factory.newConnection();
return connection;
}
}
@Test
void contextLoads() throws Exception {
// 测试获取 rabbitmq 连接
Connection connection = ConnectionUtil.getConnection();
System.out.println("connection = " + connection);
}
结果 : connection = amqp://admin@192.168.230.205:5672//host_admin
官方介绍:
总之:
两个变化:
package com.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
/**
* 第一种 : 广播模式的生产者
* Created by YongXin Xue on 2020/06/11 21:52
*/
public class Send {
private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String MSG = "Hi 来自广播模式的一条新消息 !";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 建立通道
Channel channel = connection.createChannel();
// 声明交换机, 并指定类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 使用通道将消息发布到交换机 参数1: 指定交换机, 如果使用空字符串,表会使用默认的交换机
channel.basicPublish(EXCHANGE_NAME, "", null, MSG.getBytes());
// 关闭通道
channel.close();
// 关闭连接
connection.close();
}
}
package com.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
/**
* 广播模式的消费者 1
* Created by YongXin Xue on 2020/06/11 22:12
*/
public class Consumer_1 {
private static final String RECV1_QUEUE_NAME = "recv1_fanout_queue";
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列 : 生产者 -> 交换机 -> 队列 ->消费者
channel.queueDeclare(RECV1_QUEUE_NAME,false, false, false, null);
// 绑定队列到交换机
// 参数1: 队列名称 参数2: 交换机名称 参数3: routing Key
channel.queueBind(RECV1_QUEUE_NAME, EXCHANGE_NAME, "");
// 创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 接收到数据的回调方法
* @param consumerTag
* @param envelope
* @param properties
* @param body 生产者发送过来的数据的字节数组
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[广播模式的消费者 1 = " + msg + " ]");
}
};
// 阻塞监听队列 true 设置自动对回执
channel.basicConsume(RECV1_QUEUE_NAME, true, consumer);
}
}
package com.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
/**
* 广播模式的消费者 2
* Created by YongXin Xue on 2020/06/11 22:12
*/
public class Consumer_2 {
private static final String RECV2_QUEUE_NAME = "recv2_fanout_queue";
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列 : 生产者 -> 交换机 -> 队列 ->消费者
channel.queueDeclare(RECV2_QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
// 参数1: 队列名称 参数2: 交换机名称 参数3: routing Key
channel.queueBind(RECV2_QUEUE_NAME, EXCHANGE_NAME, "");
// 创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 接收到数据的回调方法
* @param consumerTag
* @param envelope
* @param properties
* @param body 生产者发送过来的数据的字节数组
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[广播模式的消费者 2 = " + msg + " ]");
}
};
// 阻塞监听队列 true 设置自动对回执
channel.basicConsume(RECV2_QUEUE_NAME, true, consumer);
}
}
我们运行两个消费者,然后发送1条消息:
应用场景:文字直播
有选择性的接收消息
此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
package com.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
/**
* direct 模式
* Created by YongXin Xue on 2020/06/11 23:17
*/
public class Send {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String MSG = "[error]: java.lang.NullPointerException";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 建立通道
Channel channel = connection.createChannel();
// 声明交换机, 并指定类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 使用通道将消息发布到交换机 参数1: 指定交换机, 如果使用空字符串,表会使用默认的交换机
channel.basicPublish(EXCHANGE_NAME, "error", null, MSG.getBytes());
System.out.println("生产者发送消息 = " + MSG);
// 关闭通道
channel.close();
// 关闭连接
connection.close();
}
}
package com.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
/**
* direct 模式的消费者 1
* Created by YongXin Xue on 2020/06/11 22:12
*/
public class Consumer_1 {
private static final String RECV1_QUEUE_NAME = "recv1_direct_queue";
// 交换机名称
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列 : 生产者 -> 交换机 -> 队列 ->消费者
channel.queueDeclare(RECV1_QUEUE_NAME,false, false, false, null);
// 绑定队列到交换机
// 参数1: 队列名称 参数2: 交换机名称 参数3: 接收打印的日志
channel.queueBind(RECV1_QUEUE_NAME, EXCHANGE_NAME, "debug");
channel.queueBind(RECV1_QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(RECV1_QUEUE_NAME, EXCHANGE_NAME, "warn");
channel.queueBind(RECV1_QUEUE_NAME, EXCHANGE_NAME, "error");
// 创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 接收到数据的回调方法
* @param consumerTag
* @param envelope
* @param properties
* @param body 生产者发送过来的数据的字节数组
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[direct 消费者 1 = " + msg + " ]");
}
};
// 阻塞监听队列 true 设置自动对回执
channel.basicConsume(RECV1_QUEUE_NAME, true, consumer);
}
}
package com.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
/**
* direct 模式的消费者 1
* Created by YongXin Xue on 2020/06/11 22:12
*/
public class Consumer_2 {
private static final String RECV2_QUEUE_NAME = "recv2_direct_queue";
// 交换机名称
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列 : 生产者 -> 交换机 -> 队列 ->消费者
channel.queueDeclare(RECV2_QUEUE_NAME,false, false, false, null);
// 绑定队列到交换机
// 参数1: 队列名称 参数2: 交换机名称 参数3: 接收打印的error日志
channel.queueBind(RECV2_QUEUE_NAME, EXCHANGE_NAME, "error");
// 创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 接收到数据的回调方法
* @param consumerTag
* @param envelope
* @param properties
* @param body 生产者发送过来的数据的字节数组
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[direct 消费者 2 = " + msg + " ]");
}
};
// 阻塞监听队列 true 设置自动对回执
channel.basicConsume(RECV2_QUEUE_NAME, true, consumer);
}
}
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
`#`:匹配一个或多个词
`*`:匹配不多不少恰好1个词
举例:
`audit.#`:能够匹配`audit.irs.corporate` 或者 `audit.irs`
`audit.*`:只能匹配`audit.irs`
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“<speed>.<color>.<species>
”。
我们创建了三个绑定:Q1绑定了绑定键“.orange.”,Q2绑定了“..rabbit”和“lazy.#”。
Q1匹配所有的橙色动物。
Q2匹配关于兔子以及懒惰动物的消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。