Sentinel 原理-滑動窗口

逅弈 轉載請註明原創出處,謝謝!java

系列文章

Sentinel 原理-全解析node

Sentinel 原理-調用鏈數組

Sentinel 原理-實體類緩存

Sentinel 實戰-限流篇併發

Sentinel 實戰-控制檯篇app

Sentinel 實戰-規則持久化框架

上篇文章中,咱們瞭解了sentinel是如何構造資源調用鏈的,以及每種Slot的具體做用,其中最重要的一個Slot非StatisticSlot莫屬,由於他作的事是其餘全部的Slot的基礎。包括各類限流,熔斷的規則,都是基於StatisticSlot統計出來的結果進行規則校驗的。本篇文章我將深刻研究下sentinel是如何進行qps等指標的統計的,首先要肯定的一點是,sentinel是基於滑動時間窗口來實現的。ide

化整爲零

咱們已經知道了Slot是從第一個日後一直傳遞到最後一個的,且當信息傳遞到StatisticSlot時,這裏就開始進行統計了,統計的結果又會被後續的Slot所採用,做爲規則校驗的依據。咱們先來看一段很是熟悉的代碼,就是StatisticSlot中的entry方法:高併發

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 觸發下一個Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 若是能經過SlotChain中後面的Slot的entry方法,說明沒有被限流或降級
        // 統計信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代碼
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockedQps();
        // 省略部分代碼
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代碼
        throw e;
    }
}
複製代碼

上面的代碼註釋寫的已經很清晰了,簡單的來講,StatisticSlot中就是作了三件事:post

  • 1.經過node中的當前的實時統計指標信息進行規則校驗
  • 2.若是經過了校驗,則從新更新node中的實時指標數據
  • 3.若是被block或出現了異常了,則從新更新node中block的指標或異常指標

從上面的代碼中能夠很清晰的看到,全部的實時指標的統計都是在node中進行的。這裏咱們拿qps的指標進行分析,看sentinel是怎麼統計出qps的,這裏能夠事先透露下他是經過滑動時間窗口來統計的,而滑動窗口就是本篇文章的重點。

DefaultNode和ClusterNode

咱們能夠看到 node.addPassRequest() 這段代碼是在fireEntry執行以後執行的,這意味着,當前請求經過了sentinel的流控等規則,此時須要將當次請求記錄下來,也就是執行 node.addPassRequest() 這行代碼,如今咱們進入這個代碼看看。具體的代碼以下所示:

@Override
public void addPassRequest() {
	super.addPassRequest();
	this.clusterNode.addPassRequest();
}
複製代碼

首先咱們知道這裏的node是一個 DefaultNode 實例,這裏特別補充一個 DefaultNodeClusterNode 的區別:

DefaultNode:保存着某個resource在某個context中的實時指標,每一個DefaultNode都指向一個ClusterNode

ClusterNode:保存着某個resource在全部的context中實時指標的總和,一樣的resource會共享同一個ClusterNode,無論他在哪一個context中

StatisticNode

好了,知道了他們的區別後,咱們再來看上面的代碼,其實都是執行的 StatisticNode 對象的 addPassRequest 方法。進入這個方法中看下具體的代碼:

private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL);

private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

@Override
public void addPassRequest() {
    rollingCounterInSecond.addPass();
    rollingCounterInMinute.addPass();
}
複製代碼

Metric

從代碼中咱們能夠看到,具體的增長pass指標是經過一個叫 Metric 的接口進行操做的,而且是經過 ArrayMetric 這種實現類,如今咱們在進入 ArrayMetric 中看一下。具體的代碼以下所示:

private final WindowLeapArray data;

public ArrayMetric(int windowLength, int interval) {
    this.data = new WindowLeapArray(windowLength, interval);
}

@Override
public void addPass() {
    WindowWrap<Window> wrap = data.currentWindow();
    wrap.value().addPass();
}
複製代碼

LeapArray和Window

本覺得在ArrayMetric中應該能夠看到具體的統計操做了,誰知道又出現了一個叫 WindowLeapArray 的類,不過從名字上看有點 「窗口」 的意思了。繼續跟代碼,發現 wrap.value().addPass() 是執行的 wrap 對象所包裝的 Window 對象的 addPass 方法,這裏就是最終的增長qps中q的值的地方了。進入 Window 類中看一下,具體的代碼以下:

private final LongAdder pass = new LongAdder();
private final LongAdder block = new LongAdder();
private final LongAdder exception = new LongAdder();
private final LongAdder rt = new LongAdder();
private final LongAdder success = new LongAdder();

