限流,在一些咱們已知的場景有:nginx
1)在Tcp協議中,Flow Control,算法
流量控制以動態調整發送空間大小(滑動窗口)的形式來反映接收端接收消息的能力,反饋給發送端以調整發送速度,避免發送速度過快致使的丟包或者過慢下降總體性能。api
在Tcp協議中,經過在首部設置window size的值來控制窗口大小。網絡
2) 在Web sever中,使用nginx對請求訪問進行限流,基於nginx擴展模塊,多線程
3) 在如今一些主流的開放平臺,都有針對API調用的限制,好比淘寶開放平臺針對商品查詢接口的限制次數,百度地圖針對開發者「「地點檢索」API是有指定限額的。這些都是針對API的限流。框架
4) 秒殺系統中,因爲庫存量通常是不多的,對應只有少部分的用戶才能秒殺成功,所以咱們要限制絕大部分用戶流量。ide
5) 各類框架使用時的數量限制,如數據鏈接池最大鏈接數、線程池最大線程數、zk最大鏈接等等性能
以上是限流的常見場景。限流,是對流量控制。ui
Flow Control,是在數據通訊中,流量控制是管理兩個節點之間的數據傳輸速率的過程,以防止快速發送者壓倒慢速接收者。
它爲接收機提供了一種控制傳輸速度的機制,使接收節點不會被來自發送節點的數據淹沒。this
Rate Limiting,在計算機網絡中,網絡接口控制器用限流來控制發送和接收的流量速率,以防止Dos攻擊。
能夠看出,限流主要是控制發送和接受雙方的流量速率,保證工做正常進行。
爲何要限流
限流,爲了保護系統在應對流量高峯時,系統可以依然以可控的處理速率對外提供服務,而不至於奔潰或變爲不可服務的。
這也從側面體現了系統服務的穩定性,若是是SOA服務的話,也體現了服務設計原則的自治性。
注:如下算法只作算法演示,確定有不少細節未考慮,包括在多線程下行爲是否正確等
計數器是最簡單的一種,針對資源設置訪問最大總量(上限)Max,以及定義一個計數器Counter,每當須要對資源訪問時,Counter++,當Counter小於Max,訪問能夠經過,不然不可用。
通常這個場景在項目中比較常見,好比咱們使用Semphore的acquire、release來控制多線程對資源的許可,好比Jedis Pool的對象池borrow、return。
基於單位時間的計數器
限制指定時間內的請求數量,好比1秒內最大的請求量爲2個
demo以下:
public class PerTimeUnitCounterFlowControl { private static final long INTERVAL = 5 * 1000;//時間間隔ms private long timestamp; private int counter; private int limit; private long interval; public PerTimeUnitCounterFlowControl(long interval,int limit) { this.interval = interval <= 0? INTERVAL:interval; this.timestamp = SystemClock.now(); this.limit = limit; } /** * * @return */ public boolean acquire(){ long now = SystemClock.now(); if (now < timestamp + interval){ counter++; return counter <= limit; } timestamp = now; counter = 1; return true; } }
該算法的缺陷是,在時間節點重置的時隙裏可能被突發請求超限。
Timed Sliding Window,參照於Tcp滑動窗口,將單位時間T看作是一個窗口,將窗口中的每一個格子設定爲指定時間間隔Duration,Window Size爲格子總數 buckets,那麼單位時間就是buckets * Duration。每一個格子有本身獨立的計數器。當時間每過去Duration時候,窗口就會向右滑動一個格子。
以下:
每當有請求過來時,都會落在指定格子裏,而後獲取當前窗口的全部計數器之和,以此來觸發是否限流。
很明顯格子劃分的越多,滑動窗口的滑動就越平滑,限流統計就越精確。
demo以下:
public class TimedSlidingWindowFlowControl { private static final int LIMIT = 5; private long duration;//每一個格子的時長 private int bucketSize;//總格子數 private final long windowTime; private final ScheduledExecutorService scheduledExecutor; private long startedTimestamp; private volatile int head;//指向第一個格子 private AtomicInteger[] buckets; public TimedSlidingWindowFlowControl(long duration, int bucketSize) { this.duration = duration; this.bucketSize = bucketSize; this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); this.windowTime = duration * bucketSize; buckets = new AtomicInteger[bucketSize]; } /** * 初始化 */ protected void init(){ startedTimestamp = SystemClock.now(); for(int i = 0; i < bucketSize;i++){ buckets[i] = new AtomicInteger(0); } head = 0;//指向第一個格子 scheduledExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { timeRolling(); } },duration/2,duration, TimeUnit.MILLISECONDS); } /** * 獲取許可 * @return */ private boolean acquire(){ long now = SystemClock.now(); long timestampDiff = now - startedTimestamp; long mask = timestampDiff % (windowTime); //相對於head的位置 int idx = getBucketIndex(mask); if(idx == -1){ throw new IllegalStateException("illegalState"); } buckets[idx].incrementAndGet(); int count = getCurrentCount(); System.out.println("當前count:" + count); if(count <= LIMIT){ return true; } return false; } /** * 查找當前的位置 * @param mask * @return */ private int getBucketIndex(long mask){ int cursor = head; int stopIndex = cursor; if(mask <= duration){ return cursor; } long d = duration; while (true){ cursor = (cursor + 1) % bucketSize; if(cursor == stopIndex){ return -1; } d = d + duration; if(mask <= d){ return cursor; } } } /** * 獲取當前計數 * @return int */ private int getCurrentCount(){ return Arrays.stream(buckets).mapToInt(buckets -> buckets.get()).sum(); } /** * 時間滾動 */ private void timeRolling(){ //每次格子移動都會更改head int last = head; head = (head + 1) % bucketSize; System.out.println("時間向前移動一格:" + head); buckets[last].set(0);//reset } /** * 關閉 */ protected void shutdown() throws InterruptedException { scheduledExecutor.shutdown(); scheduledExecutor.awaitTermination(5,TimeUnit.SECONDS); } }
上面的demo可能有些細節未考慮,基本思路是使用定時任務模擬時鐘滾動,循環複用計數器桶,使用head指針始終指向第一個桶,統計全部的桶計數的和 判斷是否觸發限流。
實際上考慮「使用定時任務模擬時鐘滾動」,這種方式有一些缺點:會浪費Cpu資源,並且還依賴時鐘。能夠考慮採用相似Guava的RateLimiter的延遲計算機制。
另更多關於滑動窗口計數能夠參考Storm的RollingCountBolt和Hystrix的Metrics實現。
漏桶算法,即Leaky bucket,是一種很經常使用的限流算法,能夠用來實現流量整形(Traffic Shaping)和流量控制(Traffic Policing)。
如下是wikipedia對Leaky bucket的算法描述:
[A] fixed capacity bucket, associated with each virtual connection or user, leaks at a fixed rate.
If the bucket is empty, it stops leaking.
For a packet to conform, it has to be possible to add a specific amount of water to the bucket: The specific amount added by a conforming packet can be the same for all packets, or can be proportional to the length of the packet.
If this amount of water would cause the bucket to exceed its capacity then the packet does not conform and the water in the bucket is left unchanged.
翻譯過來的意思爲:
這個可使用基於生產者-消費者共享阻塞隊列實現。
demo以下:
public class LeakyBucketFlowControl{ private int capacity; private LinkedBlockingQueue<Integer> bucket; private int flowOutNum;//以恆定的速率流出 private int flowOutTimeUnit;// private static final int VALUE = 1; private Thread thread; private volatile boolean stop = false; public LeakyBucketFlowControl(int capacity, int flowOutNum, int flowOutTimeUnit) { this.capacity = capacity; this.flowOutNum = flowOutNum; this.flowOutTimeUnit = flowOutTimeUnit; this.bucket = new LinkedBlockingQueue<>(capacity); this.thread = new Thread(new Worker()); } /** * init */ public void init(){ thread.start(); } /** * 獲取許可 * @return */ protected boolean acquire(){ boolean of = bucket.offer(VALUE); return of; } /** * shutdown */ public void shutdown(){ stop = true; System.out.println("當前漏桶的容量:" + bucket.size()); } /** * 內部worker */ class Worker implements Runnable{ @Override public void run() { while (!Thread.currentThread().isInterrupted() && !stop){ try { TimeUnit.MILLISECONDS.sleep(flowOutTimeUnit); for(int i = 1;i <= flowOutNum;i++){ bucket.take(); } System.out.println("漏桶流出容量爲:" + bucket.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } }
通常狀況下,漏桶對於流量整形有用, 不管什麼樣的請求速率進來,漏桶老是以恆定的速率執行,但對於突發傳輸有必定限制,除非當前漏桶已經爲空。
關於令牌token的使用場景比較多了,好比Auth的access token,計算機網絡中輪轉訪問MAC協議中的Token passing等。
如今是關於token在限流中的算法描述:
[A] token is added to the bucket every 1/r seconds.
The bucket can hold at the most b tokens. If a token arrives when the bucket is full, it is discarded.
When a packet (network layer PDU) of n bytes arrives, n tokens are removed from the bucket, and the packet is sent to the network.
If fewer than n tokens are available, no tokens are removed from the bucket, and the packet is considered to be non-conformant.
算法實現直接使用Guava的RateLimiter
public class RateLimiterTester { public static void main(String[] args) { RateLimiter limiter = RateLimiter.create(2);//發令牌的間隔時間約500ms double x = limiter.acquire(5) * 1000; System.out.println(x + "...."); for (int i = 1;i <= 5;i++){ double y = limiter.acquire() * 1000; System.out.println(y); } } } 輸出 0.0.... 2497.7299999999996 491.842 495.838 497.392 498.442
令牌桶算法能夠應對突發流量,RateLimiter提供了SmoothBursty和SmoothWarmingUp兩種需求。具體區別和實現能夠自行查看下文檔或網上找下相關分析。
推薦關於Hystrix一篇好文。