当前位置:   article > 正文

分布式消息队列RocketMQ原生API_rocketmq admin api

rocketmq admin api

消费模式:

  • 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
  • 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

         集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力。

         广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力。

一、普通消息

1 消息发送分类

        Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。

同步发送消息

        同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式 的消息可靠性最高,但消息发送效率太低。

异步发送消息 

        异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息 可靠性可以得到保障,消息发送效率也可以。

 单向发送消息

        单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返 回ACK。该方式的消息发送效率最高,但消息可靠性较差。

 2.案例

pom

  1. <!-- rocketmq -->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.4.0</version>
  6. </dependency>

1.定义同步消息发送生产者

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.common.message.Message;
  4. /**
  5. * Producer端发送同步消息
  6. * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  7. */
  8. public class SyncProducer {
  9. public static void main(String[] args) throws Exception {
  10. // 创建一个producer,参数为Producer Group名称
  11. DefaultMQProducer producer = new DefaultMQProducer("pg");
  12. // 指定nameServer地址
  13. producer.setNamesrvAddr("127.0.0.1:9876");
  14. // 设置当发送失败时重试发送的次数,默认为2次
  15. producer.setRetryTimesWhenSendFailed(3);
  16. // 设置发送超时时限为5s,默认3s
  17. producer.setSendMsgTimeout(5000);
  18. // 开启生产者
  19. producer.start();
  20. // 生产并发送100条消息
  21. for (int i = 0; i < 100; i++) {
  22. byte[] body = ("Hi," + i).getBytes();
  23. Message msg = new Message("sync-someTopic", "someTag", body);
  24. // 为消息指定key
  25. msg.setKeys("key-" + i);
  26. // 同步发送消息
  27. SendResult sendResult = producer.send(msg);
  28. System.out.println(sendResult);
  29. }
  30. // 关闭producer
  31. producer.shutdown();
  32. }
  33. }
  1. // 消息发送的状态
  2. public enum SendStatus {
  3. SEND_OK, // 发送成功
  4. FLUSH_DISK_TIMEOUT, // 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出
  5. 现这种异常状态。异步刷盘不会出现
  6. FLUSH_SLAVE_TIMEOUT, // Slave同步超时。当Broker集群设置的Master-Slave的复
  7. 制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
  8. SLAVE_NOT_AVAILABLE, // 没有可用的Slave。当Broker集群设置为Master-Slave的
  9. 复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
  10. }

2.定义异步消息发送生产者

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendCallback;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.CountDownLatch2;
  5. import org.apache.rocketmq.common.message.Message;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * Producer端 异步发送
  9. * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
  10. */
  11. public class AsyncProducer {
  12. public static void main(String[] args) throws Exception {
  13. DefaultMQProducer producer = new DefaultMQProducer("pg");
  14. producer.setNamesrvAddr("127.0.0.1:9876");
  15. // 指定异步发送失败后不进行重试发送
  16. producer.setRetryTimesWhenSendAsyncFailed(0);
  17. // 指定新创建的Topic的Queue数量为2,默认为4
  18. producer.setDefaultTopicQueueNums(2);
  19. producer.start();
  20. int messageCount = 100;
  21. // 根据消息数量实例化倒计时计算器
  22. final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
  23. for (int i = 0; i < messageCount; i++) {
  24. byte[] body = ("Hi," + i).getBytes();
  25. try {
  26. Message msg = new Message("async-someTopic", "myTag", body);
  27. // 异步发送。指定回调
  28. producer.send(msg, new SendCallback() {
  29. // 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
  30. @Override
  31. public void onSuccess(SendResult sendResult) {
  32. countDownLatch.countDown();
  33. System.out.println(sendResult);
  34. }
  35. @Override
  36. public void onException(Throwable e) {
  37. countDownLatch.countDown();
  38. e.printStackTrace();
  39. }
  40. });
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. } // end-for
  45. // sleep一会儿
  46. // 由于采用的是异步发送,所以若这里不sleep,
  47. // 则消息还未发送就会将producer给关闭,报错
  48. // 等待5s
  49. countDownLatch.await(5, TimeUnit.SECONDS);
  50. // 如果不再发送消息,关闭Producer实例。
  51. producer.shutdown();
  52. }
  53. }

3.定义单向消息发送生产者

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.common.message.Message;
  3. /**
  4. * 单向发送消息
  5. * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
  6. * 通常消息的发送是这样一个过程:
  7. *
  8. * 客户端发送请求到服务器
  9. * 服务器处理请求
  10. * 服务器向客户端返回应答
  11. * 所以,一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,
  12. * oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级。
  13. */
  14. public class OnewayProducer {
  15. public static void main(String[] args) throws Exception{
  16. DefaultMQProducer producer = new DefaultMQProducer("pg");
  17. producer.setNamesrvAddr("127.0.0.1:9876");
  18. producer.start();
  19. for (int i = 0; i < 10; i++) {
  20. byte[] body = ("Hi," + i).getBytes();
  21. Message msg = new Message("oneway-someTopic", "someTag", body);
  22. // 单向发送
  23. producer.sendOneway(msg);
  24. }
  25. producer.shutdown();
  26. System.out.println("producer shutdown");
  27. }
  28. }

4.定义消息消费者

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. /**
  10. * 消费消息
  11. */
  12. public class SomeConsumer {
  13. public static void main(String[] args) throws MQClientException {
  14. // 定义一个pull消费者
  15. // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
  16. // 定义一个push消费者
  17. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncOrAsyncCustomerGroup");
  18. // 指定nameServer
  19. consumer.setNamesrvAddr("127.0.0.1:9876");
  20. // 指定从第一条消息开始消费
  21. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  22. // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
  23. consumer.subscribe("sync-someTopic", "*");
  24. consumer.subscribe("async-someTopic", "*");
  25. consumer.subscribe("oneway-someTopic", "*");
  26. // 指定采用“广播模式”进行消费,默认为“集群模式”
  27. // consumer.setMessageModel(MessageModel.BROADCASTING);
  28. // 注册消息监听器
  29. consumer.registerMessageListener(new MessageListenerConcurrently() {
  30. // 一旦broker中有了其订阅的消息就会触发该方法的执行,
  31. // 其返回值为当前consumer消费的状态
  32. @Override
  33. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  34. ConsumeConcurrentlyContext context) {
  35. // 逐条消费消息
  36. for (MessageExt msg : msgs) {
  37. System.out.println(msg);
  38. }
  39. // 返回消费状态:消费成功
  40. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  41. }
  42. });
  43. // 开启消费者消费
  44. consumer.start();
  45. System.out.println("Consumer Started");
  46. }
  47. }

