分佈式環境下限流方案的實現redis RateLimiter Guava,Token Bucket, Leaky Bucket

Redis RateLimiter Guava

場景描述

    對於web應用的限流,光看標題,彷佛過於抽象,難以理解,那咱們仍是以具體的某一個應用場景來引入這個話題吧。 
在平常生活中,咱們確定收到過很多很多這樣的短信,「雙11約嗎?,千款….」,「您有幸得到唱讀卡,趕快戳連接…」。這種類型的短信是屬於推廣性質的短信。爲何我要說這個呢?聽我慢慢道來。 
通常而言,對於推廣營銷類短信,它們針對某一羣體(譬如註冊會員)進行定點推送,有時這個羣體的成員量比較大,譬如京東的會員,能夠達到千萬級別。所以相應的,發送推廣短信的量也會增大。然而,要完成這些短信發送,咱們是須要調用服務商的接口來完成的。假若一次發送的量在200萬條,而咱們的服務商接口每秒能處理的短信發送量有限,只能達到200條每秒。那麼這個時候就會產生問題了,咱們如何能控制好程序發送短信時的速度暱?因而限流這個功能就得加上了前端

生產環境背景 

  1. 服務商接口所能提供的服務上限是400條/s 
  2. 業務方調用短信發送接口的速度未知,QPS可能達到800/s,1200/s,或者更高 
  3. 當服務商接口訪問頻率超過400/s時,超過的量將拒絕服務,多出的信息將會丟失 
  4. 線上爲多節點佈置,但調用的是同一個服務商接口

需求分析 

鑑於業務方對短信發送接口的調用頻率未知,而服務商的接口服務有上限,爲保證服務的可用性,業務層須要對接口調用方的流量進行限制—–接口限流java

需求設計

方案1、在提供給業務方的Controller層進行控制

  • 使用guava提供工具庫裏的RateLimiter類(內部採用令牌捅算法實現)進行限流
/**核心代碼片斷*/
private RateLimiter rateLimiter = RateLimiter.create(400);//400表示每秒容許處理的量是400
if(rateLimiter.tryAcquire()) {
    //短信發送邏輯能夠在此處
}
  • 使用Java自帶delayqueue的延遲隊列實現(編碼過程相對麻煩,此處省略代碼)
  • 使用Redis實現,存儲兩個key,一個用於計時,一個用於計數。請求每調用一次,計數器增長1,若在計時器時間內計數器未超過閾值,則能夠處理任務
if (!cacheDao.hasKey(API_WEB_TIME_KEY)) {
    cacheDao.putToValue(API_WEB_TIME_KEY, 0, (long) 1, TimeUnit.SECONDS);
}
if (cacheDao.hasKey(API_WEB_TIME_KEY) && cacheDao.incrBy(API_WEB_COUNTER_KEY, (long) 1) > (long) 400) {
    LOGGER.info("調用頻率過快");
}
//短信發送邏輯

方案2、在短信發送至服務商時作限流處理 

方案3、同時使用方案一和方案二

  • 可行性分析 
    最快捷且有效的方式是使用RateLimiter實現,可是這很容易踩到一個坑,單節點模式下,使用RateLimiter進行限流一點問題都沒有。可是…線上是分佈式系統,佈署了多個節點,並且多個節點最終調用的是同一個短信服務商接口。雖然咱們對單個節點能作到將QPS限制在400/s,可是多節點條件下,若是每一個節點均是400/s,那麼到服務商那邊的總請求就是節點數x400/s,因而限流效果失效。使用該方案對單節點的閾值控制是難以適應分佈式環境的,至少目前我還沒想到更爲合適的方式。 
    對於第二種,使用delayqueue方式。其實主要存在兩個問題,1:短信系統自己就用了一層消息隊列,有用kafka,或者rabitmq,若是再加一層延遲隊列,從設計上來講是不太合適的。2:實現delayqueue的過程相對較麻煩,耗時可能比較長,並且達不到精準限流的效果 
    對於第三種,使用redis進行限流,其很好地解決了分佈式環境下多實例所致使的併發問題。由於使用redis設置的計時器和計數器均是全局惟一的,無論多少個節點,它們使用的都是一樣的計時器和計數器,所以能夠作到很是精準的流控。同時,這種方案編碼並不複雜,可能須要的代碼不超過10行。nginx

  • 實施方案 
    根據可行性分析可知,整個系統採起redis限流處理是成本最低且最高效的。 
    具體實現web

