赞
踩
保证一个消息发送出去,至少被消费一次。
可能在多个步骤中给消息弄丢了
不建议使用, 会增加网络和资源的消耗
第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断
当RabbitTemplate与MQ连接超时后,多次重试
修改publisher模块的application.yaml文件,添加下面的内容:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
Rabbitmq之ConfirmCallback与ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模块的application.yaml中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
定义ReturnCallback:
package com.itheima.publisher.config; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j @AllArgsConstructor @Configuration public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("触发return callback,"); log.debug("exchange: {}", returned.getExchange()); log.debug("routingKey: {}", returned.getRoutingKey()); log.debug("message: {}", returned.getMessage()); log.debug("replyCode: {}", returned.getReplyCode()); log.debug("replyText: {}", returned.getReplyText()); } }); } }
定义ConfirmCallback:
由于每个消息发送时的处理逻辑不一定相同,因此**ConfirmCallback需要在每次发消息时定义。**具体来说,是在调用RabbitTemplate中的convertAndSend方法 时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:
@Test void testPublisherConfirm() { // 1.创建CorrelationData,需要一个UUID,回调的时候通过id识别 CorrelationData cd = new CorrelationData(); // 2.给Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 2.1.Future发生异常时的处理逻辑,基本不会触发 log.error("send message fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容 if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执 log.debug("发送消息成功,收到 ack!"); }else{ // result.getReason(),String类型,返回nack时的异常描述 log.error("发送消息失败,收到 nack, reason : {}", result.getReason()); } } }); // 3.发送消息 rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd); }
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化
设置为Durable就是持久化模式,Transient就是临时模式。
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
当内存占满, page out会影响MQ阻塞
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。官方推荐所有队列都为LazyQueue模式。
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
当然,我们也可以基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
可以基于命令行设置policy:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
当然,也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:
当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态
SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 不做处理
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
结论:
Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
在consumer服务中定义处理失败消息的交换机和队列
定义一个RepublishMessageRecoverer,关联队列和交换机
/** * 错误消息配置类,用于配置 RabbitMQ 错误消息处理相关的 Bean。 * 当前类会根据 spring.rabbitmq.listener.simple.retry.enabled 属性的值来决定是否创建相关的 Bean。 * 如果该属性值为 true,则会创建错误消息交换机、错误队列和绑定关系,并配置消息恢复器。 */ @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig { /** * 创建错误消息交换机 */ @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } /** * 创建错误队列 */ @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } /** * 创建错误队列与错误消息交换机的绑定关系,"error"是路由键 */ @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } /** * 创建消息恢复器 */ @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
消费者如何保证消息一定被消费?
- 开启消费者确认机制为auto, 由spring确认消息处理成功后返回ack, 异常时返回nack
- 开启消费者失败重试机制, 并设置MessageRecoverer, 多次重试失败后将信息投递到异常交换机
保证消息处理的幂等性。这里给出两种方案:
SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可
以Jackson的消息转换器为例( 加在启动类 ):
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);// 在底层会自动创建一个UUID
return jjmc;
}
在Spring AMQP中,当使用Jackson2JsonMessageConverter并开启setCreateMessageIds(true)功能时,底层会自动在消息的属性中添加一个名为amqp_messageId的字段,其值为自动生成的UUID。
具体来说,UUID会成为消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)属性中。这些属性是与消息一起传递的元数据,包含了关于消息的一些信息。amqp_messageId字段就是用于唯一标识消息的UUID。
当消息被发送给消费者时,消费者可以通过**message.getMessageProperties().getMessageId()**方法来获取消息的ID,然后根据业务需求将该ID保存到数据库中。在处理相同消息时,消费者可以在数据库中查询是否存在相同的消息ID,以判断是否为重复消息。
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。
综上,支付服务与交易服务之间的订单状态一致性是如何保证的?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
死信交换机有什么作用呢?
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下:
[
{
"CreatedAt": "2024-01-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来执行命令,启用插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
基于@Bean的方式:
package com.itheima.consumer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class DelayExchangeConfig { @Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") // 指定交换机类型和名称 .delayed() // 设置delay的属性为true .durable(true) // 持久化 .build(); } @Bean public Queue delayedQueue(){ return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
可以写一个延迟时间的类, 不用每次都new一个,具体代码如下:
/**
* @author Ccoo
* 2024/1/22
*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {
private final int delay;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}
最后上面的业务代码变为:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message,
new DelayMessageProcessor(message.removeNextDelay().intValue());
}
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息。
由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到hm-common模块下:
@Data public class MultiDelayMessage<T> { /** * 消息体 */ private T data; /** * 记录延迟时间的集合 */ private List<Long> delayMillis; public MultiDelayMessage(T data, List<Long> delayMillis) { this.data = data; this.delayMillis = delayMillis; } public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){ return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis)); } /** * 获取并移除下一个延迟时间 * @return 队列中的第一个延迟时间 */ public Long removeNextDelay(){ return delayMillis.remove(0); } /** * 是否还有下一个延迟时间 */ public boolean hasNextDelay(){ return !delayMillis.isEmpty(); } }
/**
* @author Ccoo
* 2024/1/22
*/
public interface MqConstants {
String DELAY_EXCHANGE = "trade.delay.topic";
String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
String DELAY_ORDER_ROUTING_KEY = "order.query";
}
在nacos中定义一个名为shared-mq.xml的配置文件,内容如下:
spring:
rabbitmq:
host: ${hm.mq.host:192.168.164.128} # 主机名
port: ${hm.mq.port:5672} # 端口
virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
username: ${hm.mq.un:itheima} # 用户名
password: ${hm.mq.pw:123321} # 密码
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
在trade-service模块添加共享配置:
在trade-service模块的pom.xml中引入amqp的依赖:
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
首先,在hm-api模块定义三个类:
说明:
@Data @ApiModel(description = "支付单数据传输实体") public class PayOrderDTO { @ApiModelProperty("id") private Long id; @ApiModelProperty("业务订单号") private Long bizOrderNo; @ApiModelProperty("支付单号") private Long payOrderNo; @ApiModelProperty("支付用户id") private Long bizUserId; @ApiModelProperty("支付渠道编码") private String payChannelCode; @ApiModelProperty("支付金额,单位分") private Integer amount; @ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付") private Integer payType; @ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功") private Integer status; @ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段") private String expandJson; @ApiModelProperty("第三方返回业务码") private String resultCode; @ApiModelProperty("第三方返回提示信息") private String resultMsg; @ApiModelProperty("支付成功时间") private LocalDateTime paySuccessTime; @ApiModelProperty("支付超时时间") private LocalDateTime payOverTime; @ApiModelProperty("支付二维码链接") private String qrCodeUrl; @ApiModelProperty("创建时间") private LocalDateTime createTime; @ApiModelProperty("更新时间") private LocalDateTime updateTime; }
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
/**
* 根据交易订单id查询支付单
* @param id 业务订单id
* @return 支付单信息
*/
@GetMapping("/pay-orders/biz/{id}")
PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {
@Override
public PayClient create(Throwable cause) {
return new PayClient() {
@Override
public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
return null;
}
};
}
}
最后,在pay-service模块的PayController中实现该接口:
@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}
接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
@Slf4j @Component @RequiredArgsConstructor public class OrderStatusListener { private final IOrderService orderService; private final PayClient payClient; private final RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"), exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC), key = MqConstants.DELAY_ORDER_ROUTING_KEY )) public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) { // 1.获取消息中的订单id Long orderId = msg.getData(); // 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭 Order order = orderService.getById(orderId); if (order == null || order.getStatus() > 1) { // 订单不存在或交易已经结束,放弃处理 return; } // 3.可能是未支付,查询支付服务 PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId); if (payOrder != null && payOrder.getStatus() == 3) { // 支付成功,更新订单状态 orderService.markOrderPaySuccess(orderId); return; } // 4.确定未支付,判断是否还有剩余延迟时间 if (msg.hasNextDelay()) { // 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值 int delayVal = msg.removeNextDelay().intValue(); // 4.2.发送延迟消息 rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, message -> { message.getMessageProperties().setDelay(delayVal); return message; }); return; } // 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单 orderService.cancelOrder(orderId); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。