2. Sentinel源碼分析—Sentinel是如何進行流量統計的?

原文連接:www.cnblogs.com/luozhiyun/p…html

這一篇我仍是繼續上一篇沒有講完的內容,先上一個例子:java

private static final int threadCount = 100;

public static void main(String[] args) {
    initFlowRule();

    for (int i = 0; i < threadCount; i++) {
        Thread entryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    Entry methodA = null;
                    try {
                        TimeUnit.MILLISECONDS.sleep(5);
                        methodA = SphU.entry("methodA");   
                    } catch (BlockException e1) {
                        // Block exception
                    } catch (Exception e2) {
                        // biz exception
                    } finally { 
                        if (methodA != null) {
                            methodA.exit(); 
                        }
                    }
                }
            }
        });
        entryThread.setName("working thread");
        entryThread.start();
    }
}


private static void initFlowRule() {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource("methodA");
    // set limit concurrent thread for 'methodA' to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
    rule1.setLimitApp("default");

    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

複製代碼

SphU#entry

我先把例子放上來node

Entry methodA = null;
try { 
    methodA = SphU.entry("methodA");
	  // dosomething 
} catch (BlockException e1) {
    block.incrementAndGet();
} catch (Exception e2) {
    // biz exception
} finally {
    total.incrementAndGet();
    if (methodA != null) {
        methodA.exit(); 
    }
}
複製代碼

咱們先進入到entry方法裏面: SphU#entryc#

public static Entry entry(String name) throws BlockException {
    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
複製代碼

這個方法裏面會調用Env的sph靜態方法,咱們進入到Env裏面看看api

public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }
}
複製代碼

這個方法初始化的時候會調用InitExecutor.doInit() InitExecutor#doInit數組

public static void doInit() {
    //InitExecutor只會初始化一次,而且初始化失敗會退出
    if (!initialized.compareAndSet(false, true)) {
        return;
    }
    try {
        //經過spi加載InitFunc子類,默認是MetricCallbackInit
        ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
        List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
        for (InitFunc initFunc : loader) {
            RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
            //因爲這裏只有一個loader裏面只有一個子類,那麼直接就返回initList裏面包含一個元素的集合
            insertSorted(initList, initFunc);
        }
        for (OrderWrapper w : initList) {
            //這裏調用MetricCallbackInit的init方法
            w.func.init();
            RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
                w.func.getClass().getCanonicalName(), w.order));
        }
    } catch (Exception ex) {
        RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
        ex.printStackTrace();
    } catch (Error error) {
        RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
        error.printStackTrace();
    }
}
複製代碼

這個方法主要是經過spi加載InitFunc 的子類,默認是MetricCallbackInit。 而後會將MetricCallbackInit封裝成OrderWrapper實例,而後遍歷,調用 MetricCallbackInit的init方法:安全

MetricCallbackInit#initbash

public void init() throws Exception {
    //添加回調函數
    //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricEntryCallback
    StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
            new MetricEntryCallback());
    //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricExitCallback
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
            new MetricExitCallback());
} 
複製代碼

這個init方法就是註冊了兩個回調實例MetricEntryCallback和MetricExitCallback。併發

而後會經過調用Env.sph.entry會最後調用到CtSph的entry方法:app

public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
    //這裏name是Resource,type是out
    StringResourceWrapper resource = new StringResourceWrapper(name, type);
    //count是1 ,args是一個空數組
    return entry(resource, count, args);
}
複製代碼

這個方法會將resource和type封裝成StringResourceWrapper實例,而後調用entry重載方法追蹤到CtSph的entryWithPriority方法。

//這裏傳入得參數count是1,prioritized=false,args是容量爲1的空數組
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    //獲取當前線程的上下文
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }
    //爲空的話,建立一個默認的context
    if (context == null) { //1
        // Using default context.
        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {//這裏會返回false
        return new CtEntry(resourceWrapper, null, context);
    }
	  //2
    //建立一系列功能插槽
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */
    //若是超過了插槽的最大數量,那麼會返回null
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
		  //3
        //調用責任鏈
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}
複製代碼

這個方法是最核心的方法,主要作了三件事:

  1. 若是context爲null則建立一個新的
  2. 經過責任鏈方式建立功能插槽
  3. 調用責任鏈插槽

在講建立context以前咱們先看一下ContextUtil這個類初始化的時候會作什麼

ContextUtil

/** * Holds all {@link EntranceNode}. Each {@link EntranceNode} is associated with a distinct context name. */
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); 
static {
    // Cache the entrance node for default context.
    initDefaultContext();
}

