限流?python+redis就能搞定!

最近在瀏覽高併發相關內容並試圖點亮技能樹的時候,老是能看到一句話: 保護高併發系統的三大利器:緩存、降級和限流。那什麼是 限流呢?用我沒讀過太多書的話來說, 限流就是限制流量。咱們都知道服務器的處理能力是有上限的,若是超過了上限繼續聽任請求進來的話,可能會發生不可控的後果。而經過 限流,在請求數量超出閾值的時候就排隊等待甚至拒絕服務,就可使系統在扛不住太高併發的狀況下作到 有損服務而不是不服務。

舉個栗子🌰,最近新型肺炎肆虐,各地都出現口罩緊缺的狀況,廣州政府爲了緩解市民買不到口罩的情況,上線了預定服務,只有預定到的市民才能到指定的藥店購買少許口罩。這就是生活中限流的狀況,說這個也是但願你們這段時間保護好本身,注意防禦 :)
html

接下來就跟你們分享下接口限流的常見玩法吧,部分算法用python + redis粗略實現了一下,關鍵是圖解啊!你品,你細品~python

固定窗口法

固定窗口法是限流算法裏面最簡單的,好比我想限制1分鐘之內請求爲100個,從如今算起的一分鐘內,請求就最多就是100個,這分鐘過完的那一刻把計數器歸零,從新計算,周而復始。
git

固定窗口法
固定窗口法

僞代碼實現

def can_pass_fixed_window(user, action, time_zone=60, times=30):
    """
    :param user: 用戶惟一標識
    :param action: 用戶訪問的接口標識(即用戶在客戶端進行的動做)
    :param time_zone: 接口限制的時間段
    :param time_zone: 限制的時間段內容許多少請求經過
    """

    key = '{}:{}'.format(user, action)
    # redis_conn 表示redis鏈接對象
    count = redis_conn.get(key)
    if not count:
        count = 1
        redis_conn.setex(key, time_zone, count)
    if count < times:
        redis_conn.incr(key)
        return True

    return False
複製代碼

這個方法雖然簡單,但有個大問題是沒法應對兩個時間邊界內的突發流量。如上圖所示,若是在計數器清零的前1秒以及清零的後1秒都進來了100個請求,那麼在短期內服務器就接收到了兩倍的(200個)請求,這樣就有可能壓垮系統。會致使上面的問題是由於咱們的統計精度還不夠,爲了將臨界問題的影響下降,咱們可使用滑動窗口法。github

滑動窗口法

滑動窗口法,簡單來講就是隨着時間的推移,時間窗口也會持續移動,有一個計數器不斷維護着窗口內的請求數量,這樣就能夠保證任意時間段內,都不會超過最大容許的請求數。例如當前時間窗口是0s~60s,請求數是40,10s後時間窗口就變成了10s~70s,請求數是60。web

時間窗口的滑動和計數器可使用redis的有序集合(sorted set)來實現。score的值用毫秒時間戳來表示,能夠利用 當前時間戳 - 時間窗口的大小 來計算出窗口的邊界,而後根據score的值作一個範圍篩選就能夠圈出一個窗口;value的值僅做爲用戶行爲的惟一標識,也用毫秒時間戳就好。最後統計一下窗口內的請求數再作判斷便可。
redis

滑動窗口法
滑動窗口法

僞代碼實現

def can_pass_slide_window(user, action, time_zone=60, times=30):
    """
    :param user: 用戶惟一標識
    :param action: 用戶訪問的接口標識(即用戶在客戶端進行的動做)
    :param time_zone: 接口限制的時間段
    :param time_zone: 限制的時間段內容許多少請求經過
    """

    key = '{}:{}'.format(user, action)
    now_ts = time.time() * 1000
    # value是什麼在這裏並不重要,只要保證value的惟一性便可,這裏使用毫秒時間戳做爲惟一值
    value = now_ts 
    # 時間窗口左邊界
    old_ts = now_ts - (time_zone * 1000)
    # 記錄行爲
    redis_conn.zadd(key, value, now_ts)
    # 刪除時間窗口以前的數據
    redis_conn.zremrangebyscore(key, 0, old_ts)
    # 獲取窗口內的行爲數量
    count = redis_conn.zcard(key)
    # 設置一個過時時間省得佔空間
    redis_conn.expire(key, time_zone + 1)
    if not count or count < times:
        return True
    return False
