Sentinel之實時數據獲取

微信公衆號: 房東的小黑黑
路途隨遙遠,未來更美好
學海無涯,你們一塊兒加油!node

Sentinel有一個重要的功能,即實時數據統計分析,咱們能夠得到在每1秒或者每1分鐘下的每一個上下文調用鏈路中的某一資源的請求數、阻塞數或響應時間;也能夠得到某一資源全局的請求數、阻塞數或者響應時間。 主要實現邏輯是在StatisticSlot中。web

Statisticslot處於調用鏈slotchain中的第三個,負責統計資源的實時狀態,調用到slotchain中的任意一個slot時,都會觸發該slot的entry方法。數組

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 觸發下一個Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 若是能經過SlotChain中後面的Slot的entry方法,說明沒有被限流或降級
        // 統計信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代碼
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockedQps();
        // 省略部分代碼
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代碼
        throw e;
    }
}
複製代碼

entry()主要有三個部分:
1) 首先會觸發後續slot的entry方法,如SystemSlot、FlowSlot、DegradeSlot等的規則。 2)當後續的slot經過,沒有拋出BlockException異常,說明該資源被成功調用,則增長執行線程數和經過的請求數。
3)當後續的slot中某一沒有經過,則會拋出BlockException等異常,若是捕獲的是BlockException異常,則主要是增長阻塞的數量;若是是系統異常,則增長異常數量。微信

當退出的時候會執行exit()方法:數據結構

 public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();
        if (context.getCurEntry().getError() == null) {
            //計算響應時間,經過當前時間-CurEntry的建立時間取毫秒值
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }
            //新增響應時間和成功數
            node.addRtAndSuccess(rt, count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
            }
            //線程數減1
            node.decreaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }
            //全局線程數-1
            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // Error may happen.
        }
        ***其餘邏輯***
        fireExit(context, resourceWrapper, count);
    }
複製代碼

當退出時,重點關注響應時間,將本次響應時間收集到Node中,並將當前活躍線程數減1。併發

總體流程如上所述,可是具體的操做咱們還不清楚,接下來我將分析其中的Qps數是如何統計的。app

在上述的entry()方法中在統計Qps數量時會調用node.addPassRequest();方法。框架

@Override
public void addPassRequest(int count) {
     # DefaultNode類型  
     # 統計某個resource在某個context中的實時指標
     super.addPassRequest(count);
     # ClusterNode類型
     # 統計某個resource在全部的context中實時指標總和
     this.clusterNode.addPassRequest(count);
}
複製代碼

這兩個Node都是StatisticNode的子類,最終會調用StatisticNode中的方法。編輯器

@Override
public void addPassRequest(int count) {
     # 秒級統計
     rollingCounterInSecond.addPass(count);
     # 分鐘統計
     rollingCounterInMinute.addPass(count);
}
複製代碼

秒級統計和分鐘統計的底層原理都是同樣的,下面將對秒級統計進行分析。ide

public class ArrayMetric implements Metric {
    private final LeapArray<MetricBucket> data;
    
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    
    @Override
    public void addPass(int count) {
           WindowWrap<MetricBucket> wrap = data.currentWindow();
           wrap.value().addPass(count);
    }
複製代碼

在上面代碼中,有幾個重要的類。ArrayMetricBucketLeapArrayMetricBucketWindowWrap

WindowWrap

每個滑動窗口的包裝類,其內部的數據結構T是用MetricBucket表示的。

public class WindowWrap<T{
    //一個窗口時段的時間長度(以毫秒爲單位)
    private final long windowLengthInMs;
    //窗口的開始時間戳(以毫秒爲單位)
    private long windowStart;
    //統計數據,MetricBucket
    private T value;
複製代碼

MetricBucket

表示一段時間內的指標數據,存放在LongAdder類型的數組裏。有經過數量、阻塞數量、異常數量、成功數量、響應時間、已經過將來配額。相對於AtomicLongLongAddr在高併發下有更好的吞吐量,代價是花費了更多的空間。

public class MetricBucket {
    private final LongAdder[] counters;
    private volatile long minRt;
 public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }
}

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}
複製代碼

LeapArray

Sentinel中統計指標的基本數據結構。

public LeapArray(int sampleCount, int intervalInMs) {
    # 時間窗口的長度
    this.windowLengthInMs = intervalInMs / sampleCount;
    # 以毫秒爲單位的時間間隔,
    this.intervalInMs = intervalInMs;
    # 採樣窗口的個數,即數組長度
    this.sampleCount = sampleCount;
    this.array = new AtomicReferenceArray<>(sampleCount);
}
複製代碼

在按秒統計時,默認的時間窗口數組長度爲2,每一個時間窗口的長度爲500ms。

在統計QPS時,第一步是調用data.currentWindow(),獲取當前時間窗口。

