赞
踩
先上一张图,大致了解 Spring Cloud Alibaba RocketMq 架构图:
RocketMQ的顺序消息分为:局部有序和全局有序。
顺序消息分为两部分:顺序发送
(发消息)、顺序消费
(收消息)。以下为实现局部有序示例:
顺序发消息方式一
① 修改生产端项目配置为同步发送
#在项目spring.propeties配置文件中,设置同步发送。(默认市异步发送)
# 所有通过 RocketMQTemplate 或其他相关组件发送的消息都会以同步的方式发送。
spring.cloud.stream.rocketmq.output.producer.sync=true
② MessageBuilder 设置Header信息头,表示该消息是一条顺序消息,将消息固定发送到指定的消息队列。
@RestController public class sendMsgController { @Autowired private Source source; @GetMapping(value = "/sendOrderlyMsg") public String sendOrderlyMsg() { List<String> list = Arrays.asList("状态1","状态2","状态3"); for (String msg : list) { // 这里指定消息发送到第0各消息队列 MessageBuilder builder = MessageBuilder.withPayload(msg) .setHeader(BinderHeaders.PARTITION_HEADER, 0); Message message = builder.build(); source.output().send(message); } return "success"; } }
顺序发消息方式二
@Slf4j @SpringBootTest public class SendSyncOrderlyMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void sendOrderedStringMessage() { String message = "这是一条同步顺序消息:"; for (int i = 0; i < 5; i++) { // hashkey是为了确保这些消息被路由到同一个消息队列,这样消费者就能够按照顺序处理它们 rocketMQTemplate.syncSendOrderly("orderilMessageTopic", message + i, "syncOrderlyHashKey"); } } }
修改消费端项目配置为顺序消费。
# 指定为顺序消费,这里默认配置为并发消费
spring.cloud.stream.rocketmq.binding.input.consumer.orderly=true
RocketMQ 发送消息的三种方式:同步、异步、单向。RocketMQ 发送顺序消息的原理,就是同一类消息发送到相同的队列即可
。为保证先发送的消息先存储到消息队列,必须使用同步发送的方式
,否则可能出现先发的消息后到消息队列,此时消息就会乱序。
同步
:发送网络请求后会同步等待 Broker 服务器的返回结果,支持发送失败重试,适用于比较重要的消息通知场景。异步
:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求更高的场景。单向
:单向和异步发送的原理一致,但是不支持回调。适用于响应时间非常短,对可靠性要求不高的场景,如日志收集。
撸一撸源码,首先看看 org.apache.rocketmq.spring.core.RocketMQTemplate#syncSendOrderly()
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { // …… 省略该类其他代码 private DefaultMQProducer producer; private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) { return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout()); } public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); // 转成 RocketMQ API 中的 Message 对象 org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); // 调用发送消息接口,发送消息,选择队列由 messageQueueSelector 实现 SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); } return sendResult; } catch (Exception e) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } // …… 省略该类其他代码 }
RocketMQTemplate#syncSendOrderly() 中选择队列的过程,由 MessageQueueSelector 和 hashKey 在实现类 SelectMessageQueueByHash 中完成。
package org.apache.rocketmq.client.producer.selector; import java.util.List; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; public class SelectMessageQueueByHash implements MessageQueueSelector { // hash 值相同且 队列数相同,则消息发送到的队列相同 // 这里的 mqs队列列表,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存获取。 @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 根据 hashKey计算hash值,再用hash值和队列数 mqs.size()取模,得到一个索引值,结果小于队列数。 int value = arg.hashCode() % mqs.size(); if (value < 0) { // 索引取绝对值 value = Math.abs(value); } // 根据索引值从队列中取出一个队列 return mqs.get(value); } }
RocketMQ支持集群消费、广播消费两种模式。
广播消费
模式:每条消息会被 ConsumerGroup 的每个 Consumer 消费。集群消费
模式:每条消息只会被 ConsumerGroup 的一个 Consumer 消费。(默认模式) 顺序消费原理是,同一个消息队列只允许 Consumer 中的一个消费线程拉取消费
。Consumer 中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到 Broker 时,会先申请独占锁,获得锁的请求则允许消费。
消息消费成功后,会向 Broker 提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息
。顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中后续的消息,从而保证顺序消费。
顺序消费需特别注意对异常的处理
,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。
撸一下 org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService 类源码:
public class ConsumeMessageOrderlyService implements ConsumeMessageService { private final MessageQueueLock messageQueueLock = new MessageQueueLock(); // ……省略其他代码 class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; // ……省略其他代码 @Override public void run() { final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { // ……省略其他代码 try { this.processQueue.getConsumeLock().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; } finally { this.processQueue.getConsumeLock().unlock(); } // ……省略其他代码 } } } }
org.apache.rocketmq.client.impl.consumer.ProcessQueue 类源码:
public class ProcessQueue {
// ……省略其他代码
private final Lock consumeLock = new ReentrantLock();
public Lock getConsumeLock() {
return consumeLock;
}
// ……省略其他代码
}
RocketMQ 采用2PC方案来提交事务消息。
Producer 向 Broker 发送预处理消息(也称半消息),此时消息还未被投递出去,Consumer 不能消费
。Producer 向 Broker 发送提交或回滚消息
。
RocketMQ 事务消息流程:
1、发送预处理消息成功后,开始执行本地事务。
2、如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给 Consumer.
3、如果本地事务执行失败,发送回滚请求回滚事务消息,消息不会投递给 Consumer.
4、如果本地事务状态未知,网络故障或 Producer 宕机,Broker 未收到二次确认的消息。由 Broker 端发送请求给 Producer 进行消息回查,确认提交或回滚。如果消息状态一直未被确认,需要人工介入处理。
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.apache.rocketmq.common.message.MessageConst; public class TransactionalMessageExample { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendTransactionalMessage() { // 构建消息 Message<String> message = MessageBuilder.withPayload("Transactional message") .setHeader(RocketMQHeaders.TRANSACTION_ID, "transactionId-test") // 设置事务ID .build(); // 发送事务消息 rocketMQTemplate.sendMessageInTransaction("Topic:Tag", message, new RocketMQLocalTransactionListener() { // 执行本地事务,及本地事务执行成功后的操作 @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 获取事务ID : transactionId-test String transactionId = (String)message.getHeaders() .get(RocketMQHeaders.TRANSACTION_ID); // 假设以事务ID 作为主键,执行本地事务, save(transactionId) 为执行数据库操作 boolean result = save(transactionId); return result ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } // 检查本地事务状态,返回事务状态 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 获取事务ID : transactionId-test String transactionId = (String)message.getHeaders() .get(RocketMQHeaders.TRANSACTION_ID); // 假设 isSuccess(transactionId) 是以事务ID为主键,查询本地事务执行情况 if (isSuccess(transactionId)) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } }); } }
不支持延时消息和批量消息
。事务性消息可能不止一次被检查或消费
。事务回查的间隔时间
:BrokerConfig.transactionCheckInterval,通过Broker的配置文件设置好。事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者
。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。