赞
踩
在RabbitMQ中可以将消息传递的链路简化成如下图:
从上图可以发现,主要分为三个角色:Producer、Consumer、RabbitMQ Broker
正常情况下,Producer生产消息,安全的到打Broker的Exchange,然后根据转发规则,存储在Queue上,最后再推送给订阅的Consumer。
因此可以将链路分成三个部分:
1.消息从Producer到RabbitMQ的Exchange
2.消息从Exchange到Queue
3.消息从Queue到Consumer
当然还有一种情况,就是消息已经到达服务器了,但服务器挂了,因此还需要有持久化来恢复消息。
综上,要想保证RabbitMQ的可靠性,可以从这四个方面去入手:
1.保证消息从Producer到达Broker的Exchange
2.保证消息从Exchange转发到对应的Queue上
3.保证消息的持久化
4.保证消息正确被消费者消费
发布确认主要分为两种模式:confirm模式和return模式
confirm模式是针对消息从Producer到Exchange
return模式是针对消息从Exchange到Queue
Spring中yml配置:
- spring:
- rabbitmq:
- port: 5672
- host: xxx
- username: admin
- password: admin
- virtual-host: demo
- #消息发送确认
- publisher-confirm-type: correlated
因为这个机制是在发送方这边,因此在创建RabbitTemplate时配置。
代码:
- @Bean("confirmRabbitTemplate")
- public RabbitTemplate confirmRabbitTemplate(ConnectionFactory
- connectionFactory){
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack,
- String cause) {
- System.out.printf("");
- if (ack){
- System.out.printf("消息接收成功, id:%s \n",
- correlationData.getId());
- }else {
- System.out.printf("消息接收失败, id:%s, cause: %s",
- correlationData.getId(), cause);
- }
- }
- });
- return rabbitTemplate;
- }

RabbitMQ无法去区分要确认的是哪条消息,因此在发送消息的时候还要添加一个标识。
代码:
- public String fanout(){
- for (int i = 0; i < 10; i++) {
- CorrelationData correlationData = new CorrelationData("" + i);
- rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello——" + i, correlationData);
- }
- return "发送成功~";
- }
消息到达Exchange后,会根据路由规则进行转发到队列中,但可能存在队列与路由键不匹配或者没有绑定的队列等,消息无法转发到队列中,我们可以设置对这种情况的队列回退到Producer。
yml配置同上
代码:
- @Bean("confirmRabbitmqTemplate2")
- //回退模式
- public RabbitTemplate confirmRabbitTemplate2(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- System.out.println("消息被退回");
- }
- });
- return rabbitTemplate;
- }
生产者代码同上
持久化主要针对的是交换机、队列、消息。
代码:
- //交换机持久化
- ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
-
- //队列持久化
- QueueBuilder.durable(Constants.WORK_QUEUE).build();
-
- //消息持久化
- Message message = new Message("nihao".getBytes(), new MessageProperties());
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- User user = new User(i, "test--" + i, 10 + i);
- rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, user);
消息确认机制主要是用在消费者这边,当消费这从Broker获取了消息,处理完后需要发送一个Ack告诉Broker,然后Broker就可以将消息删除了。
在RabbitMQ中可以设置两种模式:手动确认和自动确认
但在Spring-AMQP中提供了三种机制:NONE、MANUAL、AUTO
NONE:表示不做任何处理,消息一旦发出去就会被确认删除。
AUTO:与NONE不同的是,如果消费者在处理消息的时候发生异常了,并不会确认消息。
MANUAL:手动确认,消费者需要显示调用ACK。
xml配置:
- spring:
- rabbitmq:
- port: 5672
- host: xxx
- username: admin
- password: admin
- virtual-host: demo
- listener:
- simple:
- # acknowledge-mode: none
- # acknowledge-mode: auto
- acknowledge-mode: manual
代码:
- @RabbitListener(queues = Constants.WORK_QUEUE)
- public void listenerQueue1(Message message, Channel channel) throws IOException {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try{
- System.out.println("listener1 : " + message);
- int a = 3 / 0;
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- channel.basicNack(deliveryTag, false, true);
- }
- }
总结:
保证消息的可靠性的方式总共分为四个方式:
1.保证消息从Producer到达Broker的Exchange -> Confirm模式
2.保证消息从Exchange转发到对应的Queue上 -> Return模式
3.保证消息的持久化 -> 设置durable参数
4.保证消息正确被消费者消费 -> 手动应答
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。