赞
踩
最近要写一个根据版本分配权重的负载策略,因为是基于springcloud的策略,先参考了项目中的轮询策略即com.netflix.loadbalancer.RoundRobinRules,使用的是加权随机算法,较为原始。
后面参考dubbo中的加权轮询算法,使用特(不)殊(懂)算法使轮询更为合理,下面就要对这种算法进行分析吧。
主要涉及RoundRobinLoadBalance和AbstractLoadBalance两个类。
名词:
- package org.apache.dubbo.rpc.cluster.loadbalance;
-
- import org.apache.dubbo.common.URL;
- import org.apache.dubbo.rpc.Invocation;
- import org.apache.dubbo.rpc.Invoker;
-
- import java.util.Collection;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicLong;
-
- /**
- * 轮询负载均衡策略
- * Round robin load balance.
- */
- public class RoundRobinLoadBalance extends AbstractLoadBalance {
- //策略名称
- public static final String NAME = "roundrobin";
- //动态权重更新时间
- private static final int RECYCLE_PERIOD = 60000;
-
- //路由权重
- protected static class WeightedRoundRobin {
- private int weight;
- //动态权重
- private AtomicLong current = new AtomicLong(0);
- //最后选取时间
- private long lastUpdate;
- public int getWeight() {
- return weight;
- }
- public void setWeight(int weight) {
- this.weight = weight;
- current.set(0);
- }
- //每次选取操作增加原始权重
- public long increaseCurrent() {
- return current.addAndGet(weight);
- }
- //每次选中减去动态总权重
- public void sel(int total) {
- current.addAndGet(-1 * total);
- }
- public long getLastUpdate() {
- return lastUpdate;
- }
- public void setLastUpdate(long lastUpdate) {
- this.lastUpdate = lastUpdate;
- }
- }
-
- private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
- //更新锁
- private AtomicBoolean updateLock = new AtomicBoolean();
-
- /**
- * get invoker addr list cached for specified invocation
- * <p>
- * <b>for unit test only</b>
- * 获取url对应的权重路由
- * 结构如下:
- * {
- * "bike.get":{
- * "url1": WeightedRoundRobin,
- * "url2": WeightedRoundRobin,
- * ...
- * },
- * "bike.update:{
- * "url1": WeightedRoundRobin,
- * "url2": WeightedRoundRobin,
- * ...
- * }
- * }
- * @param invokers
- * @param invocation
- * @return
- */
- protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- //获取url对应的权重路由
- Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- if (map != null) {
- return map.keySet();
- }
- return null;
- }
-
- /**
- * 根据动态权重表选取服务
- * @param invokers 实例列表
- * @param url 请求url 在这没啥用
- * @param invocation 请求调用信息
- * @param <T>
- * @return 选出的实例调度器
- */
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- //获取url对应的动态权重表
- ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- //如果权重表为空,则新建
- if (map == null) {
- methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
- map = methodWeightMap.get(key);
- }
- //动态权重总和,用于计算更新动态权重
- int totalWeight = 0;
- //计算时动态权重最小值
- long maxCurrent = Long.MIN_VALUE;
- //当前时间,设置为动态权重表最后选取时间
- long now = System.currentTimeMillis();
- Invoker<T> selectedInvoker = null;
- WeightedRoundRobin selectedWRR = null;
- //循环所有注册服务
- for (Invoker<T> invoker : invokers) {
- //获取服务id
- String identifyString = invoker.getUrl().toIdentityString();
- //获取服务对应的本地动态权信息
- WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
- //获取权重,预热期返回预热权重,否则为原始权重
- int weight = getWeight(invoker, invocation);
- //新建本地动态权重信息
- if (weightedRoundRobin == null) {
- weightedRoundRobin = new WeightedRoundRobin();
- weightedRoundRobin.setWeight(weight);
- map.putIfAbsent(identifyString, weightedRoundRobin);
- }
- //是否为预热权重,预热情况更新权重
- if (weight != weightedRoundRobin.getWeight()) {
- //weight changed
- weightedRoundRobin.setWeight(weight);
- }
- //每次选取调整对应的动态选择
- long cur = weightedRoundRobin.increaseCurrent();
- //更新最后选取时间,为什么不在increaseCurrent方法里面更新?
- //入long cur = weightedRoundRobin.increaseCurrent(now);
- weightedRoundRobin.setLastUpdate(now);
- //获取最大权重服务
- if (cur > maxCurrent) {
- maxCurrent = cur;
- selectedInvoker = invoker;
- selectedWRR = weightedRoundRobin;
- }
- //相加计算总的权重
- totalWeight += weight;
- }
- //移除过期的实例,默认60秒没访问移除
- //调度器数和权重集合数不一致是,更新权重集合
- if (!updateLock.get() && invokers.size() != map.size()) {
- if (updateLock.compareAndSet(false, true)) {
- try {
- // copy -> modify -> update reference
- ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
- newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
- methodWeightMap.put(key, newMap);
- } finally {
- updateLock.set(false);
- }
- }
- }
- //减少选中服务的动态权重值
- if (selectedInvoker != null) {
- selectedWRR.sel(totalWeight);
- return selectedInvoker;
- }
- // should not happen here
- // 没有选出调度器的时候返回第一个服务。
- return invokers.get(0);
- }
-
- }