private static void initDefaultContext() {
    String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
    //初始化一個sentinel_default_context,type爲in的隊形
    EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
    //Constants.ROOT會初始化一個name是machine-root,type=IN的對象
    Constants.ROOT.addChild(node);
    //因此如今map裏面有一個key=CONTEXT_DEFAULT_NAME的對象
    contextNameNodeMap.put(defaultContextName, node);
} 
複製代碼

ContextUtil在初始化的時候會先調用initDefaultContext方法。經過Constants.ROOT建立一個root節點,而後將建立的node做爲root的子節點入隊,而後將node節點put到contextNameNodeMap中 結構以下:

Constants.ROOT:
					machine-root(EntryType#IN)
						/
					  /
			sentinel_default_context(EntryType#IN)
複製代碼

如今咱們再回到entryWithPriority方法中:

if (context == null) {//1
    // Using default context.
    context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
複製代碼

若是context爲空,那麼會調用MyContextUtil.myEnter建立一個新的context,這個方法最後會調用到ContextUtil.trueEnter方法中進行建立。

protected static Context trueEnter(String name, String origin) {
    Context context = contextHolder.get();
    if (context == null) {
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
            //若是爲null的話,檢查contextNameNodeMap的size是否是超過2000
            if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                setNullContext();
                return NULL_CONTEXT;
            } else {
                // 重複initDefaultContext方法的內容
                try {
                    LOCK.lock();
                    node = contextNameNodeMap.get(name);
                    if (node == null) {
                        if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                            setNullContext();
                            return NULL_CONTEXT;
                        } else {
                            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                            // Add entrance node.
                            Constants.ROOT.addChild(node);

                            Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                            newMap.putAll(contextNameNodeMap);
                            newMap.put(name, node);
                            contextNameNodeMap = newMap;
                        }
                    }
                } finally {
                    LOCK.unlock();
                }
            }
        }
        context = new Context(node, name);
        context.setOrigin(origin);
        contextHolder.set(context);
    }

    return context;
}
複製代碼

在trueEnter方法中會作一個校驗,若是contextNameNodeMap中的數量已經超過了2000,那麼會返回一個NULL_CONTEXT。因爲咱們在initDefaultContext中已經初始化過了node節點,因此這個時候直接根據name獲取node節點放入到contextHolder中。

建立完了context以後咱們再回到entryWithPriority方法中繼續往下走:

//建立一系列功能插槽
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
複製代碼

經過調用lookProcessChain方法會建立功能插槽

CtSph#lookProcessChain

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //根據resourceWrapper初始化插槽
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.最大插槽數量爲6000
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                //初始化新的插槽
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
複製代碼

這裏會調用SlotChainProvider.newSlotChain進行插槽的初始化。

SlotChainProvider#newSlotChain

public static ProcessorSlotChain newSlotChain() {
    if (slotChainBuilder != null) {
        return slotChainBuilder.build();
    }
    //根據spi初始化slotChainBuilder,默認是DefaultSlotChainBuilder
    resolveSlotChainBuilder();

    if (slotChainBuilder == null) {
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        slotChainBuilder = new DefaultSlotChainBuilder();
    }
    return slotChainBuilder.build();
}
複製代碼

默認調用DefaultSlotChainBuilder的build方法進行初始化

DefaultSlotChainBuilder#build

public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    //建立Node節點
    chain.addLast(new NodeSelectorSlot());
    //用於構建資源的 ClusterNode 
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    //用於統計實時的調用數據
    chain.addLast(new StatisticSlot());
    //用於對入口的資源進行調配
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    //用於限流
    chain.addLast(new FlowSlot());
    //用於降級
    chain.addLast(new DegradeSlot());
    return chain;
}
複製代碼

DefaultProcessorSlotChain裏面會建立一個頭節點,而後把其餘節點經過addLast串成一個鏈表:

最後咱們再回到CtSph的entryWithPriority方法中,往下走調用chain.entry方法觸發調用鏈。

Context

在往下看Slot插槽以前,咱們先總結一下Context是怎樣的一個結構:

在Sentinel中,全部的統計操做都是基於context來進行的。context會經過ContextUtil的trueEnter方法進行建立,會根據context的不一樣的name來組裝不一樣的Node來實現數據的統計。

在通過NodeSelectorSlot的時候會根據傳入的不一樣的context的name字段來獲取不一樣的DefaultNode對象,而後設置到context的curEntry實例的curNode屬性中。

NodeSelectorSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                node = new DefaultNode(resourceWrapper, null);
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
                // Build invocation tree
                ((DefaultNode) context.getLastNode()).addChild(node);
            }

        }
    }
	 //設置到context的curEntry實例的curNode屬性中
    context.setCurNode(node);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
