当前位置:   article > 正文

RocketMQ批量消息介绍和入门demo以及消息超过了限制如何解决_consume message failed

consume message failed

介绍

批量消息是指将多条小的消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

比如说原本我有三条消息,如果三条消息分三次发的话,会走三次网络IO,如果我给三条消息整成一起发送,这样就走一次网络了.

不足

批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右.

使用限制

这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

入门代码案例

说明

要想批量发送消息,很简单,只需要在生产者代码那里 producer.send 方法的时候传入一个List集合即可,这List集合里面存放的就是多个Message消息.

消费者代码那里什么都不需要动,不需要其它额外的配置

	List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
        producer.send(messages);// 批量发送
  • 1
  • 2
  • 3
  • 4
  • 5

生产者

package org.apache.rocketmq.example.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

public class SimpleBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");

        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();
        //如果您一次发送的消息不超过1MiB,那么很容易使用批处理
        String topic = "BatchTest";
        // 将三个消息都放到一个List,然后把这个List发送过去,这就是批量消息
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
        producer.send(messages);
        
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者

package org.apache.rocketmq.example.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        //使用指定的消费者组名称实例化
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchProducerGroupName");

        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        /*从上次偏移量开始消耗*/
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //再订阅一个主题来消费
        consumer.subscribe("BatchTest", "*");


        AtomicLong atomicLong = new AtomicLong(1); //创建一个计数器,初始值是1

        //注册回调以在从代理获取的消息到达时执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {


                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

                long andIncrement = atomicLong.getAndIncrement();
                System.out.println("当前接收到了消息的个数" + andIncrement);

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //启动消费者实例
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

验证代码查看效果

先启动消费者, 再启动生产者

可以发现 消费者这里接收到了三条消息, 但其实生产者就发送了一次,发送的参数是一个List集合,这List集合里面就是三条消息.

注意:不要看到 “当前接收到了消息的个数” 这个输出顺序变了就以为是bug, 其实不是, 因为 消费者接收消息的时候是多线程的, 可能打印 “当前接收到了消息的个数1” 这个的线程比打印"当前接收到了消息的个数2"的线程执行的早, 所以就打印在前面了, 顺序乱了,

只要看最大的数是多大就可以了,因为AtomicLong是原子类,在多线程是线程安全的,不会出现计数错误问题.

最大的输出 收到消息个数是 3 , 就说明一共接收到了三条消息.

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581891, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7A9C, commitLogOffset=467499676, bodyCRC=1841171634, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID001, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390000, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581892, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7B5A, commitLogOffset=467499866, bodyCRC=448347172, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID002, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390001, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 49], transactionId='null'}]] 
当前接收到了消息的个数2
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581893, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7C18, commitLogOffset=467500056, bodyCRC=61894046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID003, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390002, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 50], transactionId='null'}]] 
当前接收到了消息的个数1
当前接收到了消息的个数3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

如果消息超过了限制如何解决

批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右. 一次发送的超过了限制,MQ会报错的,

最简单粗暴的解决方案就是 一次发送的消息不要太多, 还有个解决办法就是将消息分成多份儿来发送.

生产者

下面代码我添加了很多注释,大部分人应该都能看懂

package org.apache.rocketmq.example.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class SplitBatchProducer {


    public static void main(String[] args) throws Exception {


        int cycleIndex = 50000;  // 循环次数 这个参数是循环设置多少个参数的


        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();

        //large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        // 一次10万条消息,一次发送出去肯定是超过限制了.


        for (int i = 0; i < cycleIndex; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }
        // 直接发送出去会报错
//        producer.send(messages);


        // 如果超过大小限制了,就用下面的代码把一个大的消息进行拆分多个小的消息,然后多次发送出去
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }
        producer.shutdown();
    }

}

class ListSplitter implements Iterator<List<Message>> {
    // 消息
    private final List<Message> messages;
    //大小限制
    private final int sizeLimit = 1000 * 1000;
    //当前索引
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    /**
     * 判断是否还有数据,
     * 判断逻辑: 当前索引是否小于 消息的长度
     *
     * @return
     */
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;//当前记录的已经用过的索引
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);

            // 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20
            int tmpSize = getMessageSize(message);


            // 如果当前取出来的消息长度大于预先设置的sizeLimit(消息最大长度,)直接就跳出循环,然后记录下nextIndex索引位置
            if (tmpSize > sizeLimit) { // 如果消息长度超过了  sizeLimit(1百万)
                // 如果下一个索引减去当前索引为0,那么就给下一个索引进行加1,这样目的是下次循环的时候,就可以通过nextIndex属性拿取下一个索引的值
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }

            // 什么时候 多个消息累加的长度+当前取出来的消息的长度 >  sizeLimit(预先设置的消息最大长度),就执行break跳出当前循环
            //否则就接着 累加消息.
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {

                totalSize += tmpSize;
            }

        }
        /*subList方法,通过起始索引和结束索引获取List的一部分
            参数1: 截取元素的起始位置,包含该索引位置元素
            参数2: 截取元素的结束位置,不包含该索引位置元素

         */
        List<Message> subList = messages.subList(currIndex, nextIndex);

        System.out.println("当前的currIndex是: " + currIndex + " 当前的nextIndex是" + nextIndex);

        currIndex = nextIndex;


        return subList;
    }

    /**
     * 计算消息大小
     * 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20
     *
     * @param message 消息
     * @return 消息长度
     */
    private int getMessageSize(Message message) {

        int tmpSize = message.getTopic().length() + message.getBody().length;
        //消息属性的长度
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
        }

        tmpSize = tmpSize + 20; //对日志的开销
        return tmpSize;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

消费者用上面入门代码案例的消费者

启动并且测试结果

先启动消费者, 再启动生产者. 注意,每次反复测试的时候需要重新启动消费者, 因为我在消费者弄了一个AtomicLong计数器,每次测试的时候不重启消费者的话,那么这个AtomicLong参数会接着累加.

生产者日志:

可以看到,分成了四份发送了50000条消息.
这就是批量消息的好处,50000条消息,如果一个一个发送的话,要走50000次网络IO, 如果用批量消息发送的话,下面这个案例,就走了4次IO.

当前的currIndex是: 0 当前的nextIndex是13275
当前的currIndex是: 13275 当前的nextIndex是26262
当前的currIndex是: 26262 当前的nextIndex是39249
当前的currIndex是: 39249 当前的nextIndex是50000
  • 1
  • 2
  • 3
  • 4

消费者日志:

可以看到 消费者一共接收到了50000条日志.

... 前面的日志不粘了,太长了.

ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610976, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528A36, commitLogOffset=475171382, bodyCRC=1335665873, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39246, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994E, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 54], transactionId='null'}]] 
当前接收到了消息的个数49998
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610978, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528BBE, commitLogOffset=475171774, bodyCRC=673483222, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39248, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D9950, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 56], transactionId='null'}]] 
当前接收到了消息的个数49999
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610977, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528AFA, commitLogOffset=475171578, bodyCRC=949720135, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39247, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994F, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 55], transactionId='null'}]] 
当前接收到了消息的个数50000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/715364
推荐阅读
相关标签
  

闽ICP备14008679号