赞
踩
本文中包含下面的内容
下面是Rocketmq中 AllocateMessageQueueAveragely 的源码:
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {
- //省略参数校验、当前消费者id是否存在的校验
- //走到下面的代码, 说明参数校验通过
- int index = cidAll.indexOf(currentCID);
- int mod = mqAll.size() % cidAll.size();
- int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
- int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
- int range = Math.min(averageSize, mqAll.size() - startIndex);
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
- }
对代码分析如下:
下面是Rocketmq中 AllocateMessageQueueAveragelyByCircle的源码
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
- //省略参数校验、当前消费者id是否存在的校验
- //走到下面的代码, 说明参数校验通过
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqAll.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqAll.get(i));
- }
- }
- return result;
- }
对代码分析如下:
针对知识点一、二可以通过下面的图示进行进一步了解
图1、普通消费和环形消费比较
再举个例子:
假设有三个消费者、八个消息, 对普通分配方式和环形分配方式,分别如下:
Message Queue | ConsumerId |
---|---|
消息队列[0] | Consumer[0] |
消息队列[1] | Consumer[0] |
消息队列[2] | Consumer[0] |
消息队列[3] | Consumer[1] |
消息队列[4] | Consumer[1] |
消息队列[5] | Consumer[1] |
消息队列[6] | Consumer[2] |
消息队列[7] | Consumer[2] |
- 环形消费方式
Message Queue | ConsumerId |
---|---|
消息队列[0] | Consumer[0] |
消息队列[1] | Consumer[1] |
消息队列[2] | Consumer[2] |
消息队列[3] | Consumer[0] |
消息队列[4] | Consumer[1] |
消息队列[5] | Consumer[2] |
消息队列[6] | Consumer[0] |
消息队列[7] | Consumer[1] |
下面是手动配置的代码:
- public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
- private List<MessageQueue> messageQueueList;
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- return this.messageQueueList;
- }
-
- @Override
- public String getName() {
- return "CONFIG";
- }
-
- public List<MessageQueue> getMessageQueueList() {
- return messageQueueList;
- }
-
- public void setMessageQueueList(List<MessageQueue> messageQueueList) {
- this.messageQueueList = messageQueueList;
- }
- }

代码分析:
进行分配的核心方法是allocate(), 从代码中可以看出分配的方式是从配置文件中获取相关的信息, 这中方式自己用的比较少,暂时忽略,后面有研究会进行相关内容更新。
下面是机房分配策略的代码:
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- List<MessageQueue> result = new ArrayList<MessageQueue>();
- int currentIndex = cidAll.indexOf(currentCID);
- if (currentIndex < 0) {
- return result;
- }
- List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
- for (MessageQueue mq : mqAll) {
- String[] temp = mq.getBrokerName().split("@");
- if (temp.length == 2 && consumeridcs.contains(temp[0])) {
- premqAll.add(mq);
- }
- }
-
- int mod = premqAll.size() / cidAll.size();
- int rem = premqAll.size() % cidAll.size();
- int startIndex = mod * currentIndex;
- int endIndex = startIndex + mod;
- for (int i = startIndex; i < endIndex; i++) {
- result.add(mqAll.get(i));
- }
- if (rem > currentIndex) {
- result.add(premqAll.get(currentIndex + mod * cidAll.size()));
- }
- return result;
- }

可以通过下面的例子进一步了解,假设有三个消费者, 八个消息队列
Message Queue | Consumer |
---|---|
消息队列[0] | Consumer[0] |
消息队列[1] | Consumer[0] |
消息队列[2] | Consumer[1] |
消息队列[3] | Consumer[1] |
消息队列[4] | Consumer[2] |
消息队列[5] | Consumer[2] |
消息队列[6] | Consumer[0] |
消息队列[7] | Consumer[1] |
下面是一致性哈希算法的代码
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
- //省略参数校验、当前消费者id是否存在的校验
- //走到下面的代码, 说明参数校验通过
- Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
- for (String cid : cidAll) {
- cidNodes.add(new ClientNode(cid));
- }
-
- final ConsistentHashRouter<ClientNode> router; //for building hash ring
- if (customHashFunction != null) {
- router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
- } else {
- router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
- }
-
- List<MessageQueue> results = new ArrayList<MessageQueue>();
- for (MessageQueue mq : mqAll) {
- ClientNode clientNode = router.routeNode(mq.toString());
- if (clientNode != null && currentCID.equals(clientNode.getKey())) {
- results.add(mq);
- }
- }
-
- return results;
-
- }

关于一致性哈希算法的讲解,可以通过下面的连接进行了解
https://blog.csdn.net/xianghonglee/article/details/25718099
https://blog.csdn.net/sparkliang/article/details/5279393
https://akshatm.svbtle.com/consistent-hash-rings-theory-and-implementation
https://github.com/gholt/ring/blob/master/BASIC_HASH_RING.md
注:本文转自:https://blog.csdn.net/yewandemty/article/details/81989695
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。