微信公衆號: 房東的小黑黑
路途隨遙遠,未來更美好
學海無涯,你們一塊兒加油!node
Sentinel有一個重要的功能,即實時數據統計分析,咱們能夠得到在每1秒或者每1分鐘下的每一個上下文調用鏈路中的某一資源的請求數、阻塞數或響應時間;也能夠得到某一資源全局的請求數、阻塞數或者響應時間。 主要實現邏輯是在StatisticSlot
中。web
Statisticslot
處於調用鏈slotchain中的第三個,負責統計資源的實時狀態,調用到slotchain中的任意一個slot時,都會觸發該slot的entry方法。數組
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// 觸發下一個Slot的entry方法
fireEntry(context, resourceWrapper, node, count, args);
// 若是能經過SlotChain中後面的Slot的entry方法,說明沒有被限流或降級
// 統計信息
node.increaseThreadNum();
node.addPassRequest();
// 省略部分代碼
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
// 省略部分代碼
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
// 省略部分代碼
throw e;
}
}
複製代碼
entry()
主要有三個部分:
1) 首先會觸發後續slot的entry方法,如SystemSlot、FlowSlot、DegradeSlot等的規則。 2)當後續的slot經過,沒有拋出BlockException異常,說明該資源被成功調用,則增長執行線程數和經過的請求數。
3)當後續的slot中某一沒有經過,則會拋出BlockException等異常,若是捕獲的是BlockException異常,則主要是增長阻塞的數量;若是是系統異常,則增長異常數量。微信
當退出的時候會執行exit()
方法:數據結構
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
//計算響應時間,經過當前時間-CurEntry的建立時間取毫秒值
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
//新增響應時間和成功數
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
//線程數減1
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
//全局線程數-1
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
***其餘邏輯***
fireExit(context, resourceWrapper, count);
}
複製代碼
當退出時,重點關注響應時間,將本次響應時間收集到Node中,並將當前活躍線程數減1。併發
總體流程如上所述,可是具體的操做咱們還不清楚,接下來我將分析其中的Qps數是如何統計的。app
在上述的entry()
方法中在統計Qps數量時會調用node.addPassRequest();
方法。框架
@Override
public void addPassRequest(int count) {
# DefaultNode類型
# 統計某個resource在某個context中的實時指標
super.addPassRequest(count);
# ClusterNode類型
# 統計某個resource在全部的context中實時指標總和
this.clusterNode.addPassRequest(count);
}
複製代碼
這兩個Node都是StatisticNode
的子類,最終會調用StatisticNode
中的方法。編輯器
@Override
public void addPassRequest(int count) {
# 秒級統計
rollingCounterInSecond.addPass(count);
# 分鐘統計
rollingCounterInMinute.addPass(count);
}
複製代碼
秒級統計和分鐘統計的底層原理都是同樣的,下面將對秒級統計進行分析。ide
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
複製代碼
在上面代碼中,有幾個重要的類。ArrayMetric
、BucketLeapArray
、MetricBucket
、WindowWrap
。
WindowWrap
每個滑動窗口的包裝類,其內部的數據結構T是用MetricBucket表示的。
public class WindowWrap<T> {
//一個窗口時段的時間長度(以毫秒爲單位)
private final long windowLengthInMs;
//窗口的開始時間戳(以毫秒爲單位)
private long windowStart;
//統計數據,MetricBucket
private T value;
複製代碼
MetricBucket
表示一段時間內的指標數據,存放在LongAdder
類型的數組裏。有經過數量、阻塞數量、異常數量、成功數量、響應時間、已經過將來配額。相對於AtomicLong
,LongAddr
在高併發下有更好的吞吐量,代價是花費了更多的空間。
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
}
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
複製代碼
LeapArray
Sentinel中統計指標的基本數據結構。
public LeapArray(int sampleCount, int intervalInMs) {
# 時間窗口的長度
this.windowLengthInMs = intervalInMs / sampleCount;
# 以毫秒爲單位的時間間隔,
this.intervalInMs = intervalInMs;
# 採樣窗口的個數,即數組長度
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
複製代碼
在按秒統計時,默認的時間窗口數組長度爲2,每一個時間窗口的長度爲500ms。
在統計QPS時,第一步是調用data.currentWindow()
,獲取當前時間窗口。
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
複製代碼
下面對currentTimeMills()
方法進行拆開分析。
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
# 計算給定的時間映射在數組中的下標(默認數組長度爲2)
# 則idx能夠是0或者1
int idx = calculateTimeIdx(timeMillis);
# 根據當前時間計算出所在窗口應該對用的開始時間
long windowStart = calculateWindowStart(timeMillis);
複製代碼
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
複製代碼
爲何默認要用兩個採樣窗口,由於sentinel設定的是比較輕量的框架。時間窗口保存着不少統計數據,若是時間窗口過多的話,一方面會佔用過多的內存,另外一方面時間窗口過多意味着時間窗口的長度會變小,若是時間窗口長度變小,就會致使時間窗口過於頻繁的滑動。
while (true) {
# 獲取存儲的該索引位置下的舊的時間窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
# 沒有則建立一個
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
# 經過CAS進行設置
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
//不然當前線程讓出時間片,再進行線程競爭
Thread.yield();
}
# 若是實際應當的開始時間和原來的窗口的開始時間相等,則說明沒有失效,直接返回
} else if (windowStart == old.windowStart()) {
return old;
# 讓應當的開始時間大於原來old窗口的開始時間,則說明該窗口失效
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
# 將舊的時間窗口的開始時間設置爲實際應該的開始時間,
# 並重置該窗口的統計數據爲0
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
# 這種狀況不可能存在,會拋出異常
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
複製代碼
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
# w.value() 即 MetricBucket
w.value().reset();
return w;
}
#從新設置它的開始時間
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
# 將MetricBucket的統計數據都重置爲0
public void reset() {
internalReset(0L);
}
複製代碼
至此,第一大步已經介紹完了,下面是第二大步wrap.value().addPass(count)
。 這一步很簡單,就是在第一步後會得到所處的時間窗口WindowWrap
,而後獲得該類裏面的MetricBucket
,它統計了該事件窗口下的數據統計,最後進行原子增長操做。
private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public T value() {
return value;
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
複製代碼
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
複製代碼
以上就是增長Qps的總體流程。
那咱們將數據添加上了,那怎麼查詢得到呢?
DefaultNode
和
ClsterNode
中,它們都是
StatisticNode
的子類,
StatisticNode
實現了
NOde
接口的不少關於統計數據的方法,其中有統計Qps的方法。
@Override
public double passQps() {
# 先獲取如今的時間窗口數組的Qps總量 @(1)
# 而後獲取時間 @(2)
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
複製代碼
@Override
public long pass() {
# 與前面方法一致,過濾掉過時窗口
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
# 即 MetricBucket
result.add(windowWrap.value());
}
return result;
}
複製代碼
當前時間減去某一窗口的開始時間,超過了事件間隔(按秒統計的話,就是1s),就說明該窗口過時,不添加。
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
複製代碼
由於以前的時間單位是毫秒,如今計算的是每秒,因此轉化爲秒。
@Override
public double getWindowIntervalInSec() {
return data.getIntervalInSecond();
}
public double getIntervalInSecond() {
return intervalInMs / 1000.0;
}
複製代碼
至此,關於實時統計的模塊就講完了,大部分是參考幾個大神的文章,圖文並茂,很好理解,你們能夠閱讀以下:
Sentinel 原理-滑動窗口
Alibaba Seninel 滑動窗口實現原理(文末附原理圖) 源碼分析 Sentinel 實時數據採集實現原理
本文使用 mdnice 排版