- package org.apache.dubbo.rpc.cluster.loadbalance;
-
- import org.apache.dubbo.common.URL;
- import org.apache.dubbo.common.utils.CollectionUtils;
- import org.apache.dubbo.rpc.Invocation;
- import org.apache.dubbo.rpc.Invoker;
- import org.apache.dubbo.rpc.cluster.LoadBalance;
-
- import java.util.List;
-
- import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
- import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
- import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
- import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
- import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
-
- /**
- * AbstractLoadBalance
- */
- public abstract class AbstractLoadBalance implements LoadBalance {
- /**
- * Calculate the weight according to the uptime proportion of warmup time
- * the new weight will be within 1(inclusive) to weight(inclusive)
- * 计算预热期权重,最小为1
- * warmWeight = uptime/(warmup/weight),
- * 如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10
- * @param uptime the uptime in milliseconds 上线时间
- * @param warmup the warmup time in milliseconds 预热时间
- * @param weight the weight of an invoker 原值权重
- * @return weight which takes warmup into account
- */
- static int calculateWarmupWeight(int uptime, int warmup, int weight) {
- int ww = (int) ( uptime / ((float) warmup / weight));
- return ww < 1 ? 1 : (Math.min(ww, weight));
- }
-
- @Override
- public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- if (CollectionUtils.isEmpty(invokers)) {
- return null;
- }
- if (invokers.size() == 1) {
- return invokers.get(0);
- }
- return doSelect(invokers, url, invocation);
- }
-
- protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
-
-
- /**
- * Get the weight of the invoker's invocation which takes warmup time into account
- * if the uptime is within the warmup time, the weight will be reduce proportionally
- * 获取调用程序的调用权重,其中考虑了预热时间如果正常运行时间在预热时间内,则权重将按比例减少
- * @param invoker the invoker
- * @param invocation the invocation of this invoker
- * @return weight
- */
- int getWeight(Invoker<?> invoker, Invocation invocation) {
- int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
- if (weight > 0) {
- //请求时间
- long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
- if (timestamp > 0L) {
- //处理时间,当前时间-invoker上线时间
- long uptime = System.currentTimeMillis() - timestamp;
- if (uptime < 0) {
- return 1;
- }
- //预热时间10分钟
- int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
- //上线时间小于预热时间,返回预热中的权重
- if (uptime > 0 && uptime < warmup) {
- weight = calculateWarmupWeight((int)uptime, warmup, weight);
- }
- }
- }
- //正常情况返回invoker权重
- return Math.max(weight, 0);
- }
- }

举例
a、b、c权重分别为2、7、1如下:

结果执行为,a2次,b7次,c1次,结果喜人。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。