一、在Controller層設置兩個全局key,一個用於計數,另外一個用於計時redis

private static final String API_WEB_TIME_KEY = "time_key";
private static final String API_WEB_COUNTER_KEY = "counter_key";

二、對時間key的存在與否進行判斷,並對計數器是否超過閾值進行判斷算法

if (!cacheDao.hasKey(API_WEB_TIME_KEY)) {
    cacheDao.putToValue(API_WEB_TIME_KEY, 0, (long) 1, TimeUnit.SECONDS);
    cacheDao.putToValue(API_WEB_COUNTER_KEY, 0, (long) 2, TimeUnit.SECONDS);//時間到就從新初始化爲
}
if (cacheDao.hasKey(API_WEB_TIME_KEY) && cacheDao.incrBy(API_WEB_COUNTER_KEY, (long) 1) > (long) 400) {
    LOGGER.info("調用頻率過快");
}
//短信發送邏輯

Token Bucket/Leaky Bucket

場景描述

     不少作服務接口的人或多或少的遇到這樣的場景,因爲業務應用系統的負載能力有限,爲了防止非預期的請求對系統壓力過大而拖垮業務應用系統。spring

    也就是面對大流量時,如何進行流量控制?sql

    服務接口的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然下降了服務接口的訪問頻率和併發量,卻換取服務接口和業務應用系統的高可用。數據庫

     實際場景中經常使用的限流策略:緩存

  • Nginx前端限流

         按照必定的規則如賬號、IP、系統調用邏輯等在Nginx層面作限流

  • 業務應用系統限流

        一、客戶端限流

        二、服務端限流

  • 數據庫限流

        紅線區,力保數據庫

經常使用的限流算法

漏桶算法(Leaky Bucket)

         算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),而後就拒絕請求,能夠看出漏桶算法能強行限制數據的傳輸速率.

         示意圖以下:

        

         可見這裏有兩個變量,一個是桶的大小,支持流量突發增多時能夠存多少的水(burst),另外一個是水桶漏洞的大小(rate)。

         由於漏桶的漏出速率是固定的參數,因此,即便網絡中不存在資源衝突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.所以,漏桶算法對於存在突發特性的流量來講缺少效率.

令牌桶算法(Token Bucket)

         漏桶(Leaky Bucket) 效果同樣但方向相反的算法,更加容易理解.隨着時間流逝,系統會按恆定1/QPS時間間隔(若是QPS=100,則間隔是10ms)往桶裏加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),若是桶已經滿了就再也不加了.新請求來臨時,會各自拿走一個Token,若是沒有Token可拿了就阻塞或者拒絕服務.

        

  令牌桶的另一個好處是能夠方便的改變速度. 一旦須要提升速率,則按需提升放入桶中的令牌的速率. 通常會定時(好比100毫秒)往桶中增長必定數量的令牌, 有些變種算法則實時的計算應該增長的令牌的數量.

基於Redis功能的實現

       簡陋的設計思路:假設一個用戶(用IP判斷)每分鐘訪問某一個服務接口的次數不能超過10次,那麼咱們能夠在Redis中建立一個鍵,並此時咱們就設置鍵的過時時間爲60秒,每個用戶對此服務接口的訪問就把鍵值加1,在60秒內當鍵值增長到10的時候,就禁止訪問服務接口。在某種場景中添加訪問時間間隔仍是頗有必要的。

      1)使用Redis的incr命令,將計數器做爲Lua腳本 

local current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
  redis.call("expire",KEYS[1],1)
end

        Lua腳本在Redis中運行,保證了incr和expire兩個操做的原子性。

       2)使用Reids的列表結構代替incr命令

FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
  ERROR "too many requests per second"
ELSE
  IF EXISTS(ip) == FALSE
      MULTI
          RPUSH(ip,ip)
          EXPIRE(ip,1)
      EXEC
  ELSE
      RPUSHX(ip,ip)
  END
  PERFORM_API_CALL()
END

         Rate Limit使用Redis的列表做爲容器,LLEN用於對訪問次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個命令,用於在第一次執行計數是建立列表並設置過時時間,

         RPUSHX在後續的計數操做中進行增長操做。

基於令牌桶算法的實現

       令牌桶算法能夠很好的支撐忽然額流量的變化即滿令牌桶數的峯值。