複製代碼

而後再通過ClusterBuilderSlot槽位在初始化的時候會初始化一個靜態的全局clusterNodeMap用來記錄全部的ClusterNode,維度是ResourceWrapper。每次調用entry方法的時候會先去全局的clusterNodeMap,找不到就會建立一個新的clusterNode,放入到node的ClusterNode屬性中,用來統計ResourceWrapper維度下面的全部數據。

//此變量是靜態的,因此只會初始化一次,存有全部的ResourceWrapper維度下的數據
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    if (clusterNode == null) {
        synchronized (lock) {
            if (clusterNode == null) {
                // Create the cluster node.
                clusterNode = new ClusterNode();
                HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                newMap.putAll(clusterNodeMap);
                newMap.put(node.getId(), clusterNode);

                clusterNodeMap = newMap;
            }
        }
    }
    node.setClusterNode(clusterNode);
 
    if (!"".equals(context.getOrigin())) {
        Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
        context.getCurEntry().setOriginNode(originNode);
    } 
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
複製代碼

StatisticSlot

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try {
        //先直接往下調用,若是沒有報錯則進行統計
        // Do some checking.
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        //當前線程數加1
        // Request passed, add thread count and pass count.
        node.increaseThreadNum();
        //經過的請求加上count
        node.addPassRequest(count); 
 			...
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum(); 
			...
    } catch (BlockException e) {
        //設置錯誤信息
        // Blocked, set block exception to current entry.
        context.getCurEntry().setError(e);
			...
        //設置被阻塞的次數
        // Add block count.
        node.increaseBlockQps(count); 
			...
        throw e;
    } catch (Throwable e) {
        // Unexpected error, set error to current entry.
        context.getCurEntry().setError(e);

        //設置異常的次數
        // This should not happen.
        node.increaseExceptionQps(count); 
			...
        throw e;
    }
}
複製代碼

這段代碼中,我把不相關的代碼都省略了,不影響咱們的主流程。 在entry方法裏面,首先是往下繼續調用,根據其餘的節點的狀況來進行統計,好比拋出異常,那麼就統計ExceptionQps,被阻塞那麼就統計BlockQps,直接經過,那麼就統計PassRequest。

咱們先看一下線程數是如何統計的:node.increaseThreadNum()

DefaultNode#increaseThreadNum 咱們先看一下DefaultNode的繼承關係:

public void increaseThreadNum() {
    super.increaseThreadNum();
    this.clusterNode.increaseThreadNum();
}
複製代碼

因此super.increaseThreadNum是調用到了父類的increaseThreadNum方法。

this.clusterNode.increaseThreadNum()這句代碼和super.increaseThreadNum是同樣的使用方式,因此看看StatisticNode的increaseThreadNum方法就行了

StatisticNode#increaseThreadNum

private LongAdder curThreadNum = new LongAdder();

public void decreaseThreadNum() {
    curThreadNum.increment();
}
複製代碼

這個方法很簡單,每次都直接使用LongAdder的api加1就行了,最後會在退出的時候減1,使用LongAdder也保證了原子性。

若是請求經過的時候會繼續往下調用node.addPassRequest

DefaultNode#addPassRequest

public void addPassRequest(int count) {
    super.addPassRequest(count);
    this.clusterNode.addPassRequest(count);
}
複製代碼

這句代碼也是調用了StatisticNode的addPassRequest方法進行統計的。

StatisticNode#addPassRequest

public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}
複製代碼

這段代碼裏面有兩個調用,一個是按分鐘統計的,一個是按秒統計的。由於咱們這裏是使用的FlowRuleManager因此是會記錄按分鐘統計的。具體是怎麼初始化,以及怎麼打印統計日誌的能夠看看我上一篇分析:1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?,我這裏再也不贅述。

因此咱們直接看看rollingCounterInMinute.addPass(count)這句代碼就行了,這句代碼會直接調用ArrayMetric的addPass方法。

ArrayMetric#addPass

public void addPass(int count) {
    //獲取當前的時間窗口
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    //窗口內的pass加1
    wrap.value().addPass(count);
}
複製代碼

這裏會首先調用currentWindow獲取當前的時間窗口WindowWrap,而後調用調用窗口內的MetricBucket的addPass方法加1,我繼續拿我上一篇文章的圖過來講明:

我面來到MetricBucket的addPass方法: MetricBucket#addPass

public void addPass(int n) {
    add(MetricEvent.PASS, n);
}

public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}
複製代碼

addPass方法會使用枚舉類而後將counters數組內的pass槽位的值加n;counters數組是LongAdder數組,因此也不會有線程安全問題。

