限流算法 - 基本實現

在開發高併發系統時有三把利器用來保護系統:緩存、降級、 限流 , 今天咱們就談談限流java

緩存:緩存的目的是提高系統訪問速度和增大系統處理容量
降級:降級是當服務器壓力劇增的狀況下,根據當前業務狀況及流量對一些服務和頁面有策略的降級,以此釋放服務器資源以保證核心任務的正常運行
限流:限流的目的是經過對併發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則能夠拒絕服務、排隊或等待、降級等處理算法

​ 我下面算法的實現基本上都用到了定時器Timer , 其實關於時間的也能夠不用定時器, 能夠看看GuavaRateLimiter, 定時器的好處是我不用處理時間邏輯 , 可是須要消耗一個線程去執行邏輯 , 當邏輯算力壓力過大會線程處理不過來,效果很差 , 能夠使用一下 ScheduledThreadPoolExecutor 線程池來執行,下降壓力緩存

​ 同時還大量使用了隊列數據結構 ,是由於生產者消費者模型大多須要隊列, 先進先出的特色服務器

​ 第一節是環境搭建 , 寫出需求 ,和接口要求 , 和測試用例 ,後面四節就是基本算法數據結構

1. 環境搭建

咱們模擬Filter#doFilter 接口進行測試 , 所有實現 AbstractLimiter#limit方法多線程

Filter 實現併發

public interface Filter {

    default public void init() {
    }

    public void doFilter(ServletRequest request, ServletResponse response,
                         FilterChain chain);

    default public void destroy() {
    }
}

FilterChain 實現ide

public interface FilterChain {

    void doFilter(ServletRequest request, ServletResponse response);
}

ServletRequest 實現高併發

public class ServletRequest {

    private String msg;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "ServletRequest{" +
                "msg='" + msg + '\'' +
                '}';
    }

    public ServletRequest(String msg) {
        this.msg = msg;
    }
}

ServletResponse 實現測試

public class ServletResponse {


}

AbstractLimiter 實現

public abstract class AbstractLimiter {

    /**
     * 最大流量
     */
    protected final int MAX_FlOW;

    /**
     * 構造器 , 輸入每秒最大流量
     * @param MAX_FlOW 最大流量
     */
    public AbstractLimiter(int MAX_FlOW) {
        this.MAX_FlOW = MAX_FlOW;
    }


    /**
     * 具體實現的方法
     * @param request 請求
     * @param response 響應
     * @param chain 執行
     */
    public abstract void limit(ServletRequest request, ServletResponse response, FilterChain chain);

}

Demo 測試類

public class Demo {

    @Test
    public void test() {
        
        // 過濾器
        Filter filter = new Filter() {
            AbstractLimiter limit = null;

            @Override
            public void init() {
                // 入口 ,咱們都是每秒限制 100個請求
                limit = new LeakyBucketLimiter(100);
            }

            @Override
            public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
                limit.limit(request, response, chain);
            }
        };

        // 過濾器初始化
        filter.init();

        // 計時器
        long start = System.currentTimeMillis();
        
        // 計數器
        AtomicInteger integer = new AtomicInteger(0);

        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 模擬4000次請求
        IntStream.range(0, 4000).forEach(e -> {
            try {
                // 模擬請求延遲
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e1) {
                //
            }

            // 多線程執行
            pool.execute(()->{
                filter.doFilter(new ServletRequest("" + e), new ServletResponse(), new FilterChain() {
                    @Override
                    public void doFilter(ServletRequest request, ServletResponse response) {
                        // 回調接口
                        integer.incrementAndGet();
                        System.out.println("請求 : "+request.getMsg() + " 經過, 執行線程 "+Thread.currentThread().getName());
                    }
                });
            });
        });

        System.out.println("總耗時" + (System.currentTimeMillis() - start));
        System.out.println("一共經過 : " + integer.get());
    }
}

2. 計數器算法

