赞
踩
rabbitmq六种模式 |
---|
![]() |
rabbitmq六种模式.png)
rabbitmq直连模式示意图 |
---|
![]() |
public void sendMessage() { // 队列的名称 String QUEUENAME = "HELLOQUEUE"; // 链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); // 虚拟主机 密码 用户名(全部为默认,创建自定义虚拟主机后面说) connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); try { // 创建一个connection Connection connection = connectionFactory.newConnection(); // 创建channel Channel channel = connection.createChannel(); /** 向管道声明队列: 参数1 String queue: 队列的名字如果没有就新创建一个(幂等) 参数2 boolean durable:是否持久化 (持久化->信息持久化到队列重启服务依旧存在;不持久化->重启rabbitmq服务消息删除) 参数3 boolean exclusive:是否独占队列 (独占是相对于链接而言的,A链接设置AA队列是独占的则B链接就无法创建同名AA的队列;一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。) 参数4 boolean autoDelete:是否自动删除(自动删除true时->如果有consumer连接过并且所有的consumer都断开了队列自动删除,重点是有consumer连接过刚创建没连接过不会自动删除) 参数5:设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。 */ channel.queueDeclare(QUEUENAME, true, false, false, null); // 要发送的消息 String message = "product:{'name':'手机','价格':12}"; /** 发布消息: 参数1 String exchange:交换机名称 参数2 String routingKey:routingKey没有就使用队列名称 参数3 BasicProperties props:一些其他属性 参数4 byte[] body:消息的具体内容 */ channel.basicPublish("",QUEUENAME,null,message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) { // 生产者对应的队列 String QUEUENAME = "HELLOQUEUE"; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); // 虚拟主机 密码 用户名(全部为默认,创建自定义虚拟主机后面说) connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); /** 向管道声明队列: 参数1 String queue: 队列的名称如果没有就新创建一个(幂等) 参数2 boolean durable:是否持久化 (持久化->信息持久化到队列重启服务依旧存在;不持久化->重启rabbitmq服务消息删除) 参数3 boolean exclusive:是否独占队列 (独占是相对于链接而言的,A链接设置AA队列是独占的则B链接就无法创建同名AA的队列;一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。) 参数4 boolean autoDelete:是否自动删除(自动删除true时->如果有consumer连接过并且所有的consumer都断开了队列自动删除,重点是有consumer连接过刚创建没连接过不会自动删除) 参数5:设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。 */ channel.queueDeclare(QUEUENAME, true, false, false, null); DeliverCallback deliverCallback = (s, delivery) -> { String s1 = new String(delivery.getBody(), "UTF-8"); System.out.println("消息为"+s1); }; /** 参数1 String queue:队列的名称 参数1 boolean autoAck:true 接收到传递过来的消息后自动acknowledged(应答服务器),false 接收到消息后不应答服务器 参数1 DeliverCallback deliverCallback:当一个消息发送过来后的回调接口 参数1 CancelCallback cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法 */ channel.basicConsume(QUEUENAME,true,deliverCallback,dd->{}); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
rabbitmq任务队列模式示意图 |
---|
![]() |
同上代码
// 创建多个消费者和同一个消息队列绑定 public static void main(String[] args) { RabbitmqUtils rabbitmqUtils = new RabbitmqUtils(); String QUEUENAME = "HELLOQUEUE"; try { Connection connection = rabbitmqUtils.getConnection(); Channel channel = null; channel = connection.createChannel(); channel.queueDeclare(QUEUENAME, true, false, false, null); channel.basicConsume(QUEUENAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 该消费者即使很慢但会处理同样的任务(消息)数 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new String(body)); } }); } catch ( Exception e) { e.printStackTrace(); } finally { } }
同上
// 启动多个consumer,能者多劳 public static void main(String[] args) { String QUEUENAME = "HELLOQUEUE"; RabbitmqUtils rabbitmqUtils = new RabbitmqUtils(); Connection connection = rabbitmqUtils.getConnection(); try{ Channel channel = connection.createChannel(); // 每次确认一条消息 channel.basicQos(1); channel.queueDeclare(QUEUENAME,true,false,false,null); // 自动确认设置为false channel.basicConsume(QUEUENAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); System.out.println("comsumer1处理"+new String(body)); } catch (InterruptedException e) { e.printStackTrace(); } // 第一个参数时 标记队列中的一个message的long类型的值;第二个参数是是否开启多个参数同时确认 channel.basicAck(envelope.getDeliveryTag(),false); } }); }catch (Exception e){ } }
Envelope envelope参数
Envelope(deliveryTag=1, redeliver=false, exchange=shortMessageExchange, routingKey=): |
---|
deliveryTag:标记一个消息的long类型的值 |
exchange:交换机名 |
routingKey:路由规则 |
rabbitmq广播模式官方图 |
---|
![]() |
使用交换机exchange
一个交换机绑定多个队列,一个队列绑定多个消费者
相比任务队列模式,多了exchange
具体交换机把消息按照那种策略分发给队列,根据交换机的类型确认;有几种可用的交换类型:直接、主题、标题和扇出。我们将关注 — 扇出。
扇出模式交换器会把消息扇出到所有和它绑定的队列;实际一个消息多次处理(可以用在注册成功短信、邮箱等多种方式通知)
public void sendMessageToExchange() { String QUEUENAME = "HELLOQUEUE"; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 声明广播类型交换机 channel.exchangeDeclare("shortMessageExchange", BuiltinExchangeType.FANOUT); // 向交换机中发送消息 for (int i = 0; i < 10; i++) { /** * 参数1:交换机名称 * 参数2:routing-key * 参数3:BasicProperties * 参数4:消息体 */ channel.basicPublish("shortMessageExchange", "", null, ("消息-" + i).getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (channel != null) { channel.close(); } if(connection!=null){ channel.close(); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
public static void main(String[] args) { String exchangeName = "shortMessageExchange"; RabbitmqUtils rabbitmqUtils = new RabbitmqUtils(); Connection connection = null; Channel channel = null; try { connection = rabbitmqUtils.getConnection(); channel = connection.createChannel(); String queueName = channel.queueDeclare("queue1", true, false, true, null).getQueue(); //String queueName = channel.queueDeclare().getQueue(); /** * 参数1:destination 队列名 * 参数2:交换器名 * 参数3:路由规则 */ // 队列和交换机绑定 channel.queueBind(queueName, exchangeName, ""); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } catch (Exception e) { e.printStackTrace(); } }
路由模型工作图: |
---|
![]() |
public void routeModdeSend() { String exchangeName = "777"; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); // 根据routing-key将消息发布到交换器 for(int i=0;i<5;i++){ channel.basicPublish(exchangeName,"error",null,("routing-key = error-queue"+i).getBytes()); } for(int i=0;i<5;i++){ channel.basicPublish(exchangeName,"warn",null,("routing-key = warn-queue"+i).getBytes()); } for(int i=0;i<5;i++){ channel.basicPublish(exchangeName,"right",null,("routing-key = right-queue"+i).getBytes()); } } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { String exchangeName = "777"; RabbitmqUtils rabbitmqUtils = new RabbitmqUtils(); Connection connection ; try { connection = rabbitmqUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("error-queue", true, false, true, null); channel.queueDeclare("warn-queue", true, false, true, null); channel.queueDeclare("right-queue", true, false, true, null); // 声明交换机 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT); channel.queueBind("error-queue", exchangeName, "error", null); channel.queueBind("warn-queue", exchangeName, "warn", null); channel.queueBind("right-queue", exchangeName, "right", null); channel.basicConsume("error-queue", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("error队列处理 "+new String(body)); } }); channel.basicConsume("warn-queue", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("warn队列处理 "+new String(body)); } }); channel.basicConsume("right-queue", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("right队列处理 "+new String(body)); } }); } catch (Exception e) { e.printStackTrace(); } }
一个交换器对应三个队列,交换器中的message按照路由规则进入队列被consumer error队列处理 routing-key = error-queue0 error队列处理 routing-key = error-queue1 error队列处理 routing-key = error-queue2 error队列处理 routing-key = error-queue3 error队列处理 routing-key = error-queue4 warn队列处理 routing-key = warn-queue0 warn队列处理 routing-key = warn-queue1 warn队列处理 routing-key = warn-queue2 warn队列处理 routing-key = warn-queue3 warn队列处理 routing-key = warn-queue4 right队列处理 routing-key = right-queue0 right队列处理 routing-key = right-queue1 right队列处理 routing-key = right-queue2 right队列处理 routing-key = right-queue3 right队列处理 routing-key = right-queue4
routing-key
**Topics通配符模型官方图: |
---|
![]() |
★ 和路由模式区别是 1.交换机的类型 2.Topics模式的routing-key可以根据通配符规则
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
// 根据routing-key将消息发布到交换器
for(int i=0;i<5;i++){
channel.basicPublish(exchangeName,"error.ff",null,("error队列处理的第"+i+"条").getBytes());
}
for(int i=0;i<5;i++){
channel.basicPublish(exchangeName,"warn.ff.ff",null,("warn队列处理的第"+i+"条").getBytes());
} for(int i=0;i<5;i++){
channel.basicPublish(exchangeName,"right",null,("right队列处理的第"+i+"条").getBytes());
}
★ 和路由模式区别是 1.交换机的类型 2.Topics模式的routing-key可以根据通配符规则
// 声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
// 将队列和交换机绑定
channel.queueBind("error-queue", exchangeName, "error.*", null);
channel.queueBind("warn-queue", exchangeName, "warn.#", null);
channel.queueBind("right-queue", exchangeName, "right", null);
@RabbitListener(queues = "listened-queue-name")
public void consumerMessage(Object o){
System.out.println("消息"+o);
}
mq消息确认 |
---|
![]() |
confirmCallback : 消息到达broke就是调用confirmCallBack(),如果是集群必须所有的broke都接收到才会调用confirmCallBack()方法。
被broke接收到只能说明message到达了mq服务器并不能保证消息一定会被投递到queue里。所以需要用到接下来的returnCallBack()。
// 配置文件=》开启发送端确认
spring.rabbitmq.publisher-confirms=true
// 回调逻辑 =》服务器收到消息就回调
@Autowired
RabbitTemplate rabbitTemplate;
public void RabbitMqConfirmCallback(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息到达rabbitMq时服务器回调[correlationData]:"+correlationData+"[ack]:"+b);
}
});
}
returnCallback:未正常投递到queue的情况下执行回调
// 配置文件 =》 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
// 配置文件 =》 发送端消息抵达队列优先以异步的方式回调
spring.rabbitmq.template.mandatory=true
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息投递到队列失败"+returnedMessage);
}
});
// 配置文件 = 》 设置消息的确认模式为手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = "listened-queue-name")
public void consumerMessage(Message message, Object o, Channel channel) throws IOException {
// 消息的确认 参数一:消息的唯一Long值 ;参数二:是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息" + o);
}
@RabbitListener(queues = "listened-queue-name")
public void consumerMessage(Message message, Object o, Channel channel) throws IOException {
// 消息的确认 参数一:消息的唯一Long值 ;参数二:是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// 拒绝消息 参数一:消息的唯一Long值;参数二:是否批量拒绝;参数三:是否重回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// 第二种拒绝方式
// 拒绝消息 参数一:消息的唯一Long值;参数二:是否重回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息" + o);
}
rabbitmq实现延迟消息队列图示 |
---|
![]() |
private static final String DELAYEXCHANGE = "delay_exchange";
private static final String DELAYQUEUE = "delay_queue";
private static final String DEADEXCHANGE = "dead_exchange";
private static final String DEADQUEUE = "dead_queue";
private static final String ROUTING_KEY = "sms";
// 绑定延迟队列的交换机(正常) @Bean(name = "delayExchange") public DirectExchange getDelayExchange() { return ExchangeBuilder.directExchange(DELAYEXCHANGE).durable(true).build(); } /** * 延迟队列 * @return */ @Bean(name = "delayQueue") public Queue getDelayQueue(){ // 交换机的名称就是死信交换机相同 return QueueBuilder.durable(DELAYQUEUE) // 延迟时间 .withArgument("x-message-ttl", 10000) // 超过延迟时间 流向的交换机 .withArgument("x-dead-letter-exchange",DEADEXCHANGE) .withArgument("x-dead-letter-routing-key", ROUTING_KEY) .build(); } // 延迟交换机 & 延迟队列绑定 @Bean public Binding delayBind(Queue delayQueue,DirectExchange delayExchange){ return BindingBuilder.bind(delayQueue).to(delayExchange).with(ROUTING_KEY); }
// 死信交换机(延迟队列用) @Bean(name = "deadExchange") public DirectExchange getDeadExchange(){ return ExchangeBuilder.directExchange(DEADEXCHANGE).durable(true).build(); } // 死信队列(普通队列) @Bean(name = "deadQueue") public Queue getDeadQueue(){ return QueueBuilder.durable(DEADQUEUE).build(); } @Bean // 死信交换机 & 死信队列绑定 public Binding deadBind(Queue deadQueue,DirectExchange deadExchange){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(ROUTING_KEY); }
public void send() {
for (int i = 0; i < 100; i++) {
CorrelationData correlationData = new CorrelationData(i+"");
rabbitTemplate.convertAndSend(DELAYEXCHANGE, ROUTING_KEY, "延迟消息" + i,correlationData);
}
}
延迟实现结果: |
---|
![]() |
@Component
@RabbitListener(queues = {"dead_queue"})
public class ConsumerLinster {
@RabbitHandler
public void handlerMessage(String message){
System.out.println(message);
}
}
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.9.0
安装启动 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: mall # rabbitmq的用户名
password: mall # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true
/** * 消息队列配置 * Created by macro on 2018/9/14. */ @Configuration public class RabbitMqConfig { /** * 订单延迟插件消息队列所绑定的交换机 */ @Bean CustomExchange orderPluginDirect() { //创建一个自定义交换机,可以发送延迟消息 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args); } /** * 订单延迟插件队列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 将订单延迟插件队列绑定到交换机 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); } }
/** * 取消订单消息的发出者 * Created by macro on 2018/9/14. */ @Component public class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setHeader("x-delay",delayTimes); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); } }
/** * 取消订单消息的处理者 * Created by macro on 2018/9/14. */ @Component @RabbitListener(queues = "mall.order.cancel.plugin") public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsPortalOrderService portalOrderService; @RabbitHandler public void handle(Long orderId){ LOGGER.info("receive delay message orderId:{}",orderId); portalOrderService.cancelOrder(orderId); } }
死信队列是这样一个队列,如果消息发送到该队列并超过了设置的时间,就会被转发到设置好的处理超时消息的队列当中去,利用该特性可以实现延迟消息。
通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。
由于死信队列方式需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。