在高併發的應用中,限流是一個繞不開的話題。限流能夠保障咱們的 API 服務對全部用戶的可用性,也能夠防止網絡攻擊。html
通常開發高併發系統常見的限流有:限制總併發數(好比數據庫鏈接池、線程池)、限制瞬時併發數(如 nginx 的 limit_conn 模塊,用來限制瞬時併發鏈接數)、限制時間窗口內的平均速率(如 Guava 的 RateLimiter、nginx 的 limit_req 模塊,限制每秒的平均速率);其餘還有如限制遠程接口調用速率、限制 MQ 的消費速率。另外還能夠根據網絡鏈接數、網絡流量、CPU 或內存負載等來限流。java
作限流 (Rate Limiting/Throttling) 的時候,除了簡單的控制併發,若是要準確的控制 TPS,簡單的作法是維護一個單位時間內的 Counter,如判斷單位時間已通過去,則將 Counter 重置零。此作法被認爲沒有很好的處理單位時間的邊界,好比在前一秒的最後一毫秒裏和下一秒的第一毫秒都觸發了最大的請求數,也就是在兩毫秒內發生了兩倍的 TPS。node
經常使用的更平滑的限流算法有兩種:漏桶算法和令牌桶算法。不少傳統的服務提供商如華爲中興都有相似的專利,參考採用令牌漏桶進行報文限流的方法。react
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),而後就拒絕請求,能夠看出漏桶算法能強行限制數據的傳輸速率。nginx
可見這裏有兩個變量,一個是桶的大小,支持流量突發增多時能夠存多少的水(burst),另外一個是水桶漏洞的大小(rate)。由於漏桶的漏出速率是固定的參數,因此,即便網絡中不存在資源衝突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率。所以,漏桶算法對於存在突發特性的流量來講缺少效率。git
令牌桶算法(Token Bucket)和 Leaky Bucket 效果同樣但方向相反的算法,更加容易理解。隨着時間流逝,系統會按恆定 1/QPS 時間間隔(若是 QPS=100,則間隔是 10ms)往桶裏加入 Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),若是桶已經滿了就再也不加了。新請求來臨時,會各自拿走一個 Token,若是沒有 Token 可拿了就阻塞或者拒絕服務。github
令牌桶的另一個好處是能夠方便的改變速度。一旦須要提升速率,則按需提升放入桶中的令牌的速率。通常會定時(好比 100 毫秒)往桶中增長必定數量的令牌,有些變種算法則實時的計算應該增長的令牌的數量。Guava 中的 RateLimiter 採用了令牌桶的算法,設計思路參見 How is the RateLimiter designed, and why?,詳細的算法實現參見源碼。redis
本文討論在gateway集成的實現算法
在此基礎上pom中加入 spring
<!--RequestRateLimiter限流--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
package com.common.config; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import reactor.core.publisher.Mono; /** * @Title: * @Auther: * @Date: 2019/8/28 17:13 * @Version: 1.0 * @Description: */ @Configuration public class RequestRateLimiterConfig { @Bean @Primary KeyResolver apiKeyResolver() { //按URL限流,即以每秒內請求數按URL分組統計,超出限流的url請求都將返回429狀態 return exchange -> Mono.just(exchange.getRequest().getPath().toString()); } @Bean KeyResolver userKeyResolver() { //按用戶限流 return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user")); } @Bean KeyResolver ipKeyResolver() { //按IP來限流 return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } }
application.yml
spring: application: name: gateway8710 cloud: gateway: default-filter: routes: - id: user-server predicates: - Path=/java/** filters: - StripPrefix=1 # 限流過濾器,使用gateway內置令牌算法 - name: RequestRateLimiter args: # 令牌桶每秒填充平均速率,即行等價於容許用戶每秒處理多少個請求平均數 redis-rate-limiter.replenishRate: 10 # 令牌桶的容量,容許在一秒鐘內完成的最大請求數 redis-rate-limiter.burstCapacity: 20 # 用於限流的鍵的解析器的 Bean 對象的名字。它使用 SpEL 表達式根據#{@beanName}從 Spring 容器中獲取 Bean 對象。 key-resolver: "#{@apiKeyResolver}" uri: lb://service-helloword # uri: "http://192.168.111.133:8708/project/hello" redis: #Redis數據庫索引(默認爲0) database: 0 #鏈接超時時間(毫秒) springboot2.0 中該參數的類型爲Duration,這裏在配置的時候須要指明單位 timeout: 20s #密碼 password: test cluster: # 獲取失敗 最大重定向次數 max-redirects: 3 #測試環境redis nodes: - 10.0.0.1:6380 - 10.0.0.2:6380 - 10.0.0.3:6380 - 10.0.0.1:6381 - 10.0.0.2:6381 - 10.0.0.3:6381 lettuce: pool: #鏈接池最大鏈接數(使用負值表示沒有限制) max-active: 300 #鏈接池最大阻塞等待時間(使用負值表示沒有限制) max-wait: -1s #鏈接池中的最大空閒鏈接 max-idle: 100 #鏈接池中的最小空閒鏈接 min-idle: 20 server: port: 8710 eureka: client: serviceUrl: #指向註冊中心 defaultZone: http://192.168.111.133:8888/eureka/ instance: # 每間隔1s,向服務端發送一次心跳,證實本身依然」存活「 lease-renewal-interval-in-seconds: 1 # 告訴服務端,若是我2s以內沒有給你發心跳,就表明我「死」了,將我踢出掉。 lease-expiration-duration-in-seconds: 2
目錄結構以下
須要用jmeter來作併發測試,一秒內啓30個進程,重複發請求10000次。詳情見併發測試JMeter及發送Json請求
測試結果,沒有搶到令牌的請求就返回429,這邊的限流至關於平均request:10/s
redis中存儲項
多個請求,如兩個(url分別爲/project/getToken,/project/login)不一樣的併發請求
6.原理
基於redis+lua
lua腳本路徑
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
引入腳本的地方
相關源碼:
限流源碼RedisRateLimiter
/* * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.springframework.cloud.gateway.filter.ratelimit; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import javax.validation.constraints.Min; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.beans.BeansException; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.validation.Validator; import org.springframework.validation.annotation.Validated; /** * See https://stripe.com/blog/rate-limiters and * https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34. * * @author Spencer Gibb * @author Ronny Bräunlich */ @ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter") public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config> implements ApplicationContextAware { /** * @deprecated use {@link Config#replenishRate} */ @Deprecated public static final String REPLENISH_RATE_KEY = "replenishRate"; /** * @deprecated use {@link Config#burstCapacity} */ @Deprecated public static final String BURST_CAPACITY_KEY = "burstCapacity"; /** * Redis Rate Limiter property name. */ public static final String CONFIGURATION_PROPERTY_NAME = "redis-rate-limiter"; /** * Redis Script name. */ public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript"; /** * Remaining Rate Limit header name. */ public static final String REMAINING_HEADER = "X-RateLimit-Remaining"; /** * Replenish Rate Limit header name. */ public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate"; /** * Burst Capacity Header name. */ public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity"; private Log log = LogFactory.getLog(getClass()); private ReactiveRedisTemplate<String, String> redisTemplate; private RedisScript<List<Long>> script; private AtomicBoolean initialized = new AtomicBoolean(false); private Config defaultConfig; // configuration properties /** * Whether or not to include headers containing rate limiter information, defaults to * true. */ private boolean includeHeaders = true; /** * The name of the header that returns number of remaining requests during the current * second. */ private String remainingHeader = REMAINING_HEADER; /** The name of the header that returns the replenish rate configuration. */ private String replenishRateHeader = REPLENISH_RATE_HEADER; /** The name of the header that returns the burst capacity configuration. */ private String burstCapacityHeader = BURST_CAPACITY_HEADER; public RedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script, Validator validator) { super(Config.class, CONFIGURATION_PROPERTY_NAME, validator); this.redisTemplate = redisTemplate; this.script = script; initialized.compareAndSet(false, true); } public RedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) { super(Config.class, CONFIGURATION_PROPERTY_NAME, null); this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate) .setBurstCapacity(defaultBurstCapacity); } static List<String> getKeys(String id) { // use `{}` around keys to use Redis Key hash tags // this allows for using redis cluster // Make a unique key per user. String prefix = "request_rate_limiter.{" + id; // You need two Redis keys for Token Bucket. String tokenKey = prefix + "}.tokens"; String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); } public boolean isIncludeHeaders() { return includeHeaders; } public void setIncludeHeaders(boolean includeHeaders) { this.includeHeaders = includeHeaders; } public String getRemainingHeader() { return remainingHeader; } public void setRemainingHeader(String remainingHeader) { this.remainingHeader = remainingHeader; } public String getReplenishRateHeader() { return replenishRateHeader; } public void setReplenishRateHeader(String replenishRateHeader) { this.replenishRateHeader = replenishRateHeader; } public String getBurstCapacityHeader() { return burstCapacityHeader; } public void setBurstCapacityHeader(String burstCapacityHeader) { this.burstCapacityHeader = burstCapacityHeader; } @Override @SuppressWarnings("unchecked") public void setApplicationContext(ApplicationContext context) throws BeansException { if (initialized.compareAndSet(false, true)) { this.redisTemplate = context.getBean("stringReactiveRedisTemplate", ReactiveRedisTemplate.class); this.script = context.getBean(REDIS_SCRIPT_NAME, RedisScript.class); if (context.getBeanNamesForType(Validator.class).length > 0) { this.setValidator(context.getBean(Validator.class)); } } } /* for testing */ Config getDefaultConfig() { return defaultConfig; } /** * This uses a basic token bucket algorithm and relies on the fact that Redis scripts * execute atomically. No other operations can run between fetching the count and * writing the new count. */ @Override @SuppressWarnings("unchecked")
// routeId也就是咱們的fsh-house,id就是限流的URL,也就是/project/hello。 public Mono<Response> isAllowed(String routeId, String id) {
// 會判斷RedisRateLimiter是否初始化了 if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); }
// 獲取routeId對應的限流配置 Config routeConfig = loadConfiguration(routeId);
// 容許用戶每秒作多少次請求 // How many requests per second do you want a user to be allowed to do? int replenishRate = routeConfig.getReplenishRate();
// 令牌桶的容量,容許在一秒鐘內完成的最大請求數 // How much bursting do you want to allow? int burstCapacity = routeConfig.getBurstCapacity(); try { List<String> keys = getKeys(id);
// 限流key的名稱(request_rate_limiter.{/login}.timestamp,request_rate_limiter.{/login}.tokens) // The arguments to the LUA script. time() returns unixtime in seconds. List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
// 執行LUA腳本 // allowed, tokens_left = redis.eval(SCRIPT, keys, args) Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { /* * We don't want a hard dependency on Redis to allow traffic. Make sure to set * an alert so you know if this is happening too much. Stripe's observed * failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); } /* for testing */ Config loadConfiguration(String routeId) { Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig); if (routeConfig == null) { routeConfig = getConfig().get(RouteDefinitionRouteLocator.DEFAULT_FILTERS); } if (routeConfig == null) { throw new IllegalArgumentException( "No Configuration found for route " + routeId + " or defaultFilters"); } return routeConfig; } @NotNull public Map<String, String> getHeaders(Config config, Long tokensLeft) { Map<String, String> headers = new HashMap<>(); if (isIncludeHeaders()) { headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(config.getReplenishRate())); headers.put(this.burstCapacityHeader, String.valueOf(config.getBurstCapacity())); } return headers; } @Validated public static class Config { @Min(1) private int replenishRate; @Min(1) private int burstCapacity = 1; public int getReplenishRate() { return replenishRate; } public Config setReplenishRate(int replenishRate) { this.replenishRate = replenishRate; return this; } public int getBurstCapacity() { return burstCapacity; } public Config setBurstCapacity(int burstCapacity) { this.burstCapacity = burstCapacity; return this; } @Override public String toString() { return "Config{" + "replenishRate=" + replenishRate + ", burstCapacity=" + burstCapacity + '}'; } } }