import com.google.common.base.Preconditions;
import org.springframework.context.Lifecycle;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TokenBucket implements Lifecycle {
    // 默認桶大小個數 即最大瞬間流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
    // 一個桶的單位是1字節
    private int everyTokenSize = 1;
    // 瞬間最大流量
    private int maxFlowRate;
    // 平均流量
    private int avgFlowRate;
    // 隊列來緩存桶數量:最大的流量峯值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<>(DEFAULT_BUCKET_SIZE);

    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    private volatile boolean isStart = false;

    private ReentrantLock lock = new ReentrantLock(true);

    private static final byte A_CHAR = 'a';

    public TokenBucket() {
    }

    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public void addTokens(Integer tokenNum) {
        // 如果桶已經滿了,就再也不家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(A_CHAR);
        }
    }

    public TokenBucket build() {
        start();
        return this;
    }

    /**
     * 獲取足夠的令牌個數
     */
    public boolean getTokens(byte[] dataSize) {
        Preconditions.checkNotNull(dataSize);
        Preconditions.checkArgument(isStart, "please invoke start method first !");

        int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量
            if (!result) {
                return false;
            }
            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }
            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void start() {
        // 初始化桶隊列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<>(maxFlowRate);
        }
        // 初始化令牌生產者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
        isStart = true;
    }

    @Override
    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }

    @Override
    public boolean isRunning() {
        return isStart;
    }

    class TokenProducer implements Runnable {
        private int avgFlowRate;
        private TokenBucket tokenBucket;

        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }

        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }

    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }

    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }

    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }

    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }

    private String stringCopy(String data, int copyNum) {
        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }
        return sbuilder.toString();
    }

    public static void main(String[] args) throws InterruptedException {
        tokenTest();
    }

    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<>(10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }

    private static void tokenTest() throws InterruptedException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
        String data = "xxxx";// 四個字節
        for (int i = 1; i <= 1000; i++) {
            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                System.out.println("token pass --- index:" + i1);
            } else {
                System.out.println("token rejuect --- index" + i1);
            }
        }
    }
}

在開發高併發系統時有三把利器用來保護系統:緩存、降級和限流。

  1. 緩存的目的是提高系統訪問速度和增大系統能處理的容量,可謂是抗高併發流量的銀彈;
  2. 而降級是當服務出問題或者影響到核心流程的性能則須要暫時屏蔽掉,待高峯或者問題解決後再打開;
  3. 而有些場景並不能用緩存和降級來解決,好比稀缺資源(秒殺、搶購)、寫服務(如評論、下單)、頻繁的複雜查詢(評論的最後幾頁),所以需有一種手段來限制這些場景的併發/請求量,即限流。

         限流的目的是經過對併發訪問/請求進行限速或者一個時間窗口內的的請求進行限速來保護系統,一旦達到限制速率則能夠拒絕服務(定向到錯誤頁或告知資源沒有了)、排隊或等待(好比秒殺、評論、下單)、降級(返回兜底數據或默認數據,如商品詳情頁庫存默認有貨)。

         通常開發高併發系統常見的限流有:限制總併發數(好比數據庫鏈接池、線程池)、限制瞬時併發數(如nginx的limit_conn模塊,用來限制瞬時併發鏈接數)、限制時間窗口內的平均速率(如Guava的RateLimiter、nginx的limit_req模塊,限制每秒的平均速率);其餘還有如限制遠程接口調用速率、限制MQ的消費速率。另外還能夠根據網絡鏈接數、網絡流量、CPU或內存負載等來限流。

          先有緩存這個銀彈,後有限流來應對61八、雙十一高併發流量,在處理高併發問題上能夠說是如虎添翼,不用擔憂瞬間流量致使系統掛掉或雪崩,最終作到有損服務而不是不服務;限流須要評估好,不可亂用,不然會正常流量出現一些奇怪的問題而致使用戶抱怨。

          在實際應用時也不要太糾結算法問題,由於一些限流算法實現是同樣的只是描述不同;具體使用哪一種限流技術仍是要根據實際場景來選擇,不要一味去找最佳模式,白貓黑貓能解決問題的就是好貓。

          因在實際工做中遇到過許多人來問如何進行限流,所以本文會詳細介紹各類限流手段。那麼接下來咱們從限流算法、應用級限流、分佈式限流、接入層限流來詳細學習下限流技術手段。

限流算法

          常見的限流算法有:令牌桶、漏桶。計數器也能夠進行粗暴限流實現。

