当前位置:   article > 正文

RocketMq(六)消息传输方式_rocketmq默认发送方式

rocketmq默认发送方式

在前面的基础上,

一、消息传输方式:在RocketMQ中,可以通过设置消费组的方式实现消息的广播和点对点传输。

1、默认方式:多个消费者轮询消费,若只有一个消费者则全部消费。通过下面的举例可以看到这其实就是点对点模式。

 (1) 生产者

  1. @RequestMapping("/sendUser")
  2. public void sendUser(@RequestBody UserDTO userDTO,int count){
  3. try{
  4. String userName = userDTO.getUserName();
  5. //同步发送多条消息
  6. for(int i=0;i<=count;i++){
  7. userDTO.setUserName(userName+i);
  8. Message msg = new Message(userTopic,userTag, JSON.toJSONString(userDTO).getBytes(StandardCharsets.UTF_8));
  9. msg.setKeys("key"+i);
  10. SendResult sendResult = defaultMQProducer.send(msg);
  11. System.out.println(userDTO.getUserName()+"发送结果:"+sendResult);
  12. }
  13. }

(2)消费者 :

  1. @Component
  2. public class DefaultMQConsumeListener {
  3. private static Logger logger = LoggerFactory.getLogger(DefaultMQConsumeListener.class);
  4. @Value("${mq.groupname}")
  5. private String groupName;
  6. @Value("${mq.nameserveraddress}")
  7. private String nameserveraddress;
  8. @Value("${mq.user.topic}")
  9. private String userTopic;
  10. @Value("${mq.school.topic}")
  11. private String schoolTopic;
  12. /**
  13. * 订阅用户、学校mq
  14. */
  15. @PostConstruct
  16. public void defaultMQProducer(){
  17. try{
  18. logger.info("mq producer 配置 start");
  19. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (groupName);
  20. consumer.setNamesrvAddr(nameserveraddress);
  21. // 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
  22. //1、订阅用户消息
  23. consumer.subscribe(userTopic,"*");
  24. //consumer.registerMessageListener(new UserListener());
  25. //2、订阅学校消息
  26. consumer.subscribe(schoolTopic,"*");
  27. //consumer.registerMessageListener(new SchoolListener());
  28. consumer.registerMessageListener(new UserAndSchoolListener());
  29. //设置消费最大批量消息条数为2
  30. consumer.setConsumeMessageBatchMaxSize(2);
  31. consumer.start();
  32. logger.info("mq producer 配置 end");
  33. }
  34. catch (Exception e){
  35. logger.error("mq consume启动失败,errorMsg={}",e.getMessage(),e);
  36. }
  37. }
  38. }
  1. public class UserAndSchoolListener implements MessageListenerConcurrently {
  2. private static Logger logger = LoggerFactory.getLogger(UserAndSchoolListener.class);
  3. @Override
  4. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  5. try{
  6. logger.info("{}消息条数:{} ", Thread.currentThread().getName(), list.size());
  7. for(MessageExt message : list){
  8. String body = new String(message.getBody(), "UTF-8");
  9. System.out.println("消息:"+body);
  10. }
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  12. }catch (Exception e){
  13. logger.error("接收消息异常{}",e.getMessage(),e);
  14. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  15. }
  16. }
  17. }

 (3)消费者1:配置同(2),打印处加了个区分

  1. logger.info("{}消息1条数:{} ", Thread.currentThread().getName(), list.size());
  2. for(MessageExt message : list){
  3. String body = new String(message.getBody(), "UTF-8");
  4. System.out.println("消费者-1 :"+body);
  5. }

访问http://localhost:8888/mqProviderTest/sendMessage/sendUser?count=6

生产者发送成功

  1. 张三0发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F0D0000, offsetMsgId=AC1F070900002A9F00000000000709C8, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=81]
  2. 张三1发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F1F0001, offsetMsgId=AC1F070900002A9F0000000000070ACC, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=0], queueOffset=82]
  3. 张三2发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F260002, offsetMsgId=AC1F070900002A9F0000000000070BD0, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=82]
  4. 张三3发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F2D0003, offsetMsgId=AC1F070900002A9F0000000000070CD4, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=0], queueOffset=83]
  5. 张三4发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F370004, offsetMsgId=AC1F070900002A9F0000000000070DD8, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=83]
  6. 张三5发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F3F0005, offsetMsgId=AC1F070900002A9F0000000000070EDC, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=0], queueOffset=84]
  7. 张三6发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F4A0006, offsetMsgId=AC1F070900002A9F0000000000070FE0, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=84]

 消费者消费

  1. 2023-10-16 15:39:15.633 INFO 22376 --- [-mq-groupname_1] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_1消息条数:1
  2. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三1"}
  3. 2023-10-16 15:39:15.639 INFO 22376 --- [-mq-groupname_2] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_2消息条数:1
  4. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三3"}
  5. 2023-10-16 15:39:15.658 INFO 22376 --- [-mq-groupname_3] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_3消息条数:1
  6. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三5"}

