赞
踩
限流效果,对应有DefaultController快速失败
WarmUpController慢启动(令牌桶算法)
RateLimiterController(漏桶算法)
在统计插槽StatisticsSlot类中有ArrayMetric的类的成员变量,用于统操作和获取统计数据,而ArrayMetric类有一个成员变量LeapArray data,并提供了一些操作这个成员变量data信息的方法。
LeapArray提供两个参数sampleCount 样本数量,intervalInMs 间隔时间,意思就是在这一段的间隔时间内,被分成了sampleCount 个样本去分别进行统计,默认间隔时间是1s,sampleCount 是2。
下面看看LeapArray的一部分源码,它也是实现滑动窗口最为重要的地方。
public abstract class LeapArray<T> {
//每一个样本窗口的时间长度
protected int windowLengthInMs;
//一个滑动窗口被划分成了多少个样本
protected int sampleCount;
//时间窗口的时间长度
protected int intervalInMs;
private double intervalInSecond;
//一个样本窗口数组,将一个滑动窗口划分为sampleCount个样本窗口的数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
//部分代码省略
}
在LeapArray有这几个成员变量:
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } //计算当前样本窗口的在array中的索引,以当前时间除以样本时间长度,获得timeId //再将TimeId对数组长度取余,得到索引,这就相当于把array当做了一个样本窗口圆环,就像官网上的图一样 int idx = calculateTimeIdx(timeMillis); // 计算当前时间的样本窗口的开始时间 long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { //如果原来这个位置的样本窗口就是null的,就说明以前还没有网array的这个位置放过样本窗口, // 这时就新建一个样本窗口用cas操作放到数组的这个位置,并返回该窗口 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { //如果计算出来的窗口开始时间和现在存在的这个串口开始时间是一样的,就说明目前正处于这个样本窗口的时间内 //直接返回当前样本窗口 return old; } else if (windowStart > old.windowStart()) { //如果计算出来的窗口开始时间大于现在存在的样本窗口开始时间,就说明现存的样本窗口已经过时了 //这边相当于在array数组圆环上不断旋转着设置新的样本窗口,要去生成新的样本窗口把以前的老的给覆盖了 if (updateLock.tryLock()) { try { // 获取到锁,再去覆盖样本窗口,防止并发更新问题 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // 没有获取到就让出cpu一小段时间再回去重新尝试获取锁 Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
LeapArray中有一个currentWindow方法,用于获取当前样本窗口,它的逻辑是这样的:
在LeapArray里,最重要的就是这个样本窗口数组,它将一个完整的滑动时间窗口划分成了sampleCount个样本窗口WindowWrap,而样本窗口WindowWrap的结构如下:
public class WindowWrap<T> { /** * Time length of a single window bucket in milliseconds. */ //该样本窗口的长度 private final long windowLengthInMs; /** * Start timestamp of the window in milliseconds. */ //该样本窗口开始的时间 private long windowStart; /** * Statistic data. */ //每一个样本窗口里统计的数据都存在这儿 private T value; /** * 其余代码省略 */ }
每一个样本窗口都包含了三个数据:
这就是滑动窗口的原理,而每次需要对统计数据做操作的时候,就会获取当前滑动窗口的样本窗口,并对样本窗口里面的数据进行操作。简单的示范一下调用的一个流程:
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. //会一层一层的去执行完所有的slot规则检查 fireEntry(context, resourceWrapper, node, count, prioritized, args); // Request passed, add thread count and pass count. //如果到这里了就说明规则检查圈通过了,可以做成功的统计了 //添加成功的线程数 node.increaseThreadNum(); //添加成功通过的请求数量 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. //一个资源对应一个ClusterNode,给他添加全局的统计 Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { //添加被Block的线程和请求数量的相关统计 context.getCurEntry().setBlockError(e); // 添加被阻塞数量 node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }
在StatisticSlot的Entry方法中,就会添加成功的数量统计,或者根据各种不同的异常,添加不同的数据统计,如请求成功通过的话,就会调用node.addPassRequest(count),而它最后会去调用ArrayMetric类的addPass方法,获取当前的样本时间窗口,并在当前的样本时间窗口上进行数据统计操作。
@Override
public void addPass(int count) {
//获取当前样本窗口,在它上面添加一个成功的统计
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。