計數器算法(Counter) 顧明思議就是一個計數器 , 好比我每秒能夠經過100個請求 , 我呢每進來一個請求, 我就將計數器+1 , 當計數器到達了100,此時我就不讓請求過去 , 可是他存在一個問題 : 好比我第999ms 的時候過來100個請求 , 當剛剛過了1000ms的時候初始化了,可是又來了100個請求 , 此時就會發生實際上在這0.1S的時候處理了200個請求 , 嚴重超載了 , 此時服務器處理不了而所有都請求超時了....

public class CounterLimiter extends AbstractLimiter {

    private static final Integer initFlow = 0;

    private final AtomicInteger flow;

    public CounterLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        
        // 初始化計數器
        flow = new AtomicInteger(initFlow);

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                // 每1000ms初始化一次
                flow.set(initFlow);
            }
        }, 0, 1000);
    }

    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        // 比較是否超載
        if (flow.get() < MAX_FlOW) {
        // 經過 : 計數器+1
            flow.incrementAndGet();
            chain.doFilter(request, response);
        }
    }
}

3. 滑動窗口算法

滑動窗口算法(Rolling - Window) 能夠說是計數器算法的一種改進 , 他呢 , 將計算器細分了, 好比我將1S的 1000ms 細分爲10個 100ms , 咱們就有10個計數器 , 好比上面的問題 , 999ms和1000ms的問題, 因爲咱們是連續的, 此時1000ms進來的我也算進去了, 此時就不會出現那種狀況 ,

​ 當咱們的顆粒度越高 , 此時所計算的資源會越多,也會越精確 , 其實對比 Hystrixsentinel 都是這種思想, 滑動窗口算法 , 主要是考慮的計算資源少的問題 ,

​ 個人算法並非最優 ,其實不須要使用ArrayBlockingQueue去維護滑塊 , 因爲咱們是單個線程去執行並不會出現多線程問題, 其實能夠使用 LinkedList 來模擬隊列 , 還有其餘點也能夠看一下

public class RollingWindowFilter extends AbstractLimiter {

    /**
     * 咱們的滑動窗口對象,包含多個窗口
     */
    private final Slider slider;

    /**
     * 程序中暴露的惟一一個計數器,能夠稱之爲當前窗口
     */
    private AtomicInteger counter;

    /**
     * 計數器初始化大小
     */
    private static final int INIT_SIZE = 0;

    /**
     * 好比窗口分爲10塊,這個表明先進入9塊窗口的計算值 , 爲何要引入是由於不浪費計算資源, 好多都是重複計算
     */
    private final AtomicInteger preCount;


    /**
     * 咱們默認隊列大小是 20 ,其實顆粒度很高了50ms計算一次, 能夠重載構造參數調整
     *
     * @param MAX_FlOW 最大流量
     */
    public RollingWindowFilter(int MAX_FlOW) {

        super(MAX_FlOW);

        // 初始化窗口,感受更名字叫作Windows比較好 ....
        slider = new Slider(20);


        // 初始化對象
        preCount = new AtomicInteger(INIT_SIZE);

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {

                ArrayBlockingQueue<AtomicInteger> queue = slider.blocks;

                // 當前窗口大小
                int size = queue.size();

                /**
                 * 初始化窗口長度
                 */
                if (size < slider.capacity) {
                    try {

                        /**
                         * 計算前面窗口的計數器總和
                         * 
                         * 這裏其實由多線程的併發問題 ,其實能夠設置一個標識符來表示完成與否 .. 我懶得改了 ,或者你就大量實例化對象,不用我這個單一對象
                         */
                        preCount.set(INIT_SIZE);
                        if (size > 0) {
                            queue.forEach(e -> preCount.addAndGet(e.get()));
                        }

                        // 新建一個計數器, 放入對應的滑塊 ,其實就是隊尾
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //
                    }

                }

                /**
                 * 當窗口長度初始化完成
                 */
                if (size == slider.capacity) {

                    try {
                        // 出局最早進來的那個
                        queue.take();

                        // 計算前面窗口的計數器總和 , 有多線程併發問題
                        preCount.set(INIT_SIZE);
                        queue.forEach(e -> preCount.addAndGet(e.get()));

                        
                        // 新建一個計數器, 放入對應的滑塊 ,其實就是隊尾
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //
                    }
                }
            }
        }, 0, 1000 / slider.capacity);
    }


    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {

        int cur = counter.get();
        int pre = preCount.get();
        int sum = cur + pre;

        if (sum < MAX_FlOW) {
            counter.incrementAndGet();
            chain.doFilter(request, response);
        }
    }


    /**
     * 滑塊組成 , 一個隊列維護一個塊 , 其實能夠用LinkedList來維護 , 我是懶得改
     * <p>
     * 通常內部類來講看JDK源碼你會發現都會用private static修飾 ,由於反射不是靜態內部類,沒法實例化 , 和構造器不加修飾
     */
    private static class Slider {
        // 多少個計數器
        private final int capacity;
        // 放置計數器
        private final ArrayBlockingQueue<AtomicInteger> blocks;

        Slider(int capacity) {
            this.blocks = new ArrayBlockingQueue<>(capacity);
            this.capacity = capacity;
        }
    }
}