二、顺序消息

1 什么是顺序消息

        顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。 默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从 多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个 Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

2 为什么需要顺序消息

        例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于 描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:

        订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单 T0000001:发货失败

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

         这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方 式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

 基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序 性),能够保证消费的顺序性

3 有序性分类

根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序

全局有序

当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。 

在创建Topic时指定Queue的数量。有三种指定方式:

1)在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量

2)在RocketMQ可视化控制台中手动创建Topic时指定Queue数量

3)使用mqadmin命令手动创建Topic时指定Queue数量

分区有序

 如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序

如何实现Queue的选择?

        在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们 自己实现了MessageQueueSelector接口定义的。

在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它 数据。但无论谁做选择key,都不能重复,都是唯一的。

一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果 即为选择出的Queue的QueueId

取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的 消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问 题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需 要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被 Consumer获取到。此时使用消息key作为选择key是比较好的做法。

以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费 该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的 不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。

案例

全局有序:

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.common.message.Message;
  4. /**
  5. * 全局有序
  6. */
  7. public class GlobalOrderedProducer {
  8. public static void main(String[] args) throws Exception {
  9. DefaultMQProducer producer = new DefaultMQProducer("pg");
  10. producer.setNamesrvAddr("127.0.0.1:9876");
  11. // 若为全局有序,则需要设置Queue数量为1 默认四个队列
  12. //发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
  13. //默认4个队列
  14. producer.setDefaultTopicQueueNums(1);
  15. producer.start();
  16. for (int i = 0; i < 100; i++) {
  17. byte[] body = ("Hi," + i).getBytes();
  18. Message msg = new Message("TopicA", "TagA", body);
  19. SendResult sendResult = producer.send(msg);
  20. System.out.println(sendResult);
  21. }
  22. producer.shutdown();
  23. }
  24. }

分区有序:

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.MessageQueueSelector;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.common.message.MessageQueue;
  6. import java.util.List;
  7. /**
  8. * 分区有序
  9. */
  10. public class GlobalOrderedProducer {
  11. public static void main(String[] args) throws Exception {
  12. DefaultMQProducer producer = new DefaultMQProducer("pg");
  13. producer.setNamesrvAddr("127.0.0.1:9876");
  14. producer.start();
  15. for (int i = 0; i < 100; i++) {
  16. Integer orderId = i;
  17. byte[] body = ("Hi," + i).getBytes();
  18. Message msg = new Message("TopicA", "TagA", body);
  19. SendResult sendResult = producer.send(msg, new
  20. MessageQueueSelector() {
  21. @Override
  22. public MessageQueue select(List<MessageQueue> mqs,
  23. Message msg, Object arg) {
  24. // 分区有序
  25. Integer id = (Integer) arg;
  26. int index = id % mqs.size();
  27. return mqs.get(index);
  28. }
  29. }, orderId);
  30. System.out.println(sendResult);
  31. }
  32. producer.shutdown();
  33. }
  34. }

三、延时消息

1 什么是延时消息

        当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

        采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。

        在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系 统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完 成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。

        在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台 业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如 果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。

2 延时等级

        延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在 RocketMQ服务端的MessageStoreConfig类中的如下变量中:

org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

        当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1 天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。

3 延时消息实现原理

具体实现方案是: 

修改消息

         Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相 应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程:

  • 修改消息的Topic为SCHEDULE_TOPIC_XXXX
  • 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId 目录与consumequeue文件(如果没有这些目录与文件的话)。

延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1 

需要注意,在创建queueId目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕, 而是用到哪个延迟等级创建哪个目录

  • 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的 Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到 commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息 被发送到Broker时的时间戳。 
  • 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的?

是按照消息投递时间排序的。一个Broker中同一等级的所有延时消息会被写入到consumequeue 目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。

投递延时消息

        Broker内部有⼀个延迟消息服务类ScheuleMessageService其会取消SCHEDULE_TOPIC_XXXX中的消 息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消 息。然后再次将消息投递到目标Topic中

        ScheuleMessageService在Broker启动时,会创建并启动一个定时器TImer,用于执行相应的定时 任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟 等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。

若第 一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);

若第一条消 息到期了,则将该消息投递到目标Topic,即消费该消息。

将消息重新写入commitlog

        延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。

这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类 ScheuleMessageService。

案例

定义DelayProducer类

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.common.message.Message;
  4. import java.text.SimpleDateFormat;
  5. import java.util.Date;
  6. /**
  7. * 延迟MQ
  8. * 延时消息的使用场景
  9. * 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
  10. *
  11. * 延时消息的使用限制
  12. * // org/apache/rocketmq/store/config/MessageStoreConfig.java
  13. * private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  14. * 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,
  15. * 消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
  16. *
  17. */
  18. public class DelayProducer {
  19. public static void main(String[] args) throws Exception {
  20. DefaultMQProducer producer = new DefaultMQProducer("delayProductGroup");
  21. producer.setNamesrvAddr("127.0.0.1:9876");
  22. producer.start();
  23. for (int i = 0; i < 1; i++) {
  24. byte[] body = ("Hi," + i).getBytes();
  25. Message msg = new Message("TopicB", "someTag", body);
  26. // 指定消息延迟等级为3级,即延迟10s
  27. // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
  28. msg.setDelayTimeLevel(3);
  29. SendResult sendResult = producer.send(msg);
  30. // 输出消息被发送的时间
  31. System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
  32. System.out.println(" ," + sendResult);
  33. }
  34. producer.shutdown();
  35. }
  36. }

定义OtherConsumer类

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.text.SimpleDateFormat;
  9. import java.util.Date;
  10. import java.util.List;
  11. public class OtherConsumer {
  12. public static void main(String[] args) throws MQClientException {
  13. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayCustomerGroup");
  14. consumer.setNamesrvAddr("127.0.0.1:9876");
  15. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  16. consumer.subscribe("TopicB", "*");
  17. consumer.registerMessageListener(new MessageListenerConcurrently() {
  18. @Override
  19. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  20. ConsumeConcurrentlyContext context) {
  21. for (MessageExt msg : msgs) {
  22. // 输出消息被消费的时间
  23. System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
  24. System.out.println(" ," + msg);
  25. }
  26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  27. }
  28. });
  29. consumer.start();
  30. System.out.println("Consumer Started");
  31. }
  32. }

四、事务消息

五、批量消息

1 批量发送消息

发送限制

        生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:

  • 批量发送的消息必须具有相同的Topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息

批量发送大小

默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
  • 方案二:在Producer端与Broker端修改属性

** Producer端需要在发送之前设置Producer的maxMessageSize属性

** Broker端需要修改其加载的配置文件中的maxMessageSize属性

