当前位置:   article > 正文

RocketMQ发送多个消息及mysql操作的事务一致性保证

RocketMQ发送多个消息及mysql操作的事务一致性保证

一段逻辑中有这样的技术需求:

1、需要发送多个不同topic的消息(一般可以避免);

2、需要进行本地数据库事务操作;

3、需要保证多条消息的发送跟数据库事务的事务一致性;

这里使用RocketMQ4.3及以后版本的事务消息进行尝试实现。

这段代码实现了 RocketMQ 的事务消息监听器 MyTransactionListener,它实现了 RocketMQ 的 TransactionListener 接口,其中包含了两个方法:

  1. executeLocalTransaction(Message msg, Object arg) 方法用于执行本地事务。当发送事务消息时,RocketMQ 会调用此方法来执行本地事务。在方法中,首先从参数 arg 中获取事务消息相关的数据,然后从队列中取出下一条待发送的消息 nextMsg,如果存在下一条消息,则使用事务生产者发送该消息,并返回发送结果对应的本地事务状态;如果队列中已经没有下一条消息,则说明当前消息是最后一条消息,模拟数据库连接并保存订单信息,最后返回提交消息的状态。如果在任何步骤中发生异常,则返回回滚消息的状态。

  2. checkLocalTransaction(MessageExt msg) 方法用于检查本地事务的执行状态。RocketMQ 在发送事务消息后会定期调用此方法来检查本地事务的执行状态。在方法中,首先解析消息体获取用户信息,然后模拟数据库连接并查询用户信息的状态,如果用户信息状态为正常,则返回提交消息的状态,否则返回回滚消息的状态。如果在查询过程中发生异常,则记录错误日志,并返回状态为未知的状态。

需要注意的是,在 checkLocalTransaction 方法中,如果发生异常,则返回 LocalTransactionState.UNKNOW 表示本地事务状态未知,这可能会导致 RocketMQ 在无法确定本地事务状态时采取一定的策略(如重试)来保证消息最终一致性。因此,在实际生产环境中,应尽量确保本地事务的状态能够被正确地查询和确认,以确保消息的可靠传递和数据的一致性。

