Sentinel
的核心功能之一是流量統計,例如咱們經常使用的指標QPS,當前線程數等。上一篇文章中咱們已經大體提到了提供數據統計功能的Slot(StatisticSlot)
,StatisticSlot
在Sentinel
的整個體系中扮演了一個很是重要的角色,後續的一系列操做(限流,熔斷)等都依賴於StatisticSlot
所統計出的數據。html
本文所要討論的重點就是StatisticSlot
是如何作的流量統計?java
其實在以前介紹經常使用限流算法[經常使用限流算法](https://www.jianshu.com/p/9edebaa446d3)的時候已經有提到過一個算法滑動窗口限流
,該算法的滑動窗口原理其實跟Sentinel
所提供的流量統計原理是同樣的,都是基於時間窗口的滑動統計node
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { ... // 當前請求線程數加一 node.increaseThreadNum(); // 新增請求數 node.addPassRequest(count); ... }
能夠看到StatisticSlot
主要統計了兩種類型的數據git
線程數github
請求數(QPS)算法
對於線程數的統計比較簡單,經過內部維護一個LongAdder
來進行當前線程數的統計,每進入一個請求加1,每釋放一個請求減1,從而獲得當前的線程數數組
對於請求數QPS的統計則相對比較複雜,其中有用到滑動窗口原理(也是本文的重點),下面根據源碼來深刻的分析app
public void addPassRequest(int count) { // 調用父類(StatisticNode)來進行統計 super.addPassRequest(count); // 根據clusterNode 彙總統計(背後也是調用父類StatisticNode) this.clusterNode.addPassRequest(count); }
最終都是調用了父類StatisticNode
的addPassRequest
方法性能
/** * 按秒統計,分紅兩個窗口,每一個窗口500ms,用來統計QPS */ private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); /** * 按分鐘統計,分紅60個窗口,每一個窗口 1000ms */ private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
代碼比較簡單,能夠知道內部是調用了ArrayMetric
的addPass
方法來統計的,而且統計了兩種不一樣時間維度的數據(秒級和分鐘級)this
private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } public void addPass(int count) { // 1\. 獲取當前窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); // 2\. 當前窗口加1 wrap.value().addPass(count); }
ArrayMetric
其實也是一個包裝類,內部經過實例化LeapArray
的對應實現類,來實現具體的統計邏輯,LeapArray
是一個抽象類,OccupiableBucketLeapArray
和BucketLeapArray
都是其具體的實現類
OccupiableBucketLeapArray
在1.5版本以後才被引入,主要是爲了解決一些高優先級的請求在限流觸發的時候也能經過(經過佔用將來時間窗口的名額來實現) 也是默認使用的LeapArray實現類
而統計的邏輯也比較清楚,分紅了兩步:
定位到當前窗口
獲取到當前窗口WindowWrap
的MetricBucket
並執行addPass
邏輯
這裏咱們先看下第二步中的MetricBucket
類,看看它作了哪些事情
/** * 存放當前窗口各類類型的統計值(類型包括 PASS BLOCK EXCEPTION 等) */ private final LongAdder[] counters; public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } // 統計pass數 public void addPass(int n) { add(MetricEvent.PASS, n); } // 統計可佔用的pass數 public void addOccupiedPass(int n) { add(MetricEvent.OCCUPIED_PASS, n); } // 統計異常數 public void addException(int n) { add(MetricEvent.EXCEPTION, n); } // 統計block數 public void addBlock(int n) { add(MetricEvent.BLOCK, n); } ....
MetricBucket經過定義了一個LongAdder
數組來存儲不一樣類型的流量統計值,具體的類型則都定義在MetricEvent
枚舉中。
執行addPass
方法對應LongAdder
數組索引下表爲0的值遞增
下面再來看下data.currentWindow()
的內部邏輯
OccupiableBucketLeapArray
繼承了抽象類LeapArray
,核心邏輯也是在LeapArray
中
/** * 時間窗口大小 單位ms */ protected int windowLengthInMs; /** * 切分的窗口數 */ protected int sampleCount; /** * 統計的時間間隔 intervalInMs = windowLengthInMs * sampleCount */ protected int intervalInMs; /** * 窗口數組 數組大小 = sampleCount */ protected final AtomicReferenceArray<WindowWrap<T>> array; /** * update lock 更新窗口時須要上鎖 */ private final ReentrantLock updateLock = new ReentrantLock(); /** * @param sampleCount 須要劃分的窗口數 * @param intervalInMs 間隔的統計時間 */ public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } /** * 獲取當前窗口 */ public WindowWrap<T> currentWindow() { return currentWindow(TimeUtil.currentTimeMillis()); }
以上須要着重理解的是幾個參數的含義:
sampleCount 定義的窗口的數
intervalInMs 統計的時間間隔
windowLengthInMs 每一個窗口的時間大小 = intervalInMs / sampleCount
sampleCount
比較好理解,就是須要定義幾個窗口(默認秒級統計維度的話是兩個窗口),intervalInMs
指的就是咱們須要統計的時間間隔,例如咱們統計QPS的話那就是1000ms,windowLengthInMs
指的每一個窗口的大小,是由intervalInMs
除以sampleCount
得來
相似下圖
理解了上訴幾個參數的含義後,咱們直接進入到LeapArray
的currentWindow(long time)
方法中去看看具體的實現
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根據當前時間戳計算當前所屬的窗口數組索引下標 int idx = calculateTimeIdx(timeMillis); // 計算當前窗口的開始時間戳 long windowStart = calculateWindowStart(timeMillis); /* * 從窗口數組中獲取當前窗口項,分爲三種狀況 * * (1) 當前窗口爲空還未建立,則初始化一個 * (2) 當前窗口的開始時間和上面計算出的窗口開始時間一致,代表當前窗口還未過時,直接返回當前窗口 * (3) 當前窗口的開始時間 小於 上面計算出的窗口開始時間,代表當前窗口已過時,須要替換當前窗口 */ while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { /* * 第一種狀況,新建一個窗口項 */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * 第二種狀況 直接返回 */ return old; } else if (windowStart > old.windowStart()) { /* * 第三種狀況 替換窗口 */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // 第四種狀況,講道理不會走到這裏 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } /** * 根據當前時間戳計算當前所屬的窗口數組索引下標 */ private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; return (int)(timeId % array.length()); } /** * 計算當前窗口的開始時間戳 */ protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; }
上面的方法就是整個滑動窗口邏輯的核心代碼,註釋其實也寫的比較清晰了,簡單歸納下能夠分爲如下幾步:
根據當前時間戳 和 窗口數組大小 獲取到當前的窗口數組索引下標idx
,若是窗口數是2,那其實idx
只有兩種值(0 或 1)
根據當前時間戳(windowStart
) 計算獲得當前窗口的開始時間戳值。經過calculateWindowStart
計算來獲得,這個方法還蠻有意思的,經過當前時間戳和窗口時間大小取餘來獲得 與當前窗口開始時間的 偏移量。比我用定時任務實現高級多了 ... 😆 能夠去對比一下我以前文章中的蠢實現 [滑動窗口算法定時任務實現](https://github.com/WangJunnan/learn/blob/master/algorithm/src/main/java/com/walm/learn/algorithm/ratelimit/SlidingWindowRateLimit.java)
而後就是根據上面獲得的兩個值 來獲取當前時間窗口,這裏其實又分爲三種狀況
當前窗口爲空還未建立,則初始化一個
當前窗口的開始時間和上面計算出的窗口開始時間(windowStart
)一致,代表當前窗口還未過時,直接返回當前窗口
當前窗口的開始時間 小於 上面計算出的窗口(windowStart
)開始時間,代表當前窗口已過時,須要替換當前窗口
總的來講,currentWindow
方法的實現仍是很是巧妙的,由於我在看Sentinel
的源碼前也寫過一篇限流算法的文章,剛好其中也實現過一個滑動窗口限流算法,不過相比於Sentinel
的實現,我用了定時任務去作窗口的切換更新,顯然性能上更差,實現的也不優雅,你們也能夠去對比一下。[經常使用限流算法](https://www.jianshu.com/p/9edebaa446d3)
Sentinel系列