Sentinel源碼解析三(滑動窗口流量統計)

前言

Sentinel的核心功能之一是流量統計,例如咱們經常使用的指標QPS,當前線程數等。上一篇文章中咱們已經大體提到了提供數據統計功能的Slot(StatisticSlot)StatisticSlotSentinel的整個體系中扮演了一個很是重要的角色,後續的一系列操做(限流,熔斷)等都依賴於StatisticSlot所統計出的數據。html

本文所要討論的重點就是StatisticSlot是如何作的流量統計?java

其實在以前介紹經常使用限流算法[經常使用限流算法](https://www.jianshu.com/p/9edebaa446d3)的時候已經有提到過一個算法滑動窗口限流,該算法的滑動窗口原理其實跟Sentinel所提供的流量統計原理是同樣的,都是基於時間窗口的滑動統計node

回到StatisticSlot

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,

 boolean prioritized, Object... args) throws Throwable {

...

// 當前請求線程數加一

node.increaseThreadNum();

// 新增請求數

node.addPassRequest(count);

...

}

能夠看到StatisticSlot主要統計了兩種類型的數據git

  1. 線程數github

  2. 請求數(QPS)算法

對於線程數的統計比較簡單,經過內部維護一個LongAdder來進行當前線程數的統計,每進入一個請求加1,每釋放一個請求減1,從而獲得當前的線程數數組

對於請求數QPS的統計則相對比較複雜,其中有用到滑動窗口原理(也是本文的重點),下面根據源碼來深刻的分析app

DefaultNode和StatisticNode

public void addPassRequest(int count) {

  // 調用父類(StatisticNode)來進行統計

 super.addPassRequest(count);

  // 根據clusterNode 彙總統計(背後也是調用父類StatisticNode)

 this.clusterNode.addPassRequest(count);

}

最終都是調用了父類StatisticNodeaddPassRequest方法性能

/**

* 按秒統計,分紅兩個窗口,每一個窗口500ms,用來統計QPS

 */

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,

 IntervalProperty.INTERVAL);

/**

* 按分鐘統計,分紅60個窗口,每一個窗口 1000ms

 */

private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

public void addPassRequest(int count) {

 rollingCounterInSecond.addPass(count);

 rollingCounterInMinute.addPass(count);

}

代碼比較簡單,能夠知道內部是調用了ArrayMetricaddPass方法來統計的,而且統計了兩種不一樣時間維度的數據(秒級和分鐘級)this

ArrayMetric

private final LeapArray<MetricBucket> data;

