当前位置:   article > 正文

Dubbo RoundRobinLoadBalance 轮询算法_dubbo的轮训算法

dubbo的轮训算法
  1. /**
  2. * Round robin load balance.
  3. */
  4. public class RoundRobinLoadBalance extends AbstractLoadBalance {
  5. public static final String NAME = "roundrobin";
  6. private static final int RECYCLE_PERIOD = 60000;
  7. protected static class WeightedRoundRobin {
  8. private int weight;
  9. private AtomicLong current = new AtomicLong(0);
  10. private long lastUpdate;
  11. public int getWeight() {
  12. return weight;
  13. }
  14. public void setWeight(int weight) {
  15. this.weight = weight;
  16. current.set(0);
  17. }
  18. public long increaseCurrent() {
  19. return current.addAndGet(weight);
  20. }
  21. public void sel(int total) {
  22. current.addAndGet(-1 * total);
  23. }
  24. public long getLastUpdate() {
  25. return lastUpdate;
  26. }
  27. public void setLastUpdate(long lastUpdate) {
  28. this.lastUpdate = lastUpdate;
  29. }
  30. }
  31. private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
  32. private AtomicBoolean updateLock = new AtomicBoolean();
  33. @Override
  34. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  35. //1.首先通过请求方法来获取服务提供者和轮询对象的Map,如果不存在就创建一个。
  36. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  37. ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
  38. if (map == null) {
  39. methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
  40. map = methodWeightMap.get(key);
  41. }
  42. int totalWeight = 0;
  43. // 2.这里有一个long maxCurrent = Long.MIN_VALUE;挺有意思的,目的是为了判断轮询值的底线
  44. long maxCurrent = Long.MIN_VALUE;
  45. long now = System.currentTimeMillis();
  46. // 3.定义一个指向最终服务提供者的selectedInvoker,和对应的selectedWRR轮询对象。
  47. Invoker<T> selectedInvoker = null;
  48. WeightedRoundRobin selectedWRR = null;
  49. //4.接下来就是整体循环传进来的所有服务提供者列表,然后在上面活动到的map中去寻找对应的轮询对象,计算权重值等信息。
  50. for (Invoker<T> invoker : invokers) {
  51. String identifyString = invoker.getUrl().toIdentityString();
  52. WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
  53. int weight = getWeight(invoker, invocation);
  54. if (weightedRoundRobin == null) {
  55. weightedRoundRobin = new WeightedRoundRobin();
  56. weightedRoundRobin.setWeight(weight);
  57. map.putIfAbsent(identifyString, weightedRoundRobin);
  58. }
  59. if (weight != weightedRoundRobin.getWeight()) {
  60. //weight changed
  61. weightedRoundRobin.setWeight(weight);
  62. }
  63. //5.取得到当前的轮询对象之后,进行原子自增操作,记录操作时间,这个时候就要判断轮询原子值是否大于maxCurrent的值了,
  64. long cur = weightedRoundRobin.increaseCurrent();
  65. weightedRoundRobin.setLastUpdate(now);
  66. //6.如果大于证明当前这个服务提供者的Invoker是有效的,
  67. if (cur > maxCurrent) {
  68. //7 然后将maxCurrent的值指向这个轮询值,同时把invoker和轮询对象的引用指过来。
  69. // 后续Invoker的轮询值没有当前这个轮询值大的时候,那么就不会被选中,反过来说也就是这里会在所有的Invoker列表中找到一个轮询值最大的那个
  70. //也就是说每次调用doSelect方法的时候都是取轮询值最大的那个Invoker作为返回。
  71. maxCurrent = cur;
  72. selectedInvoker = invoker;
  73. selectedWRR = weightedRoundRobin;
  74. }
  75. totalWeight += weight;
  76. }
  77. // 优化代码 是为了在有Invoker下线,或者服务不可用的时候,将其从轮询队列中剔除
  78. if (!updateLock.get() && invokers.size() != map.size()) {
  79. if (updateLock.compareAndSet(false, true)) {
  80. try {
  81. // copy -> modify -> update reference
  82. ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
  83. newMap.putAll(map);
  84. Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
  85. while (it.hasNext()) {
  86. Entry<String, WeightedRoundRobin> item = it.next();
  87. if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
  88. it.remove();
  89. }
  90. }
  91. methodWeightMap.put(key, newMap);
  92. } finally {
  93. updateLock.set(false);
  94. }
  95. }
  96. }
  97. //如果在上面遍历Invoker的过程中已经找到了轮询值最大的那个Invoker,
  98. if (selectedInvoker != null) {
  99. selectedWRR.sel(totalWeight);
  100. return selectedInvoker;
  101. }
  102. // should not happen here
  103. return invokers.get(0);
  104. }
  105. }
  106. //就将其对应的轮询值调用sel方法将其设置为一个负值。然后每次如果不被选中,就会加上权重值。
  107. //所以正常情况下每个Invoker的轮询值都应该是负数的且大于Long.MIN_VALUE
  108. public void sel(int total) {
  109. current.addAndGet(-1 * total);
  110. }
  111. // 轮询调度中cas算法
  112. // Cas 算法理解
  113. public long increaseCurrent() {
  114. return current.addAndGet(weight);
  115. }
  116. //AtomicLong
  117. public final long addAndGet(long delta) {
  118. // valueOffset 定位对象中字段内存偏移量 以内存地址为为版本的概念,delta 为业务数据值
  119. // 模型是version-biz
  120. return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
  121. }
  122. //这个方法以100 为入参
  123. // (this,0,100)+100
  124. //Unsafe
  125. public final long getAndAddLong(Object o, long valueOffset, long delta) {
  126. long value; // volatile value;
  127. do {
  128. value = this.getLongVolatile(o, valueOffset);
  129. // 新值和旧值根据 可以理解 取得 以内存地址为版本概念offSet 和o中的全局的offset最终比较一次
  130. } while(!this.compareAndSwapLong(o, valueOffset, value, value + delta));
  131. return value;
  132. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/41305
推荐阅读
相关标签
  

闽ICP备14008679号