生产者发送的消息大小

        生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这 个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志 (占20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、 要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。

2 批量消费消息

修改批量属性

  1. // 指定每次可以消费10条消息,默认为1
  2. consumer.setConsumeMessageBatchMaxSize(10);
  3. // 指定每次可以从Broker拉取40条消息,默认为32
  4. consumer.setPullBatchSize(40);
  5. consumer.registerMessageListener(new MessageListenerConcurrently() {
  6. @Override
  7. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  8. for (MessageExt msg : msgs) {
  9. System.out.println("rocketMq 消费的数据信息----------"+msg);
  10. }
  11. // 消费成功的返回结果
  12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  13. // 消费异常时的返回结果
  14. // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  15. }
  16. });

        Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列 表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改 Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过32。因为默认情况下消 费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定。

存在的问题

        Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好?当然不 是。

  • pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现 问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉 取。
  • consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消 费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一 个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费 处理。

3.案例

该批量发送的需求是,不修改最大发送4M的默认值,但要防止发送的批量消息超出4M的限制。

定义消息列表分割器

  1. import org.apache.rocketmq.common.message.Message;
  2. import java.util.Iterator;
  3. import java.util.List;
  4. import java.util.Map;
  5. // 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
  6. // 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
  7. // 其直接将这条消息构成一个子列表返回。并没有再进行分割
  8. public class MessageListSplitter implements Iterator<List<Message>> {
  9. // 指定极限值为4M
  10. private final int SIZE_LIMIT = 4 *1024 * 1024;
  11. // 存放所有要发送的消息
  12. private final List<Message> messages;
  13. // 要进行批量发送消息的小集合起始索引
  14. private int currIndex;
  15. public MessageListSplitter(List<Message> messages) {
  16. this.messages = messages;
  17. }
  18. @Override
  19. public boolean hasNext() {
  20. // 判断当前开始遍历的消息索引要小于消息总数
  21. return currIndex < messages.size();
  22. }
  23. @Override
  24. public List<Message> next() {
  25. int nextIndex = currIndex;
  26. // 记录当前要发送的这一小批次消息列表的大小
  27. int totalSize = 0;
  28. for (; nextIndex < messages.size(); nextIndex++) {
  29. // 获取当前遍历的消息
  30. Message message = messages.get(nextIndex);
  31. // 统计当前遍历的message的大小
  32. int tmpSize = message.getTopic().length() + message.getBody().length;
  33. Map<String, String> properties = message.getProperties();
  34. for (Map.Entry<String, String> entry : properties.entrySet()) {
  35. tmpSize += entry.getKey().length() + entry.getValue().length();
  36. }
  37. tmpSize = tmpSize + 20;
  38. // 判断当前消息本身是否大于4M
  39. if (tmpSize > SIZE_LIMIT) {
  40. if (nextIndex - currIndex == 0) {
  41. nextIndex++;
  42. }
  43. break;
  44. }
  45. if (tmpSize + totalSize > SIZE_LIMIT) {
  46. break;
  47. } else {
  48. totalSize += tmpSize;
  49. }
  50. } // end-for
  51. // 获取当前messages列表的子集合[currIndex, nextIndex)
  52. List<Message> subList = messages.subList(currIndex, nextIndex);
  53. // 下次遍历的开始索引
  54. currIndex = nextIndex;
  55. return subList;
  56. }
  57. }

定义批量消息生产者

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.common.message.Message;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. //批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,
  6. // 相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
  7. public class BatchProducer {
  8. public static void main(String[] args) throws Exception {
  9. //控制台看不到生产者 因为 生产者运行完 就自动删除了
  10. DefaultMQProducer producer = new DefaultMQProducer("pg");
  11. producer.setNamesrvAddr("127.0.0.1:9876");
  12. // 指定要发送的消息的最大大小,默认是4M
  13. //不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的 maxMessageSize属性
  14. // producer.setMaxMessageSize(8 * 1024 * 1024);
  15. producer.start();
  16. // 定义要发送的消息集合
  17. List<Message> messages = new ArrayList<>();
  18. for (int i = 0; i < 100; i++) {
  19. byte[] body = ("Hi," + i).getBytes();
  20. Message msg = new Message("TopicD", "someTag", body);
  21. messages.add(msg);
  22. }
  23. // 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
  24. MessageListSplitter splitter = new MessageListSplitter(messages);
  25. while (splitter.hasNext()) {
  26. try {
  27. List<Message> listItem = splitter.next();
  28. producer.send(listItem);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. producer.shutdown();
  34. }
  35. }

定义批量消息消费者

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. public class BatchConsumer {
  10. public static void main(String[] args) throws MQClientException {
  11. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
  12. consumer.setNamesrvAddr("127.0.0.1:9876");
  13. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  14. consumer.subscribe("TopicD", "*");
  15. // 指定每次可以消费10条消息,默认为1
  16. consumer.setConsumeMessageBatchMaxSize(10);
  17. // 指定每次可以从Broker拉取40条消息,默认为32
  18. consumer.setPullBatchSize(40);
  19. consumer.registerMessageListener(new MessageListenerConcurrently() {
  20. @Override
  21. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  22. for (MessageExt msg : msgs) {
  23. System.out.println("rocketMq 消费的数据信息----------"+msg);
  24. }
  25. // 消费成功的返回结果
  26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  27. // 消费异常时的返回结果
  28. // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  29. }
  30. });
  31. consumer.start();
  32. System.out.println("Consumer Started");
  33. }
  34. }

六、消息过滤

        消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定 条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。

对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤

1 Tag过滤

        通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

2 SQL过滤

        SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤, 可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符。

支持的常量类型:

  • 数值:比如:123,3.1415
  • 字符:必须用单引号包裹起来,比如:'abc'
  • 布尔:TRUE 或 FALSE
  • NULL:特殊的常量,表示空

支持的运算符有:

  • 数值比较:>,>=,<,<=,BETWEEN,=
  • 字符比较:=,<>,IN
  • 逻辑运算 :AND,OR,NOT
  • NULL判断:IS NULL 或者 IS NOT NULL

        默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

enablePropertyFilter = true

        在启动Broker时需要指定这个修改过的配置文件。例如对于单机Broker的启动,其修改的配置文件是 conf/broker.conf,启动时使用如下命令:

sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

3 代码举例

定义Tag过滤Producer

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.common.message.Message;
  4. public class FilterBySQLProducer {
  5. public static void main(String[] args) throws Exception {
  6. DefaultMQProducer producer = new DefaultMQProducer("sqlFilterCustomerGroup");
  7. producer.setNamesrvAddr("127.0.0.1:9876");
  8. producer.start();
  9. for (int i = 0; i < 10; i++) {
  10. try {
  11. byte[] body = ("Hi," + i).getBytes();
  12. Message msg = new Message("TopicE", "myTag", body);
  13. // 事先埋入用户属性age
  14. msg.putUserProperty("age", i + "");
  15. SendResult sendResult = producer.send(msg);
  16. System.out.println(sendResult);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. producer.shutdown();
  22. }
  23. }

定义Tag过滤Consumer

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import java.util.List;
  8. /**
  9. * tag过滤
  10. */
  11. public class FilterByTagConsumer {
  12. public static void main(String[] args) throws Exception {
  13. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterTagConsumerGroup");
  14. consumer.setNamesrvAddr("127.0.0.1:9876");
  15. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  16. // 仅订阅Tag为myTagA与myTagB的消息,不包含myTagC
  17. consumer.subscribe("TopicC", "myTagA || myTagB");
  18. consumer.registerMessageListener(new MessageListenerConcurrently() {
  19. @Override
  20. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  21. ConsumeConcurrentlyContext context) {
  22. for (MessageExt me:msgs){
  23. System.out.println(me);
  24. }
  25. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  26. }
  27. });
  28. consumer.start();
  29. System.out.println("Consumer Started");
  30. }
  31. }

定义SQL过滤Producer 

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.common.message.Message;
  4. public class FilterBySQLProducer {
  5. public static void main(String[] args) throws Exception {
  6. DefaultMQProducer producer = new DefaultMQProducer("sqlFilterCustomerGroup");
  7. producer.setNamesrvAddr("127.0.0.1:9876");
  8. producer.start();
  9. for (int i = 0; i < 10; i++) {
  10. try {
  11. byte[] body = ("Hi," + i).getBytes();
  12. Message msg = new Message("TopicE", "myTag", body);
  13. // 事先埋入用户属性age
  14. msg.putUserProperty("age", i + "");
  15. SendResult sendResult = producer.send(msg);
  16. System.out.println(sendResult);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. producer.shutdown();
  22. }
  23. }

定义SQL过滤Consumer

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.MessageSelector;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. public class FilterBySQLConsumer {
  10. public static void main(String[] args) throws Exception {
  11. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterProductGroup");
  12. consumer.setNamesrvAddr("127.0.0.1:9876");
  13. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  14. // 要从TopicE的消息中过滤出age在[0, 6]间的消息
  15. consumer.subscribe("TopicE", MessageSelector.bySql("age between 0 and 6"));
  16. // consumer.subscribe("TopicE", MessageSelector.bySql("age > 5 "));
  17. consumer.registerMessageListener(new MessageListenerConcurrently() {
  18. @Override
  19. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  20. ConsumeConcurrentlyContext context) {
  21. for (MessageExt me : msgs) {
  22. System.out.println(me);
  23. }
  24. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  25. }
  26. });
  27. consumer.start();
  28. System.out.println("Consumer Started");
  29. }
  30. }

七、生产者消息发送重试机制

1 说明

        Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。

对于消息重投,需要注意以下几点:

  • 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式 发送失败是没有重试机制的
  • 只有普通消息具有发送重试机制,顺序消息是没有的
  • 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在 RocketMQ中是无法避免的问题
  • 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件
  • producer主动重发、consumer负载变化(发生Rebalance,不会导致消息重复,但可能出现重复 消费)也会导致重复消息
  • 消息重复无法避免,但要避免消息的重复消费。
  • 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息key),使消费者对消息进行消 费判断来避免重复消费
  • 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略

2 同步发送失败策略

        对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败,默认重试2次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue

// 创建一个producer,参数为Producer Group名称

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");

// 指定nameServer地址

producer.setNamesrvAddr("127.0.01:9876");

// 设置同步发送失败时重试发送的次数,默认为2次 producer.setRetryTimesWhenSendFailed(3);

// 设置发送超时时限为5s,默认3s

producer.setSendMsgTimeout(5000);

        同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标 Broker。其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。

思考:让我们自己实现失败隔离功能,如何来做?

        1)方案一:Producer中维护某JUC的Map集合,其key是发生失败的时间戳,value为Broker实例。Producer中还维护着一个Set集合,其中存放着所有未发生发送异常的Broker实例。选择目 标Broker是从该Set集合中选择的。再定义一个定时任务,定期从Map集合中将长期未发生发送 异常的Broker清理出去,并添加到Set集合.

        2)方案二:为Producer中的Broker实例添加一个标识,例如是一个AtomicBoolean属性。只要该 Broker上发生过发送异常,就将其置为true。选择目标Broker就是选择该属性值为false的 Broker。再定义一个定时任务,定期将Broker的该属性置为false。

        3)方案三:为Producer中的Broker实例添加一个标识,例如是一个AtomicLong属性。只要该Broker上发生过发送异常,就使其值增一。选择目标Broker就是选择该属性值最小的Broker。若该值相同,采用轮询方式选择。

        如果超过重试次数,则抛出异常,由Producer去保证消息不丢。当然当生产者出现 RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息

3 异步发送失败策略

        异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢

DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); producer.setNamesrvAddr("127.0.0.1:9876");