4. 漏桶算法

​ 其實所謂的漏桶算法(Leaky Bucket),咱們想一下 , 有一個入水口和一個出水口 , 咱們這倆口控制權在誰那 ,入水口無非就是大量的請求, 出水口就是咱們放過的請求 , 因此他是一個生產者 - 消費者模型 , 生產者就是請求 , 消費者就是以必定速度咱們消費請求 ,

​ 漏桶算法能夠使請求流出的速率是均勻的, 無論你多少請求 , 我流出的速率是均勻的 , 當桶滿了就溢出 ,沒有滿加進來就等着被流出去

​ 當你看懂我上面的兩段話 , 你就理解了下面的代碼 , 個人註釋十分清晰

public class LeakyBucketLimiter extends AbstractLimiter {

    /**
     * 咱們的漏斗
     */
    private final LeakyBucket leakyBucket;

    /**
     * 構造器 , 輸入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public LeakyBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.leakyBucket = new LeakyBucket(MAX_FlOW);
    }

    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        try {
            // 1. 獲取桶當前水的大小
            int size = leakyBucket.bucket.size();

            // 2. 比較桶裏的水是否滿了
            if (size < leakyBucket.waterSize) {

                // 沒有滿咱們就將水放進去,其實這裏put也行 , offer也行 , 看需求
                leakyBucket.bucket.put(new Water(request, response, chain));
            }
        } catch (InterruptedException e) {
            //
        }
    }

    static class LeakyBucket {

        /**
         * 能放多少水,其實就是隊列大小
         */
        final int waterSize;

        /**
         * 咱們的放水的桶
         */
        final ArrayBlockingQueue<Water> bucket;

        public LeakyBucket(int MAX_FlOW) {
            this.waterSize = MAX_FlOW;
            bucket = new ArrayBlockingQueue<>(this.waterSize);

            /**
             * 模擬消費 , 1S只能過去100個 ,說明 100ms 能夠消耗10個, 看你的顆粒度
             */
            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    // 100ms 流出去10個
                    for (int i = 0; i < (waterSize / 10); i++) {
                        try {
                            // 流出的水
                            Water water = bucket.take();

                            // 執行掉
                            water.chain.doFilter(water.request, water.response);
                        } catch (InterruptedException e) {
                            //
                        }
                    }
                }
            }, 0, 100);
        }
    }


    /**
     * 咱們的節點對象, 其實能夠稱之爲 成功注入的水 , 等着被漏桶流出去
     */
    static class Water {

        private ServletRequest request;

        private ServletResponse response;

        private FilterChain chain;

        public Water(ServletRequest request, ServletResponse response, FilterChain chain) {
            this.request = request;
            this.response = response;
            this.chain = chain;
        }
    }
}

5. 令牌桶算法

