赞
踩
mq(Message Queue),一种提供消息队列服务的中间件,也称消息中间件,提供消息生产、存储、消费全过程API软件系统。
ActiveMQ,RabbitMQ,kafka,RocketMQ
RabbitMQ官网
1.限流消峰:系统后期慢慢处理,避免请求丢失或系统压垮
例如问卷填写,同时有上万人点击,而咱们的带宽只允许上百人同时访问,这个时候上万的访问很容易让系统崩溃,而解决这个问题的方法就可以用到限流消峰。
不直接访问系统,而是先访问消息队列(mq)在访问系统。
通过在mq中排队,限制访问到一百后不允许访问,防止访问多量使系统宕机。
这样让访问量变小时间变长的方法防止系统宕机。
2.应用解耦:同步调用,会降低系统的吞吐量与并发度,耦合度太高
例如某应用中有1、2、3系统。
如果耦合调用1、2、3,任何一个子系统出了故障都会使得操作异常。
可通过消息队列的方式减少系统间调用问题,mq对接每个子系统直至所有系统完成后在响应应用系统。
3.异步处理:分布式产生海量数据
例如部分服务间调用是异步的。
1调用2,2需要很长时间执行,但1不知道完成时间只能在一定时间去调用2的查询API查询,或1提供 callback api 在2执行完后调用通知1服务,进行等待完成。
可通过消息总线在1调用2服务后,只需要监听2执行完的消息,当2执行完成后发送到mq,mq将消息转发给1服务,1服务可以及时的得到异步执行成功的消息。
生产者发送消息流程:
1、生产者和消息队列服务进程建立TCP连接。
2、生产者和消息队列服务进程建立通道。
3、生产者通过通道消息发送给消息队列服务进程,由交换机将消息进行转发。
4、交换机将消息转发到指定的消息队列
消费者接收消息流程:
1、消费者和消息队列服务进程建立TCP连接
2、消费者和消息队列服务进程建立通道
3、消费者监听指定的消息队列
4、当有消息到达消息队列时消息队列服务进程默认将消息推送给消费者。
5、消费者接收到消息。
6、ack(确认字符)回复
P:发送消息
红色部分:消息队列。
C;消息的接受着
生产者
1.在新建的maven工程中填加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
2.编写连接类
public class ConnectionUtil { /** * 建立与RabbitMQ的连接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("192.168.200.129"); //设置账号信息,用户名、密码 factory.setUsername("admin"); factory.setPassword("123"); // 通过工厂获取连接 Connection connection = factory.newConnection(); return connection; } }
3.生产者发送消息
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 1、获取到连接 Connection connection = ConnectionUtil.getConnection(); // 2、从连接中创建通道,使用通道才能完成消息相关的操作 Channel channel = connection.createChannel(); //3、声明(创建)队列。 /** * 参数明细 * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * queue 队列名称 * durable 是否持久化 * exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * arguments 可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4、消息内容 String message = "Hello World"; // 向指定的队列中发送消息 /** * 参数明细: * 参数:String exchange, String routingKey, BasicProperties props, byte[] body * exchange 交换机,如果不指定将使用mq的默认交换机(设置为"") * routingKey 路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * props 消息的属性 * body 消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(message); //关闭通道和连接 channel.close(); connection.close(); } }
4.运行,在控制台显示
可以后台中找到一个队列,里面有显示一个消息正在准备被消费
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 /** //第一种正常情况需要new一个实现类 DefaultConsumer consumer = new DefaultConsumer(channel){ // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 即消息体 String msg = new String(body,"utf-8"); System.out.println(msg); } }; */ //第二种直接可以用Lambada表达式 DeliverCallback consumer = (consumerTag,message) -> { System.out.println(new String(message.getBody())); }; //取消消息时回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断"); }; // 监听队列,第二个参数:是否自动进行消息确认。 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为true表示会自动回复mq,如果设置为false要通过手动实现回复 * 3、callback 消费方法,当消费者接收到消息要执行的方法 * 4、cancelCallback 消息者取消消息时回调 */ channel.basicConsume(QUEUE_NAME, true, consumer, cancelCallback); } }
一个消息只能处理一次,消费者间处于竞争关系依次执行。
P:生产者
C1:消费者1
C2:消费者2
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息完成:"+message); } // 关闭通道和连接 channel.close(); connection.close(); } }
public class work1 { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DeliverCallback consumer = (consumerTag,message) -> { System.out.println(new String(message.getBody())); }; //消息者取消消息时回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断"); }; System.out.println("C1等待接受消息。。。"); channel.basicConsume(QUEUE_NAME, true, consumer, cancelCallback); } }
生产者发送
消费者接收
两个消费者各自消费了不同2条消息,这就实现了任务的分发。
消息应答(能者多劳)
以上的代码实现,消费者1比消费者2的效率要低,一次任务的耗时较长,然而两人最终消费的消息数量是一样的,消费者2大量时间处于空闲状态,消费者1一直忙碌。
现在的状态属于是把任务平均分配,更高效的做法是消费越快的人,消费的越多。
可通过 BasicQos 方法设置prefetchCount = 1 达到想要的效果(在手动ack的情况下才生效,自动ack不生效。)
work queues特点:
- 不用定义交换机
- 生产方是面向队列发送消息(底层使用默认交换机)。
- work queues会将队列绑定到默认的交换机
- 多个消费端监听同一个队列不会重复消费消息
Exchange(交换机)只负责转发消息,不具备存储消息的能力
public class Send { private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为fanout(Fanout 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上) channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "注册成功!!"; // 发布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(message); channel.close(); connection.close(); } }
public class worker1 { private final static String QUEUE_NAME = "exchange_queue_sms";//短信队列 private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DeliverCallback consumer = (consumerTag,message) -> { String msg = new String(message.getBody()); System.out.println("短信服务: " + msg); }; //取消消息时回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断"); }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer, cancelCallback); } }
运行两个消费者,发送一条消息
发送/订阅模型特点:
需要定义交换机
生产方是面向交换机发送消息
需要设置队列和交换机的绑定
多个消费端监听同一个队列不会重复消费消息
相比work queues更强大(也可以做到同一队列竞争)
发布订阅模式可以指定自己专用的交换机。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,把消息递交给与routing key完全匹配的队列
C1:消费者
C2:消费者
public class Send { private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为direct(直接交换基于消息路由密钥将消息传递到队列。直接交换是消息单播路由的理想选择(尽管它们也可以用于多播路由)) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 消息内容, String message = "注册成功!"; // 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息 channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes()); System.out.println("短信服务: " + message); channel.close(); connection.close(); } }
public class worksms { private final static String QUEUE_NAME = "exchange_queue_sms";//短信队列 private final static String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个 //指定接收发送方指定routing key为sms的消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms"); //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email"); // 定义队列的消费者 DeliverCallback consumer = (consumerTag,message) -> { String msg = new String(message.getBody()); System.out.println("短信服务: " + msg); }; //取消消息时回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断"); }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer, cancelCallback); } }
发送sms的RoutingKey,发现结果:只有指定短信的消费者1收到消息了
发送到主题交换的消息不能有任意的 routing_key 。
它必须是单词列表,由点分隔。
这些词可以是任何东西,但通常它们指定与消息相关的一些特征。
一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。
路由键中可以有任意多的单词,最多为 255 个字节。
创建三个绑定:Q1 与绑定键“ .orange. ”绑定,Q2 与“ ..rabbit ”和“ lazy.# ”绑定。
从示意图可知,我们将发送所有描述动物的消息。消息将Routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色,第三个种类:“..”。
Q1匹配所有的橙色动物。
Q2匹配关于兔子的一切和懒惰动物的一切。
假如生产者发送如下消息,会进入哪个队列:
quick.orange.rabbit Q1 Q2 routingKey="quick.orange.rabbit"的消息会同时路由到Q1与Q2
lazy.orange.elephant Q1 Q2
quick.orange.fox Q1
lazy.pink.rabbit Q2 (虽然这个routingKey与Q2的两个bindingKey都匹配,但是只会投递Q2一次)
quick.brown.fox 不匹配任意队列,被丢弃
quick.orange.male.rabbit 不匹配任意队列,被丢弃
orange 不匹配任意队列,被丢弃
我们以指定Routing key="quick.orange.rabbit"为例,验证上面的答案
public class Send { private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 消息内容 String message = "这是一只迅速奔跑的橙色的兔子"; // 发送消息,并且指定routing key为:quick.orange.rabbit channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes()); System.out.println("动物描述:" + message); channel.close(); connection.close(); } }
消费者1
public class work01 { private final static String QUEUE_NAME = "exchange_queue_Q1"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*"); // 定义队列的消费者 DeliverCallback consumer = (consumerTag,message) -> { String msg = new String(message.getBody()); System.out.println("消费者1: " + msg); }; //消息者取消消息时回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断"); }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer, cancelCallback); } }
与消费者1相似,创建消费者2
public class work02 { private final static String QUEUE_NAME = "exchange_queue_Q2"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#"); // 定义队列的消费者 DeliverCallback consumer = (consumerTag,message) -> { String msg = new String(message.getBody()); System.out.println("消费者2: " + msg); }; //取消消息时回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断"); }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer, cancelCallback); } }
结果C1、C2是都接收到消息了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。