最近在參與一個識別熱點數據的需求開發。其中涉及了限流算法相關的內容。因此這裏記錄一下本身瞭解的各類限流算法,以及各個限流算法的實現。java
限流算法的應用場景很是普遍,好比經過限流來確保下游配置較差的應用不會被上游應用的大量請求擊穿,不管是HTTP請求仍是RPC請求,從而使得服務保持穩定。限流也一樣能夠用於客戶端,好比當咱們須要從微博上爬取數據時,咱們須要在請求中攜帶token從而經過微博的網關驗證。可是微博爲了防止服務被單個客戶端大量訪問,每每會在服務端進行限流,好比多是一個token一個小時只能發起1000次請求。可是爬蟲發出的請求一般遠遠不止這個量級。因此在客戶端進行限流能夠確保咱們的token不會失效或是查封。面試
限流算法能夠從多種角度分類,好比按照處理方式分爲兩種,一種是在超出限定流量以後會拒絕多餘的訪問,另外一種是超出限定流量以後,只是報警或者是記錄日誌,訪問仍然正常進行。算法
目前比較常見的限流算法有如下幾種:api
本文主要記錄一下固定窗口和滑動窗口。令牌桶算法在谷歌的開源guava包中有實現,下次再開一篇文章分享一下。文中錯誤的地方歡迎指出!若是guava中實現了滑動窗口算法也請告訴我,急需,目前沒有找到orz。安全
這是限流算法中最暴力的一種想法。既然咱們但願某個API在一分鐘內只能固定被訪問N次(多是出於安全考慮,也多是出於服務器資源的考慮),那麼咱們就能夠直接統計這一分鐘開始對API的訪問次數,若是訪問次數超過了限定值,則拋棄後續的訪問。直到下一分鐘開始,再開放對API的訪問。服務器
全部的暴力算法的共同點都是容易實現,而固定窗口限流的缺點也一樣很明顯。假設如今有一個惡意用戶在上一分鐘的最後一秒和下一分鐘的第一秒瘋狂的衝擊API。按照固定窗口的限流規則,這些請求都可以訪問成功,可是在這一秒內,服務將承受超過規定值的訪問衝擊(這個規定值極可能是服務器可以承受的最大負載),從而致使服務沒法穩定提供。並且由於用戶在這一秒內耗光了上一分鐘和下一分鐘的訪問定額,從而致使別的用戶沒法享受正常的服務,對於服務提供方來講是徹底不能接收的。微信
這裏本身作了一個簡單的實現:多線程
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class FixedWindowRateLimiter implements RateLimiter, Runnable { private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5; private final int maxVisitPerSecond; private AtomicInteger count; FixedWindowRateLimiter(){ this.maxVisitPerSecond = DEFAULT_ALLOWED_VISIT_PER_SECOND; this.count = new AtomicInteger(); } FixedWindowRateLimiter(int maxVisitPerSecond) { this.maxVisitPerSecond = maxVisitPerSecond; this.count = new AtomicInteger(); } @Override public boolean isOverLimit() { return currentQPS() > maxVisitPerSecond; } @Override public int currentQPS() { return count.get(); } @Override public boolean visit() { count.incrementAndGet(); System.out.print(isOverLimit()); return isOverLimit(); } @Override public void run() { System.out.println(this.currentQPS()); count.set(0); } public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(); scheduledExecutorService.scheduleAtFixedRate(rateLimiter, 0, 1, TimeUnit.SECONDS); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
其中RateLimiter是一個通用的接口,後面的其它限流算法也會實現該接口:dom
public interface RateLimiter { boolean isOverLimit(); int currentQPS(); boolean visit(); }
也能夠不使用多線程的方式實現,更加簡單高效:ide
public class FixedWindowRateLimiterWithoutMultiThread implements RateLimiter { private Long lastVisitAt = System.currentTimeMillis(); private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5; private final int maxVisitPerSecond; private AtomicInteger count; public FixedWindowRateLimiterWithoutMultiThread(int maxVisitPerSecond){ this.maxVisitPerSecond = maxVisitPerSecond; this.count = new AtomicInteger(); } public FixedWindowRateLimiterWithoutMultiThread() { this(DEFAULT_ALLOWED_VISIT_PER_SECOND); } @Override public boolean isOverLimit() { return count.get() > maxVisitPerSecond; } @Override public int currentQPS() { return count.get(); } @Override public boolean visit() { long now = System.currentTimeMillis(); synchronized (lastVisitAt) { if (now - lastVisitAt > 1000) { lastVisitAt = now; System.out.println(currentQPS()); count.set(1); } } count.incrementAndGet(); return isOverLimit(); } public static void main(String[] args) { RateLimiter rateLimiter = new FixedWindowRateLimiterWithoutMultiThread(); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
固定窗口就像是滑動窗口的一個特例。滑動窗口將固定窗口再等分爲多個小的窗口,每一次對一個小的窗口進行流量控制。這種方法能夠很好的解決以前的臨界問題。
這裏找的網上一個圖,假設咱們將1s劃分爲4個窗口,則每一個窗口對應250ms。假設惡意用戶仍是在上一秒的最後一刻和下一秒的第一刻衝擊服務,按照滑動窗口的原理,此時統計上一秒的最後750毫秒和下一秒的前250毫秒,這種方式可以判斷出用戶的訪問依舊超過了1s的訪問數量,所以依然會阻攔用戶的訪問。
使用定時任務實現的滑動窗口代碼以下:
public class SlidingWindowRateLimiter implements RateLimiter, Runnable{ private final long maxVisitPerSecond; private static final int DEFAULT_BLOCK = 10; private final int block; private final AtomicLong[] countPerBlock; private AtomicLong count; private volatile int index; public SlidingWindowRateLimiter(int block, long maxVisitPerSecond) { this.block = block; this.maxVisitPerSecond = maxVisitPerSecond; countPerBlock = new AtomicLong[block]; for (int i = 0 ; i< block ; i++) { countPerBlock[i] = new AtomicLong(); } count = new AtomicLong(0); } public SlidingWindowRateLimiter() { this(DEFAULT_BLOCK, DEFAULT_ALLOWED_VISIT_PER_SECOND); } @Override public boolean isOverLimit() { return currentQPS() > maxVisitPerSecond; } @Override public long currentQPS() { return count.get(); } @Override public boolean visit() { countPerBlock[index].incrementAndGet(); count.incrementAndGet(); return isOverLimit(); } @Override public void run() { System.out.println(isOverLimit()); System.out.println(currentQPS()); System.out.println("index:" + index); index = (index + 1) % block; long val = countPerBlock[index].getAndSet(0); count.addAndGet(-val); } public static void main(String[] args) { SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(10, 1000); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(slidingWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS); new Thread(new Runnable() { @Override public void run() { while (true) { slidingWindowRateLimiter.visit(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { slidingWindowRateLimiter.visit(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
Protect Your API Resources with Rate Limiting
想要了解更多開發技術,面試教程以及互聯網公司內推,歡迎關注個人微信公衆號!將會不按期的發放福利哦~