令牌桶和漏桶對比

  • 令牌桶是按照固定速率往桶中添加令牌,請求是否被處理須要看桶中令牌是否足夠,當令牌數減爲零時則拒絕新的請求;

  • 漏桶則是按照常量固定速率流出請求,流入請求速率任意,當流入的請求數累積到漏桶容量時,則新流入的請求被拒絕;

  • 令牌桶限制的是平均流入速率(容許突發請求,只要有令牌就能夠處理,支持一次拿3個令牌,4個令牌),並容許必定程度突發流量;

  • 漏桶限制的是常量流出速率(即流出速率是一個固定常量值,好比都是1的速率流出,而不能一次是1,下次又是2),從而平滑突發流入速率;

  • 令牌桶容許必定程度的突發,而漏桶主要目的是平滑流入速率;

  • 兩個算法實現能夠同樣,可是方向是相反的,對於相同的參數獲得的限流效果是同樣的。

另外有時候咱們還使用計數器來進行限流,主要用來限制總併發數,好比數據庫鏈接池、線程池、秒殺的併發數;只要全局總請求數或者必定時間段的總請求數設定的閥值則進行限流,是簡單粗暴的總數量限流,而不是平均速率限流。

應用級限流

限流總併發/鏈接/請求數

對於一個應用系統來講必定會有極限併發/請求數,即總有一個TPS/QPS閥值,若是超了閥值則系統就會不響應用戶請求或響應的很是慢,所以咱們最好進行過載保護,防止大量請求涌入擊垮系統。

若是你使用過Tomcat,其Connector 其中一種配置有以下幾個參數:

  • acceptCount:若是Tomcat的線程都忙於響應,新來的鏈接會進入隊列排隊,若是超出排隊大小,則拒絕鏈接;
  • maxConnections: 瞬時最大鏈接數,超出的會排隊等待;
  • maxThreads:Tomcat能啓動用來處理請求的最大線程數,若是請求處理量一直遠遠大於最大線程數則可能會僵死。

詳細的配置請參考官方文檔。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都會有相似的限制鏈接數的配置。

限流總資源數

          若是有的資源是稀缺資源(如數據庫鏈接、線程),並且可能有多個系統都會去使用它,那麼須要限制應用;可使用池化技術來限制總資源數:鏈接池、線程池。好比分配給每一個應用的數據庫鏈接是100,那麼本應用最多可使用100個資源,超出了能夠等待或者拋異常。

限流某個接口的總併發/請求數

          若是接口可能會有突發訪問狀況,但又擔憂訪問量太大形成崩潰,如搶購業務;這個時候就須要限制這個接口的總併發/請求數總請求數了;由於粒度比較細,能夠爲每一個接口都設置相應的閥值。可使用Java中的AtomicLong進行限流:

try {
    if(atomic.incrementAndGet() > 限流數) {
        //拒絕請求
    }
    //處理請求
} finally {
    atomic.decrementAndGet();
}

          適合對業務無損的服務或者須要過載保護的服務進行限流,如搶購業務,超出了大小要麼讓用戶排隊,要麼告訴用戶沒貨了,對用戶來講是能夠接受的。而一些開放平臺也會限制用戶調用某個接口的試用請求量,也能夠用這種計數器方式實現。這種方式也是簡單粗暴的限流,沒有平滑處理,須要根據實際狀況選擇使用;

限流某個接口的時間窗請求數

          即一個時間窗口內的請求數,如想限制某個接口/服務每秒/每分鐘/天天的請求數/調用量。如一些基礎服務會被不少其餘系統調用,好比商品詳情頁服務會調用基礎商品服務調用,可是怕由於更新量比較大將基礎服務打掛,這時咱們要對每秒/每分鐘的調用量進行限速;一種實現方式以下所示:

LoadingCache<Long, AtomicLong> counter =
        CacheBuilder.newBuilder()
                .expireAfterWrite(2, TimeUnit.SECONDS)
                .build(new CacheLoader<Long, AtomicLong>() {
                    @Override
                    public AtomicLong load(Long seconds) throws Exception {
                        return new AtomicLong(0);
                    }
                });
long limit = 1000;
while (true) {
    //獲得當前秒
    long currentSeconds = System.currentTimeMillis() / 1000;
    if (counter.get(currentSeconds).incrementAndGet() > limit) {
        System.out.println("限流了:" + currentSeconds);
        continue;
    }
    //業務處理
}

          咱們使用Guava的Cache來存儲計數器,過時時間設置爲2秒(保證1秒內的計數器是有的),而後咱們獲取當前時間戳而後取秒數來做爲KEY進行計數統計和限流,這種方式也是簡單粗暴,剛纔說的場景夠用了。

平滑限流某個接口的請求數

          以前的限流方式都不能很好地應對突發請求,即瞬間請求可能都被容許從而致使一些問題;所以在一些場景中須要對突發請求進行整形,整形爲平均速率請求處理(好比5r/s,則每隔200毫秒處理一個請求,平滑了速率)。這個時候有兩種算法知足咱們的場景:令牌桶和漏桶算法。Guava框架提供了令牌桶算法實現,可直接拿來使用。

          Guava RateLimiter提供了令牌桶算法實現:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實現。

SmoothBursty

RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());

 將獲得相似以下的輸出:

0.0
0.198239
0.196083
0.200609
0.199599
0.19961

          RateLimiter.create(5) 表示桶容量爲5且每秒新增5個令牌,即每隔200毫秒新增一個令牌;

          limiter.acquire()表示消費一個令牌,若是當前桶中有足夠令牌則成功(返回值爲0),若是桶中沒有令牌則暫停一段時間,好比發令牌間隔是200毫秒,則等待200毫秒後再去消費令牌(如上測試用例返回的爲0.198239,差很少等待了200毫秒桶中才有令牌可用),這種實現將突發請求速率平均爲了固定請求速率。

再看一個突發示例:

RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire(5));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));

將獲得相似以下的輸出:

 

0.0
0.98745
0.183553
0.199909

          limiter.acquire(5)表示桶的容量爲5且每秒新增5個令牌,令牌桶算法容許必定程度的突發,因此能夠一次性消費5個令牌,但接下來的limiter.acquire(1)將等待差很少1秒桶中才能有令牌,且接下來的請求也整形爲固定速率了。

RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire(10));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));

將獲得相似以下的輸出:

0.0
1.997428
0.192273
0.200616

          同上邊的例子相似,第一秒突發了10個請求,令牌桶算法也容許了這種突發(容許消費將來的令牌),但接下來的limiter.acquire(1)將等待差很少2秒桶中才能有令牌,且接下來的請求也整形爲固定速率了。

RateLimiter limiter = RateLimiter.create(2);
System.out.println(limiter.acquire());
Thread.sleep(2000L);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());

將獲得相似以下的輸出:

0.0
0.0
0.0
0.0
0.499876
0.495799

          建立了一個桶容量爲2且每秒新增2個令牌;

          首先調用limiter.acquire()消費一個令牌,此時令牌桶能夠知足(返回值爲0);

          而後線程暫停2秒,接下來的兩個limiter.acquire()都能消費到令牌,第三個limiter.acquire()也一樣消費到了令牌,到第四個時就須要等待500毫秒了。

          此處能夠看到咱們設置的桶容量爲2(即容許的突發量),這是由於SmoothBursty中有一個參數:最大突發秒數(maxBurstSeconds)默認值是1s,突發量/桶容量=速率*maxBurstSeconds,因此本示例桶容量/突發量爲2,例子中前兩個是消費了以前積攢的突發量,而第三個開始就是正常計算的了。令牌桶算法容許將一段時間內沒有消費的令牌暫存到令牌桶中,留待將來使用,並容許將來請求的這種突發。

          SmoothBursty經過平均速率和最後一次新增令牌的時間計算出下次新增令牌的時間的,另外須要一個桶暫存一段時間內沒有使用的令牌(便可以突發的令牌數)。另外RateLimiter還提供了tryAcquire方法來進行無阻塞或可超時的令牌消費。

          由於SmoothBursty容許必定程度的突發,會有人擔憂若是容許這種突發,假設忽然間來了很大的流量,那麼系統極可能扛不住這種突發。所以須要一種平滑速率的限流工具,從而系統冷啓動後慢慢的趨於平均固定速率(即剛開始速率小一些,而後慢慢趨於咱們設置的固定速率)。Guava也提供了SmoothWarmingUp來實現這種需求,其能夠認爲是漏桶算法,可是在某些特殊場景又不太同樣。

          SmoothWarmingUp建立方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit)

          permitsPerSecond表示每秒新增的令牌數,warmupPeriod表示在從冷啓動速率過渡到平均速率的時間間隔。

示例以下:

RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
for(int i = 1; i < 5;i++) {
    System.out.println(limiter.acquire());
}
Thread.sleep(1000L);
for(int i = 1; i < 5;i++) {
    System.out.println(limiter.acquire());
}

將獲得相似以下的輸出:

0.0
0.51767
0.357814
0.219992
0.199984
0.0
0.360826
0.220166
0.199723
0.199555

          速率是梯形上升速率的,也就是說冷啓動時會以一個比較大的速率慢慢到平均速率;而後趨於平均速率(梯形降低到平均速率)。能夠經過調節warmupPeriod參數實現一開始就是平滑固定速率。

          到此應用級限流的一些方法就介紹完了。假設將應用部署到多臺機器,應用級限流方式只是單應用內的請求限流,不能進行全侷限流。所以咱們須要分佈式限流和接入層限流來解決這個問題。

分佈式限流

redis+lua實現中的lua腳本:

local key = KEYS[1] --限流KEY(一秒一個)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call("INCRBY", key, "1")) --請求數+1
if current > limit then --若是超出限流大小
        return 0
elseif
        current == 1
then --只有第一次訪問須要設置2秒的過時時間
redis.call("expire", key,"2")
end
return 1

          如上操做因是在一個lua腳本中,又因Redis是單線程模型,所以是線程安全的。如上方式有一個缺點就是當達到限流大小後仍是會遞增的,能夠改形成以下方式實現:

local key = KEYS[1] --限流KEY(一秒一個)
local limit = tonumber(ARGV[1])        --限流大小
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --若是超出限流大小
        return 0
else  --請求數+1,並設置2秒過時
        redis.call("INCRBY", key,"1")
        redis.call("expire", key,"2")
        return 1
end

Java中判斷是否須要限流的代碼:

public static boolean acquire() throws Exception {
    String luaScript = Files.toString(new File("limit.lua"), Charset.defaultCharset());
    Jedis jedis = new Jedis("192.168.147.52", 6379);
    String key = "ip:" + System.currentTimeMillis() / 1000; //此處將當前時間戳取秒數
    Stringlimit = "3"; //限流大小
    return (Long) jedis.eval(luaScript, Lists.newArrayList(key), Lists.newArrayList(limit)) == 1;
}

          由於Redis的限制(Lua中有寫操做不能使用帶隨機性質的讀操做,如TIME)不能在Redis Lua中使用TIME獲取時間戳,所以只好從應用獲取而後傳入,在某些極端狀況下(機器時鐘不許的狀況下),限流會存在一些小問題。

使用Nginx+Lua實現的Lua腳本:

local locks = require "resty.lock"
local function acquire()
local lock =locks:new("locks")
local elapsed, err =lock:lock("limit_key") --互斥鎖
local limit_counter =ngx.shared.limit_counter --計數器

local key = "ip:" ..os.time()
local limit = 5 --限流大小
local current =limit_counter:get(key)

if current ~= nil and current + 1> limit then --若是超出限流大小
        lock:unlock()
        return 0
end
if current == nil then
        limit_counter:set(key, 1, 1) --第一次須要設置過時時間,設置key的值爲1,過時時間爲1秒
else
    limit_counter:incr(key, 1) --第二次開始加1便可
end
        lock:unlock()
        return 1
end
ngx.print(acquire())

          實現中咱們須要使用lua-resty-lock互斥鎖模塊來解決原子性問題(在實際工程中使用時請考慮獲取鎖的超時問題),並使用ngx.shared.DICT共享字典來實現計數器。若是須要限流則返回0,不然返回1。使用時須要先定義兩個共享字典(分別用來存放鎖和計數器數據):

http {
    ……
    lua_shared_dict locks 10m;
    lua_shared_dict limit_counter 10m;
}

          有人會糾結若是應用併發量很是大那麼redis或者nginx是否是能抗得住;不過這個問題要從多方面考慮:你的流量是否是真的有這麼大,是否是能夠經過一致性哈希將分佈式限流進行分片,是否是能夠當併發量太大降級爲應用級限流;對策很是多,能夠根據實際狀況調節;像在京東使用Redis+Lua來限流搶購流量,通常流量是沒有問題的。

          對於分佈式限流目前遇到的場景是業務上的限流,而不是流量入口的限流;流量入口限流應該在接入層完成,而接入層筆者通常使用Nginx。

相關文章
相關標籤/搜索