複製代碼

雖然滑動窗口法避免了時間界限的問題,可是依然沒法很好解決細時間粒度上面請求過於集中的問題,就例如限制了1分鐘請求不能超過60次,請求都集中在59s時發送過來,這樣滑動窗口的效果就大打折扣。
爲了使流量更加平滑,咱們可使用更加高級的令牌桶算法和漏桶算法。算法

令牌桶法

令牌桶算法的思路不復雜,它先以固定的速率生成令牌,把令牌放到固定容量的桶裏,超過桶容量的令牌則丟棄,每來一個請求則獲取一次令牌,規定只有得到令牌的請求才能放行,沒有得到令牌的請求則丟棄。
編程


僞代碼實現

# 令牌桶法,具體步驟:
# 請求來了就計算生成的令牌數,生成的速率有限制
# 若是生成的令牌太多,則丟棄令牌
# 有令牌的請求才能經過,不然拒絕
def can_pass_token_bucket(user, action, time_zone=60, times=30):
    """
    :param user: 用戶惟一標識
    :param action: 用戶訪問的接口標識(即用戶在客戶端進行的動做)
    :param time_zone: 接口限制的時間段
    :param time_zone: 限制的時間段內容許多少請求經過
    """

    # 請求來了就倒水,倒水速率有限制
    key = '{}:{}'.format(user, action)
    rate = times / time_zone # 令牌生成速度
    capacity = times # 桶容量
    tokens = redis_conn.hget(key, 'tokens'# 看桶中有多少令牌
    last_time = redis_conn.hget(key, 'last_time'# 上次令牌生成時間
    now = time.time()
    tokens = int(tokens) if tokens else capacity
    last_time = int(last_time) if last_time else now
    delta_tokens = (now - last_time) * rate # 通過一段時間後生成的令牌
    if delta_tokens > 1:
        tokens = tokens + tokens # 增長令牌
        if tokens > tokens:
            tokens = capacity
        last_time = time.time() # 記錄令牌生成時間
        redis_conn.hset(key, 'last_time', last_time)

    if tokens >= 1:
        tokens -= 1 # 請求進來了,令牌就減小1
        redis_conn.hset(key, 'tokens', tokens)
        return True
    return False
複製代碼

令牌桶法限制的是請求的平均流入速率,優勢是能應對必定程度上的突發請求,也能在必定程度上保持流量的來源特徵,實現難度不高,適用於大多數應用場景。後端

漏桶算法

漏桶算法的思路與令牌桶算法有點相反。你們能夠將請求想象成是水流,水流能夠任意速率流入漏桶中,同時漏桶以固定的速率將水流出。若是流入速度太大會致使水滿溢出,溢出的請求被丟棄。
api


經過上圖能夠看出漏桶法的特色是: 不限制請求流入的速率,可是 限制了請求流出的速率。這樣突發流量能夠被整造成一個穩定的流量,不會發生超頻。

關於漏桶算法的實現方式有一點值得注意,我在瀏覽相關內容時發現網上大多數對於漏桶算法的僞代碼實現,都只是實現了

根據維基百科,漏桶算法的實現理論有兩種,分別是基於 meter 的基於 queue 的,他們實現的具體思路不一樣,我大概介紹一下。

基於meter的漏桶

基於 meter 的實現相對來講比較簡單,其實它就有一個計數器,而後有消息要發送的時候,就看計數器夠不夠,若是計數器沒有滿的話,那麼這個消息就能夠被處理,若是計數器不足以發送消息的話,那麼這個消息將會被丟棄。

那麼這個計數器是怎麼來的呢,基於 meter 的形式的計數器就是發送的頻率,例如你設置得頻率是不超過 5條/s ,那麼計數器就是 5,在一秒內你每發送一條消息就減小一個,當你發第 6 條的時候計時器就不夠了,那麼這條消息就被丟棄了。

這種實現有點相似最開始介紹的固定窗口法,只不過期間粒度再小一些,僞代碼就不上了。

基於queue的漏桶

基於 queue 的實現起來比較複雜,可是原理卻比較簡單,它也存在一個計數器,這個計數器卻不表示速率限制,而是表示 queue 的大小,這裏就是當有消息要發送的時候看 queue 中是否還有位置,若是有,那麼就將消息放進 queue 中,這個 queue 以 FIFO 的形式提供服務;若是 queue 沒有位置了,消息將被拋棄。

在消息被放進 queue 以後,還須要維護一個定時器,這個定時器的週期就是咱們設置的頻率週期,例如咱們設置得頻率是 5條/s,那麼定時器的週期就是 200ms,定時器每 200ms 去 queue 裏獲取一次消息,若是有消息,那麼就發送出去,若是沒有就輪空。

這種實現方式比較複雜,限於篇幅這裏就沒有實現了,可是貼心的我仍是爲你們找來了參考的栗子🌰。

熟悉python的朋友能夠參考aiolimiter的實現 👉🏻 python傳送門
熟悉go的朋友能夠參考uber的ratelimit的實現 👉🏻 go傳送門

注意,網上不少關於漏桶法的僞代碼實現只實現了水流入桶的部分,沒有實現關鍵的水從桶中漏出的部分。若是隻實現了前半部分,其實跟令牌桶沒有大的區別噢😯

若是以爲上面的都太難,很差實現,那麼我牆裂建議你嘗試一下redis-cell這個模塊!

redis-cell

Redis 4.0 提供了一個限流 Redis 模塊,它叫 redis-cell。該模塊也使用了漏斗算法,並提供了原子的限流指令。有了這個模塊,限流問題就很是簡單了。
這個模塊須要單獨安裝,安裝教程網上不少,它只有一個指令:CL.THROTTLE

CL.THROTTLE user123 15 30 60 1
                ▲    ▲  ▲  ▲ ▲
                |    |  |  | └───── apply 1 operation (default if omitted) 每次請求消耗的水滴
                |    |  └──┴─────── 30 operations / 60 seconds 漏水的速率
                |    └───────────── 15 max_burst 漏桶的容量
                └─────────────────── key 「user123」 用戶行爲
複製代碼

執行以上命令以後,redis會返回以下信息:

> cl.throttle laoqian:reply 15 30 60
1) (integer) 0   # 0 表示容許,1表示拒絕
2) (integer) 16  # 漏桶容量
3) (integer) 15  # 漏桶剩餘空間left_quota
4) (integer) -1  # 若是拒絕了,須要多長時間後再試(漏桶有空間了,單位秒)
5) (integer) 2   # 多長時間後,漏桶徹底空出來(單位秒)
複製代碼

有了上面的redis模塊,就能夠輕鬆對付大多數的限流場景了,簡直太方便了有木有!

後記&引用

限流算法就大概介紹到這裏了,這些算法圖雖然是參照了好幾篇文章所畫,但也是花了精力在上面的,但願能對你們產生幫助吧。而後我還將上面的代碼整理到了github,須要的朋友請戳 👉🏻 這裏


參考文章:

讀後三連⭐️

若是你以爲這篇內容對你有幫助,我想邀請你幫我三個小忙:

  1. 點贊/在看,讓更多的人也能看到這篇內容(這對我真的很重要)
  2. 關注公衆號「py一下」,不按期分享原創知識,python編程/後端/數據結構與算法/我的成長等
  3. 也多看看其餘文章
相關文章
相關標籤/搜索