public void addPass() {
    pass.add(1L);
}
public void addException() {
    exception.add(1L);
}
public void addBlock() {
    block.add(1L);
}
public void addSuccess() {
    success.add(1L);
}
public void addRT(long rt) {
    this.rt.add(rt);

    // Not thread-safe, but it's okay.
    if (rt < minRt) {
        minRt = rt;
    }
}
複製代碼

看到這裏是否是就放心了,原來 Window 是經過 LongAdder 來保存各類指標的值的,看到 LongAdder 是否是馬上就想到 AtomicLong 了?可是這裏爲何不用 AtomicLong ,而是用 LongAdder 呢?主要是 LongAdder 在高併發下有更好的吞吐量,代價是花費了更多的空間,典型的以空間換時間。

完整的流程

分析到這裏咱們已經把指標統計的完整鏈路理清楚了,能夠用下面這張圖來表示整個過程:

add-pass-request.png

有人可能會問了,你不是要分析滑動窗口的嗎?搞了半天只畫了一張圖,並且圖上還多了一個 timeId 之類的東西,這個根本沒在上面出現過。

好了,如今咱們就能夠來分析具體的滑動窗口了,這裏的 timeId 是用來表示一個 WindowWrap 對象的時間id。爲何要用 timeId 來表示呢?咱們能夠看到每個 WindowWrap 對象由三個部分組成:

  • windowStart: 時間窗口的開始時間,單位是毫秒
  • windowLength: 時間窗口的長度,單位是毫秒
  • value: 時間窗口的內容,在 WindowWrap 中是用泛型表示這個值的,但實際上就是 Window

咱們先大體的瞭解下時間窗口的構成,後面會再來分析 timeId 的做用。首先一個時間窗口是用來在某個固定時間長度內保存一些統計值的虛擬概念。有了這個概念後,咱們就能夠經過時間窗口來計算統計一段時間內的諸如:qps,rt,threadNum等指標了。

繼續深刻

咱們再回到 ArrayMetric 中看一下:

private final WindowLeapArray data;

public ArrayMetric(int windowLength, int interval) {
    this.data = new WindowLeapArray(windowLength, interval);
}
複製代碼

首先建立了一個 WindowLeapArray 對象,看一下 WindowLeapArray 類的代碼:

public class WindowLeapArray extends LeapArray<Window> {
	public WindowLeapArray(int windowLengthInMs, int intervalInSec) {
	    super(windowLengthInMs, intervalInSec);
	}
}  
複製代碼

該對象的構造方法有兩個參數:

  • windowLengthInMs :一個用毫秒作單位的時間窗口的長度
  • intervalInSec ,一個用秒作單位的時間間隔,這個時間間隔具體是作什麼的,下面會分析。

而後 WindowLeapArray 繼承自 LeapArray ,在初始化 WindowLeapArray 的時候,直接調用了父類的構造方法,再來看一下父類 LeapArray 的代碼:

public abstract class LeapArray<T> {

    // 時間窗口的長度
    protected int windowLength;
    // 採樣窗口的個數
    protected int sampleCount;
    // 以毫秒爲單位的時間間隔
    protected int intervalInMs;

    // 採樣的時間窗口數組
    protected AtomicReferenceArray<WindowWrap<T>> array;

    /** * LeapArray對象 * @param windowLength 時間窗口的長度,單位:毫秒 * @param intervalInSec 統計的間隔,單位:秒 */
    public LeapArray(int windowLength, int intervalInSec) {
        this.windowLength = windowLength;
        // 時間窗口的採樣個數,默認爲2個採樣窗口
        this.sampleCount = intervalInSec * 1000 / windowLength;
        this.intervalInMs = intervalInSec * 1000;

        this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
    }
}
複製代碼

能夠很清晰的看出來在 LeapArray 中建立了一個 AtomicReferenceArray 數組,用來對時間窗口中的統計值進行採樣。經過採樣的統計值再計算出平均值,就是咱們須要的最終的實時指標的值了。

能夠看到我在上面的代碼中經過註釋,標明瞭默認採樣的時間窗口的個數是2個,這個值是怎麼獲得的呢?咱們回憶一下 LeapArray 對象建立,是經過在 StatisticNode 中,new了一個 ArrayMetric ,而後將參數一路往上傳遞後建立的:

private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount,IntervalProperty.INTERVAL);
複製代碼

SampleCountProperty.sampleCount 的默認值是2,因此第一個參數 windowLengthInMs 的值是 500ms,那麼1秒鐘是1000ms,每一個時間窗口的長度是500ms,也就是說總共分了兩個採樣的時間窗口。

如今繼續回到 ArrayMetric.addPass() 方法:

@Override
public void addPass() {
    WindowWrap<Window> wrap = data.currentWindow();
    wrap.value().addPass();
}
複製代碼

獲取當前Window

咱們已經分析了 wrap.value().addPass() ,如今只須要分析清楚 data.currentWindow() 具體作了什麼,拿到了當前時間窗口就能夠 了。繼續深刻代碼,最終定位到下面的代碼:

@Override
public WindowWrap<Window> currentWindow(long time) {
    long timeId = time / windowLength;
    // Calculate current index.
    int idx = (int)(timeId % array.length());

    // Cut the time to current window start.
    long time = time - time % windowLength;

    while (true) {
        WindowWrap<Window> old = array.get(idx);
        if (old == null) {
            WindowWrap<Window> window = new WindowWrap<Window>(windowLength, time, new Window());
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        } else if (time == old.windowStart()) {
            return old;
        } else if (time > old.windowStart()) {
            if (addLock.tryLock()) {
                try {
                    // if (old is deprecated) then [LOCK] resetTo currentTime.
                    return resetWindowTo(old, time);
                } finally {
                    addLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (time < old.windowStart()) {
            // Cannot go through here.
            return new WindowWrap<Window>(windowLength, time, new Window());
        }
    }
}
複製代碼

初次看到這段代碼時,可能會以爲有點懵,可是細細的分析一下,實際能夠把他分紅如下幾步:

  • 1.根據當前時間,算出該時間的timeId,並根據timeId算出當前窗口在採樣窗口數組中的索引idx
  • 2.根據當前時間算出當前窗口的應該對應的開始時間time,以毫秒爲單位
  • 3.根據索引idx,在採樣窗口數組中取得一個時間窗口old
  • 4.循環判斷知道獲取到一個當前時間窗口
    • 4.1.若是old爲空,則建立一個時間窗口,並將它插入到array的第idx個位置,array上面已經分析過了,是一個 AtomicReferenceArray
    • 4.2.若是當前窗口的開始時間time與old的開始時間相等,那麼說明old就是當前時間窗口,直接返回old
    • 4.3.若是當前窗口的開始時間time大於old的開始時間,則說明old窗口已通過時了,將old的開始時間更新爲最新值:time,下個循環中會在步驟4.2中返回
    • 4.4.若是當前窗口的開始時間time小於old的開始時間,實際上這種狀況是不可能存在的,由於time是當前時間,old是過去的一個時間

上面的代碼有個比較容易混淆的地方,就是計算出來的當前時間窗口的開始時間,沒有使用一個新的變量來表示,而是直接用time來表示。

另外timeId是會隨着時間的增加而增長,當前時間每增加一個windowLength的長度,timeId就加1。可是idx不會增加,只會在0和1之間變換,由於array數組的長度是2,只有兩個採樣時間窗口。至於爲何默認只有兩個採樣窗口,我的以爲由於sentinel是比較輕量的框架。時間窗口中保存着不少統計數據,若是時間窗口過多的話,一方面會佔用過多內存,另外一方面時間窗口過多就意味着時間窗口的長度會變小,若是時間窗口長度變小,就會致使時間窗口過於頻繁的滑動。

通過分析,加上註釋,並將表示當前窗口開始時間的time變量,重命名成其餘變量,使得代碼更具可讀性,調整後的代碼以下:

@Override
public WindowWrap<Window> currentWindow(long time) {
    // time每增長一個windowLength的長度,timeId就會增長1,時間窗口就會往前滑動一個
    long timeId = time / windowLength;
    // Calculate current index.
    // idx被分紅[0,arrayLength-1]中的某一個數,做爲array數組中的索引
    int idx = (int)(timeId % array.length());

    // Cut the time to current window start.
    long currentWindowStart = time - time % windowLength;

    while (true) {
        // 從採樣數組中根據索引獲取緩存的時間窗口
        WindowWrap<Window> old = array.get(idx);
        // array數組長度不宜過大,不然old不少狀況下都命中不了,就會建立不少個WindowWrap對象
        if (old == null) {
            // 若是沒有獲取到,則建立一個新的
            WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
            // 經過CAS將新窗口設置到數組中去
            if (array.compareAndSet(idx, null, window)) {
                // 若是能設置成功,則將該窗口返回
                return window;
            } else {
                // 不然當前線程讓出時間片,等待
                Thread.yield();
            }
        // 若是當前窗口的開始時間與old的開始時間相等,則直接返回old窗口
        } else if (currentWindowStart == old.windowStart()) {
            return old;
        // 若是當前時間窗口的開始時間已經超過了old窗口的開始時間,則放棄old窗口
        // 並將time設置爲新的時間窗口的開始時間,此時窗口向前滑動
        } else if (currentWindowStart > old.windowStart()) {
            if (addLock.tryLock()) {
                try {
                    // if (old is deprecated) then [LOCK] resetTo currentTime.
                    return resetWindowTo(old, currentWindowStart);
                } finally {
                    addLock.unlock();
                }
            } else {
                Thread.yield();
            }
        // 這個條件不可能存在
        } else if (currentWindowStart < old.windowStart()) {
            // Cannot go through here.
            return new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
        }
    }
}
複製代碼

看圖理解

爲了更好的理解,下面我用幾幅圖來描述下這個過程。

slide-window-1.png

初始的時候arrays數組中只有一個窗口(多是第一個,也多是第二個),每一個時間窗口的長度是500ms,這就意味着只要當前時間與時間窗口的差值在500ms以內,時間窗口就不會向前滑動。例如,假如當前時間走到300或者500時,當前時間窗口仍然是相同的那個:

slide-window-2.png

時間繼續往前走,當超過500ms時,時間窗口就會向前滑動到下一個,這時就會更新當前窗口的開始時間:

slide-window-3.png

時間繼續往前走,只要不超過1000ms,則當前窗口不會發生變化:

slide-window-4.png

當時間繼續往前走,當前時間超過1000ms時,就會再次進入下一個時間窗口,此時arrays數組中的窗口將會有一個失效,會有另外一個新的窗口進行替換:

slide-window-5.png

以此類推隨着時間的流逝,時間窗口也在發生變化,在當前時間點中進入的請求,會被統計到當前時間對應的時間窗口中。計算qps時,會用當前採樣的時間窗口中對應的指標統計值除以時間間隔,就是具體的qps。具體的代碼在StatisticNode中:

@Override
public long totalQps() {
    return passQps() + blockedQps();
}

@Override
public long blockedQps() {
    return rollingCounterInSecond.block() / IntervalProperty.INTERVAL;
}

@Override
public long passQps() {
    return rollingCounterInSecond.pass() / IntervalProperty.INTERVAL;
}
複製代碼

到這裏就基本上把滑動窗口的原理分析清楚了,還有不清楚的地方,最好可以藉助代碼繼續分析下,最好的作法就是debug,這裏貼一下筆者在分析 currentWindow 方法時採起的測試代碼:

public static void main(String[] args) throws InterruptedException {
    int windowLength = 500;
    int arrayLength = 2;
    calculate(windowLength,arrayLength);

    Thread.sleep(100);
    calculate(windowLength,arrayLength);

    Thread.sleep(200);
    calculate(windowLength,arrayLength);

    Thread.sleep(200);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);
}

private static void calculate(int windowLength,int arrayLength){
    long time = System.currentTimeMillis();
    long timeId = time/windowLength;
    long currentWindowStart = time - time % windowLength;
    int idx = (int)(timeId % arrayLength);
	System.out.println("time="+time+",currentWindowStart="+currentWindowStart+",timeId="+timeId+",idx="+idx);
}
複製代碼

這裏假設時間窗口的長度爲500ms,數組的大小爲2,當前時間做爲輸入參數,計算出當前時間窗口的timeId、windowStart、idx等值。執行上面的代碼後,將打印出以下的結果:

time=1540629334619,currentWindowStart=1540629334500,timeId=3081258669,idx=1
time=1540629334721,currentWindowStart=1540629334500,timeId=3081258669,idx=1
time=1540629334924,currentWindowStart=1540629334500,timeId=3081258669,idx=1
time=1540629335129,currentWindowStart=1540629335000,timeId=3081258670,idx=0
time=1540629335633,currentWindowStart=1540629335500,timeId=3081258671,idx=1
time=1540629336137,currentWindowStart=1540629336000,timeId=3081258672,idx=0
time=1540629336641,currentWindowStart=1540629336500,timeId=3081258673,idx=1
time=1540629337145,currentWindowStart=1540629337000,timeId=3081258674,idx=0
time=1540629337649,currentWindowStart=1540629337500,timeId=3081258675,idx=1
複製代碼

能夠看出來,windowStart每增長500ms,timeId就加1,這時就是時間窗口發生滑動的時候。

更多原創好文,請關注「逅弈逐碼」

更多原創好文,請關注公衆號「逅弈逐碼」

相關文章
相關標籤/搜索