// 指定异步发送失败后不进行重试发送

producer.setRetryTimesWhenSendAsyncFailed(0);

4 消息刷盘失败策略

        消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broker 的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启

八、消费者消费重试机制

1 顺序消息的消费重试

        对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重 试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的情况.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); // 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10, 30000]

consumer.setSuspendCurrentQueueTimeMillis(100);

由于对顺序消息的重试是无休止的,不间断的,直至消费成功,所以,对于顺序消息的消费, 务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。

注意,顺序消息没有发送失败重试机制,但具有消费失败重试机制

2 无序消息的消费重试

        对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不 提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。

3 消费重试次数与间隔

        对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同 的,会逐渐变长。每次重试的间隔时间如下表。

 若一条消息在一直消费失败的前提下,将会在正常消费后的第4小时46分后进行第16次重试。 若仍然失败,则将消息投递到死信队列

修改消费重试次数

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("customerGroup");

// 修改消费重试次数

consumer.setMaxReconsumeTimes(10);

对于修改过的重试次数,将按照以下策略执行:

        若修改值小于16,则按照指定间隔进行重试

        若修改值大于16,则超过16次的重试时间间隔均为2小时

        对于Consumer Group,若仅修改了一个Consumer的消费重试次数,则会应用到该Group中所有 其它Consumer实例。若出现多个Consumer均做了修改的情况,则采用覆盖方式生效。即最后被 修改的值会覆盖前面设置的值。

