赞
踩
5.1.1. 模式说明

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
- package com.atguigu.rabbitmq.util;
-
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class ConnectionUtil {
- public static Connection getConnection() throws Exception {
- //定义连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置服务地址
- factory.setHost("192.168.6.100");
- //端口
- factory.setPort(5672);
- //设置账号信息,用户名、密码、vhost
- factory.setVirtualHost("/");
- factory.setUsername("admin");
- factory.setPassword("123456");
- // 通过工程获取连接
- Connection connection = factory.newConnection();
- return connection;
- }
-
- public static void main(String[] args) throws Exception {
- Connection con = ConnectionUtil.getConnection();
- System.out.println(con);
- // amqp://admin@192.168.6.100:5672/
- con.close();
- }
- }
- package com.atguigu.rabbitmq.work;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class Producer {
- static final String QUEUE_NAME = "work_queue";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
- for (int i = 1; i <= 10; i++) {
- String body = i+"hello rabbitmq~~~";
- channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
- }
- channel.close();
- connection.close();
- }
- }

- package com.atguigu.rabbitmq.work;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer1 {
- static final String QUEUE_NAME = "work_queue";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- }
- };
- channel.basicConsume(QUEUE_NAME,true,consumer);
- }
- }
Consumer2与Consumer1类一样。省略。
运行两个消费者
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
订阅模式示例图:

前面2个案例中,只有3个角色:
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
- package com.atguigu.rabbitmq.fanout;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class Producer {
- public static void main(String[] args) throws Exception {
-
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- /*
- exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
- 参数:
- 1. exchange:交换机名称
- 2. type:交换机类型
- DIRECT("direct"),:定向
- FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
- TOPIC("topic"),通配符的方式
- HEADERS("headers");参数匹配
- 3. durable:是否持久化
- 4. autoDelete:自动删除
- 5. internal:内部使用。 一般false
- 6. arguments:参数
- */
- String exchangeName = "test_fanout";
- //5. 创建交换机
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
- //6. 创建队列
- String queue1Name = "test_fanout_queue1";
- String queue2Name = "test_fanout_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
- //7. 绑定队列和交换机
- /*
- queueBind(String queue, String exchange, String routingKey)
- 参数:
- 1. queue:队列名称
- 2. exchange:交换机名称
- 3. routingKey:路由键,绑定规则
- 如果交换机的类型为fanout ,routingKey设置为""
- */
- channel.queueBind(queue1Name,exchangeName,"");
- channel.queueBind(queue2Name,exchangeName,"");
-
- String body = "日志信息:张三调用了findAll方法...日志级别:info...";
- //8. 发送消息
- channel.basicPublish(exchangeName,"",null,body.getBytes());
-
- //9. 释放资源
- channel.close();
- connection.close();
- }
- }
运行

- package com.atguigu.rabbitmq.fanout;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue1Name = "test_fanout_queue1";
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息打印到控制台.....");
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
- }
- }
- package com.atguigu.rabbitmq.fanout;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue2Name = "test_fanout_queue2";
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息打印到控制台.....");
- }
- };
- channel.basicConsume(queue2Name,true,consumer);
- }
- }
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达**广播**的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
路由模式特点:
RoutingKey(路由key)RoutingKey。Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
图解:
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
- package com.atguigu.rabbitmq.routing;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class Producer {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String exchangeName = "test_direct";
- // 创建交换机
- channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
- // 创建队列
- String queue1Name = "test_direct_queue1";
- String queue2Name = "test_direct_queue2";
- // 声明(创建)队列
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
- // 队列绑定交换机
- // 队列1绑定error
- channel.queueBind(queue1Name,exchangeName,"error");
- // 队列2绑定info error warning
- channel.queueBind(queue2Name,exchangeName,"info");
- channel.queueBind(queue2Name,exchangeName,"error");
- channel.queueBind(queue2Name,exchangeName,"warning");
-
- String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
- // 发送消息
- channel.basicPublish(exchangeName,"warning",null,message.getBytes());
- System.out.println(message);
-
- channel.close();
- connection.close();
- }
- }
运行


- package com.atguigu.rabbitmq.routing;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue1Name = "test_direct_queue1";
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息打印到控制台.....");
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
- }
- }
- package com.atguigu.rabbitmq.routing;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue2Name = "test_direct_queue2";
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息存储到数据库.....");
- }
- };
- channel.basicConsume(queue2Name,true,consumer);
- }
- }
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达**按照需要接收**的效果。
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候**使用通配符**!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配零个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert


图解:
usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到#.news ,因此凡是以 .news结尾的 routing key 都会被匹配使用topic类型的Exchange,发送消息的routing key有3种: order.info
- package com.atguigu.rabbitmq.topic;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class Producer {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String exchangeName = "test_topic";
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
- String queue1Name = "test_topic_queue1";
- String queue2Name = "test_topic_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
- // 绑定队列和交换机
- /**
- * 参数:
- 1. queue:队列名称
- 2. exchange:交换机名称
- 3. routingKey:路由键,绑定规则
- 如果交换机的类型为fanout ,routingKey设置为""
- */
- // routing key 系统的名称.日志的级别。
- //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
- channel.queueBind(queue1Name,exchangeName,"#.error");
- channel.queueBind(queue1Name,exchangeName,"order.*");
- channel.queueBind(queue2Name,exchangeName,"*.*");
- String body = "日志信息:张三调用了findAll方法...日志级别:info...";
- //发送消息goods.info,goods.error
- channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
- channel.close();
- connection.close();
- }
- }
运行程序


接收两种类型的消息:更新商品和删除商品
- package com.atguigu.rabbitmq.topic;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue1Name = "test_topic_queue1";
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
- }
- }
接收所有类型的消息:新增商品,更新商品和删除商品。
- package com.atguigu.rabbitmq.topic;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue2Name = "test_topic_queue2";
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- }
- };
- channel.basicConsume(queue2Name,true,consumer);
- }
- }
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达**按照需要接收**的效果;并且这些routing key可以使用通配符。

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。