如下代码,只是自己按对RocketMQ事务消息的理解设计的简单的demo,并未在生产上进行实践,如有疑问欢迎交流指正。

  1. public class NestedTransactionDemo {
  2. // 模拟数据库连接
  3. private static Connection getConnection() throws SQLException {
  4. // 这里使用简单的内存数据库 H2
  5. return DriverManager.getConnection("jdbc:h2:mem:test");
  6. }
  7. // 业务db操作
  8. private static void saveOrderToDB(Connection conn, UserInfoEntity userInfo) throws SQLException {
  9. // 业务db操作
  10. String sql = "UPDATE t_user SET status = ? WHERE id = ?";
  11. try (PreparedStatement ps = conn.prepareStatement(sql)) {
  12. ps.setString(1, userInfo.getStatus());
  13. ps.setLong(2, userInfo.getId());
  14. ps.executeUpdate();
  15. }
  16. }
  17. // 业务db操作查询确认
  18. private static UserInfoEntity getUserInfoById(Connection conn, long id) throws SQLException {
  19. // 查询用户信息
  20. String sql = "SELECT * FROM t_user WHERE id =?";
  21. try (PreparedStatement ps = conn.prepareStatement(sql)) {
  22. ps.setLong(1, id);
  23. try (ResultSet rs = ps.executeQuery()) {
  24. if (rs.next()) {
  25. return new UserInfoEntity(rs.getLong("id"), rs.getString("status"));
  26. } else {
  27. return null;
  28. }
  29. }
  30. }
  31. }
  32. public static void main(String[] args) throws Exception {
  33. // 业务逻辑 ....
  34. // 设置 NameServer 地址
  35. String nameServerAddr = "localhost:9876";
  36. // 创建一个事务生产者实例
  37. TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
  38. // 设置 NameServer 地址
  39. producer.setNamesrvAddr(nameServerAddr);
  40. // 设置事务监听器
  41. producer.setTransactionListener(new MyTransactionListener());
  42. // 启动事务生产者实例
  43. producer.start();
  44. try {
  45. // 需要进行db操作的数据entity
  46. UserInfoEntity userInfo = new UserInfoEntity(1L, "normal");
  47. // 构造消息1
  48. String jsonString = JsonUtils.toJsonString(userInfo);
  49. byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);
  50. Message msg1 = new Message("TopicTest1", "Tag", "k1", body);
  51. Message msg2 = new Message("TopicTest2", "Tag", "k2", body);
  52. Message msg3 = new Message("TopicTest3", "Tag", "k3", body);
  53. // 构建消息待发送队列
  54. Queue<Message> afterMessages = new LinkedList<>();
  55. afterMessages.offer(msg2);
  56. afterMessages.offer(msg3);
  57. // 发送事务消息1
  58. producer.sendMessageInTransaction(msg1,
  59. new MQMsgTransactionData(userInfo, afterMessages, producer)
  60. );
  61. } catch (Exception e) {
  62. // ignore
  63. }
  64. // 停止事务生产者实例
  65. Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));
  66. }
  67. @Data
  68. public static class MQMsgTransactionData {
  69. private UserInfoEntity userInfo;
  70. private Queue<Message> afterMessages;
  71. TransactionMQProducer producer;
  72. public MQMsgTransactionData(UserInfoEntity userInfo, Queue<Message> afterMessages, TransactionMQProducer producer) {
  73. this.userInfo = userInfo;
  74. this.afterMessages = afterMessages;
  75. this.producer = producer;
  76. }
  77. }
  78. @Data
  79. public static class UserInfoEntity {
  80. private Long id;
  81. private String status;
  82. public UserInfoEntity(Long id, String status) {
  83. this.id = id;
  84. this.status = status;
  85. }
  86. }
  87. @Slf4j
  88. public static class MyTransactionListener implements TransactionListener {
  89. @Override
  90. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  91. MQMsgTransactionData data = (MQMsgTransactionData) arg;
  92. Queue<Message> messages = data.getAfterMessages();
  93. // 出队列
  94. Message nextMsg = messages.poll();
  95. TransactionMQProducer producer = data.getProducer();
  96. try {
  97. if (nextMsg != null) {
  98. TransactionSendResult sendResult = producer.sendMessageInTransaction(nextMsg, arg);
  99. return sendResult.getLocalTransactionState();
  100. } else {
  101. // nextMsg为空,代表当前msg已经是最后一个
  102. // 模拟数据库连接
  103. try (Connection conn = getConnection()) {
  104. // 业务db操作
  105. saveOrderToDB(conn, data.getUserInfo());
  106. return LocalTransactionState.COMMIT_MESSAGE;
  107. } catch (Exception e) {
  108. // 返回 ROLLBACK_MESSAGE 表示本地事务提交失败
  109. return LocalTransactionState.ROLLBACK_MESSAGE;
  110. }
  111. }
  112. } catch (Exception e) {
  113. // 这里需要返回 unknown :在 mq 发送 嵌套的过程中,担心 ack的结果不一致
  114. log.error("严重告警");
  115. return LocalTransactionState.UNKNOW;
  116. }
  117. }
  118. @Override
  119. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  120. // todo : 获取业务唯一id
  121. // todo : 查询数据判断业务db操作是否已完成
  122. // todo : 已完成则 ack true
  123. // todo : 未完成则 ack false
  124. byte[] body = msg.getBody();
  125. String userJson = new String(body, StandardCharsets.UTF_8);
  126. UserInfoEntity userInfoEntity = JsonUtils.parseJson2Object(userJson, UserInfoEntity.class);
  127. // 模拟数据库连接
  128. try (Connection conn = getConnection()) {
  129. // 在数据库中保存订单
  130. UserInfoEntity infoEntity = getUserInfoById(conn, userInfoEntity.getId());
  131. if (infoEntity != null && infoEntity.getStatus().equals("normal")) {
  132. return LocalTransactionState.COMMIT_MESSAGE;
  133. } else {
  134. return LocalTransactionState.ROLLBACK_MESSAGE;
  135. }
  136. } catch (Exception e) {
  137. log.error("严重告警");
  138. return LocalTransactionState.UNKNOW;
  139. }
  140. }
  141. }
  142. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/384578
推荐阅读
相关标签
  

闽ICP备14008679号