4 重试队列

        对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而 是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。

        当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称 为%RETRY%consumerGroup@consumerGroup 的重试队列。

1)这个重试队列是针对消息才组的,而不是针对每个Topic设置的(一个Topic的消息可以让多 个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)

2)只有当出现需要进行重试消费的消息时,才会为该消费者组创建重试队列

注意,消费重试的时间间隔与延时消费延时等级十分相似,除了没有延时等级的前两个时间外,其它的时间都是相同的.

Broker对于重试消息的处理是通过延时消息实现的。先将消息保存到SCHEDULE_TOPIC_XXXX延迟队列中,延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup重试队列中。

5 消费重试配置方式

  1. consumer.registerMessageListener(new MessageListenerConcurrently() {
  2. @Override
  3. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  4. ConsumeConcurrentlyContext context) {
  5. try {
  6. } catch (Throwable e) {
  7. // 消费失败
  8. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  9. // return null;
  10. // throw new RuntimeException("消费异常");
  11. }
  12. // 返回消费状态:消费成功
  13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  14. }
  15. });

        集群消费方式下,消息消费失败后若希望消费重试,则需要在消息监听器接口的实现中明确进行如下三 种方式之一的配置:

  1.  方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
  2. 方式2:返回Null
  3. 方式3:抛出异常

 6 消费不重试配置方式

  1. consumer.registerMessageListener(new MessageListenerConcurrently() {
  2. @Override
  3. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  4. ConsumeConcurrentlyContext context) {
  5. try {
  6. } catch (Throwable e) {
  7. // 消费失败 不重试
  8. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  9. }
  10. // 返回消费状态:消费成功
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  12. }
  13. });

        集群消费方式下,消息消费失败后若不希望消费重试,则在捕获到异常后同样也返回与消费成功后的相 同的结果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,则不进行消费重试。

九、消息的幂等性

1.为什么会重复消费

        生产者发送消息的时候使用了重试机制,发送消息后由于网络原因没有收到MQ的响应信息,报了个超时异常,然后又去重新发送了一次消息。但其实MQ已经接到了消息,并返回了响应,只是因为网络原因超时了。导致一条消息就会被发送两次。

        在消费者处理了一条消息后会返回一个offset给MQ,证明这条消息被处理过了。但是,此时如果服务宕机了,MQ就没有接收到这条offset,那么服务重启后会再次消费这条消息。 

