sentinel 滑動窗口統計機制

sentinel的滑動窗口統計機制就是根據當前時間,獲取對應的時間窗口,並更新該時間窗口中的各項統計指標(pass/block/rt等),這些指標被用來進行後續判斷,好比限流、降級等;隨着時間的推移,當前時間點對應的時間窗口是變化的,這時會涉及到時間窗口的初始化、複用等。能夠說,sentinel上的功能所用到的數據幾乎都是滑動窗口統計機制來維護和更新的。html

 

sentinel 處理流程是基於slot鏈(ProcessorSlotChain)來完成的,好比限流、熔斷等,其中重要的一個slot就是StatisticSlot,它是作各類數據統計的,而限流/熔斷的數據判斷來源就是StatisticSlot,StatisticSlot的各類數據統計都是基於滑動窗口來完成的,所以本文就重點分析StatisticSlot的滑動窗口統計機制。node

 

sentinel 的slot鏈(ProcessorSlotChain)是責任鏈模式的體現,那SlotChain是在哪建立的呢?是在 CtSph.lookProcessChain()方法中建立的,而且該方法會根據當前請求的資源先去一個靜態的HashMap中獲取,若是獲取不到纔會建立,建立後會保存到HashMap中。這就意味着,同一個資源會全局共享一個SlotChain。默認生成ProcessorSlotChain邏輯爲:安全

 1 // DefaultSlotChainBuilder
 2 public ProcessorSlotChain build() {  3    ProcessorSlotChain chain = new DefaultProcessorSlotChain();  4    chain.addLast(new NodeSelectorSlot());  5    chain.addLast(new ClusterBuilderSlot());  6    chain.addLast(new LogSlot());  7    chain.addLast(new StatisticSlot());  8    chain.addLast(new SystemSlot());  9    chain.addLast(new AuthoritySlot()); 10    chain.addLast(new FlowSlot()); 11    chain.addLast(new DegradeSlot()); 12 
13    return chain; 14 }

 

整個處理過程從第一個slot日後一直傳遞到最後一個的,當到達StatisticSlot時,開始統計各項指標,統計的結果又會被後續的Slot所採用,做爲各類規則校驗的依據。各類指標以下:數據結構

public enum MetricEvent { PASS, // Normal pass.
   BLOCK, // Normal block.
   EXCEPTION, // 異常統計
 SUCCESS, RT, // rt統計
 OCCUPIED_PASS }

 

StatisticSlot.entry流程

處理流程走到StatisticSlot時,首先觸發後續slot.entry方法,而後統計各項指標,後續slot中數據判斷來源就是這裏統計的各項指標。StatisticSlot.entry 邏輯以下:併發

 1 @Override  2 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNodenode, int count, Object... args) throws Throwable {  3    try {  4        // 觸發下一個Slot的entry方法
 5  fireEntry(context, resourceWrapper, node, count, args);  6        // 若是能經過SlotChain中後面的Slot的entry方法,說明沒有被限流或降級  7        // 統計信息
 8  node.increaseThreadNum();  9  node.addPassRequest(); 10        // 省略部分代碼
11   } catch (BlockException e) { 12  context.getCurEntry().setError(e); 13        // Add block count.
14  node.increaseBlockedQps(); 15        // 省略部分代碼
16        throw e; 17   } catch (Throwable e) { 18  context.getCurEntry().setError(e); 19        // Should not happen
20  node.increaseExceptionQps(); 21        // 省略部分代碼
22        throw e; 23  } 24 }

由以上代碼可知,StatisticSlot主要就作了3件事:app

  1. 觸發後續slot的entry方法,進行規則校驗分佈式

  2. 校驗經過則更新node實時指標數據ide

  3. 校驗不經過則更新node異常指標數據高併發

注意:因爲後續的fireEntry操做和更新本次統計信息是兩個操做,不是原子的,會形成限流不許的小問題,好比設置的FlowRule count爲20,併發狀況下可能稍大於20,不過針對大部分場景來講,這點誤差是能夠容忍的,畢竟咱們要的是限流效果,而不是必須精確的限流操做。性能

 

更新node實時指標數據

咱們能夠看到 node.addPassRequest() 這段代碼是在fireEntry執行以後執行的,這意味着,當前請求經過了sentinel的流控等規則,此時須要將當次請求記錄下來,也就是執行 node.addPassRequest() 這行代碼,具體的代碼以下所示:

1 // DefaultNode
2 public void addPassRequest() { 3    super.addPassRequest(); 4    this.clusterNode.addPassRequest(); 5 }

這裏的node是一個 DefaultNode 實例,這裏特別補充一個 DefaultNode 和 ClusterNode 的區別:

  • DefaultNode:保存着某個resource在某個context中的實時指標,每一個DefaultNode都指向一個ClusterNode。

  • ClusterNode:保存着某個resource在全部的context中實時指標的總和,一樣的resource會共享同一個ClusterNode,無論他在哪一個context中。

 

上面代碼無論是 DefaultNode 仍是 ClusterNode ,走的都是StatisticNode 對象的 addPassRequest 方法:

