統計一段時間內容許經過的請求數。好比 qps爲100,即1s內容許經過的請求數100,每來一個請求計數器加1,超過100的請求拒絕、時間過1s後計數器清0,從新計數。這樣限流比較暴力,若是前10ms 來了100個請求,那剩下的990ms只能眼睜睜看着請求被過濾掉,並不能平滑處理這些請求,容易出現常說的「突刺現象」。redis
計數器算法流程圖以下
算法
請求到來時先放入漏桶中,漏桶再以勻速放行請求,若是進來請求超出了漏桶的容量時,則拒絕請求,這樣作雖然可以避免「突刺現象」,可是過於平滑並不能應對短時的突發流量。
具體實現可採起將到來的請求放入隊列中,再另起線程從隊列中勻速拿出請求放行。spring
假設有個桶,而且會以必定速率往桶中投放令牌,每次請求來時都要去桶中拿令牌,若是拿到則放行,拿不到則進行等待直至拿到令牌爲止,好比以每秒100的速度往桶中投放令牌,令牌桶初始化一秒事後桶內有100個令牌,若是大量請求來時會當即消耗完100個令牌,其他請求進行等待,最終以勻速方式放行這些請求。此算法的好處在於既能應對短暫瞬時流量,又能夠平滑處理請求。數組
令牌桶限流流程圖
緩存
核心限流類:org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter
核心方法以下:安全
public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } Config routeConfig = loadConfiguration(routeId); //令牌桶平均投放速率 int replenishRate = routeConfig.getReplenishRate(); //桶容量 int burstCapacity = routeConfig.getBurstCapacity(); try { //獲取限流key List<String> keys = getKeys(id); //組裝lua腳本執行參數,第一個參數投放令牌速率、第二個參數桶的容量、第三個參數當前時間戳,第四個參數須要獲取的令牌個數 List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); //經過lua腳本與redis交互獲取令牌,返回數組,數組第一個元素表明是否獲取成功(1成功0失敗),第二個參數表明剩餘令牌數 Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); //若是獲取令牌異常,默認設置獲取結果【一、-1】,顧默認獲取令牌成功、剩餘令牌-1,不作限流控制 return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); }
gateway限流lua腳本實現以下:
微信
lua腳本分析及備註以下:多線程
--令牌桶剩餘令牌數key local tokens_key = KEYS[1] --令牌桶最後填充時間key local timestamp_key = KEYS[2] --往令牌桶投放令牌速率 local rate = tonumber(ARGV[1]) --令牌桶大小 local capacity = tonumber(ARGV[2]) --當前數據戳 local now = tonumber(ARGV[3]) --請求獲取令牌數量 local requested = tonumber(ARGV[4]) --計算令牌桶填充滿須要的時間 local fill_time = capacity/rate --保證時間充足 local ttl = math.floor(fill_time*2) --獲取redis中剩餘令牌數 local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --獲取redis中最後一次更新令牌的時間 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end local delta = math.max(0, now-last_refreshed) --計算出須要更新redis裏的令牌桶數量(經過 過去的時間間隔內須要投放的令牌數+桶剩餘令牌) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 --消耗令牌後,從新計算出須要更新redis緩存裏的令牌數 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --互斥更新redis 裏的剩餘令牌數 redis.call("setex", tokens_key, ttl, new_tokens) --互斥更新redis 裏的最新更新令牌時間 redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
spring cloud gateway經過redis實現令牌桶算法的流程圖以下
併發
總結: 經過redis 實現令牌桶算法限流,支持集羣限流、但限速有上限,畢竟和redis交互須要消耗較長時間,限流沒加鎖雖然能夠提高網關吞吐量,但實際並非知足線程安全,且還存在一個問題,例如桶大小10,往桶投放令牌速率爲100/1s,當桶內10令牌消耗完後,這時兩個正常的請求q1 和q2同時進入網關,若是q1恰好拿到產生新的令牌放行,q2則須要再過10ms才能獲取新的令牌,因爲兩個請求間隔很短<10ms,致使q2去桶中拿不到令牌而被攔截爲超速請求,致使緣由gateway未對消耗完桶後的請求進行入隊等待。app
測試
設置令牌桶大小爲5,投放速率爲10/s ,配置以下
server: port: 8081 spring: cloud: gateway: routes: - id: limit_route uri: http://localhost:19090 predicates: - After=2017-01-20T17:42:47.789-07:00[America/Denver] filters: - name: RequestRateLimiter args: key-resolver: '#{@uriKeyResolver}' redis-rate-limiter.replenishRate: 10 redis-rate-limiter.burstCapacity: 5 application: name: gateway-limiter
如今用jmeter模擬10個併發請求,查看可以正常經過的請求數有多少?
運行結果:
經過打印結果發現,10個請求被攔截了5個請求。在實際應該中,10個請求或許都是正常請求,並無超過10qps卻被攔截。
zuul-ratelimt 支持memory、redis限流,經過計數器算法實現限流,即在窗口時間內消耗指定數量的令牌後限流,窗口時間刷新後從新指定消耗令牌數量爲0。
核心代碼以下:
public class RateLimitFilter extends ZuulFilter { public static final String LIMIT_HEADER = "X-RateLimit-Limit"; public static final String REMAINING_HEADER = "X-RateLimit-Remaining"; public static final String RESET_HEADER = "X-RateLimit-Reset"; private static final UrlPathHelper URL_PATH_HELPER = new UrlPathHelper(); private final RateLimiter rateLimiter; private final RateLimitProperties properties; private final RouteLocator routeLocator; private final RateLimitKeyGenerator rateLimitKeyGenerator; @Override public String filterType() { return "pre"; } @Override public int filterOrder() { return -1; } @Override public boolean shouldFilter() { return properties.isEnabled() && policy(route()).isPresent(); } public Object run() { final RequestContext ctx = RequestContext.getCurrentContext(); final HttpServletResponse response = ctx.getResponse(); final HttpServletRequest request = ctx.getRequest(); final Route route = route(); policy(route).ifPresent(policy -> //生成限流key final String key = rateLimitKeyGenerator.key(request, route, policy); //執行核心限流方法,返回剩餘能夠用令牌數,若是rate.remaining<0則已超出流量限制 final Rate rate = rateLimiter.consume(policy, key); response.setHeader(LIMIT_HEADER, policy.getLimit().toString()); response.setHeader(REMAINING_HEADER, String.valueOf(Math.max(rate.getRemaining(), 0))); response.setHeader(RESET_HEADER, rate.getReset().toString()); if (rate.getRemaining() < 0) { ctx.setResponseStatusCode(TOO_MANY_REQUESTS.value()); ctx.put("rateLimitExceeded", "true"); throw new ZuulRuntimeException(new ZuulException(TOO_MANY_REQUESTS.toString(), TOO_MANY_REQUESTS.value(), null)); } }); return null; } }
核心限流方法rateLimiter.consume(policy, key)代碼以下:
public abstract class AbstractCacheRateLimiter implements RateLimiter { @Override public synchronized Rate consume(Policy policy, String key, Long requestTime) { final Long refreshInterval = policy.getRefreshInterval(); final Long quota = policy.getQuota() != null ? SECONDS.toMillis(policy.getQuota()) : null; final Rate rate = new Rate(key, policy.getLimit(), quota, null, null); calcRemainingLimit(policy.getLimit(), refreshInterval, requestTime, key, rate); return rate; } protected abstract void calcRemainingLimit(Long limit, Long refreshInterval, Long requestTime, String key, Rate rate); } @Slf4j @RequiredArgsConstructor @SuppressWarnings("unchecked") public class RedisRateLimiter extends AbstractCacheRateLimiter { private final RateLimiterErrorHandler rateLimiterErrorHandler; private final RedisTemplate redisTemplate; @Override protected void calcRemainingLimit(final Long limit, final Long refreshInterval, final Long requestTime, final String key, final Rate rate) { if (Objects.nonNull(limit)) { long usage = requestTime == null ? 1L : 0L; Long remaining = calcRemaining(limit, refreshInterval, usage, key, rate); rate.setRemaining(remaining); } } private Long calcRemaining(Long limit, Long refreshInterval, long usage, String key, Rate rate) { rate.setReset(SECONDS.toMillis(refreshInterval)); Long current = 0L; try { current = redisTemplate.opsForValue().increment(key, usage); // Redis returns the value of key after the increment, check for the first increment, and the expiration time is set if (current != null && current.equals(usage)) { handleExpiration(key, refreshInterval); } } catch (RuntimeException e) { String msg = "Failed retrieving rate for " + key + ", will return the current value"; rateLimiterErrorHandler.handleError(msg, e); } return Math.max(-1, limit - current); } private void handleExpiration(String key, Long refreshInterval) { try { this.redisTemplate.expire(key, refreshInterval, SECONDS); } catch (RuntimeException e) { String msg = "Failed retrieving expiration for " + key + ", will reset now"; rateLimiterErrorHandler.handleError(msg, e); } } }
總結: 你們有沒有注意到限流方法前面加了synchronized 鎖,雖然保證了線程安全,但這裏會存在一個問題,若是此限流方法執行時間2ms,即持鎖時間過長(主要是和redis交互耗時),會致使整個網關的吞吐量不會超過500qps,因此在用redis作限流時建議作分key鎖,每一個限流key之間互不影響,即保證了限流的安全性,又提升了網關的吞吐量。用memory作限流不須要考慮這個問題,由於本地限流持鎖時間足夠短,便是執行限流方法是串行的,但也能夠擁有很高的吞吐量,zuul-ratelimt限流算法採用計數器限流,顧都有一個通病,避免不了「突刺現象」。
Guava RateLimiter提供了令牌桶算法實現:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實現,代碼實現時序圖以下:
測試
平滑預熱限流,qps爲10,看下去拿10個令牌依次耗時狀況
/** * 平滑預熱限流(SmoothWarmingUp) */ public class SmoothWarmingUp { public static void main(String[] args) { RateLimiter limiter = RateLimiter.create(10, 1000, TimeUnit.MILLISECONDS); for(int i = 0; i < 10;i++) { //獲取一個令牌 System.out.println(limiter.acquire(1)); } } }
運行結果:返回線程等待的時間
平滑突發限流
/* 平滑突發限流(SmoothBursty) */ public class SmoothBurstyRateLimitTest { public static void main(String[] args) { //QPS = 5,每秒容許5個請求 RateLimiter limiter = RateLimiter.create(5); //limiter.acquire() 返回獲取token的耗時,以秒爲單位 System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); } }
運行結果:
總結:Guava限流思路主要是經過計算下一個可用令牌的等待時間,去休眠線程,休眠結束後默認成功得到令牌,平滑預熱算法SmoothWarmingUp也是相似,只是剛開始計算獲取令牌的速率要比設定限流的速率底,最後再慢慢趨於限流速率。SmoothWarmingUp限流不適用中低頻限流,在常規的應用限流中,好比咱們設定guava的限速爲100qps,在同一個時間點來了q一、q二、q3三個正常請求,那麼q1會被迫等待10ms,q2被迫等待20ms,q3被迫等待30ms,在除高併發的應用場景中是常常出現這種狀況的,應用持續高併發情景並很少,只是在較短期內來了多個正常請求,卻被迫等待必定時間,下降了請求的響應速度,在這種場景下算法顯得過於平滑,仍是主要適用高併發應用場景 如秒殺場景等。SmoothBursty限流則不會,它有「桶」的概念,「桶」中令牌沒拿完前是不會限速的,桶大小爲限流速率大小,不支持自動調整桶大小。
自寫限流目標:爲了即保證限流算法線程安全,又能提升網關吞吐量,my-ratelimit網關限流採用分key鎖,不一樣key之間限流互不影響。爲知足多業務場景,my-ratelimit支持了自定義限流維度、多維度限流、多維度自由排列組合限流、支持自選限流算法及倉庫類型。
總結:my-ratelimit令牌桶限流算法核心思想:每來一個請求都會先作投放令牌操做,投放數量根據當前時間距離上次投放時間的時間段佔1s的比例乘以限流速率limit計算而得,可能投放數量爲0,投放完後再去桶中取令牌,若是取到了令牌則請求放行,若沒有令牌則線程進入AQS同步隊列中,直到有令牌產生再依次去喚醒隊列中的線程來獲取令牌。在實際的業務場景中,高頻的時間段其實並很少,大都是低頻的請求,爲了儘量提升請求響應速度,知足低頻「不限流」,高頻平滑限流的指標,剛來的請求不會先入AQS同步隊列中,而是先去拿令牌,當拿不到令牌時說明此時段流量比較大,再進入隊列等待獲取令牌達到平滑限流目的。另外在進來的請求前加了一個判斷,則是若是等待隊列大小已經到達了限流的速率limit大小了,則說明此時段請求已超速,顧直接拒絕請求。
爲了測試限流算法自己耗時狀況,先用單線程來測試,設置每秒產生10w的令牌,桶大小爲1,平滑拿完這些令牌須要多少時間。
測試代碼:
public static void singleCatfishLimit(long permitsRatelimit) throws InterruptedException { RateLimitPermit rateLimitPermit = RateLimitPermit.create(1,permitsRatelimit,1); int hastoken=0; long start =System.nanoTime(); for(int i=0 ; i < permitsRatelimit*1 ; i++){ if(rateLimitPermit.acquire()>=0){ hastoken++; } } System.out.println("catfishLimit use time:"+(NANOSECONDS.toMillis(System.nanoTime()-start-SECONDS.toNanos(1) ) ) + " ms" ); System.out.println("single thread hold Permit:"+hastoken); } public static void main(String[] args) throws Exception { singleCatfishLimit(100000); //guavaLimit(); //multCatfishLimit(2000,10000); }
運行結果:
說明:10w令牌平滑拿完用了115ms,平均每次限流邏輯執行時間1微秒左右,幾乎能夠忽略不計。
接下來測試多線程狀況,設置併發請求2000個,限流qps爲10000,測試用時
public static void multCatfishLimit(int threadCount ,long permitsRatelimit) throws InterruptedException { CountDownLatch countDownLatch=new CountDownLatch(threadCount); AtomicInteger hastoken= new AtomicInteger(0); CyclicBarrier cyclicBarrier= new CyclicBarrier(threadCount); RateLimitPermit rateLimitPermit = RateLimitPermit.create(1,permitsRatelimit,1); AtomicLong startTime= new AtomicLong(0); for (int i = 0; i < threadCount; i++) { Thread thread=new Thread(()->{ try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } startTime.compareAndSet(0,System.nanoTime()); for (int j = 0; j < 1; j++) { if( rateLimitPermit.acquire()>=0){ hastoken.incrementAndGet(); } } countDownLatch.countDown(); },"ratelimit-"+i); thread.start(); } countDownLatch.await(); System.out.println("catfishLimit use time:"+ (long)( NANOSECONDS.toMillis( System.nanoTime()-startTime.get() ) -Math.min(hastoken.get()*1.0/permitsRatelimit*1000L, threadCount*1.0/permitsRatelimit * SECONDS.toMillis(1)) )+" ms"); System.out.println("mult thread hold Permit:"+hastoken.get()); } public static void main(String[] args) throws Exception { singleCatfishLimit(100000); //guavaLimit(); multCatfishLimit(2000,10000); }
運行結果:
說明:看到結果會發現,qps爲1w時,2000個線程去拿令牌用時127ms,這是爲何呢,其實這裏經過cyclicBarrier控制併發請求,請求數未到達2000時 進入wait,到達2000時才signalAll,這裏換醒線程是有時間差的,很難經過程序控制多線程在同一個時間點同時執行,這裏統計出的時間存在偏差。
測qps爲100時,2000個請求同時去拿令牌耗時,能拿到多少令牌
public static void main(String[] args) throws Exception { // singleCatfishLimit(100000); //guavaLimit(); multCatfishLimit(2000,100); }
運行結果:
說明:2000個線程卻拿到了155個令牌,拿令牌操做用時12ms,總用時1550ms+12ms,爲啥沒有精確拿到100個令牌,緣由仍是2000個線程未在同一個時間點執行拿令牌操做,經過打印線程喚醒時間發現,2000個線程被喚醒的最大時間差爲566ms。
總結:該限流算法特別支持少許限流器,高併發限流,由於算法獲取不到令牌會循環往桶投放令牌,若是限流器多致使N個循環投放令牌操做,增長cpu壓力
算法流程圖以下: