要實現限流、熔斷等功能,首先要解決的問題是如何實時採集服務(資源)調用信息。例如將某一個接口設置的限流闊值 1W/tps,那首先如何判斷當前的 TPS 是多少?Alibaba Sentinel 採用滑動窗口來實現實時數據的統計。java
舒適提示:若是對源碼不太感興趣,能夠先跳到文末,看一下滑動窗口的設計原理圖,再決定是否須要閱讀源碼。算法
@數組
咱們先對上述核心類作一個簡單的介紹,重點關注核心類的做用與核心屬性(重點須要探究其核心數據結構)。數據結構
滑動窗口的入口類爲 ArrayMetric ,咱們先來看一下其核心代碼。架構
private final LeapArray<MetricBucket> data; // @1 public ArrayMetric(int sampleCount, int intervalInMs) { // @2 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { // @3 if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } }
代碼@1:ArrayMetric 類惟一的屬性,用來存儲各個窗口的數據,這個是接下來咱們探究的重點。併發
代碼@2,代碼@3 該類提供了兩個構造方法,其核心參數爲:分佈式
注意,LeapArray 的泛型類爲 MetricBucket,意思就是指標桶,能夠認爲一個 MetricBucket 對象能夠存儲一個抽樣時間段內全部的指標,例如一個抽象時間段中經過數量、阻塞數量、異常數量、成功數量、響應時間,其實現的奧祕在 LongAdder 中,本文先不對該類進行詳細介紹,後續文章會單獨來探究其實現原理。ide
此次,咱們先不去看子類,反其道而行,先去看看其父類。函數
LeapArray 的核心屬性以下:高併發
上面的各個屬性的含義是從其構造函數得出來的,請其看構造函數。
public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); }
那咱們繼續來看 LeapArray 中的方法,深刻探究滑動窗口的實現細節。
該方法主要是根據當前時間來肯定處於哪個滑動窗口中,即找到上圖中的 WindowWrap,該方法內部就是調用其重載方法,參數爲系統的當前時間,故咱們重點來看一下重載方法的實現。
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); // @1 long windowStart = calculateWindowStart(timeMillis); // @2 while (true) { // 死循環查找當前的時間窗口,這裏之全部須要循環,是由於可能多個線程都在獲取當前時間窗口。 WindowWrap<T> old = array.get(idx); // @3 if (old == null) { // @4 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // @5 return window; } else { Thread.yield(); } } else if (windowStart == old.windowStart()) { // @6 return old; } else if (windowStart > old.windowStart()) { // @7 if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { // @8 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
代碼@1:計算當前時間會落在一個採集間隔 ( LeapArray ) 中哪個時間窗口中,即在 LeapArray 中屬性 AtomicReferenceArray <WindowWrap< T>> array 的下標。其實現算法以下:
代碼@2:計算當前時間戳所在的時間窗口的開始時間,即要計算出 WindowWrap 中 windowStart 的值,其實就是要算出小於當前時間戳,而且是 windowLengthInMs 的整數倍最大的數字,Sentinel 給出是算法爲 ( timeMillis - timeMillis % windowLengthInMs )。
代碼@3:嘗試從 LeapArray 中的 WindowWrap 數組查找指定下標的元素。
代碼@4:若是指定下標的元素爲空,則須要建立一個 WindowWrap 。 其中 WindowWrap 中的 MetricBucket 是調用其抽象方法 newEmptyBucket (timeMillis),由不一樣的子類去實現。
代碼@5:這裏使用了 CAS 機制來更新 LeapArray 數組中的 元素,由於同一時間戳,可能有多個線程都在獲取當前時間窗口對象,但該時間窗口對象還未建立,這裏就是避免建立多個,致使統計數據被覆蓋,若是用 CAS 更新成功的線程,則返回新建好的 WindowWrap ,CAS 設置不成功的線程繼續跑這個流程,而後會進入到代碼@6。
代碼@6:若是指定索引下的時間窗口對象不爲空並判斷起始時間相等,則返回。
代碼@7:若是原先存在的窗口開始時間小於當前時間戳計算出來的開始時間,則表示 bucket 已被棄用。則須要將開始時間重置到新時間戳對應的開始時間戳,重置的邏輯將在下文詳細介紹。
代碼@8:應該不會進入到該分支,由於當前時間算出來時間窗口不會比以前的小。
接下來咱們來看一下窗口的過時機制。
public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) { return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap); } public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) { return time - windowWrap.windowStart() > intervalInMs; }
判斷滑動窗口是否生效的依據是當系統時間與滑動窗口的開始時間戳的間隔大於一個採集時間,即表示過時。即從當前窗口開始,一般包含的有效窗口爲 sampleCount 個有效滑動窗口。
根據當前時間獲取前一個有效滑動窗口,其代碼以下:
public WindowWrap<T> getPreviousWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis - windowLengthInMs); // @1 timeMillis = timeMillis - windowLengthInMs; WindowWrap<T> wrap = array.get(idx); if (wrap == null || isWindowDeprecated(wrap)) { // @2 return null; } if (wrap.windowStart() + windowLengthInMs < (timeMillis)) { // @3 return null; } return wrap; }
其實現的關鍵點以下:
代碼@1:用當前時間減去一個時間窗口間隔,而後去定位所在 LeapArray 中 數組的下標。
代碼@2:若是爲空或已過時,則返回 null。
代碼@3:若是定位的窗口的開始時間再加上 windowLengthInMs 小於 timeMills ,說明失效,則返回 null,一般是不會走到該分支。
通過上面的分析,雖然還有一個核心方法 (resetWindowTo) 未進行分析,但咱們應該能夠畫出滑動窗口的實現的實現原理圖了。
接下來對上面的圖進行一個簡單的說明:下面的示例以採集間隔爲 1 s,抽樣次數爲 2。
首先會建立一個 LeapArray,內部持有一個數組,元素爲 2,一開始進行採集時,數組的第一個,第二個下標都會 null,例如當前時間通過 calculateTimeIdx 定位到下標爲 0,此時沒有滑動窗口,會建立一個滑動窗口,而後該滑動窗口會採集指標,隨着進入 1s 的後500ms,後會建立第二個抽樣窗口。
而後時間前進 1s,又會定位到下標爲 0 的地方,但此時不會爲空,由於有上一秒的採集數據,故須要將這些採集數據丟棄 ( MetricBucket value ),而後重置該窗口的 windowStart,這就是 resetWindowTo 方法的做用。
在 ArrayMetric 的構造函數出現了 LeapArray 的兩個實現類型 BucketLeapArray 與 OccupiableBucketLeapArray。
其中 BucketLeapArray 比較簡單,在這裏就不深刻研究了, 咱們接下來將重點探討一下 OccupiableBucketLeapArray 的實現原理,即支持搶佔將來的「令牌」。
所謂的 OccupiableBucketLeapArray ,實現的思想是當前抽樣統計中的「令牌」已耗盡,即達到用戶設定的相關指標的闊值後,能夠向下一個時間窗口,即借用將來一個採樣區間。接下來咱們詳細來探討一下它的核心實現原理。
3.1 類圖
咱們重點關注一下 OccupiableBucketLeapArray 引入了一個 FutureBucketLeapArray 的成員變量,其命名叫 borrowArray,即爲借用的意思。
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs); }
從構造函數能夠看出,不只建立了一個常規的 LeapArray,對應一個採集週期,還會建立一個 borrowArray ,也會包含一個採集週期。
public MetricBucket newEmptyBucket(long time) { MetricBucket newBucket = new MetricBucket(); // @1 MetricBucket borrowBucket = borrowArray.getWindowValue(time); // @2 if (borrowBucket != null) { newBucket.reset(borrowBucket); } return newBucket; }
咱們知道 newEmptyBucket 是在獲取當前窗口時,對應的數組下標爲空的時會建立。
代碼@1:首先新建一個 MetricBucket。
代碼@2:在新建的時候,若是曾經有借用過將來的滑動窗口,則將將來的滑動窗口上收集的數據 copy 到新建立的採集指標上,再返回。
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) { w.resetTo(time); MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { w.value().reset(); w.value().addPass((int)borrowBucket.pass()); } else { w.value().reset(); } return w; }
遇到過時的滑動窗口時,須要對滑動窗口進行重置,這裏的思路和 newEmptyBucket 的核心思想是同樣的,即若是存在已借用的狀況,在重置後須要加上在將來已使用過的許可,就不一一展開了。
public void addWaiting(long time, int acquireCount) { WindowWrap<MetricBucket> window = borrowArray.currentWindow(time); window.value().add(MetricEvent.PASS, acquireCount); }
通過上面的分析,先作一個大膽的猜想,該方法應該是當前滑動窗口中的「令牌」已使用完成,借用將來的令牌。將在下文給出證實。
滑動窗口的實現原理就介紹到這裏了。你們能夠按照上面的代碼結合下圖作一個理解。
思考題,你們能夠畫一下 OccupiableBucketLeapArray 滑動窗口的圖示。這部份內容也將在個人【中間件知識星球】中與各位星友一塊兒探討,歡迎你們的加入。
推薦閱讀:源碼分析 Alibaba Sentinel 專欄。
一、Alibaba Sentinel 限流與熔斷初探(技巧篇)
二、源碼分析 Sentinel 之 Dubbo 適配原理
做者信息:丁威,《RocketMQ技術內幕》做者,目前擔任中通科技技術平臺部資深架構師,維護 中間件興趣圈公衆號,目前主要發表了源碼閱讀java集合、JUC(java併發包)、Netty、ElasticJob、Mycat、Dubbo、RocketMQ、mybaits等系列源碼。點擊連接:加入筆者的知識星球,一塊兒探討高併發、分佈式服務架構,分享閱讀源碼心得。