1 private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000); 2 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000); 3 
4 public void addPassRequest(int count) { 5    rollingCounterInSecond.addPass(count); // 對每秒指標統計
6    rollingCounterInMinute.addPass(count); // 每分鐘指標統計
7 }

 

每個經過的指標(pass)都是調用Metric 的接口進行操做的,而且是經過 ArrayMetric 這種實現類,代碼以下:

public ArrayMetric(int windowLength, int interval) { this.data = new WindowLeapArray(windowLength, interval); } public void addPass(int count) { // 獲取當前時間窗口
   WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }

 

首先經過 currentWindow() 獲取當前時間窗口,而後更新當前時間窗口對應的統計指標,如下代碼重點關注幾個判斷邏輯:

 1 // LeapArray
 2 public WindowWrap<T> currentWindow() {  3    return currentWindow(TimeUtil.currentTimeMillis());  4 }  5 // TimeUtil
 6 public static long currentTimeMillis() {  7    // currentTimeMillis是由一個tick線程每一個1ms更新一次,具體邏輯在TimeUtil類中
 8    return currentTimeMillis;  9 } 10 // LeapArray
11 public WindowWrap<T> currentWindow(long timeMillis) { 12    // 計算當前時間點落在滑動窗口的下標
13    int idx = calculateTimeIdx(timeMillis); 14    // Calculate current bucket start time.
15    long windowStart = calculateWindowStart(timeMillis); 16 
17    // 獲取當前時間點對應的windowWrap,array爲AtomicReferenceArray
18    while (true) { 19        WindowWrap<T> old = array.get(idx); 20        if (old == null) { 21            // 1.爲空表示當前時間窗口爲初始化過,建立WindowWrap並cas設置到array中
22            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs,windowStart, newEmptyBucket()); 23            if (array.compareAndSet(idx, null, window)) { 24                return window; 25           } else { 26  Thread.yield(); 27  } 28       } else if (windowStart == old.windowStart()) { 29            // 2.獲取的時間窗口正好對應當前時間,直接返回
30            return old; 31       } else if (windowStart > old.windowStart()) { 32            // 3.獲取的時間窗口爲老的,進行reset操做複用
33            if (updateLock.tryLock()) { 34                try { 35                    return resetWindowTo(old, windowStart); 36               } finally { 37  updateLock.unlock(); 38  } 39           } else { 40  Thread.yield(); 41  } 42       } else if (windowStart < old.windowStart()) { 43            // 4.時間回撥了,正常狀況下不會走到這裏
44            return new WindowWrap<T>(windowLengthInMs, windowStart,newEmptyBucket()); 45  } 46  } 47 }

 

獲取當前時間窗口對應的WindowWrap以後,就能夠進行更新操做了。

// wrap.value().addPass(count);
public void addPass(int n) { add(MetricEvent.PASS, n); } // MetricBucket
public MetricBucket add(MetricEvent event, long n) { // 對應MetricEvent枚舉中值
 counters[event.ordinal()].add(n); return this; }

到這裏爲止,整個指標統計流程就完成了,下面重點看下滑動窗口機制。

 

滑動窗口機制

時間窗口是用WindowWrap對象表示的,其屬性以下:

private final long windowLengthInMs;  // 時間窗口的長度
private long windowStart; // 時間窗口開始時間
private T value; // MetricBucket對象,保存各個指標數據

 

sentinel時間基準由tick線程來作,每1ms更新一次時間基準,邏輯以下:

currentTimeMillis = System.currentTimeMillis(); Thread daemon = new Thread(new Runnable() { @Override public void run() { while (true) { currentTimeMillis = System.currentTimeMillis(); try { TimeUnit.MILLISECONDS.sleep(1); } catch (Throwable e) { } } } }); daemon.setDaemon(true); daemon.setName("sentinel-time-tick-thread"); daemon.start();

 

sentinel默認有每秒和每分鐘的滑動窗口,對應的LeapArray類型,它們的初始化邏輯是:

protected int windowLengthInMs; // 單個滑動窗口時間值
protected int sampleCount; // 滑動窗口個數
protected int intervalInMs; // 週期值(至關於全部滑動窗口時間值之和)

public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount); }

針對每秒滑動窗口,windowLengthInMs=500,sampleCount=2,intervalInMs=1000,針對每分鐘滑動窗口,windowLengthInMs=1000,sampleCount=60,intervalInMs=60000,對應代碼:

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000); private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);

currentTimeMillis時間基準(tick線程)每1ms更新一次,經過currentWindow(timeMillis)方法獲取當前時間點對應的WindowWrap對象,而後更新對應的各類指標,用於作限流、降級時使用。注意,當前時間基準對應的事件窗口初始化時lazy模式,而且會複用的。

 

Sentinel 底層採用高性能的滑動窗口數據結構 LeapArray 來統計實時的秒級指標數據,能夠很好地支撐寫多於讀的高併發場景。最後以一張圖結束吧:

 

 

 往期精選 

以爲文章不錯,對你有所啓發和幫助,但願能轉發給更多的小夥伴。若是有問題,請關注下面公衆號,發送問題給我,多謝。
歡迎小夥伴 關注【TopCoder】閱讀更多精彩好文。

 

原文出處:https://www.cnblogs.com/luoxn28/p/11109144.html

相關文章
相關標籤/搜索