当前位置:   article > 正文

Rocketmq之消息队列分配策略算法实现的源码分析_allocatemessagequeueaveragelybycircle

allocatemessagequeueaveragelybycircle

Rocketmq之消息队列分配策略算法实现的源码分析

本文中包含下面的内容

  • 平均分配策略(默认)(AllocateMessageQueueAveragely)
  • 环形分配策略(AllocateMessageQueueAveragelyByCircle)
  • 手动配置分配策略(AllocateMessageQueueByConfig)
  • 机房分配策略(AllocateMessageQueueByMachineRoom)
  • 一致性哈希分配策略(AllocateMessageQueueConsistentHash)

一、平均分配策略(AllocateMessageQueueAveragely)

下面是Rocketmq中 AllocateMessageQueueAveragely 的源码:

  1. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {
  2. //省略参数校验、当前消费者id是否存在的校验
  3. //走到下面的代码, 说明参数校验通过
  4. int index = cidAll.indexOf(currentCID);
  5. int mod = mqAll.size() % cidAll.size();
  6. int averageSize =
  7. mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
  8. + 1 : mqAll.size() / cidAll.size());
  9. int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
  10. int range = Math.min(averageSize, mqAll.size() - startIndex);
  11. for (int i = 0; i < range; i++) {
  12. result.add(mqAll.get((startIndex + i) % mqAll.size()));
  13. }
  14. return result;
  15. }

对代码分析如下:

  • 第4行, 计算当前消费者在消费者集合(List<String> cidAll)中下标的位置(index)
  • 第5行, 计算当前消息队列(Message Queue)中的消息是否能被消费者集合(cidAll)平均消费掉
  • 第6-8行, 计算当前消费者消费的平均数量
    • mqAll.size() <= cidAll.size() ? 1 如果消费者的数量 >= 消息的数量, 当前消费者消耗的消息数量为1
    • mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size() 如果消息不能被消费者平均消费掉, 且当前消费者在消费者集合中的下标(index) < 平均消费后的余数mod , 则当前消费者消费的数量为 mqAll.size() / cidAll.size() + 1 , 否则是 mqAll.size() / cidAll.size()
  • 第9行,计算当前消费者开始消费消息的下标
    • 如果消息不能被平均消费掉, 且当前消费者在消费者集合中的下标 < 平均消费后的余数mod , 则消息开始的下标为index * averageSize , 否则为index * averageSize + mod
      第10行, 根据Math.min()计算消费者最终需要消费的数量
  • 第11 - 14 行, 从startIndex开始的下标位置,加载数量为range的消息到result集合中,最后返回这个result

二、环形分配策略(AllocateMessageQueueAveragelyByCircle)

下面是Rocketmq中 AllocateMessageQueueAveragelyByCircle的源码

  1. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
  2. //省略参数校验、当前消费者id是否存在的校验
  3. //走到下面的代码, 说明参数校验通过
  4. int index = cidAll.indexOf(currentCID);
  5. for (int i = index; i < mqAll.size(); i++) {
  6. if (i % cidAll.size() == index) {
  7. result.add(mqAll.get(i));
  8. }
  9. }
  10. return result;
  11. }

对代码分析如下:

  • 第2-3行省略了参数校验、当前消费者是否存在的部分代码
  • 第4行, 计算当前消费者在消费者集合(List<String>)中的下标(index)
  • 第5-10行,遍历消息的下标, 对下标取模(mod), 如果与index相等, 则存储到result集合中,最后返回该集合 。

针对知识点一、二可以通过下面的图示进行进一步了解
普通消费和环形消费比较

图1、普通消费和环形消费比较

 

再举个例子:
    假设有三个消费者、八个消息, 对普通分配方式和环形分配方式,分别如下:

  • 普通消费方式
Message QueueConsumerId
消息队列[0]Consumer[0]
消息队列[1]Consumer[0]
消息队列[2]Consumer[0]
消息队列[3]Consumer[1]
消息队列[4]Consumer[1]
消息队列[5]Consumer[1]
消息队列[6]Consumer[2]
消息队列[7]Consumer[2]

- 环形消费方式

Message QueueConsumerId
消息队列[0]Consumer[0]
消息队列[1]Consumer[1]
消息队列[2]Consumer[2]
消息队列[3]Consumer[0]
消息队列[4]Consumer[1]
消息队列[5]Consumer[2]
消息队列[6]Consumer[0]
消息队列[7]Consumer[1]

三、手动配置策略(AllocateMessageQueueByConfig)

