在開發高併發系統時有三把利器用來保護系統:緩存、降級、 限流 , 今天咱們就談談
限流
java緩存:緩存的目的是提高系統訪問速度和增大系統處理容量
降級:降級是當服務器壓力劇增的狀況下,根據當前業務狀況及流量對一些服務和頁面有策略的降級,以此釋放服務器資源以保證核心任務的正常運行
限流:限流的目的是經過對併發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則能夠拒絕服務、排隊或等待、降級等處理算法
我下面算法的實現基本上都用到了定時器
Timer
, 其實關於時間的也能夠不用定時器, 能夠看看Guava
的RateLimiter
, 定時器的好處是我不用處理時間邏輯 , 可是須要消耗一個線程去執行邏輯 , 當邏輯算力壓力過大會線程處理不過來,效果很差 , 能夠使用一下ScheduledThreadPoolExecutor
線程池來執行,下降壓力緩存 同時還大量使用了
隊列
數據結構 ,是由於生產者消費者模型大多須要隊列, 先進先出的特色服務器 第一節是環境搭建 , 寫出需求 ,和接口要求 , 和測試用例 ,後面四節就是基本算法數據結構
咱們模擬Filter#doFilter
接口進行測試 , 所有實現 AbstractLimiter#limit
方法多線程
Filter
實現併發
public interface Filter { default public void init() { } public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain); default public void destroy() { } }
FilterChain
實現ide
public interface FilterChain { void doFilter(ServletRequest request, ServletResponse response); }
ServletRequest
實現高併發
public class ServletRequest { private String msg; public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } @Override public String toString() { return "ServletRequest{" + "msg='" + msg + '\'' + '}'; } public ServletRequest(String msg) { this.msg = msg; } }
ServletResponse
實現測試
public class ServletResponse { }
AbstractLimiter
實現
public abstract class AbstractLimiter { /** * 最大流量 */ protected final int MAX_FlOW; /** * 構造器 , 輸入每秒最大流量 * @param MAX_FlOW 最大流量 */ public AbstractLimiter(int MAX_FlOW) { this.MAX_FlOW = MAX_FlOW; } /** * 具體實現的方法 * @param request 請求 * @param response 響應 * @param chain 執行 */ public abstract void limit(ServletRequest request, ServletResponse response, FilterChain chain); }
Demo
測試類
public class Demo { @Test public void test() { // 過濾器 Filter filter = new Filter() { AbstractLimiter limit = null; @Override public void init() { // 入口 ,咱們都是每秒限制 100個請求 limit = new LeakyBucketLimiter(100); } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) { limit.limit(request, response, chain); } }; // 過濾器初始化 filter.init(); // 計時器 long start = System.currentTimeMillis(); // 計數器 AtomicInteger integer = new AtomicInteger(0); ExecutorService pool = Executors.newFixedThreadPool(10); // 模擬4000次請求 IntStream.range(0, 4000).forEach(e -> { try { // 模擬請求延遲 TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e1) { // } // 多線程執行 pool.execute(()->{ filter.doFilter(new ServletRequest("" + e), new ServletResponse(), new FilterChain() { @Override public void doFilter(ServletRequest request, ServletResponse response) { // 回調接口 integer.incrementAndGet(); System.out.println("請求 : "+request.getMsg() + " 經過, 執行線程 "+Thread.currentThread().getName()); } }); }); }); System.out.println("總耗時" + (System.currentTimeMillis() - start)); System.out.println("一共經過 : " + integer.get()); } }
計數器算法(Counter)
顧明思議就是一個計數器 , 好比我每秒能夠經過100個請求 , 我呢每進來一個請求, 我就將計數器+1 , 當計數器到達了100,此時我就不讓請求過去 , 可是他存在一個問題 : 好比我第999ms 的時候過來100個請求 , 當剛剛過了1000ms的時候初始化了,可是又來了100個請求 , 此時就會發生實際上在這0.1S的時候處理了200個請求 , 嚴重超載了 , 此時服務器處理不了而所有都請求超時了....
public class CounterLimiter extends AbstractLimiter { private static final Integer initFlow = 0; private final AtomicInteger flow; public CounterLimiter(int MAX_FlOW) { super(MAX_FlOW); // 初始化計數器 flow = new AtomicInteger(initFlow); new Timer().schedule(new TimerTask() { @Override public void run() { // 每1000ms初始化一次 flow.set(initFlow); } }, 0, 1000); } public void limit(ServletRequest request, ServletResponse response, FilterChain chain) { // 比較是否超載 if (flow.get() < MAX_FlOW) { // 經過 : 計數器+1 flow.incrementAndGet(); chain.doFilter(request, response); } } }
滑動窗口算法(Rolling - Window)
能夠說是計數器算法的一種改進 , 他呢 , 將計算器細分了, 好比我將1S的 1000ms 細分爲10個 100ms , 咱們就有10個計數器 , 好比上面的問題 , 999ms和1000ms的問題, 因爲咱們是連續的, 此時1000ms進來的我也算進去了, 此時就不會出現那種狀況 , 當咱們的顆粒度越高 , 此時所計算的資源會越多,也會越精確 , 其實對比
Hystrix
和sentinel
都是這種思想, 滑動窗口算法 , 主要是考慮的計算資源少的問題 , 個人算法並非最優 ,其實不須要使用
ArrayBlockingQueue
去維護滑塊 , 因爲咱們是單個線程去執行並不會出現多線程問題, 其實能夠使用LinkedList
來模擬隊列 , 還有其餘點也能夠看一下
public class RollingWindowFilter extends AbstractLimiter { /** * 咱們的滑動窗口對象,包含多個窗口 */ private final Slider slider; /** * 程序中暴露的惟一一個計數器,能夠稱之爲當前窗口 */ private AtomicInteger counter; /** * 計數器初始化大小 */ private static final int INIT_SIZE = 0; /** * 好比窗口分爲10塊,這個表明先進入9塊窗口的計算值 , 爲何要引入是由於不浪費計算資源, 好多都是重複計算 */ private final AtomicInteger preCount; /** * 咱們默認隊列大小是 20 ,其實顆粒度很高了50ms計算一次, 能夠重載構造參數調整 * * @param MAX_FlOW 最大流量 */ public RollingWindowFilter(int MAX_FlOW) { super(MAX_FlOW); // 初始化窗口,感受更名字叫作Windows比較好 .... slider = new Slider(20); // 初始化對象 preCount = new AtomicInteger(INIT_SIZE); new Timer().schedule(new TimerTask() { @Override public void run() { ArrayBlockingQueue<AtomicInteger> queue = slider.blocks; // 當前窗口大小 int size = queue.size(); /** * 初始化窗口長度 */ if (size < slider.capacity) { try { /** * 計算前面窗口的計數器總和 * * 這裏其實由多線程的併發問題 ,其實能夠設置一個標識符來表示完成與否 .. 我懶得改了 ,或者你就大量實例化對象,不用我這個單一對象 */ preCount.set(INIT_SIZE); if (size > 0) { queue.forEach(e -> preCount.addAndGet(e.get())); } // 新建一個計數器, 放入對應的滑塊 ,其實就是隊尾 counter = new AtomicInteger(INIT_SIZE); queue.put(counter); } catch (InterruptedException e) { // } } /** * 當窗口長度初始化完成 */ if (size == slider.capacity) { try { // 出局最早進來的那個 queue.take(); // 計算前面窗口的計數器總和 , 有多線程併發問題 preCount.set(INIT_SIZE); queue.forEach(e -> preCount.addAndGet(e.get())); // 新建一個計數器, 放入對應的滑塊 ,其實就是隊尾 counter = new AtomicInteger(INIT_SIZE); queue.put(counter); } catch (InterruptedException e) { // } } } }, 0, 1000 / slider.capacity); } public void limit(ServletRequest request, ServletResponse response, FilterChain chain) { int cur = counter.get(); int pre = preCount.get(); int sum = cur + pre; if (sum < MAX_FlOW) { counter.incrementAndGet(); chain.doFilter(request, response); } } /** * 滑塊組成 , 一個隊列維護一個塊 , 其實能夠用LinkedList來維護 , 我是懶得改 * <p> * 通常內部類來講看JDK源碼你會發現都會用private static修飾 ,由於反射不是靜態內部類,沒法實例化 , 和構造器不加修飾 */ private static class Slider { // 多少個計數器 private final int capacity; // 放置計數器 private final ArrayBlockingQueue<AtomicInteger> blocks; Slider(int capacity) { this.blocks = new ArrayBlockingQueue<>(capacity); this.capacity = capacity; } } }
其實所謂的
漏桶算法(Leaky Bucket)
,咱們想一下 , 有一個入水口和一個出水口 , 咱們這倆口控制權在誰那 ,入水口無非就是大量的請求, 出水口就是咱們放過的請求 , 因此他是一個生產者 - 消費者模型
, 生產者就是請求 , 消費者就是以必定速度咱們消費請求 , 漏桶算法能夠使
請求流出的速率是均勻的
, 無論你多少請求 , 我流出的速率是均勻的 , 當桶滿了就溢出 ,沒有滿加進來就等着被流出去 當你看懂我上面的兩段話 , 你就理解了下面的代碼 , 個人註釋十分清晰
public class LeakyBucketLimiter extends AbstractLimiter { /** * 咱們的漏斗 */ private final LeakyBucket leakyBucket; /** * 構造器 , 輸入每秒最大流量 * * @param MAX_FlOW 最大流量 */ public LeakyBucketLimiter(int MAX_FlOW) { super(MAX_FlOW); this.leakyBucket = new LeakyBucket(MAX_FlOW); } @Override public void limit(ServletRequest request, ServletResponse response, FilterChain chain) { try { // 1. 獲取桶當前水的大小 int size = leakyBucket.bucket.size(); // 2. 比較桶裏的水是否滿了 if (size < leakyBucket.waterSize) { // 沒有滿咱們就將水放進去,其實這裏put也行 , offer也行 , 看需求 leakyBucket.bucket.put(new Water(request, response, chain)); } } catch (InterruptedException e) { // } } static class LeakyBucket { /** * 能放多少水,其實就是隊列大小 */ final int waterSize; /** * 咱們的放水的桶 */ final ArrayBlockingQueue<Water> bucket; public LeakyBucket(int MAX_FlOW) { this.waterSize = MAX_FlOW; bucket = new ArrayBlockingQueue<>(this.waterSize); /** * 模擬消費 , 1S只能過去100個 ,說明 100ms 能夠消耗10個, 看你的顆粒度 */ new Timer().schedule(new TimerTask() { @Override public void run() { // 100ms 流出去10個 for (int i = 0; i < (waterSize / 10); i++) { try { // 流出的水 Water water = bucket.take(); // 執行掉 water.chain.doFilter(water.request, water.response); } catch (InterruptedException e) { // } } } }, 0, 100); } } /** * 咱們的節點對象, 其實能夠稱之爲 成功注入的水 , 等着被漏桶流出去 */ static class Water { private ServletRequest request; private ServletResponse response; private FilterChain chain; public Water(ServletRequest request, ServletResponse response, FilterChain chain) { this.request = request; this.response = response; this.chain = chain; } } }
令牌桶算法(Token Bucket)
是與漏桶算法相反的思想, 他也是生產者消費者模型
,只是角色的互換, 他呢是咱們去控制生成 , 請求去執行消費 , 舉個栗子 : 好比咱們限流100 , 此時咱們就每100ms生成10個令牌 , 當令牌數達到100 咱們就不生產 了, 當一個請求過來 , 就會去拿掉一個令牌 , 若是拿到了就經過了, 拿不到就拒絕 根據這個咱們能夠和漏桶算法作比較 ,假設都是剛剛開始 , 此時都是100個請求過來 , 令牌桶可能會拒絕掉90個,由於我只生產了10個令牌 ,可是漏桶呢他不會, 他會將100個請求所有放進去慢慢消費 , 是由於個人桶容量是100,能夠放進去這麼多請求 , 這就是這倆的區別 .... 其實穩定了幾乎麼區別
生產者消費者模型
的思想轉換能夠更加理清思路 , 模型的選擇有時候是解決問題的一個合適的方式 令牌桶算法 網上大多都是採用的
Guava
的RateLimiter
實現的 , 這裏我就實現兩種 一種是本身實現, 一種是使用RateLimiter
,
public class TokenBucketLimiter extends AbstractLimiter { /** * 令牌桶 */ private final TokenBucket tokenBucket; /** * 構造器 , 輸入每秒最大流量 * * @param MAX_FlOW 最大流量 */ public TokenBucketLimiter(int MAX_FlOW) { super(MAX_FlOW); this.tokenBucket = new TokenBucket(MAX_FlOW); } @Override public void limit(ServletRequest request, ServletResponse response, FilterChain chain) { /** * 這裏咱們就不使用 take的阻塞思想了 ,直接poll去拉去 ,而後等待5mS , 若是拉去不到直接返回失敗 , 其實等待的長了點 */ try { // 嘗試去獲取一個令牌 Token token = tokenBucket.bucket.poll(5, TimeUnit.MILLISECONDS); // 拿到經過 if (null != token) { chain.doFilter(request, response); } } catch (InterruptedException e) { // } } /** * 令牌桶 */ private static class TokenBucket { /** * 令牌存放的位置 , 用一個隊列維護 */ private final ArrayBlockingQueue<Token> bucket; /** * 桶最多存放多少個令牌 */ private final int tokenSize; public TokenBucket(int MAX_FlOW) { this.tokenSize = MAX_FlOW; this.bucket = new ArrayBlockingQueue<>(this.tokenSize); new Timer().schedule(new TimerTask() { @Override public void run() { for (int x = 0; x < (tokenSize / 10); x++) { try { if (bucket.size() < tokenSize) { // 定時放入令牌 bucket.put(new Token()); } } catch (InterruptedException e) { // } } } }, 0, 100); } } /** * 令牌 */ private static class Token { } }
public class GuavaRateLimiter extends AbstractLimiter { /** * 令牌桶 */ private final RateLimiter limiter; /** * 每次須要的令牌個數 */ private static final int ACQUIRE_NUM = 1; /** * 最長等待時間 */ private static final int WAIT_TIME_PER_MILLISECONDS = 5; /** * 構造器 , 輸入每秒最大流量 * * @param MAX_FlOW 最大流量 */ public GuavaRateLimiter(final int MAX_FlOW) { super(MAX_FlOW); limiter = RateLimiter.create(MAX_FlOW); } @Override public void limit(ServletRequest request, ServletResponse response, FilterChain chain) { /** * 意思就是 我嘗試去獲取1個令牌 ,最大等待時間是 5 ms , 其實太長了, 真是開發也就1ms不到 */ boolean flag = limiter.tryAcquire(ACQUIRE_NUM, WAIT_TIME_PER_MILLISECONDS, TimeUnit.MILLISECONDS); if (flag) { chain.doFilter(request, response); } } }