2.如何解决重复消费

  • 解决重复消费的关键就是引入幂等性机制,幂等性机制:确保数据库的结果的唯一性
  • 消费者端保证幂等性。

            这块其实就比较简单了,只要处理消息之前先根据业务判断一下本次操作是否已经执行过了,如果已经执行过了,那就不再执行了,这样就可以保证消费者的幂等性。
        举个例子,比如每条消息都会有一条唯一的消息ID,消费者接收到消息会存储消息日志,如果日志中存在相同ID的消息,就证明这条消息已经被处理过了。

案例:

生产者:

  1. public class SyncRetryProducer {
  2. public static void main(String[] args) throws Exception {
  3. // 创建一个producer,参数为Producer Group名称
  4. DefaultMQProducer producer = new DefaultMQProducer("pg");
  5. // 指定nameServer地址
  6. producer.setNamesrvAddr("127.0.0.1:9876");
  7. // 设置同步发送失败时重试发送的次数,默认为2次
  8. producer.setRetryTimesWhenSendFailed(3);
  9. // 设置发送超时时限为5s,默认3s
  10. producer.setSendMsgTimeout(5000);
  11. // 开启生产者
  12. producer.start();
  13. // 生产并发送100条消息
  14. for (int i = 0; i < 100; i++) {
  15. byte[] body = ("Hi," + i).getBytes();
  16. Message msg = new Message("someTopic", "someTag", body);
  17. // 为消息指定key setKey,做唯一标识
  18. msg.setKeys("key-" + i);
  19. // 同步发送消息
  20. SendResult sendResult = producer.send(msg);
  21. System.out.println(sendResult);
  22. }
  23. // 关闭producer
  24. producer.shutdown();
  25. }
  26. }

 消费者:

  1. public class SyncRetryConsumer {
  2. //保存标识的集合
  3. static private Map<String, String> logMap = new HashMap<>();
  4. public static void main(String[] args) throws MQClientException {
  5. // 定义一个pull消费者
  6. // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
  7. // 定义一个push消费者
  8. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
  9. // 指定nameServer
  10. consumer.setNamesrvAddr("127.0.0.1:9876");
  11. // 指定从第一条消息开始消费
  12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  13. // 指定消费topic与tag
  14. consumer.subscribe("someTopic", "*");
  15. // 指定采用“广播模式”进行消费,默认为“集群模式”
  16. // consumer.setMessageModel(MessageModel.BROADCASTING);
  17. // 顺序消息消费失败的消费重试时间间隔,默认为1000毫秒,其取值范围为[10, 30000]毫秒
  18. consumer.setSuspendCurrentQueueTimeMillis(100);
  19. // 修改消费重试次数
  20. consumer.setMaxReconsumeTimes(20);
  21. // 注册消息监听器
  22. consumer.registerMessageListener(new MessageListenerConcurrently() {
  23. // 一旦broker中有了其订阅的消息就会触发该方法的执行,
  24. // 其返回值为当前consumer消费的状态
  25. @Override
  26. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  27. ConsumeConcurrentlyContext context) {
  28. String key = null;
  29. String msgId = null;
  30. try {
  31. for (MessageExt msg : msgs) {
  32. key = msg.getKeys();
  33. //判断集合当中有没有存在key,存在就不需要重试,不存在先存key再回来重试后消费消息
  34. if (logMap.containsKey(key)) {
  35. // 无需继续重试。
  36. System.out.println("key:"+key+",无需重试...");
  37. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  38. }
  39. msgId = msg.getMsgId();
  40. System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
  41. //模拟异常
  42. int i = 1 / 0;
  43. }
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. //重试
  47. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  48. } finally {
  49. //保存key
  50. logMap.put(key, msgId);
  51. }
  52. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  53. }
  54. });
  55. // 开启消费者消费
  56. consumer.start();
  57. System.out.println("Consumer Started");
  58. }
  59. }

十、死信队列

1 什么是死信队列 

        当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则 表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息 则称为死信消息(Dead-Letter Message,DLM)。

死信队列是用于处理无法被正常消费的消息的。

2 死信队列的特征

 

死信队列具有如下特征:

  • 死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的
  • 死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间),3 天后会被自动删除
  • 死信队列就是一个特殊的Topic,名称为%DLQ%消费组名称,即每个消费者组都有一个死信队列
  • 如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列

死信队列中的数据需要通过新订阅该topic进行消费

每个topic被消费后,如果消费失败超过次数会进入重试队列、死信队列等。名称会以

  • %RETRY%消费组名称
  • %DLQ%消费组名称

