赞
踩
批量消息是指将多条小的消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
比如说原本我有三条消息,如果三条消息分三次发的话,会走三次网络IO,如果我给三条消息整成一起发送,这样就走一次网络了.
批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右.
这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
要想批量发送消息,很简单,只需要在生产者代码那里 producer.send 方法的时候传入一个List集合即可,这List集合里面存放的就是多个Message消息.
消费者代码那里什么都不需要动,不需要其它额外的配置
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);// 批量发送
package org.apache.rocketmq.example.batch; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; public class SimpleBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876"); producer.start(); //如果您一次发送的消息不超过1MiB,那么很容易使用批处理 String topic = "BatchTest"; // 将三个消息都放到一个List,然后把这个List发送过去,这就是批量消息 List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes())); producer.send(messages); producer.shutdown(); } }
package org.apache.rocketmq.example.batch; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.concurrent.atomic.AtomicLong; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //使用指定的消费者组名称实例化 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchProducerGroupName"); consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876"); /*从上次偏移量开始消耗*/ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //再订阅一个主题来消费 consumer.subscribe("BatchTest", "*"); AtomicLong atomicLong = new AtomicLong(1); //创建一个计数器,初始值是1 //注册回调以在从代理获取的消息到达时执行 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); long andIncrement = atomicLong.getAndIncrement(); System.out.println("当前接收到了消息的个数" + andIncrement); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
先启动消费者, 再启动生产者
可以发现 消费者这里接收到了三条消息, 但其实生产者就发送了一次,发送的参数是一个List集合,这List集合里面就是三条消息.
注意:不要看到 “当前接收到了消息的个数” 这个输出顺序变了就以为是bug, 其实不是, 因为 消费者接收消息的时候是多线程的, 可能打印 “当前接收到了消息的个数1” 这个的线程比打印"当前接收到了消息的个数2"的线程执行的早, 所以就打印在前面了, 顺序乱了,
只要看最大的数是多大就可以了,因为AtomicLong是原子类,在多线程是线程安全的,不会出现计数错误问题.
最大的输出 收到消息个数是 3 , 就说明一共接收到了三条消息.
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581891, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7A9C, commitLogOffset=467499676, bodyCRC=1841171634, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID001, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390000, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581892, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7B5A, commitLogOffset=467499866, bodyCRC=448347172, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID002, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390001, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 49], transactionId='null'}]]
当前接收到了消息的个数2
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=581893, sysFlag=0, bornTimestamp=1634973433664, bornHost=/172.16.10.1:60630, storeTimestamp=1634973434070, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001BDD7C18, commitLogOffset=467500056, bodyCRC=61894046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=581894, KEYS=OrderID003, CONSUME_START_TIME=1634973433681, UNIQ_KEY=AC100A01492418B4AAC27493A7390002, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 50], transactionId='null'}]]
当前接收到了消息的个数1
当前接收到了消息的个数3
批量消息虽然好用,但是也有一些不足,官方说一次批量消息不能大于1MB, 实际上实际使用的时候一次发送最大的消息是4MB左右. 一次发送的超过了限制,MQ会报错的,
最简单粗暴的解决方案就是 一次发送的消息不要太多, 还有个解决办法就是将消息分成多份儿来发送.
下面代码我添加了很多注释,大部分人应该都能看懂
package org.apache.rocketmq.example.batch; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; public class SplitBatchProducer { public static void main(String[] args) throws Exception { int cycleIndex = 50000; // 循环次数 这个参数是循环设置多少个参数的 DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876"); producer.start(); //large batch String topic = "BatchTest"; List<Message> messages = new ArrayList<>(100 * 1000); // 一次10万条消息,一次发送出去肯定是超过限制了. for (int i = 0; i < cycleIndex; i++) { messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes())); } // 直接发送出去会报错 // producer.send(messages); // 如果超过大小限制了,就用下面的代码把一个大的消息进行拆分多个小的消息,然后多次发送出去 ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { List<Message> listItem = splitter.next(); producer.send(listItem); } producer.shutdown(); } } class ListSplitter implements Iterator<List<Message>> { // 消息 private final List<Message> messages; //大小限制 private final int sizeLimit = 1000 * 1000; //当前索引 private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } /** * 判断是否还有数据, * 判断逻辑: 当前索引是否小于 消息的长度 * * @return */ @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex;//当前记录的已经用过的索引 int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); // 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20 int tmpSize = getMessageSize(message); // 如果当前取出来的消息长度大于预先设置的sizeLimit(消息最大长度,)直接就跳出循环,然后记录下nextIndex索引位置 if (tmpSize > sizeLimit) { // 如果消息长度超过了 sizeLimit(1百万) // 如果下一个索引减去当前索引为0,那么就给下一个索引进行加1,这样目的是下次循环的时候,就可以通过nextIndex属性拿取下一个索引的值 if (nextIndex - currIndex == 0) { nextIndex++; } break; } // 什么时候 多个消息累加的长度+当前取出来的消息的长度 > sizeLimit(预先设置的消息最大长度),就执行break跳出当前循环 //否则就接着 累加消息. if (tmpSize + totalSize > sizeLimit) { break; } else { totalSize += tmpSize; } } /*subList方法,通过起始索引和结束索引获取List的一部分 参数1: 截取元素的起始位置,包含该索引位置元素 参数2: 截取元素的结束位置,不包含该索引位置元素 */ List<Message> subList = messages.subList(currIndex, nextIndex); System.out.println("当前的currIndex是: " + currIndex + " 当前的nextIndex是" + nextIndex); currIndex = nextIndex; return subList; } /** * 计算消息大小 * 如何计算一个消息的大小: 就是 topic的长度加上消息body的长度,加一个自定义属性的长度 , 再加上20 * * @param message 消息 * @return 消息长度 */ private int getMessageSize(Message message) { int tmpSize = message.getTopic().length() + message.getBody().length; //消息属性的长度 Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //对日志的开销 return tmpSize; } @Override public void remove() { throw new UnsupportedOperationException("Not allowed to remove"); } }
先启动消费者, 再启动生产者. 注意,每次反复测试的时候需要重新启动消费者, 因为我在消费者弄了一个AtomicLong计数器,每次测试的时候不重启消费者的话,那么这个AtomicLong参数会接着累加.
生产者日志:
可以看到,分成了四份发送了50000条消息.
这就是批量消息的好处,50000条消息,如果一个一个发送的话,要走50000次网络IO, 如果用批量消息发送的话,下面这个案例,就走了4次IO.
当前的currIndex是: 0 当前的nextIndex是13275
当前的currIndex是: 13275 当前的nextIndex是26262
当前的currIndex是: 26262 当前的nextIndex是39249
当前的currIndex是: 39249 当前的nextIndex是50000
消费者日志:
可以看到 消费者一共接收到了50000条日志.
... 前面的日志不粘了,太长了.
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610976, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528A36, commitLogOffset=475171382, bodyCRC=1335665873, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39246, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994E, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 54], transactionId='null'}]]
当前接收到了消息的个数49998
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610978, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528BBE, commitLogOffset=475171774, bodyCRC=673483222, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39248, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D9950, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 56], transactionId='null'}]]
当前接收到了消息的个数49999
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=196, queueOffset=610977, sysFlag=0, bornTimestamp=1634973749605, bornHost=/172.16.10.1:61128, storeTimestamp=1634973750044, storeHost=/172.16.10.102:10911, msgId=AC100A6600002A9F000000001C528AFA, commitLogOffset=475171578, bodyCRC=949720135, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=610979, KEYS=OrderID39247, CONSUME_START_TIME=1634973752038, UNIQ_KEY=AC100A015AD018B4AAC27498792D994F, WAIT=true, TAGS=Tag}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 51, 57, 50, 52, 55], transactionId='null'}]]
当前接收到了消息的个数50000
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。