赞
踩
前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ,当时采用的是生产者直接将消息发布给队列,但是实际开发中不建议这么做,更加推荐生产者将消息发布到交换机(exchange),然后由exchange路由到队列,其架构如下所示:
可以看出,在发布-订阅模型中新增一个"交换机"角色,此后各个角色的任务如下:
注意:由于我们的exchange不暂存消息,只做消息的路由,因此如果没有queue与exchange绑定或者routing key设置错误,就会导致消息丢失!!!
RabbitMQ提供的交换机类型有如下四种:
下面是Fanout Exchange的工作流程图:
特征:Fanout Exchange将消息路由给全部跟它绑定的queue
操作步骤:
/** * 订阅fanout.queue1队列 * @param msg 消息 */ @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { log.info("listener1 从【fanout.queue1】接收到消息:" + msg); } /** * 订阅fanout.queue2队列 * @param msg 消息 */ @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { log.info("listener2 从【fanout.queue2】接收到消息:" + msg); }
/**
* 测试FanoutExchange交换机类型
*/
@Test
public void testFanoutExchange() {
// 1. 定义exchange名称
String exchangeName = "fanout.exchange";
// 2. 定义消息体
String msg = "震惊!某大学频频被曝出食堂安全问题";
// 3. 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
结果如上图所示:说明fanout.exchange雀氏将消息广播给了所有与之绑定的queue
特点:Direct Exchange要求在与queue建立binding关系的时候定义一个BindingKey,之后publisher生产者携带消息的同时也会指定RoutingKey,只有RoutingKey与BindingKey一致的queue才会被路由消息
工作流程如上图所示,其中queue1与exchange的Binding Key为"blue"以及"red",queue2与exchange的Binding Key为"yellow"以及"red",此时当Routing Key为"blue",Direct Exchange只会将消息路由给queue1
操作步骤:
/** * 订阅direct.queue1队列 * @param msg 消息 */ @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { log.info("listener1 从【direct.queue1】接收到消息:" + msg); } /** * 订阅direct.queue2队列 * @param msg 消息 */ @RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { log.info("listener2 从【direct.queue2】接收到消息:" + msg); }
/**
* 测试DirectExchange交换机类型
*/
@Test
public void testDirectExchange() {
// 1. 定义交换机名称
String exchangeName = "direct.exchange";
// 2. 定义消息体
String msg = "今日份消息只交给幸运色为blue的哦~";
// 3. 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
结果符合预期,只有direct.queue1能够接受到消息!
Topic Exchange与Direct Exchange非常类似,都可以依据BindingKey以及RoutingKey的匹配程度进而路由给特定符合条件的queue,但是Topic Exchange定义Binding Key可以为一组词,中间用"."进行分隔,并且支持使用通配符,规则如下:
#
:匹配0个或者多个词*
:匹配1个单词例如现在queue1的BindingKey为"china.#“,而queue2的BindingKey为”#.news",而RoutingKey为"china.reports",此时可以路由给queue1,但是无法路由给queue2,如果RoutingKey为"china.news"则queue1、queue2均可以被路由
操作步骤:
/** * 订阅topic.queue1队列 * @param msg 消息 */ @RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg) { log.info("listener1 从【topic.queue1】接收到消息:" + msg); } /** * 订阅topic.queue2队列 * @param msg 消息 */ @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg) { log.info("listener2 从【topic.queue2】接收到消息:" + msg); }
/**
* 测试TopicExchange交换机类型
*/
@Test
public void testTopicExchange() {
// 1. 定义交换机名称
String exchangeName = "topic.exchange";
// 2. 定义消息体
String msg = "中国新闻报,快来买呀!";
// 3. 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
证明通配符生效!
前面我们收发消息的过程是使用Java代码实现的,但是创建Queues以及Exchanges仍然需要我们在RabbitMQ提供的控制台实现,那么如何使用Java代码来创建Queue以及Exchange呢?
SpringAMQP API:
new Queue("队列名称")
创建new FanoutExchange("交换机名称")
(以FanoutExchange为例)BindingBuilder.bind(队列对象).to(交换机对象)
构建步骤:
@Configuration
声明@Bean
声明@Configuration public class FanoutConfig { /** * 声明FanoutExchange交换机 * @return 返回FanoutExchange对象 */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("code.fanout.exchange"); } /** * 声明FanoutQueue队列 * @return 返回FanoutQueue队列 */ @Bean public Queue fanoutQueue() { return new Queue("code.fanout.queue"); } /** * 声明绑定关系 * @param fanoutExchange 交换机 * @param fanoutQueue 队列 * @return 绑定关系 */ @Bean public Binding fanoutBinding(FanoutExchange fanoutExchange, Queue fanoutQueue) { return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); } }
步骤:
@Configuration
声明@Bean
声明@Configuration public class DirectConfig { /** * 声明一个DirectExchange交换机 * @return 返回一个DirectExchange类型对象 */ @Bean public DirectExchange directExchange() { return new DirectExchange("code.direct.exchange"); } /** * 声明一个Queue队列 * @return 返回一个Queue类型对象 */ @Bean public Queue directQueue() { return new Queue("code.direct.queue"); } /** * 声明一个绑定关系 * @return 返回Binding对象 */ @Bean public Binding directBinding(DirectExchange directExchange, Queue directQueue) { return BindingBuilder.bind(directQueue).to(directExchange).with(""); } }
注解声明格式:
@Component
@Slf4j
public class AnnotateRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("annotate.direct.queue"),
key = {"blue", "red"},
exchange = @Exchange(name = "annotate.direct.exchange", type = ExchangeTypes.DIRECT)
))
public void listenAnnotateDirect(String msg) {
log.info("接收到消息:" + msg);
}
}
前面我们都是将字符串类型的数据作为消息进行传输,那么如果是对象类型的消息呢,我们尝试发送一个自定义User类型作为消息传输:
/**
* 自定义User类型
* @author 米饭好好吃
*/
@Data
@AllArgsConstructor
public class User implements Serializable {
private String name;
private Integer age;
}
@Test
public void testSendObject() {
// 1. 声明队列名称
String queueName = "work.queue";
// 2. 定义消息体
User user = new User("jack", 22);
// 3. 发送消息
rabbitTemplate.convertAndSend(queueName, user);
}
从RabbitMQ控制台中查看消息内容如下:
我们发现实际调用了convertMessageIfNecessary(object)
方法,我们继续追踪进去:
该方法判断object是否为Message类型,如果不是就调用getRequiredMessageConverter()
获取所需的消息转换器,继续追踪进去:
该方法返回了一个SimpleMessageConverter实例对象,因此我们回到上一层,获取到MessageConverter实例后又调用了toMessage
方法,我们继续追踪进去观察是如何转换消息的:
在AbstruectMessageConverter中实现了toMessage方法,而createMessage方法在子类 SimpleMessageConverter重写了该方法:
可以看出调用了SerialzationUtils.serialize(object)
进行了序列化,继续追踪观察到底是如何序列化的:
可以看出是借助ObjectOutputStream
进行序列化的,而这这个是JDK默认的序列化方式,该方式有如下缺点:
因此我们需要重写消息转换器中的序列化机制:
因此JDK原生序列化器有诸多确定,因此我们需要使用自定义的JSON序列化器,此处需要引入jackson-databind
相关依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
/**
* 消息转换器配置
* @author 米饭好好吃
*/
@Configuration
public class MessageConvertConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
验证结果:
在控制台中我们可以发现消息格式就是熟悉的JSON格式了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。