令牌桶算法(Token Bucket) 是與漏桶算法相反的思想, 他也是生產者消費者模型 ,只是角色的互換, 他呢是咱們去控制生成 , 請求去執行消費 , 舉個栗子 : 好比咱們限流100 , 此時咱們就每100ms生成10個令牌 , 當令牌數達到100 咱們就不生產 了, 當一個請求過來 , 就會去拿掉一個令牌 , 若是拿到了就經過了, 拿不到就拒絕

根據這個咱們能夠和漏桶算法作比較 ,假設都是剛剛開始 , 此時都是100個請求過來 , 令牌桶可能會拒絕掉90個,由於我只生產了10個令牌 ,可是漏桶呢他不會, 他會將100個請求所有放進去慢慢消費 , 是由於個人桶容量是100,能夠放進去這麼多請求 , 這就是這倆的區別 .... 其實穩定了幾乎麼區別

生產者消費者模型 的思想轉換能夠更加理清思路 , 模型的選擇有時候是解決問題的一個合適的方式

​ 令牌桶算法 網上大多都是採用的 GuavaRateLimiter實現的 , 這裏我就實現兩種 一種是本身實現, 一種是使用RateLimiter,

1. 本身實現的令牌桶

public class TokenBucketLimiter extends AbstractLimiter {

    /**
     * 令牌桶
     */
    private final TokenBucket tokenBucket;

    /**
     * 構造器 , 輸入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public TokenBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.tokenBucket = new TokenBucket(MAX_FlOW);
    }


    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /**
         * 這裏咱們就不使用 take的阻塞思想了 ,直接poll去拉去 ,而後等待5mS ,  若是拉去不到直接返回失敗 , 其實等待的長了點
         */
        try {
            // 嘗試去獲取一個令牌
            Token token = tokenBucket.bucket.poll(5, TimeUnit.MILLISECONDS);
            
            // 拿到經過
            if (null != token) {
                chain.doFilter(request, response);
            }

        } catch (InterruptedException e) {
            //
        }

    }


    /**
     * 令牌桶
     */
    private static class TokenBucket {
        /**
         * 令牌存放的位置 , 用一個隊列維護
         */
        private final ArrayBlockingQueue<Token> bucket;

        /**
         * 桶最多存放多少個令牌
         */
        private final int tokenSize;

        public TokenBucket(int MAX_FlOW) {
            this.tokenSize = MAX_FlOW;
            this.bucket = new ArrayBlockingQueue<>(this.tokenSize);

            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    for (int x = 0; x < (tokenSize / 10); x++) {
                        try {
                            if (bucket.size() < tokenSize) {
                                // 定時放入令牌
                                bucket.put(new Token());
                            }
                        } catch (InterruptedException e) {
                            //
                        }
                    }
                }
            }, 0, 100);
        }
    }

    /**
     * 令牌
     */
    private static class Token {

    }
}

2. 基於Guava 的 RateLimiter實現令牌桶

public class GuavaRateLimiter extends AbstractLimiter {

    /**
     * 令牌桶
     */
    private final RateLimiter limiter;

    /**
     * 每次須要的令牌個數
     */
    private static final int ACQUIRE_NUM = 1;
    /**
     * 最長等待時間
     */
    private static final int WAIT_TIME_PER_MILLISECONDS = 5;

    /**
     * 構造器 , 輸入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public GuavaRateLimiter(final int MAX_FlOW) {
        super(MAX_FlOW);
        limiter = RateLimiter.create(MAX_FlOW);
    }


    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /**
         * 意思就是 我嘗試去獲取1個令牌 ,最大等待時間是 5 ms , 其實太長了, 真是開發也就1ms不到
         */
        boolean flag = limiter.tryAcquire(ACQUIRE_NUM, WAIT_TIME_PER_MILLISECONDS, TimeUnit.MILLISECONDS);
        if (flag) {
            chain.doFilter(request, response);
        }
    }
}
相關文章
相關標籤/搜索