当前位置:   article > 正文

Sentinel限流及其滑动窗口算法_微服务限流、滑动窗口

微服务限流、滑动窗口

Sentinel的限流原理

限流效果,对应有DefaultController快速失败

WarmUpController慢启动(令牌桶算法)

RateLimiterController(漏桶算法)

滑动时间窗口算法

  1. 固定时间窗口算法
    即比如每一秒作为一个固定的时间窗口,在一秒内最多可以通过100个请求,那么在统计数据的时候,如果0-500ms没有请求,而500-1000ms有100个请求,那么这一百个请求都能通过,在1000-1500ms的时候,又有100个请求过来了,它依然能够通过,因为在1000ms的时候又开启了一个新的固定时间窗口。这样,500-1500ms这一秒内有了200个请求,但是它依然能够通过,所以这就会造成数据统计的不准确性,并不能保证在任意的一秒内都使得通过请求数小于100,。
  2. 普通的滑动窗口做法
    因为固定时间窗口带来的数据同的不准确性,就会造成可能局部的时间压力过高,所以就需要采用滑动窗口算法来进行统计,滑动窗口时间算法意思就是,从请求过来的时刻开始,统计往前一秒中的数据,通过这个数据来判断是否进行限流等操作。这样的话准确性就会有很大的提升,但是由于每一次请求过来都需要重新统计前一秒的数据,就会造成巨大的性能损失。所以这也是他的不合理的地方。
  3. Sentinel的滑动时间窗口算法
    由于固定时间窗口带来的不准确性和普通滑动窗口带来的性能损失的缺点,所以Sentinel对这两种方案采取了折中的方案。
    在Sentinel中会将原本的固定的时间窗口划分成很多更小的样本窗口,每一次请求的数据都会被保存在小的样本窗口中去,而每一次获取的时候都会去获取这些样本时间窗口中的数据,从而不需要进行重新统计,就减小了性能损耗,同时时间窗口被细粒度化了,不准确性也会降低很多。

在统计插槽StatisticsSlot类中有ArrayMetric的类的成员变量,用于统操作和获取统计数据,而ArrayMetric类有一个成员变量LeapArray data,并提供了一些操作这个成员变量data信息的方法。

LeapArray提供两个参数sampleCount 样本数量,intervalInMs 间隔时间,意思就是在这一段的间隔时间内,被分成了sampleCount 个样本去分别进行统计,默认间隔时间是1s,sampleCount 是2。

下面看看LeapArray的一部分源码,它也是实现滑动窗口最为重要的地方。

public abstract class LeapArray<T> {
    //每一个样本窗口的时间长度
    protected int windowLengthInMs;
    //一个滑动窗口被划分成了多少个样本
    protected int sampleCount;
    //时间窗口的时间长度
    protected int intervalInMs;
    private double intervalInSecond;
    //一个样本窗口数组,将一个滑动窗口划分为sampleCount个样本窗口的数组
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    //部分代码省略
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在LeapArray有这几个成员变量:

  1. 每一个样本窗口的时间长度
  2. 一个滑动窗口被划分成的样本数量
  3. 滑动时间窗口的时间长度
  4. 样本窗口数组
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        //计算当前样本窗口的在array中的索引,以当前时间除以样本时间长度,获得timeId
        //再将TimeId对数组长度取余,得到索引,这就相当于把array当做了一个样本窗口圆环,就像官网上的图一样
        int idx = calculateTimeIdx(timeMillis);
        // 计算当前时间的样本窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
            //如果原来这个位置的样本窗口就是null的,就说明以前还没有网array的这个位置放过样本窗口,
            // 这时就新建一个样本窗口用cas操作放到数组的这个位置,并返回该窗口
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                //如果计算出来的窗口开始时间和现在存在的这个串口开始时间是一样的,就说明目前正处于这个样本窗口的时间内
                //直接返回当前样本窗口
                return old;
            } else if (windowStart > old.windowStart()) {
                //如果计算出来的窗口开始时间大于现在存在的样本窗口开始时间,就说明现存的样本窗口已经过时了
                //这边相当于在array数组圆环上不断旋转着设置新的样本窗口,要去生成新的样本窗口把以前的老的给覆盖了
                if (updateLock.tryLock()) {
                    try {
                        // 获取到锁,再去覆盖样本窗口,防止并发更新问题
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // 没有获取到就让出cpu一小段时间再回去重新尝试获取锁
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

LeapArray中有一个currentWindow方法,用于获取当前样本窗口,它的逻辑是这样的:

  1. 计算当前样本窗口的在array中的索引,以当前时间除以样本时间长度,获得timeId
  2. 再将TimeId对数组长度取余,得到索引,这就相当于把array当做了一个样本窗口圆环,就像官网上的图一样,这样所有的时间窗口都会在这个数组里进行循环。
  3. 获取当前时间所对应的样本窗口开始时间,与目前数组中的样本窗口进行比较。
  4. 如果数组最终该索引不存在样本窗口,就创建一个样本窗口放到数组中
  5. 如果计算出来的窗口开始时间和现在存在的这个串口开始时间是一样的,就说明目前正处于这个样本窗口的时间内,直接返回该窗口就好了
  6. 如果计算出来的窗口开始时间大于现在存在的样本窗口开始时间,就说明现存的样本窗口已经过时了,需要重新覆盖一个新的样本窗口

在LeapArray里,最重要的就是这个样本窗口数组,它将一个完整的滑动时间窗口划分成了sampleCount个样本窗口WindowWrap,而样本窗口WindowWrap的结构如下:

public class WindowWrap<T> {

    /**
     * Time length of a single window bucket in milliseconds.
     */
    //该样本窗口的长度
    private final long windowLengthInMs;

    /**
     * Start timestamp of the window in milliseconds.
     */
    //该样本窗口开始的时间
    private long windowStart;

    /**
     * Statistic data.
     */
    //每一个样本窗口里统计的数据都存在这儿
    private T value;
    
    /**
     * 其余代码省略
     */
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

每一个样本窗口都包含了三个数据:

  1. 该样本窗口的时间长度
  2. 该样本窗口开始的时间
  3. 每一个样本窗口里统计的数据

这就是滑动窗口的原理,而每次需要对统计数据做操作的时候,就会获取当前滑动窗口的样本窗口,并对样本窗口里面的数据进行操作。简单的示范一下调用的一个流程:

  • 首先统计数据,直接可以去看StatisticSlot插槽,因为这个插槽本身就是做这个统计数据相关的事情的
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            //会一层一层的去执行完所有的slot规则检查
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            // Request passed, add thread count and pass count.
            //如果到这里了就说明规则检查圈通过了,可以做成功的统计了
            //添加成功的线程数
            node.increaseThreadNum();
            //添加成功通过的请求数量
            node.addPassRequest(count);
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                //一个资源对应一个ClusterNode,给他添加全局的统计
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            //添加被Block的线程和请求数量的相关统计
            context.getCurEntry().setBlockError(e);
            // 添加被阻塞数量
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }
            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }
            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);
            throw e;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

在StatisticSlot的Entry方法中,就会添加成功的数量统计,或者根据各种不同的异常,添加不同的数据统计,如请求成功通过的话,就会调用node.addPassRequest(count),而它最后会去调用ArrayMetric类的addPass方法,获取当前的样本时间窗口,并在当前的样本时间窗口上进行数据统计操作。

    @Override
    public void addPass(int count) {
        //获取当前样本窗口,在它上面添加一个成功的统计
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/274853
推荐阅读
相关标签
  

闽ICP备14008679号