3 死信消息的处理

        实际上,当⼀条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消 费该消息,比如代码中原本就存在Bug。因此,对于死信消息,通常需要开发人员进行特殊处理。最关键的步骤是要排查可疑因素,解决代码中可能存在的Bug,然后再将原来的死信消息再次进行投递消费。

解决方案:

        死信队列中的消息可以后台开一个线程,订阅%DLQ%消费组名称,并不停重试。

案例:

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.client.producer.SendStatus;
  4. import org.apache.rocketmq.common.message.Message;
  5. /**
  6. * 验证私信队列 生产者
  7. */
  8. public class DeadProducer {
  9. public static void main(String[] args) throws Exception{
  10. // 实例化生产者,并指定生产组名称
  11. DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_topic_name_dle_01");
  12. //设置实例名称,一个jvm中有多个生产者可以根据实例名区分
  13. //默认default
  14. producer.setInstanceName("topic_name_dle");
  15. // 指定nameserver的地址
  16. producer.setNamesrvAddr("192.168.16.79:9876");
  17. //设置同步重试次数
  18. producer.setRetryTimesWhenSendFailed(2);
  19. //设置异步发送次数
  20. //producer.setRetryTimesWhenSendAsyncFailed(2);
  21. // 初始化生产者
  22. producer.start();
  23. for (int i = 0; i <10 ; i++) {
  24. Message message = new Message("topic_name_dle", ("key=" + i).getBytes("utf-8"));
  25. // 1 同步发送 如果发送失败会根据重试次数重试
  26. SendResult send = producer.send(message);
  27. SendStatus sendStatus = send.getSendStatus();
  28. System.out.println(sendStatus.toString());
  29. }
  30. }
  31. }
  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.common.message.MessageExt;
  6. import java.io.UnsupportedEncodingException;
  7. import java.util.List;
  8. /**
  9. * 验证私信队列 消费者
  10. */
  11. public class DeadCustomer {
  12. public static void main(String[] args) throws Exception {
  13. /**
  14. * 推消息消费
  15. */
  16. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer_group_topic_name_dle_01");
  17. // 指定nameserver的地址
  18. defaultMQPushConsumer.setNamesrvAddr("192.168.16.79:9876");
  19. //指定主题
  20. defaultMQPushConsumer.subscribe("topic_name_dle", "*");
  21. /**
  22. * 推送消息 提高消费处理能力
  23. * 1 提高消费并行度
  24. * 2 以批量方式进行 消费
  25. * 3 检测延时情况,跳过非重要消息
  26. */
  27. //消费限流 只针对推送来设置,拉取消息自己控制
  28. // 1 提高消费并行度
  29. defaultMQPushConsumer.setConsumeThreadMax(10);
  30. defaultMQPushConsumer.setConsumeThreadMin(1);
  31. // 2 以批量方式进行 消费
  32. // 设置消息批处理的一个批次中消息的最大个数
  33. defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
  34. //设置重试次数 默认16次
  35. defaultMQPushConsumer.setMaxReconsumeTimes(1);
  36. // 添加消息监听器,一旦有消息推送过来,就进行消费
  37. defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
  38. @Override
  39. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  40. //final MessageQueue messageQueue = context.getMessageQueue();
  41. for (MessageExt msg : msgs) {
  42. System.out.println(msg);
  43. try {
  44. System.out.println(new String(msg.getBody(), "utf-8"));
  45. } catch (UnsupportedEncodingException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. // 消息消费成功
  50. //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  51. //null 也表示推送失败,会进行重试
  52. return null;
  53. // 消息消费失败
  54. // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  55. }
  56. });
  57. // 开启消费者消费
  58. defaultMQPushConsumer.start();
  59. System.out.println("Consumer Started");
  60. }
  61. }

订阅死信队列所属主题。

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.common.message.MessageExt;
  6. import java.io.UnsupportedEncodingException;
  7. import java.util.List;
  8. /**
  9. * 不断的监听 死信队列
  10. */
  11. public class DeadListener {
  12. public static void main(String[] args) throws Exception{
  13. // 初始化consumer,并设置consumer group name
  14. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("deal-listener");
  15. // 设置NameServer地址
  16. consumer.setNamesrvAddr("192.168.16.79:9876");
  17. //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
  18. consumer.subscribe("%DLQ%consumer_group_topic_name_dle_01", "*");
  19. //注册回调接口来处理从Broker中收到的消息
  20. consumer.registerMessageListener(new MessageListenerConcurrently() {
  21. @Override
  22. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  23. for (MessageExt msg : msgs) {
  24. System.out.println(msg);
  25. try {
  26. System.out.println(new String(msg.getBody(), "utf-8"));
  27. } catch (UnsupportedEncodingException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
  32. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  33. }
  34. });
  35. // 启动Consumer
  36. consumer.start();
  37. System.out.printf("Consumer Started.%n");
  38. }
  39. }
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号