消费者1消费

  1. 2023-10-16 15:39:15.623 INFO 7496 --- [-mq-groupname_1] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_1消息1条数:1
  2. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三0"}
  3. 2023-10-16 15:39:15.630 INFO 7496 --- [-mq-groupname_2] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_2消息1条数:1
  4. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三2"}
  5. 2023-10-16 15:39:15.647 INFO 7496 --- [-mq-groupname_3] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_3消息1条数:1
  6. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三4"}
  7. 2023-10-16 15:39:15.664 INFO 7496 --- [-mq-groupname_4] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_4消息1条数:1
  8. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三6"}

多次访问,可以看到两个消费者在轮询消费

2、消息的广播:广播模式下,一个消息可以被多个消费者消费,每个消费者都能够接收到该消息的一个副本。实现消息的广播可以通过设置消费组的方式,将消费组的模式设置为广播模式。

consumer.setMessageModel(MessageModel.BROADCASTING);

在两个消费者的配置中都加上这句

生产者发送成功后,两个消费者打印如下

  1. 2023-10-16 16:04:01.886 INFO 8260 --- [-mq-groupname_7] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_7消息条数:1
  2. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三0"}
  3. 2023-10-16 16:04:01.893 INFO 8260 --- [-mq-groupname_8] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_8消息条数:1
  4. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三1"}
  5. 2023-10-16 16:04:01.905 INFO 8260 --- [-mq-groupname_9] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_9消息条数:1
  6. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三2"}
  7. 2023-10-16 16:04:01.923 INFO 8260 --- [mq-groupname_10] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_10消息条数:1
  8. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三3"}
  9. 2023-10-16 16:04:01.944 INFO 8260 --- [mq-groupname_11] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_11消息条数:1
  10. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三4"}
  11. 2023-10-16 16:04:01.946 INFO 8260 --- [mq-groupname_12] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_12消息条数:1
  12. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三5"}
  13. 2023-10-16 16:04:01.955 INFO 8260 --- [mq-groupname_13] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_13消息条数:1
  14. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三6"}
  1. 2023-10-16 16:04:01.886 INFO 23564 --- [-mq-groupname_7] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_7消息1条数:1
  2. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三0"}
  3. 2023-10-16 16:04:01.893 INFO 23564 --- [-mq-groupname_8] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_8消息1条数:1
  4. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三1"}
  5. 2023-10-16 16:04:01.904 INFO 23564 --- [-mq-groupname_9] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_9消息1条数:1
  6. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三2"}
  7. 2023-10-16 16:04:01.922 INFO 23564 --- [mq-groupname_10] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_10消息1条数:1
  8. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三3"}
  9. 2023-10-16 16:04:01.944 INFO 23564 --- [mq-groupname_12] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_12消息1条数:1
  10. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三5"}
  11. 2023-10-16 16:04:01.944 INFO 23564 --- [mq-groupname_11] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_11消息1条数:1
  12. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三4"}
  13. 2023-10-16 16:04:01.951 INFO 23564 --- [mq-groupname_13] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_13消息1条数:1
  14. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三6"}

3、点对点传输:点对点传输模式下,一个消息只能被一个消费者消费,该消费者消费该消息后,其他消费者将无法再次接收到该消息的副本。实现消息的点对点传输可以通过设置消费组的方式,将消费组的模式设置为集群模式。这里将两个消费者设置如下

consumer.setMessageModel(MessageModel.CLUSTERING);

效果同(1)

4、两个消费者一个设置为点对点,一个设置为广播。如我设置消费者-1为广播式

发送者发送成功后,设置为广播式的全部消费,设置为点对点的轮询消费。

  1. 2023-10-16 16:11:45.793 INFO 10852 --- [mq-groupname_14] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_14消息条数:1
  2. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三0"}
  3. 2023-10-16 16:11:45.806 INFO 10852 --- [mq-groupname_15] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_15消息条数:1
  4. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三2"}
  5. 2023-10-16 16:11:45.821 INFO 10852 --- [mq-groupname_16] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_16消息条数:1
  6. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三4"}
  7. 2023-10-16 16:11:45.835 INFO 10852 --- [mq-groupname_17] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_17消息条数:1
  8. 消息:{"age":1,"userAccount":"zhangsan","userName":"张三6"}
  1. 2023-10-16 16:11:45.793 INFO 3812 --- [mq-groupname_16] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_16消息1条数:1
  2. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三0"}
  3. 2023-10-16 16:11:45.801 INFO 3812 --- [mq-groupname_17] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_17消息1条数:1
  4. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三1"}
  5. 2023-10-16 16:11:45.806 INFO 3812 --- [mq-groupname_18] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_18消息1条数:1
  6. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三2"}
  7. 2023-10-16 16:11:45.813 INFO 3812 --- [mq-groupname_19] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_19消息1条数:1
  8. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三3"}
  9. 2023-10-16 16:11:45.821 INFO 3812 --- [mq-groupname_20] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_20消息1条数:1
  10. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三4"}
  11. 2023-10-16 16:11:45.827 INFO 3812 --- [-mq-groupname_3] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_3消息1条数:1
  12. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三5"}
  13. 2023-10-16 16:11:45.833 INFO 3812 --- [-mq-groupname_6] com.demo.listener.UserAndSchoolListener : ConsumeMessageThread_test-mq-groupname_6消息1条数:1
  14. 消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三6"}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/871543
推荐阅读
相关标签
  

闽ICP备14008679号