public ArrayMetric(int sampleCount, int intervalInMs) {

 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {

 if (enableOccupy) {

 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

 } else {

 this.data = new BucketLeapArray(sampleCount, intervalInMs);

 }

}

public void addPass(int count) {

  // 1\. 獲取當前窗口

 WindowWrap<MetricBucket> wrap = data.currentWindow();

  // 2\. 當前窗口加1

 wrap.value().addPass(count);

}

ArrayMetric其實也是一個包裝類,內部經過實例化LeapArray的對應實現類,來實現具體的統計邏輯,LeapArray是一個抽象類,OccupiableBucketLeapArrayBucketLeapArray都是其具體的實現類

OccupiableBucketLeapArray在1.5版本以後才被引入,主要是爲了解決一些高優先級的請求在限流觸發的時候也能經過(經過佔用將來時間窗口的名額來實現) 也是默認使用的LeapArray實現類

而統計的邏輯也比較清楚,分紅了兩步:

  1. 定位到當前窗口

  2. 獲取到當前窗口WindowWrapMetricBucket並執行addPass邏輯

這裏咱們先看下第二步中的MetricBucket類,看看它作了哪些事情

MetricBucket

/**

 * 存放當前窗口各類類型的統計值(類型包括 PASS BLOCK EXCEPTION 等)

 */

private final LongAdder[] counters;

public MetricBucket() {

 MetricEvent[] events = MetricEvent.values();

 this.counters = new LongAdder[events.length];

 for (MetricEvent event : events) {

 counters[event.ordinal()] = new LongAdder();

 }

 initMinRt();

}

// 統計pass數

public void addPass(int n) {

 add(MetricEvent.PASS, n);

}

// 統計可佔用的pass數

public void addOccupiedPass(int n) {

 add(MetricEvent.OCCUPIED_PASS, n);

}

// 統計異常數

public void addException(int n) {

 add(MetricEvent.EXCEPTION, n);

}

// 統計block數

public void addBlock(int n) {

 add(MetricEvent.BLOCK, n);

}

....

MetricBucket經過定義了一個LongAdder數組來存儲不一樣類型的流量統計值,具體的類型則都定義在MetricEvent枚舉中。

執行addPass方法對應LongAdder數組索引下表爲0的值遞增

下面再來看下data.currentWindow()的內部邏輯

OccupiableBucketLeapArray

OccupiableBucketLeapArray繼承了抽象類LeapArray,核心邏輯也是在LeapArray

/**

* 時間窗口大小  單位ms

 */

protected int windowLengthInMs;

/**

* 切分的窗口數

 */

protected int sampleCount;

/**

 * 統計的時間間隔 intervalInMs = windowLengthInMs * sampleCount

 */ 

protected int intervalInMs;

/**

 * 窗口數組 數組大小 = sampleCount

 */

protected final AtomicReferenceArray<WindowWrap<T>> array;

/**

 * update lock 更新窗口時須要上鎖

 */

private final ReentrantLock updateLock = new ReentrantLock();

/**

 * @param sampleCount 須要劃分的窗口數

 * @param intervalInMs 間隔的統計時間

 */

public LeapArray(int sampleCount, int intervalInMs) {

 this.windowLengthInMs = intervalInMs / sampleCount;

 this.intervalInMs = intervalInMs;

 this.sampleCount = sampleCount;

 this.array = new AtomicReferenceArray<>(sampleCount);

}

/**

* 獲取當前窗口

 */

public WindowWrap<T> currentWindow() {

 return currentWindow(TimeUtil.currentTimeMillis());

}

以上須要着重理解的是幾個參數的含義:

  1. sampleCount 定義的窗口的數

  2. intervalInMs 統計的時間間隔

  3. windowLengthInMs 每一個窗口的時間大小 = intervalInMs / sampleCount

sampleCount 比較好理解,就是須要定義幾個窗口(默認秒級統計維度的話是兩個窗口),intervalInMs 指的就是咱們須要統計的時間間隔,例如咱們統計QPS的話那就是1000ms,windowLengthInMs 指的每一個窗口的大小,是由intervalInMs除以sampleCount得來

相似下圖

理解了上訴幾個參數的含義後,咱們直接進入到LeapArraycurrentWindow(long time)方法中去看看具體的實現

public WindowWrap<T> currentWindow(long timeMillis) {

 if (timeMillis < 0) {

 return null;

 }

  // 根據當前時間戳計算當前所屬的窗口數組索引下標

 int idx = calculateTimeIdx(timeMillis);

  // 計算當前窗口的開始時間戳

 long windowStart = calculateWindowStart(timeMillis);

 /*

 * 從窗口數組中獲取當前窗口項,分爲三種狀況

 *

 * (1) 當前窗口爲空還未建立,則初始化一個

 * (2) 當前窗口的開始時間和上面計算出的窗口開始時間一致,代表當前窗口還未過時,直接返回當前窗口

 * (3) 當前窗口的開始時間  小於  上面計算出的窗口開始時間,代表當前窗口已過時,須要替換當前窗口

 */

 while (true) {

 WindowWrap<T> old = array.get(idx);

 if (old == null) {

 /*

 * 第一種狀況,新建一個窗口項

 */

 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

 if (array.compareAndSet(idx, null, window)) {

  // Successfully updated, return the created bucket.

 return window;

 } else {

  // Contention failed, the thread will yield its time slice to wait for bucket available.

 Thread.yield();

 }

 } else if (windowStart == old.windowStart()) {

 /*

 * 第二種狀況 直接返回

 */

 return old;

 } else if (windowStart > old.windowStart()) {

 /*

 * 第三種狀況 替換窗口

 */

 if (updateLock.tryLock()) {

 try {

  // Successfully get the update lock, now we reset the bucket.

 return resetWindowTo(old, windowStart);

 } finally {

 updateLock.unlock();

 }

 } else {

  // Contention failed, the thread will yield its time slice to wait for bucket available.

 Thread.yield();

 }

 } else if (windowStart < old.windowStart()) {

  // 第四種狀況,講道理不會走到這裏

 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

 }

 }

}

/**

* 根據當前時間戳計算當前所屬的窗口數組索引下標

 */

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {

 long timeId = timeMillis / windowLengthInMs;

 return (int)(timeId % array.length());

}

/**

* 計算當前窗口的開始時間戳

 */

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {

 return timeMillis - timeMillis % windowLengthInMs;

}

上面的方法就是整個滑動窗口邏輯的核心代碼,註釋其實也寫的比較清晰了,簡單歸納下能夠分爲如下幾步:

  1. 根據當前時間戳 和 窗口數組大小 獲取到當前的窗口數組索引下標idx,若是窗口數是2,那其實idx只有兩種值(0 或 1)

  2. 根據當前時間戳(windowStart) 計算獲得當前窗口的開始時間戳值。經過calculateWindowStart計算來獲得,這個方法還蠻有意思的,經過當前時間戳和窗口時間大小取餘來獲得 與當前窗口開始時間的 偏移量。比我用定時任務實現高級多了 ... 😆 能夠去對比一下我以前文章中的蠢實現 [滑動窗口算法定時任務實現](https://github.com/WangJunnan/learn/blob/master/algorithm/src/main/java/com/walm/learn/algorithm/ratelimit/SlidingWindowRateLimit.java)

  3. 而後就是根據上面獲得的兩個值 來獲取當前時間窗口,這裏其實又分爲三種狀況

  • 當前窗口爲空還未建立,則初始化一個

  • 當前窗口的開始時間和上面計算出的窗口開始時間(windowStart)一致,代表當前窗口還未過時,直接返回當前窗口

  • 當前窗口的開始時間 小於 上面計算出的窗口(windowStart)開始時間,代表當前窗口已過時,須要替換當前窗口

總結

總的來講,currentWindow方法的實現仍是很是巧妙的,由於我在看Sentinel的源碼前也寫過一篇限流算法的文章,剛好其中也實現過一個滑動窗口限流算法,不過相比於Sentinel的實現,我用了定時任務去作窗口的切換更新,顯然性能上更差,實現的也不優雅,你們也能夠去對比一下。[經常使用限流算法](https://www.jianshu.com/p/9edebaa446d3)

Sentinel系列

相關文章
相關標籤/搜索