赞
踩
Redisson系列文章:
基于Redis的分布式限流器RateLimiter可以用来在分布式环境下现在请求方的调用频率。既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。
RateLimter主要作用就是可以限制调用接口的次数。主要原理就是调用接口之前,需要拥有指定个令牌。限流器每秒会产生X个令牌放入令牌桶,调用接口需要去令牌桶里面拿令牌。如果令牌被其它请求拿完了,那么自然而然,当前请求就调用不到指定的接口。
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// 初始化
// 最大流速 = 每10秒钟产生1个令牌
rateLimiter.trySetRate(RateType.OVERALL, 1, 10, RateIntervalUnit.SECONDS);
//需要1个令牌
if(rateLimiter.tryAcquire(1)){
//TODO:Do something
}
当前业务就是,向指定手机号发送短信。但是有每10秒只允许发送1次的限制。完全可以使用Redisson限流器来完成。
package com.tyzhou.redisson.service; import com.tyzhou.Constant; import org.apache.commons.lang3.StringUtils; import org.redisson.api.RRateLimiter; import org.redisson.api.RateIntervalUnit; import org.redisson.api.RateType; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RateLimiterService { private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterService.class); @Autowired private RedissonClient redisson; public void sendMsg(String phone) { if (StringUtils.isNotBlank(phone)) { RRateLimiter rateLimiter = redisson.getRateLimiter(Constant.REDISSON_RATE_LIMITER + phone); //每10秒产生1个令牌 rateLimiter.trySetRate(RateType.OVERALL, 1, 10, RateIntervalUnit.SECONDS); if (rateLimiter.tryAcquire(1)) { LOGGER.info("向手机:{}发送短信", phone); } } } }
调用者:
@PostMapping("/send_msg")
public void sendMsg(@RequestParam String phone) {
while(true){
rateLimiterService.sendMsg(phone);
}
}
用了一个死循环,但是从日志的打印效果上来看,每10秒才请求到一次我们的发送短信接口。
// 声明一个限流器 名称 叫key
redissonClient.getRateLimiter(key)
trySetRate方法跟进去底层实现如下:
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
举个例子,更容易理解:
比如下面这段代码,5秒中产生3个令牌,并且所有实例共享(RateType.OVERALL所有实例共享、RateType.CLIENT单实例端共享)
trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
那么redis中就会设置3个参数:
hsetnx,key,rate,3
hsetnx,key,interval,5
hsetnx,key,type,0
接着看tryAcquire(1)方法:底层源码如下
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" //1 + "local interval = redis.call('hget', KEYS[1], 'interval');" //2 + "local type = redis.call('hget', KEYS[1], 'type');" //3 + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" //4 + "local valueName = KEYS[2];" //5 + "if type == 1 then " + "valueName = KEYS[3];" //6 + "end;" + "local currentValue = redis.call('get', valueName); " //7 + "if currentValue ~= false then " + "if tonumber(currentValue) < tonumber(ARGV[1]) then " //8 + "return redis.call('pttl', valueName); " + "else " + "redis.call('decrby', valueName, ARGV[1]); " //9 + "return nil; " + "end; " + "else " //10 + "redis.call('set', valueName, rate, 'px', interval); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), value, commandExecutor.getConnectionManager().getId().toString()); }
String getClientValueName() {
return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString());
}
ConnectionManager().getId()如下:
public interface ConnectionManager {
UUID getId();
省略...
}
这个getId()是每个客户端初始化的时候生成的UUID,即每个客户端的getId是唯一的,这也就验证了trySetRate方法中RateType.ALL与RateType.PER_CLIENT的作用。
redission分布式限流采用令牌桶思想和固定时间窗口,trySetRate方法设置桶的大小,利用redis key过期机制达到时间窗口目的,控制固定时间窗口内允许通过的请求量。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。