下面是手动配置的代码:

  1. public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
  2. private List<MessageQueue> messageQueueList;
  3. @Override
  4. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  5. List<String> cidAll) {
  6. return this.messageQueueList;
  7. }
  8. @Override
  9. public String getName() {
  10. return "CONFIG";
  11. }
  12. public List<MessageQueue> getMessageQueueList() {
  13. return messageQueueList;
  14. }
  15. public void setMessageQueueList(List<MessageQueue> messageQueueList) {
  16. this.messageQueueList = messageQueueList;
  17. }
  18. }

代码分析:
    进行分配的核心方法是allocate(), 从代码中可以看出分配的方式是从配置文件中获取相关的信息, 这中方式自己用的比较少,暂时忽略,后面有研究会进行相关内容更新。

四、机房分配策略(AllocateMessageQueueByMachineRoom)

下面是机房分配策略的代码:

  1. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  2. List<String> cidAll) {
  3. List<MessageQueue> result = new ArrayList<MessageQueue>();
  4. int currentIndex = cidAll.indexOf(currentCID);
  5. if (currentIndex < 0) {
  6. return result;
  7. }
  8. List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
  9. for (MessageQueue mq : mqAll) {
  10. String[] temp = mq.getBrokerName().split("@");
  11. if (temp.length == 2 && consumeridcs.contains(temp[0])) {
  12. premqAll.add(mq);
  13. }
  14. }
  15. int mod = premqAll.size() / cidAll.size();
  16. int rem = premqAll.size() % cidAll.size();
  17. int startIndex = mod * currentIndex;
  18. int endIndex = startIndex + mod;
  19. for (int i = startIndex; i < endIndex; i++) {
  20. result.add(mqAll.get(i));
  21. }
  22. if (rem > currentIndex) {
  23. result.add(premqAll.get(currentIndex + mod * cidAll.size()));
  24. }
  25. return result;
  26. }
  • 第4-7行, 计算当前消费者在消费者集合中的下标(index), 如果下标 < 0 , 则直接返回
  • 第8-14行, 根据brokerName解析出所有有效机房信息(其实是有效mq), 用Set集合去重, 结果存储在premqAll中
  • 第16行, 计算消息整除的平均结果mod
  • 第17行, 计算消息是否能够被平均消费rem,(即消息平均消费后还剩多少消息(remaing))
  • 第18行, 计算当前消费者开始消费的下标(startIndex)
  • 第19行, 计算当前消费者结束消费的下标(endIndex)
  • 第20-26行, 将消息的消费分为两部分, 第一部分 – (cidAllSize * mod) , 第二部分 – (premqAll - cidAllSize * mod) ; 从第一部分中查询startIndex ~ endIndex之间所有的消息, 从第二部分中查询 currentIndex + mod * cidAll.size() , 最后返回查询的结果result

可以通过下面的例子进一步了解,假设有三个消费者, 八个消息队列

Message QueueConsumer
消息队列[0]Consumer[0]
消息队列[1]Consumer[0]
消息队列[2]Consumer[1]
消息队列[3]Consumer[1]
消息队列[4]Consumer[2]
消息队列[5]Consumer[2]
消息队列[6]Consumer[0]
消息队列[7]Consumer[1]

五、一致性哈希分配策略(AllocateMessageQueueConsistentHash)

下面是一致性哈希算法的代码

  1. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
  2. //省略参数校验、当前消费者id是否存在的校验
  3. //走到下面的代码, 说明参数校验通过
  4. Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
  5. for (String cid : cidAll) {
  6. cidNodes.add(new ClientNode(cid));
  7. }
  8. final ConsistentHashRouter<ClientNode> router; //for building hash ring
  9. if (customHashFunction != null) {
  10. router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
  11. } else {
  12. router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
  13. }
  14. List<MessageQueue> results = new ArrayList<MessageQueue>();
  15. for (MessageQueue mq : mqAll) {
  16. ClientNode clientNode = router.routeNode(mq.toString());
  17. if (clientNode != null && currentCID.equals(clientNode.getKey())) {
  18. results.add(mq);
  19. }
  20. }
  21. return results;
  22. }

关于一致性哈希算法的讲解,可以通过下面的连接进行了解
https://blog.csdn.net/xianghonglee/article/details/25718099
https://blog.csdn.net/sparkliang/article/details/5279393
https://akshatm.svbtle.com/consistent-hash-rings-theory-and-implementation
https://github.com/gholt/ring/blob/master/BASIC_HASH_RING.md

注:本文转自:https://blog.csdn.net/yewandemty/article/details/81989695

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/925337
推荐阅读
相关标签
  

闽ICP备14008679号