赞
踩
-
-
- /**
- * Round robin load balance.
- */
- public class RoundRobinLoadBalance extends AbstractLoadBalance {
- public static final String NAME = "roundrobin";
-
- private static final int RECYCLE_PERIOD = 60000;
-
- protected static class WeightedRoundRobin {
- private int weight;
- private AtomicLong current = new AtomicLong(0);
- private long lastUpdate;
- public int getWeight() {
- return weight;
- }
- public void setWeight(int weight) {
- this.weight = weight;
- current.set(0);
- }
- public long increaseCurrent() {
- return current.addAndGet(weight);
- }
- public void sel(int total) {
- current.addAndGet(-1 * total);
- }
- public long getLastUpdate() {
- return lastUpdate;
- }
- public void setLastUpdate(long lastUpdate) {
- this.lastUpdate = lastUpdate;
- }
- }
-
- private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
- private AtomicBoolean updateLock = new AtomicBoolean();
-
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- //1.首先通过请求方法来获取服务提供者和轮询对象的Map,如果不存在就创建一个。
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- if (map == null) {
- methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
- map = methodWeightMap.get(key);
- }
-
- int totalWeight = 0;
- // 2.这里有一个long maxCurrent = Long.MIN_VALUE;挺有意思的,目的是为了判断轮询值的底线
- long maxCurrent = Long.MIN_VALUE;
- long now = System.currentTimeMillis();
- // 3.定义一个指向最终服务提供者的selectedInvoker,和对应的selectedWRR轮询对象。
- Invoker<T> selectedInvoker = null;
- WeightedRoundRobin selectedWRR = null;
- //4.接下来就是整体循环传进来的所有服务提供者列表,然后在上面活动到的map中去寻找对应的轮询对象,计算权重值等信息。
- for (Invoker<T> invoker : invokers) {
- String identifyString = invoker.getUrl().toIdentityString();
- WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
- int weight = getWeight(invoker, invocation);
-
- if (weightedRoundRobin == null) {
- weightedRoundRobin = new WeightedRoundRobin();
- weightedRoundRobin.setWeight(weight);
- map.putIfAbsent(identifyString, weightedRoundRobin);
- }
- if (weight != weightedRoundRobin.getWeight()) {
- //weight changed
- weightedRoundRobin.setWeight(weight);
- }
- //5.取得到当前的轮询对象之后,进行原子自增操作,记录操作时间,这个时候就要判断轮询原子值是否大于maxCurrent的值了,
- long cur = weightedRoundRobin.increaseCurrent();
- weightedRoundRobin.setLastUpdate(now);
- //6.如果大于证明当前这个服务提供者的Invoker是有效的,
- if (cur > maxCurrent) {
- //7 然后将maxCurrent的值指向这个轮询值,同时把invoker和轮询对象的引用指过来。
- // 后续Invoker的轮询值没有当前这个轮询值大的时候,那么就不会被选中,反过来说也就是这里会在所有的Invoker列表中找到一个轮询值最大的那个
- //也就是说每次调用doSelect方法的时候都是取轮询值最大的那个Invoker作为返回。
- maxCurrent = cur;
- selectedInvoker = invoker;
- selectedWRR = weightedRoundRobin;
- }
- totalWeight += weight;
- }
-
- // 优化代码 是为了在有Invoker下线,或者服务不可用的时候,将其从轮询队列中剔除
- if (!updateLock.get() && invokers.size() != map.size()) {
- if (updateLock.compareAndSet(false, true)) {
- try {
- // copy -> modify -> update reference
- ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
- newMap.putAll(map);
- Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, WeightedRoundRobin> item = it.next();
- if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
- it.remove();
- }
- }
- methodWeightMap.put(key, newMap);
- } finally {
- updateLock.set(false);
- }
- }
- }
- //如果在上面遍历Invoker的过程中已经找到了轮询值最大的那个Invoker,
- if (selectedInvoker != null) {
- selectedWRR.sel(totalWeight);
- return selectedInvoker;
- }
- // should not happen here
- return invokers.get(0);
- }
- }
- //就将其对应的轮询值调用sel方法将其设置为一个负值。然后每次如果不被选中,就会加上权重值。
- //所以正常情况下每个Invoker的轮询值都应该是负数的且大于Long.MIN_VALUE。
- public void sel(int total) {
- current.addAndGet(-1 * total);
- }
-
-
-
- // 轮询调度中cas算法
-
- // Cas 算法理解
- public long increaseCurrent() {
- return current.addAndGet(weight);
- }
-
- //AtomicLong
- public final long addAndGet(long delta) {
- // valueOffset 定位对象中字段内存偏移量 以内存地址为为版本的概念,delta 为业务数据值
- // 模型是version-biz
- return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
- }
- //这个方法以100 为入参
- // (this,0,100)+100
-
- //Unsafe
- public final long getAndAddLong(Object o, long valueOffset, long delta) {
- long value; // volatile value;
- do {
- value = this.getLongVolatile(o, valueOffset);
- // 新值和旧值根据 可以理解 取得 以内存地址为版本概念offSet 和o中的全局的offset最终比较一次
- } while(!this.compareAndSwapLong(o, valueOffset, value, value + delta));
-
- return value;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。