node.increaseBlockQps和node.increaseExceptionQps代碼也是同樣的,你們能夠自行去看看。

FlowSlot

FlowSlot能夠根據預先設置的規則來判斷一個請求是否應該被經過。

FlowSlot

private final FlowRuleChecker checker;

public FlowSlot() {
    this(new FlowRuleChecker());
}

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
複製代碼

FlowSlot在實例化的時候會設置一個規則檢查器,而後在調用entry方法的時候會調用規則檢查器的checkFlow方法

咱們進入到FlowRuleChecker的checkFlow 方法中: FlowRuleChecker#checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    //返回FlowRuleManager裏面註冊的全部規則
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            //若是當前的請求不能經過,那麼就拋出FlowException異常
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};
複製代碼

checkFlow這個方法就是過去全部的規則而後根據規則進行過濾。主要的過濾操做是在canPassCheck中進行的。

FlowRuleChecker#canPassCheck

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    //若是沒有設置limitapp,那麼不進行校驗,默認會給個defualt
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
    //集羣模式
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    //本地模式
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
複製代碼

這個方法首先會校驗limitApp,而後判斷是集羣模式仍是本地模式,咱們這裏暫時分析本地模式。

FlowRuleChecker#passLocalCheck

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    //節點選擇
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    //根據設置的規則來攔截
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
複製代碼

本地模式中,首先會調用selectNodeByRequesterAndStrategy進行節點選擇,根據不一樣的模式選擇不一樣的節點,而後調用規則控制器的canPass方法進行攔截。

FlowRuleChecker#selectNodeByRequesterAndStrategy

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    // The limit app should not be empty.
    String limitApp = rule.getLimitApp();
    //關係限流策略
    int strategy = rule.getStrategy();

    String origin = context.getOrigin();
    //origin不爲`default` or `other`,而且limitApp和origin相等
    if (limitApp.equals(origin) && filterOrigin(origin)) {//1
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Matches limit origin, return origin statistic node.
            return context.getOriginNode();
        }
        //關係限流策略爲關聯或者鏈路的處理
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//2
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            //這裏返回ClusterNode,表示全部應用對該資源的全部請求狀況
            // Return the cluster node.
            return node.getClusterNode();
        }
        //關係限流策略爲關聯或者鏈路的處理
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
        && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//3
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        //關係限流策略爲關聯或者鏈路的處理
        return selectReferenceNode(rule, context, node);
    }

    return null;
}
複製代碼

這個方法主要是用來根據控制根據不一樣的規則,獲取不一樣的node進行數據的統計。

  • 在標記1中表示,若是流控規則配置了來源應用且不是"default"或者"other"這種特殊值,那麼這種時候該規則就只對配置的來源應用生效。
  • 在標記2中表示,limitApp是"default",表明針對全部應用進行統計。
  • 標記7中,這個是"other"值的處理,假設當前請求來源不在當前規則的limitApp中,則進行下面的處理。

我這裏引用官方文檔的一段話進行解釋:

default:表示不區分調用者,來自任何調用者的請求都將進行限流統計。若是這個資源名的調用總和超過了這條規則定義的閾值,則觸發限流。

{some_origin_name}:表示針對特定的調用者,只有來自這個調用者的請求才會進行流量控制。例如 NodeA 配置了一條針對調用者caller1的規則,那麼當且僅當來自 caller1 對 NodeA 的請求才會觸發流量控制。

other:表示針對除 {some_origin_name} 之外的其他調用方的流量進行流量控制。例如,資源NodeA配置了一條針對調用者 caller1 的限流規則,同時又配置了一條調用者爲 other 的規則,那麼任意來自非 caller1 對 NodeA 的調用,都不能超過 other 這條規則定義的閾值

同一個資源名能夠配置多條規則,規則的生效順序爲:{some_origin_name} > other > default

複製代碼

而後返回到passLocalCheck方法中,繼續往下走,調用rule.getRater(),咱們這裏沒有指定特殊的rater,因此返回的是DefaultController。

DefaultController#canPass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    //判斷是限流仍是限制併發數量,而後獲取流量或併發數量
    int curCount = avgUsedTokens(node);
    //若是二者相加大於限定的併發數
    if (curCount + acquireCount > count) {
        ... 
        return false;
    }
    return true;
}
複製代碼

這裏首先調用avgUsedTokens,根據grade判斷當前的規則是QPS限流仍是線程數限流,若是二者之和大於count,那麼返回false。

返回false以後會回到FlowRuleChecker的checkFlow方法,拋出FlowException異常。

到這裏Sentinel的主流程就分析完畢了。

相關文章
相關標籤/搜索