在以前的HTTP請求初探的文章中,大致梳理了Soul插件的處理流程,也得知了DividePlugin、GlobalPlugin,WebClientPlugin,WebCilentResponsePlugin插件的具體做用,在梳理流程中,發現Soul的插件是有前後順序的,在DividePlugin插件以前作了不少前置插件的操做,其中包含了咱們本章分析的主題RateLimiterPlugin 限流插件(其中一種)。html
rateLimiter插件react
經過官方文檔的閱讀咱們得知了RateLimiterPlugin的兩個核心點速率、容量redis
如下講解來源於官方文檔算法
能夠看出RateLimiterPlugin限流核心在於令牌桶算法的實現。緩存
ps:關於限流算法常見的有四種實現令牌桶算法,漏斗算法,計數器(固定窗口)算法,滑動窗口算法,詳情看對應博客介紹markdown
在Soul網關系統管理-插件管理處,將狀態更改成啓用狀態,注意此處須要填寫redis相關配置,Soul令牌桶基於Redis。ide
爲何Soul的令牌桶算法要基於redis?oop
在集羣部署狀況下單機的令牌桶算法沒法知足集羣狀態下的限流功能。post
在Soul網關插件列表處,選擇rate_limiter處添加規則及選擇器配置,不懂如何添加的能夠先閱讀選擇器\規則的匹配邏輯.
在此處添加的容量及速率都爲1 主要爲了驗證插件是否啓用。性能
調用_http://127.0.0.1:9195/http/test/findByUserId?userId=10_ 進行訪問,速率高於1的狀況下出現以下接口返回結果,表明插件成功使用。
{
"code": 429,
"message": "You have been restricted, please try again later!",
"data": null
}
複製代碼
答案天然數據同步脫不了干係。
在修改插件的配置時,也發佈了一個插件數據變動的事件通知,在以前梳理Soul網關同步數據總體流程時,已經得知修改的插件數據除了更改了JVM緩存內的數據外,還對對應的插件進行下發操做,以下圖 而針對於RateLimiterPlugin而言,其主要實現了handlePlugin的接口,那這個對應的實現到底作了哪些事呢?
具體的方法爲RateLimiterPluginDataHandler的handlerPlugin。
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && pluginData.getEnabled()) {
//加載限流插件配置
RateLimiterConfig rateLimiterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), RateLimiterConfig.class);
//判斷是否須要從新加載redis鏈接值
if (Objects.isNull(Singleton.INST.get(ReactiveRedisTemplate.class))
|| Objects.isNull(Singleton.INST.get(RateLimiterConfig.class))
|| !rateLimiterConfig.equals(Singleton.INST.get(RateLimiterConfig.class))) {
LettuceConnectionFactory lettuceConnectionFactory = createLettuceConnectionFactory(rateLimiterConfig);
lettuceConnectionFactory.afterPropertiesSet();
RedisSerializer<String> serializer = new StringRedisSerializer();
RedisSerializationContext<String, String> serializationContext =
RedisSerializationContext.<String, String>newSerializationContext().key(serializer).value(serializer).hashKey(serializer).hashValue(serializer).build();
ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(lettuceConnectionFactory, serializationContext);
Singleton.INST.single(ReactiveRedisTemplate.class, reactiveRedisTemplate);
Singleton.INST.single(RateLimiterConfig.class, rateLimiterConfig);
}
}
}
複製代碼
上述代碼有幾個較爲關鍵的點:
在上述代碼中將限流插件的配置和對應的redisTemplate實例放入了Singleton.INST對應map中。
在插件數據過來時,判斷是否存在redis鏈接實例,是否存在限流配置實例,判斷當前的限流配置實例是否和傳遞的限流實例一致,不一致就認爲配置是有更改的,就從新初始化限流實例和鏈接池實例放入Singleton.INST的map中,由此而言就保證了更改redis配置的熱部署。
if判斷中的代碼就是基於SpringDataRedis封裝成一個對應redis鏈接池。
ps:Singleton.INST是枚舉實現的單例模式。
RateLimiterPlugin因爲須要對特定規則進行限流,因此依舊實現了AbstractSoulPlugin,以前依舊梳理過AbstractSoulPlugin的excute的方法和做用了,因此這裏不重複解釋,可觀看Http 調用流程梳理,加深對該類的印象。
本節重點仍是看具體的doexcute方法作了哪些事。
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final String handle = rule.getHandle();
final RateLimiterHandle limiterHandle = GsonUtils.getInstance().fromJson(handle, RateLimiterHandle.class);
return redisRateLimiter.isAllowed(rule.getId(), limiterHandle.getReplenishRate(), limiterHandle.getBurstCapacity())
.flatMap(response -> {
if (!response.isAllowed()) {
//返回的錯誤信息 429錯誤編碼
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
Object error = SoulResultWrap.error(SoulResultEnum.TOO_MANY_REQUESTS.getCode(), SoulResultEnum.TOO_MANY_REQUESTS.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
});
}
複製代碼
在上述代碼中能夠看出是經過redisRateLimiter.isAllowed來判斷是否獲取令牌成功的。 該方法以下
public Mono<RateLimiterResponse> isAllowed(final String id, final double replenishRate, final double burstCapacity) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
//獲取redis Key
List<String> keys = getKeys(id);
//封裝lua腳本執行所需的參數 第一位是速率 第二位是容量 第三位是當前時間戳10位 第四位固定參數值1 表明申請的令牌數
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
//執行lua腳本
Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(this.script, keys, scriptArgs);
return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
//allowed 表明執行的結果 爲1 表明執行成功
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft);
log.info("RateLimiter response:{}", rateLimiterResponse.toString());
return rateLimiterResponse;
}).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage()));
}
複製代碼
該方法是獲取redis須要操做的key,一共獲取了兩個類型的Key,格式以下:
中間那位特別長的數字是規則ID,由於限流的最小粒度是規則。
第一個timestamp記錄的是上一次調用的時間戳
第二個tokens記錄的是上一次調用完成後剩餘的令牌數量
執行lua腳本 keys傳遞的是getKeys(id)返回值,scriptArgs傳遞的是所需的參數
經過閱讀上述代碼已經知曉 限流規則的具體實現是交給特定的lua腳本的。
ps:這裏須要提醒一下限流算法是令牌桶算法,令牌桶算法一共有兩種大致實現,一種是有個線程不斷生成令牌,當請求進來時,先從對應的隊列中獲取令牌,但這種令牌生成方式在設定閾值特別大時,會很是消耗性能,因此有了第二種令牌桶算法,在獲取令牌時實時計算令牌數量,而soul就是基於第二種實現的。
-- 當前規則令牌剩餘數量存儲key
local tokens_key = KEYS[1]
-- 當前規則上次調用時間
local timestamp_key = KEYS[2]
-- 速率
local rate = tonumber(ARGV[1])
-- 容量
local capacity = tonumber(ARGV[2])
-- 時間戳
local now = tonumber(ARGV[3])
-- 值爲1
local requested = tonumber(ARGV[4])
-- 容量除以速率 計算填充時間
local fill_time = capacity/rate
-- 計算過時時間 取下限
local ttl = math.floor(fill_time*2)
-- 獲取當前存有的令牌數
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
-- 將令牌數量賦值爲設定的容量
last_tokens = capacity
end
-- 獲取上一次調用時間
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
-- 計算出上次調用和本次調用之間的時間差
local delta = math.max(0, now-last_refreshed)
-- 計算出當前剩餘的令牌數量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 判斷當前令牌數量 數量>=1 表明獲取成功
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
-- 申請一個令牌
new_tokens = filled_tokens - requested
allowed_num = 1
end
-- setex 設置過時key 過時時間 新值
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }
複製代碼
推薦先了解一下luaKEYS ARGS的做用redis lua 中keys[1] 和argv[1] 的理解.
Lua代碼總體邏輯仍是很是明朗的,在這裏細講也講不出個啥來,代碼註釋已經打全了。
本人在這裏疑惑的有兩點