赞
踩
一段逻辑中有这样的技术需求:
1、需要发送多个不同topic的消息(一般可以避免);
2、需要进行本地数据库事务操作;
3、需要保证多条消息的发送跟数据库事务的事务一致性;
这里使用RocketMQ4.3及以后版本的事务消息进行尝试实现。
这段代码实现了 RocketMQ 的事务消息监听器 MyTransactionListener
,它实现了 RocketMQ 的 TransactionListener
接口,其中包含了两个方法:
executeLocalTransaction(Message msg, Object arg)
方法用于执行本地事务。当发送事务消息时,RocketMQ 会调用此方法来执行本地事务。在方法中,首先从参数 arg
中获取事务消息相关的数据,然后从队列中取出下一条待发送的消息 nextMsg
,如果存在下一条消息,则使用事务生产者发送该消息,并返回发送结果对应的本地事务状态;如果队列中已经没有下一条消息,则说明当前消息是最后一条消息,模拟数据库连接并保存订单信息,最后返回提交消息的状态。如果在任何步骤中发生异常,则返回回滚消息的状态。
checkLocalTransaction(MessageExt msg)
方法用于检查本地事务的执行状态。RocketMQ 在发送事务消息后会定期调用此方法来检查本地事务的执行状态。在方法中,首先解析消息体获取用户信息,然后模拟数据库连接并查询用户信息的状态,如果用户信息状态为正常,则返回提交消息的状态,否则返回回滚消息的状态。如果在查询过程中发生异常,则记录错误日志,并返回状态为未知的状态。
需要注意的是,在 checkLocalTransaction
方法中,如果发生异常,则返回 LocalTransactionState.UNKNOW
表示本地事务状态未知,这可能会导致 RocketMQ 在无法确定本地事务状态时采取一定的策略(如重试)来保证消息最终一致性。因此,在实际生产环境中,应尽量确保本地事务的状态能够被正确地查询和确认,以确保消息的可靠传递和数据的一致性。
如下代码,只是自己按对RocketMQ事务消息的理解设计的简单的demo,并未在生产上进行实践,如有疑问欢迎交流指正。
- public class NestedTransactionDemo {
-
- // 模拟数据库连接
- private static Connection getConnection() throws SQLException {
- // 这里使用简单的内存数据库 H2
- return DriverManager.getConnection("jdbc:h2:mem:test");
- }
-
- // 业务db操作
- private static void saveOrderToDB(Connection conn, UserInfoEntity userInfo) throws SQLException {
- // 业务db操作
- String sql = "UPDATE t_user SET status = ? WHERE id = ?";
- try (PreparedStatement ps = conn.prepareStatement(sql)) {
- ps.setString(1, userInfo.getStatus());
- ps.setLong(2, userInfo.getId());
- ps.executeUpdate();
- }
- }
-
- // 业务db操作查询确认
- private static UserInfoEntity getUserInfoById(Connection conn, long id) throws SQLException {
- // 查询用户信息
- String sql = "SELECT * FROM t_user WHERE id =?";
- try (PreparedStatement ps = conn.prepareStatement(sql)) {
- ps.setLong(1, id);
- try (ResultSet rs = ps.executeQuery()) {
- if (rs.next()) {
- return new UserInfoEntity(rs.getLong("id"), rs.getString("status"));
- } else {
- return null;
- }
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- // 业务逻辑 ....
-
-
- // 设置 NameServer 地址
- String nameServerAddr = "localhost:9876";
-
- // 创建一个事务生产者实例
- TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
- // 设置 NameServer 地址
- producer.setNamesrvAddr(nameServerAddr);
-
- // 设置事务监听器
- producer.setTransactionListener(new MyTransactionListener());
-
- // 启动事务生产者实例
- producer.start();
-
- try {
- // 需要进行db操作的数据entity
- UserInfoEntity userInfo = new UserInfoEntity(1L, "normal");
- // 构造消息1
- String jsonString = JsonUtils.toJsonString(userInfo);
- byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);
- Message msg1 = new Message("TopicTest1", "Tag", "k1", body);
- Message msg2 = new Message("TopicTest2", "Tag", "k2", body);
- Message msg3 = new Message("TopicTest3", "Tag", "k3", body);
- // 构建消息待发送队列
- Queue<Message> afterMessages = new LinkedList<>();
- afterMessages.offer(msg2);
- afterMessages.offer(msg3);
- // 发送事务消息1
- producer.sendMessageInTransaction(msg1,
- new MQMsgTransactionData(userInfo, afterMessages, producer)
- );
- } catch (Exception e) {
- // ignore
- }
-
- // 停止事务生产者实例
- Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));
- }
-
- @Data
- public static class MQMsgTransactionData {
- private UserInfoEntity userInfo;
- private Queue<Message> afterMessages;
- TransactionMQProducer producer;
-
- public MQMsgTransactionData(UserInfoEntity userInfo, Queue<Message> afterMessages, TransactionMQProducer producer) {
- this.userInfo = userInfo;
- this.afterMessages = afterMessages;
- this.producer = producer;
- }
- }
-
- @Data
- public static class UserInfoEntity {
- private Long id;
- private String status;
-
- public UserInfoEntity(Long id, String status) {
- this.id = id;
- this.status = status;
- }
- }
-
- @Slf4j
- public static class MyTransactionListener implements TransactionListener {
-
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- MQMsgTransactionData data = (MQMsgTransactionData) arg;
- Queue<Message> messages = data.getAfterMessages();
- // 出队列
- Message nextMsg = messages.poll();
- TransactionMQProducer producer = data.getProducer();
- try {
- if (nextMsg != null) {
- TransactionSendResult sendResult = producer.sendMessageInTransaction(nextMsg, arg);
- return sendResult.getLocalTransactionState();
- } else {
- // nextMsg为空,代表当前msg已经是最后一个
- // 模拟数据库连接
- try (Connection conn = getConnection()) {
- // 业务db操作
- saveOrderToDB(conn, data.getUserInfo());
- return LocalTransactionState.COMMIT_MESSAGE;
- } catch (Exception e) {
- // 返回 ROLLBACK_MESSAGE 表示本地事务提交失败
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- }
- } catch (Exception e) {
- // 这里需要返回 unknown :在 mq 发送 嵌套的过程中,担心 ack的结果不一致
- log.error("严重告警");
- return LocalTransactionState.UNKNOW;
- }
- }
-
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- // todo : 获取业务唯一id
- // todo : 查询数据判断业务db操作是否已完成
- // todo : 已完成则 ack true
- // todo : 未完成则 ack false
- byte[] body = msg.getBody();
- String userJson = new String(body, StandardCharsets.UTF_8);
- UserInfoEntity userInfoEntity = JsonUtils.parseJson2Object(userJson, UserInfoEntity.class);
- // 模拟数据库连接
- try (Connection conn = getConnection()) {
- // 在数据库中保存订单
- UserInfoEntity infoEntity = getUserInfoById(conn, userInfoEntity.getId());
- if (infoEntity != null && infoEntity.getStatus().equals("normal")) {
- return LocalTransactionState.COMMIT_MESSAGE;
- } else {
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- } catch (Exception e) {
- log.error("严重告警");
- return LocalTransactionState.UNKNOW;
- }
- }
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。