public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
}
複製代碼

Qps添加第一大步

下面對currentTimeMills()方法進行拆開分析。

public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        # 計算給定的時間映射在數組中的下標(默認數組長度爲2
        # 則idx能夠是0或者1
        int idx = calculateTimeIdx(timeMillis);
        # 根據當前時間計算出所在窗口應該對用的開始時間
        long windowStart = calculateWindowStart(timeMillis);
複製代碼
private int calculateTimeIdx(long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
}
複製代碼

爲何默認要用兩個採樣窗口,由於sentinel設定的是比較輕量的框架。時間窗口保存着不少統計數據,若是時間窗口過多的話,一方面會佔用過多的內存,另外一方面時間窗口過多意味着時間窗口的長度會變小,若是時間窗口長度變小,就會致使時間窗口過於頻繁的滑動。

while (true) {
      # 獲取存儲的該索引位置下的舊的時間窗口
      WindowWrap<T> old = array.get(idx);
      if (old == null) {
          # 沒有則建立一個
          WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
          # 經過CAS進行設置
          if (array.compareAndSet(idx, null, window)) {
                 return window;
           } else {
                //不然當前線程讓出時間片,再進行線程競爭
                Thread.yield();
           }
     # 若是實際應當的開始時間和原來的窗口的開始時間相等,則說明沒有失效,直接返回
     } else if (windowStart == old.windowStart()) {
            return old;
     # 讓應當的開始時間大於原來old窗口的開始時間,則說明該窗口失效
     } else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
               try {
                   # 將舊的時間窗口的開始時間設置爲實際應該的開始時間,
                   # 並重置該窗口的統計數據爲0
                    return resetWindowTo(old, windowStart);
               } finally {
                   updateLock.unlock();
               }
            }  else {
                 Thread.yield();
                }
    # 這種狀況不可能存在,會拋出異常
    } else if (windowStart < old.windowStart()) {
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
           }
}
複製代碼
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        # w.value() 即 MetricBucket 
        w.value().reset();
        return w;
}
#從新設置它的開始時間
public WindowWrap<T> resetTo(long startTime) {
        this.windowStart = startTime;
        return this;

# 將MetricBucket的統計數據都重置爲0
public void reset() {
        internalReset(0L);
}
複製代碼

Qps添加第二大步

至此,第一大步已經介紹完了,下面是第二大步wrap.value().addPass(count)。 這一步很簡單,就是在第一步後會得到所處的時間窗口WindowWrap,而後獲得該類裏面的MetricBucket,它統計了該事件窗口下的數據統計,最後進行原子增長操做。

private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
}
public T value() {
        return value;
}

public void addPass(int n) {
        add(MetricEvent.PASS, n);
}
複製代碼
public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
}
複製代碼

以上就是增長Qps的總體流程。

Qps數據獲取

那咱們將數據添加上了,那怎麼查詢得到呢?

通過學習瞭解後,咱們能夠知道資源的數據統計存放在 DefaultNodeClsterNode中,它們都是 StatisticNode的子類, StatisticNode實現了 NOde接口的不少關於統計數據的方法,其中有統計Qps的方法。

@Override
public double passQps() {
        # 先獲取如今的時間窗口數組的Qps總量 @(1)
        # 而後獲取時間 @(2)
        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
複製代碼

代碼@(1)解析

@Override
public long pass() {
        # 與前面方法一致,過濾掉過時窗口
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
}

public List<T> values() {
        return values(TimeUtil.currentTimeMillis());
}

public List<T> values(long timeMillis) {
        if (timeMillis < 0) {
            return new ArrayList<T>();
        }
        int size = array.length();
        List<T> result = new ArrayList<T>(size);

        for (int i = 0; i < size; i++) {
            WindowWrap<T> windowWrap = array.get(i);
            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                continue;
            }
            # 即 MetricBucket
            result.add(windowWrap.value());
        }
        return result;
    }
複製代碼

當前時間減去某一窗口的開始時間,超過了事件間隔(按秒統計的話,就是1s),就說明該窗口過時,不添加。

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
        return time - windowWrap.windowStart() > intervalInMs;
}
複製代碼

代碼@(2)解析

由於以前的時間單位是毫秒,如今計算的是每秒,因此轉化爲秒。

@Override
public double getWindowIntervalInSec() {
        return data.getIntervalInSecond();
}

public double getIntervalInSecond() {
        return intervalInMs / 1000.0;
}
複製代碼

至此,關於實時統計的模塊就講完了,大部分是參考幾個大神的文章,圖文並茂,很好理解,你們能夠閱讀以下:

Sentinel 原理-滑動窗口
Alibaba Seninel 滑動窗口實現原理(文末附原理圖) 源碼分析 Sentinel 實時數據採集實現原理

本文使用 mdnice 排版

相關文章
相關標籤/搜索