当前位置:   article > 正文

Dubbo源码解析之加权轮询策略RoundRobinLoadBalance

roundrobinloadbalance

引言

最近要写一个根据版本分配权重的负载策略,因为是基于springcloud的策略,先参考了项目中的轮询策略即com.netflix.loadbalancer.RoundRobinRules,使用的是加权随机算法,较为原始。

后面参考dubbo中的加权轮询算法,使用特(不)殊(懂)算法使轮询更为合理,下面就要对这种算法进行分析吧。

逻辑

  • 使用本地权重表,根据调用情况动态调整。
  • 每次调用根据算法更新权重表,设置本地权重为本地所有权重加上配置权重,选出本地权重最大的服务,并设置它的本地权重减去本本轮总权重。
  • 权重表回收,删除1分内未被调用的实例
  • 预热期权重算法,预热期默认10分钟,warmWeight = uptime/(warmup/weight),如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10

源码

主要涉及RoundRobinLoadBalance和AbstractLoadBalance两个类。

名词:

  1. 原始权重:服务设置中的weight
  2. 动态权重:每次选取操作调整后的权重
  3. 动态权重总和:每次调整完后的所有服务动态权重总和
  4. 本地动态权重表:记录本地服务选取时的动态权重信息,每次调用选取算法都会更新。
  •  RoundRobinLoadBalance

  1. package org.apache.dubbo.rpc.cluster.loadbalance;
  2. import org.apache.dubbo.common.URL;
  3. import org.apache.dubbo.rpc.Invocation;
  4. import org.apache.dubbo.rpc.Invoker;
  5. import java.util.Collection;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.ConcurrentHashMap;
  9. import java.util.concurrent.ConcurrentMap;
  10. import java.util.concurrent.atomic.AtomicBoolean;
  11. import java.util.concurrent.atomic.AtomicLong;
  12. /**
  13. * 轮询负载均衡策略
  14. * Round robin load balance.
  15. */
  16. public class RoundRobinLoadBalance extends AbstractLoadBalance {
  17. //策略名称
  18. public static final String NAME = "roundrobin";
  19. //动态权重更新时间
  20. private static final int RECYCLE_PERIOD = 60000;
  21. //路由权重
  22. protected static class WeightedRoundRobin {
  23. private int weight;
  24. //动态权重
  25. private AtomicLong current = new AtomicLong(0);
  26. //最后选取时间
  27. private long lastUpdate;
  28. public int getWeight() {
  29. return weight;
  30. }
  31. public void setWeight(int weight) {
  32. this.weight = weight;
  33. current.set(0);
  34. }
  35. //每次选取操作增加原始权重
  36. public long increaseCurrent() {
  37. return current.addAndGet(weight);
  38. }
  39. //每次选中减去动态总权重
  40. public void sel(int total) {
  41. current.addAndGet(-1 * total);
  42. }
  43. public long getLastUpdate() {
  44. return lastUpdate;
  45. }
  46. public void setLastUpdate(long lastUpdate) {
  47. this.lastUpdate = lastUpdate;
  48. }
  49. }
  50. private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
  51. //更新锁
  52. private AtomicBoolean updateLock = new AtomicBoolean();
  53. /**
  54. * get invoker addr list cached for specified invocation
  55. * <p>
  56. * <b>for unit test only</b>
  57. * 获取url对应的权重路由
  58. * 结构如下:
  59. * {
  60. * "bike.get":{
  61. * "url1": WeightedRoundRobin,
  62. * "url2": WeightedRoundRobin,
  63. * ...
  64. * },
  65. * "bike.update:{
  66. * "url1": WeightedRoundRobin,
  67. * "url2": WeightedRoundRobin,
  68. * ...
  69. * }
  70. * }
  71. * @param invokers
  72. * @param invocation
  73. * @return
  74. */
  75. protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
  76. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  77. //获取url对应的权重路由
  78. Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
  79. if (map != null) {
  80. return map.keySet();
  81. }
  82. return null;
  83. }
  84. /**
  85. * 根据动态权重表选取服务
  86. * @param invokers 实例列表
  87. * @param url 请求url 在这没啥用
  88. * @param invocation 请求调用信息
  89. * @param <T>
  90. * @return 选出的实例调度器
  91. */
  92. @Override
  93. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  94. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  95. //获取url对应的动态权重表
  96. ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
  97. //如果权重表为空,则新建
  98. if (map == null) {
  99. methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
  100. map = methodWeightMap.get(key);
  101. }
  102. //动态权重总和,用于计算更新动态权重
  103. int totalWeight = 0;
  104. //计算时动态权重最小值
  105. long maxCurrent = Long.MIN_VALUE;
  106. //当前时间,设置为动态权重表最后选取时间
  107. long now = System.currentTimeMillis();
  108. Invoker<T> selectedInvoker = null;
  109. WeightedRoundRobin selectedWRR = null;
  110. //循环所有注册服务
  111. for (Invoker<T> invoker : invokers) {
  112. //获取服务id
  113. String identifyString = invoker.getUrl().toIdentityString();
  114. //获取服务对应的本地动态权信息
  115. WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
  116. //获取权重,预热期返回预热权重,否则为原始权重
  117. int weight = getWeight(invoker, invocation);
  118. //新建本地动态权重信息
  119. if (weightedRoundRobin == null) {
  120. weightedRoundRobin = new WeightedRoundRobin();
  121. weightedRoundRobin.setWeight(weight);
  122. map.putIfAbsent(identifyString, weightedRoundRobin);
  123. }
  124. //是否为预热权重,预热情况更新权重
  125. if (weight != weightedRoundRobin.getWeight()) {
  126. //weight changed
  127. weightedRoundRobin.setWeight(weight);
  128. }
  129. //每次选取调整对应的动态选择
  130. long cur = weightedRoundRobin.increaseCurrent();
  131. //更新最后选取时间,为什么不在increaseCurrent方法里面更新?
  132. //入long cur = weightedRoundRobin.increaseCurrent(now);
  133. weightedRoundRobin.setLastUpdate(now);
  134. //获取最大权重服务
  135. if (cur > maxCurrent) {
  136. maxCurrent = cur;
  137. selectedInvoker = invoker;
  138. selectedWRR = weightedRoundRobin;
  139. }
  140. //相加计算总的权重
  141. totalWeight += weight;
  142. }
  143. //移除过期的实例,默认60秒没访问移除
  144. //调度器数和权重集合数不一致是,更新权重集合
  145. if (!updateLock.get() && invokers.size() != map.size()) {
  146. if (updateLock.compareAndSet(false, true)) {
  147. try {
  148. // copy -> modify -> update reference
  149. ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
  150. newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
  151. methodWeightMap.put(key, newMap);
  152. } finally {
  153. updateLock.set(false);
  154. }
  155. }
  156. }
  157. //减少选中服务的动态权重值
  158. if (selectedInvoker != null) {
  159. selectedWRR.sel(totalWeight);
  160. return selectedInvoker;
  161. }
  162. // should not happen here
  163. // 没有选出调度器的时候返回第一个服务。
  164. return invokers.get(0);
  165. }
  166. }

 

  • AbstractLoadBalance

  1. package org.apache.dubbo.rpc.cluster.loadbalance;
  2. import org.apache.dubbo.common.URL;
  3. import org.apache.dubbo.common.utils.CollectionUtils;
  4. import org.apache.dubbo.rpc.Invocation;
  5. import org.apache.dubbo.rpc.Invoker;
  6. import org.apache.dubbo.rpc.cluster.LoadBalance;
  7. import java.util.List;
  8. import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
  9. import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
  10. import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
  11. import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
  12. import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
  13. /**
  14. * AbstractLoadBalance
  15. */
  16. public abstract class AbstractLoadBalance implements LoadBalance {
  17. /**
  18. * Calculate the weight according to the uptime proportion of warmup time
  19. * the new weight will be within 1(inclusive) to weight(inclusive)
  20. * 计算预热期权重,最小为1
  21. * warmWeight = uptime/(warmup/weight),
  22. * 如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10
  23. * @param uptime the uptime in milliseconds 上线时间
  24. * @param warmup the warmup time in milliseconds 预热时间
  25. * @param weight the weight of an invoker 原值权重
  26. * @return weight which takes warmup into account
  27. */
  28. static int calculateWarmupWeight(int uptime, int warmup, int weight) {
  29. int ww = (int) ( uptime / ((float) warmup / weight));
  30. return ww < 1 ? 1 : (Math.min(ww, weight));
  31. }
  32. @Override
  33. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  34. if (CollectionUtils.isEmpty(invokers)) {
  35. return null;
  36. }
  37. if (invokers.size() == 1) {
  38. return invokers.get(0);
  39. }
  40. return doSelect(invokers, url, invocation);
  41. }
  42. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
  43. /**
  44. * Get the weight of the invoker's invocation which takes warmup time into account
  45. * if the uptime is within the warmup time, the weight will be reduce proportionally
  46. * 获取调用程序的调用权重,其中考虑了预热时间如果正常运行时间在预热时间内,则权重将按比例减少
  47. * @param invoker the invoker
  48. * @param invocation the invocation of this invoker
  49. * @return weight
  50. */
  51. int getWeight(Invoker<?> invoker, Invocation invocation) {
  52. int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
  53. if (weight > 0) {
  54. //请求时间
  55. long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
  56. if (timestamp > 0L) {
  57. //处理时间,当前时间-invoker上线时间
  58. long uptime = System.currentTimeMillis() - timestamp;
  59. if (uptime < 0) {
  60. return 1;
  61. }
  62. //预热时间10分钟
  63. int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
  64. //上线时间小于预热时间,返回预热中的权重
  65. if (uptime > 0 && uptime < warmup) {
  66. weight = calculateWarmupWeight((int)uptime, warmup, weight);
  67. }
  68. }
  69. }
  70. //正常情况返回invoker权重
  71. return Math.max(weight, 0);
  72. }
  73. }

效果

举例

a、b、c权重分别为2、7、1如下:

结果执行为,a2次,b7次,c1次,结果喜人。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/41299
推荐阅读
相关标签
  

闽ICP备14008679号