Sentinel 原理-全解析

逅弈 轉載請註明原創出處,謝謝!java

系列文章

Sentinel 原理-調用鏈node

Sentinel 原理-滑動窗口git

Sentinel 原理-實體類github

Sentinel 實戰-限流篇web

Sentinel 實戰-控制檯篇spring

Sentinel 實戰-規則持久化數據庫

Sentinel 是阿里中間件團隊開源的,面向分佈式服務架構的輕量級高可用流量控制組件,主要以流量爲切入點,從流量控制、熔斷降級、系統負載保護等多個維度來幫助用戶保護服務的穩定性。api

你們可能會問:Sentinel 和以前經常使用的熔斷降級庫 Netflix Hystrix 有什麼異同呢?Sentinel官網有一個對比的文章,這裏摘抄一個總結的表格,具體的對比能夠點此 連接 查看。緩存

對比內容 Sentinel Hystrix
隔離策略 信號量隔離 線程池隔離/信號量隔離
熔斷降級策略 基於響應時間或失敗比率 基於失敗比率
實時指標實現 滑動窗口 滑動窗口(基於 RxJava)
規則配置 支持多種數據源 支持多種數據源
擴展性 多個擴展點 插件的形式
基於註解的支持 支持 支持
限流 基於 QPS,支持基於調用關係的限流 不支持
流量整形 支持慢啓動、勻速器模式 不支持
系統負載保護 支持 不支持
控制檯 開箱即用,可配置規則、查看秒級監控、機器發現等 不完善
常見框架的適配 Servlet、Spring Cloud、Dubbo、gRPC 等 Servlet、Spring Cloud Netflix

從對比的表格能夠看到,Sentinel比Hystrix在功能性上還要強大一些,本文讓咱們一塊兒來了解下Sentinel的源碼,揭開Sentinel的神祕面紗。bash

項目結構

將Sentinel的源碼fork到本身的github庫中,接着把源碼clone到本地,而後開始源碼閱讀之旅吧。

首先咱們看一下Sentinel項目的整個結構:

sentinel-project-structure.png

  • sentinel-core 核心模塊,限流、降級、系統保護等都在這裏實現
  • sentinel-dashboard 控制檯模塊,能夠對鏈接上的sentinel客戶端實現可視化的管理
  • sentinel-transport 傳輸模塊,提供了基本的監控服務端和客戶端的API接口,以及一些基於不一樣庫的實現
  • sentinel-extension 擴展模塊,主要對DataSource進行了部分擴展實現
  • sentinel-adapter 適配器模塊,主要實現了對一些常見框架的適配
  • sentinel-demo 樣例模塊,可參考怎麼使用sentinel進行限流、降級等
  • sentinel-benchmark 基準測試模塊,對核心代碼的精確性提供基準測試

運行樣例

基本上每一個框架都會帶有樣例模塊,有的叫example,有的叫demo,sentinel也不例外。

那咱們從sentinel的demo中找一個例子運行下看看大體的狀況吧,上面說過了sentinel主要的核心功能是作限流、降級和系統保護,那咱們就從「限流」開始看sentinel的實現原理吧。

sentinel-basic-demo-flow-qps.png

能夠看到sentinel-demo模塊中有不少不一樣的樣例,咱們找到basic模塊下的flow包,這個包下面就是對應的限流的樣例,可是限流也有不少種類型的限流,咱們就找根據qps限流的類看吧,其餘的限流方式原理上都大差不差。

public class FlowQpsDemo {

    private static final String KEY = "abc";

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    private static volatile boolean stop = false;

    private static final int threadCount = 32;

    private static int seconds = 30;

    public static void main(String[] args) throws Exception {
        initFlowQpsRule();

        tick();
        // first make the system run on a very low condition
        simulateTraffic();

        System.out.println("===== begin to do flow control");
        System.out.println("only 20 requests per second can pass");

    }

    private static void initFlowQpsRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(KEY);
        // set limit qps to 20
        rule1.setCount(20);
        // 設置限流類型:根據qps
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);
        // 加載限流的規則
        FlowRuleManager.loadRules(rules);
    }

    private static void simulateTraffic() {
        for (int i = 0; i < threadCount; i++) {
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " send qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                    + ", pass:" + oneSecondPass
                    + ", block:" + oneSecondBlock);

                if (seconds-- <= 0) {
                    stop = true;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                + ", block:" + block.get());
            System.exit(0);
        }
    }

    static class RunTask implements Runnable {
        @Override
        public void run() {
            while (!stop) {
                Entry entry = null;

                try {
                    entry = SphU.entry(KEY);
                    // token acquired, means pass
                    pass.addAndGet(1);
                } catch (BlockException e1) {
                    block.incrementAndGet();
                } catch (Exception e2) {
                    // biz exception
                } finally {
                    total.incrementAndGet();
                    if (entry != null) {
                        entry.exit();
                    }
                }

                Random random2 = new Random();
                try {
                    TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    }
}
複製代碼

執行上面的代碼後,打印出以下的結果:

sentinel-basic-demo-flow-qps-result.png

能夠看到,上面的結果中,pass的數量和咱們的預期並不相同,咱們預期的是每秒容許pass的請求數是20個,可是目前有不少pass的請求數是超過20個的。

緣由是,咱們這裏測試的代碼使用了多線程,注意看 threadCount 的值,一共有32個線程來模擬,而在RunTask的run方法中執行資源保護時,即在 SphU.entry 的內部是沒有加鎖的,因此就會致使在高併發下,pass的數量會高於20。

能夠用下面這個模型來描述下,有一個TimeTicker線程在作統計,每1秒鐘作一次。有N個RunTask線程在模擬請求,被訪問的business code被資源key保護着,根據規則,每秒只容許20個請求經過。

因爲pass、block、total等計數器是全局共享的,而多個RunTask線程在執行SphU.entry申請獲取entry時,內部沒有鎖保護,因此會存在pass的個數超過設定的閾值。

sentinel-basic-demo-flow-qps-module.png

那爲了證實在單線程下限流的正確性與可靠性,那咱們的模型就應該變成了這樣:

sentinel-basic-demo-flow-qps-single-thread-module.png

那接下來我把 threadCount 的值改成1,只有一個線程來執行這個方法,看下具體的限流結果,執行上面的代碼後打印的結果以下:

sentinel-basic-demo-single-thread-flow-qps-result.png

能夠看到pass數基本上維持在20,可是第一次統計的pass值仍是超過了20。這又是什麼緣由致使的呢?

其實仔細看下Demo中的代碼能夠發現,模擬請求是用的一個線程,統計結果是用的另一個線程,統計線程每1秒鐘統計一次結果,這兩個線程之間是有時間上的偏差的。從TimeTicker線程打印出來的時間戳能夠看出來,雖然每隔一秒進行統計,可是當前打印時的時間和上一次的時間仍是有偏差的,不徹底是1000ms的間隔。

要真正驗證每秒限制20個請求,保證數據的精準性,須要作基準測試,這個不是本篇文章的重點,有興趣的同窗能夠去了解下jmh,sentinel中的基準測試也是經過jmh作的。

深刻原理

經過一個簡單的示例程序,咱們瞭解了sentinel能夠對請求進行限流,除了限流外,還有降級和系統保護等功能。那如今咱們就撥開雲霧,深刻源碼內部去一窺sentinel的實現原理吧。

首先從入口開始:SphU.entry() 。這個方法會去申請一個entry,若是可以申請成功,則說明沒有被限流,不然會拋出BlockException,表面已經被限流了。

SphU.entry() 方法往下執行會進入到 Sph.entry() ,Sph的默認實現類是 CtSph ,在CtSph中最終會執行到 entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException 這個方法。

咱們來看一下這個方法的具體實現:

public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // Init the entry only. No rule checking will occur.
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }

    // 獲取該資源對應的SlotChain
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /* * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no * rule checking will be done. */
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
    	// 執行Slot的entry方法
        chain.entry(context, resourceWrapper, null, count, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        // 拋出BlockExecption
        throw e1;
    } catch (Throwable e1) {
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}
複製代碼

這個方法能夠分爲如下幾個部分:

  • 1.對參數和全局配置項作檢測,若是不符合要求就直接返回了一個CtEntry對象,不會再進行後面的限流檢測,不然進入下面的檢測流程。
  • 2.根據包裝過的資源對象獲取對應的SlotChain
  • 3.執行SlotChain的entry方法
    • 3.1.若是SlotChain的entry方法拋出了BlockException,則將該異常繼續向上拋出
    • 3.2.若是SlotChain的entry方法正常執行了,則最後會將該entry對象返回
  • 4.若是上層方法捕獲了BlockException,則說明請求被限流了,不然請求能正常執行

其中比較重要的是第二、3兩個步驟,咱們來分解一下這兩個步驟。

建立SlotChain

首先看一下lookProcessChain的方法實現:

private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                // 具體構造chain的方法
                chain = Env.slotsChainbuilder.build();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
複製代碼

該方法使用了一個HashMap作了緩存,key是資源對象。這裏加了鎖,而且作了 double check 。具體構造chain的方法是經過: Env.slotsChainbuilder.build() 這句代碼建立的。那就進入這個方法看看吧。

public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());

    return chain;
}
複製代碼

Chain是鏈條的意思,從build的方法可看出,ProcessorSlotChain是一個鏈表,裏面添加了不少個Slot。具體的實現須要到DefaultProcessorSlotChain中去看。

public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, args);
        }
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };
    
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
}
複製代碼

DefaultProcessorSlotChain中有兩個AbstractLinkedProcessorSlot類型的變量:first和end,這就是鏈表的頭結點和尾節點。

建立DefaultProcessorSlotChain對象時,首先建立了首節點,而後把首節點賦值給了尾節點,能夠用下圖表示:

slot-chain-1.png

將第一個節點添加到鏈表中後,整個鏈表的結構變成了以下圖這樣:

slot-chain-2.png

將全部的節點都加入到鏈表中後,整個鏈表的結構變成了以下圖所示:

slot-chain-3.png

這樣就將全部的Slot對象添加到了鏈表中去了,每個Slot都是繼承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一種責任鏈的設計,每一個對象中都有一個next屬性,指向的是另外一個AbstractLinkedProcessorSlot對象。其實責任鏈模式在不少框架中都有,好比Netty中是經過pipeline來實現的。

知道了SlotChain是如何建立的了,那接下來就要看下是如何執行Slot的entry方法的了。

執行SlotChain的entry方法

lookProcessChain方法得到的ProcessorSlotChain的實例是DefaultProcessorSlotChain,那麼執行chain.entry方法,就會執行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是這樣的:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) throws Throwable {
    first.transformEntry(context, resourceWrapper, t, count, args);
}
複製代碼

也就是說,DefaultProcessorSlotChain的entry實際是執行的first屬性的transformEntry方法。

而transformEntry方法會執行當前節點的entry方法,在DefaultProcessorSlotChain中first節點重寫了entry方法,具體以下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) throws Throwable {
    super.fireEntry(context, resourceWrapper, t, count, args);
}
複製代碼

first節點的entry方法,實際又是執行的super的fireEntry方法,那繼續把目光轉移到fireEntry方法,具體以下:

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable {
    if (next != null) {
        next.transformEntry(context, resourceWrapper, obj, count, args);
    }
}
複製代碼

從這裏能夠看到,從fireEntry方法中就開始傳遞執行entry了,這裏會執行當前節點的下一個節點transformEntry方法,上面已經分析過了,transformEntry方法會觸發當前節點的entry,也就是說fireEntry方法實際是觸發了下一個節點的entry方法。具體的流程以下圖所示:

slot-chain-entry-process.png

從圖中能夠看出,從最初的調用Chain的entry()方法,轉變成了調用SlotChain中Slot的entry()方法。從上面的分析能夠知道,SlotChain中的第一個Slot節點是NodeSelectorSlot。

執行Slot的entry方法

如今能夠把目光轉移到SlotChain中的第一個節點NodeSelectorSlot的entry方法中去了,具體的代碼以下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable {
    
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
            }
            // Build invocation tree
            ((DefaultNode)context.getLastNode()).addChild(node);
        }
    }

    context.setCurNode(node);
    // 由此觸發下一個節點的entry方法
    fireEntry(context, resourceWrapper, node, count, args);
}
複製代碼

從代碼中能夠看到,NodeSelectorSlot節點作了一些本身的業務邏輯處理,具體的你們能夠深刻源碼繼續追蹤,這裏大概的介紹下每種Slot的功能職責:

  • NodeSelectorSlot 負責收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;
  • ClusterBuilderSlot 則用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用做爲多維度限流,降級的依據;
  • StatistcSlot 則用於記錄,統計不一樣緯度的 runtime 信息;
  • FlowSlot 則用於根據預設的限流規則,以及前面 slot 統計的狀態,來進行限流;
  • AuthorizationSlot 則根據黑白名單,來作黑白名單控制;
  • DegradeSlot 則經過統計信息,以及預設的規則,來作熔斷降級;
  • SystemSlot 則經過系統的狀態,例如 load1 等,來控制總的入口流量;

執行完業務邏輯處理後,調用了fireEntry()方法,由此觸發了下一個節點的entry方法。此時咱們就知道了sentinel的責任鏈就是這樣傳遞的:每一個Slot節點執行完本身的業務後,會調用fireEntry來觸發下一個節點的entry方法。

因此能夠將上面的圖完整了,具體以下:

slot-chain-entry-whole-process.png

至此就經過SlotChain完成了對每一個節點的entry()方法的調用,每一個節點會根據建立的規則,進行本身的邏輯處理,當統計的結果達到設置的閾值時,就會觸發限流、降級等事件,具體是拋出BlockException異常。

總結

sentinel主要是基於7種不一樣的Slot造成了一個鏈表,每一個Slot都各司其職,本身作完份內的事以後,會把請求傳遞給下一個Slot,直到在某一個Slot中命中規則後拋出BlockException而終止。

前三個Slot負責作統計,後面的Slot負責根據統計的結果結合配置的規則進行具體的控制,是Block該請求仍是放行。

控制的類型也有不少可選項:根據qps、線程數、冷啓動等等。

而後基於這個核心的方法,衍生出了不少其餘的功能:

  • 一、dashboard控制檯,能夠可視化的對每一個鏈接過來的sentinel客戶端 (經過發送heartbeat消息)進行控制,dashboard和客戶端之間經過http協議進行通信。
  • 二、規則的持久化,經過實現DataSource接口,能夠經過不一樣的方式對配置的規則進行持久化,默認規則是在內存中的
  • 三、對主流的框架進行適配,包括servlet,dubbo,rRpc等

Dashboard控制檯

sentinel-dashboard是一個單獨的應用,經過spring-boot進行啓動,主要提供一個輕量級的控制檯,它提供機器發現、單機資源實時監控、集羣資源彙總,以及規則管理的功能。

咱們只須要對應用進行簡單的配置,就可使用這些功能。

1 啓動控制檯

1.1 下載代碼並編譯控制檯

  • 下載 控制檯 工程
  • 使用如下命令將代碼打包成一個 fat jar: mvn clean package

1.2 啓動

使用以下命令啓動編譯後的控制檯:

$ java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -jar target/sentinel-dashboard.jar
複製代碼

上述命令中咱們指定了一個JVM參數,-Dserver.port=8080 用於指定 Spring Boot 啓動端口爲 8080

2 客戶端接入控制檯

控制檯啓動後,客戶端須要按照如下步驟接入到控制檯。

2.1 引入客戶端jar包

經過 pom.xml 引入 jar 包:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-simple-http</artifactId>
    <version>x.y.z</version>
</dependency>
複製代碼

2.2 配置啓動參數

啓動時加入 JVM 參數 -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制檯地址和端口。若啓動多個應用,則須要經過 -Dcsp.sentinel.api.port=xxxx 指定客戶端監控 API 的端口(默認是 8719)。

除了修改 JVM 參數,也能夠經過配置文件取得一樣的效果。更詳細的信息能夠參考 啓動配置項

2.3 觸發客戶端初始化

確保客戶端有訪問量,Sentinel 會在客戶端首次調用的時候進行初始化,開始向控制檯發送心跳包。

sentinel-dashboard是一個獨立的web應用,能夠接受客戶端的鏈接,而後與客戶端之間進行通信,他們之間使用http協議進行通信。他們之間的關係以下圖所示:

dashboard-client-transport.png

dashboard

dashboard啓動後會等待客戶端的鏈接,具體的作法是在 MachineRegistryController 中有一個 receiveHeartBeat 的方法,客戶端發送心跳消息,就是經過http請求這個方法。

dashboard接收到客戶端的心跳消息後,會把客戶端的傳遞過來的ip、port等信息封裝成一個 MachineInfo 對象,而後將該對象經過 MachineDiscovery 接口的 addMachine 方法添加到一個ConcurrentHashMap中保存起來。

這裏會有問題,由於客戶端的信息是保存在dashboard的內存中的,因此當dashboard應用重啓後,以前已經發送過來的客戶端信息都會丟失掉。

client

client在啓動時,會經過CommandCenterInitFunc選擇一個,而且只選擇一個CommandCenter進行啓動。

啓動以前會經過spi的方式掃描獲取到全部的CommandHandler的實現類,而後將全部的CommandHandler註冊到一個HashMap中去,待後期使用。

PS:考慮一下,爲何CommandHandler不須要作持久化,而是直接保存在內存中。

註冊完CommandHandler以後,緊接着就啓動CommandCenter了,目前CommandCenter有兩個實現類:

  • SimpleHttpCommandCenter 經過ServerSocket啓動一個服務端,接受socket鏈接
  • NettyHttpCommandCenter 經過Netty啓動一個服務端,接受channel鏈接

CommandCenter啓動後,就等待dashboard發送消息過來了,當接收到消息後,會把消息經過具體的CommandHandler進行處理,而後將處理的結果返回給dashboard。

這裏須要注意的是,dashboard給client發送消息是經過異步的httpClient進行發送的,在HttpHelper類中。

可是詭異的是,既然經過異步發送了,又經過一個CountDownLatch來等待消息的返回,而後獲取結果,那這樣不就失去了異步的意義的嗎?具體的代碼以下:

private String httpGetContent(String url) {
    final HttpGet httpGet = new HttpGet(url);
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<String> reference = new AtomicReference<>();
    httpclient.execute(httpGet, new FutureCallback<HttpResponse>() {
        @Override
        public void completed(final HttpResponse response) {
            try {
                reference.set(getBody(response));
            } catch (Exception e) {
                logger.info("httpGetContent " + url + " error:", e);
            } finally {
                latch.countDown();
            }
        }

        @Override
        public void failed(final Exception ex) {
            latch.countDown();
            logger.info("httpGetContent " + url + " failed:", ex);
        }

        @Override
        public void cancelled() {
            latch.countDown();
        }
    });
    try {
        latch.await(5, TimeUnit.SECONDS);
    } catch (Exception e) {
        logger.info("wait http client error:", e);
    }
    return reference.get();
}

複製代碼

主流框架的適配

sentinel也對一些主流的框架進行了適配,使得在使用主流框架時,也能夠享受到sentinel的保護。目前已經支持的適配器包括如下這些:

  • Web Servlet
  • Dubbo
  • Spring Boot / Spring Cloud
  • gRPC
  • Apache RocketMQ

其實作適配就是經過那些主流框架的擴展點,而後在擴展點上加入sentinel限流降級的代碼便可。拿Servlet的適配代碼看一下,具體的代碼是:

public class CommonFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) {

    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest)request;
        Entry entry = null;

        try {
        	// 根據請求生成的資源
            String target = FilterUtil.filterTarget(sRequest);
            target = WebCallbackManager.getUrlCleaner().clean(target);

            // 「申請」該資源
            ContextUtil.enter(target);
            entry = SphU.entry(target, EntryType.IN);

            // 若是能成功「申請」到資源,則說明未被限流
            // 則將請求放行
            chain.doFilter(request, response);
        } catch (BlockException e) {
        	// 不然若是捕獲了BlockException異常,說明請求被限流了
        	// 則將請求重定向到一個默認的頁面
            HttpServletResponse sResponse = (HttpServletResponse)response;
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse);
        } catch (IOException e2) {
            // 省略部分代碼
        } finally {
            if (entry != null) {
                entry.exit();
            }
            ContextUtil.exit();
        }
    }

    @Override
    public void destroy() {

    }
}
複製代碼

經過Servlet的Filter進行擴展,實現一個Filter,而後在doFilter方法中對請求進行限流控制,若是請求被限流則將請求重定向到一個默認頁面,不然將請求放行給下一個Filter。

規則持久化,動態化

Sentinel 的理念是開發者只須要關注資源的定義,當資源定義成功,能夠動態增長各類流控降級規則。

Sentinel 提供兩種方式修改規則:

  • 經過 API 直接修改 (loadRules)
  • 經過DataSource適配不一樣數據源修改

經過 API 修改比較直觀,能夠經過如下三個 API 修改不一樣的規則:

FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控規則
DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降級規則
SystemRuleManager.loadRules(List<SystemRule> rules); // 修改系統規則
複製代碼

DataSource 擴展

上述 loadRules() 方法只接受內存態的規則對象,但應用重啓後內存中的規則就會丟失,更多的時候規則最好可以存儲在文件、數據庫或者配置中心中。

DataSource 接口給咱們提供了對接任意配置源的能力。相比直接經過 API 修改規則,實現 DataSource 接口是更加可靠的作法。

官方推薦經過控制檯設置規則後將規則推送到統一的規則中心,用戶只須要實現 DataSource 接口,來監聽規則中心的規則變化,以實時獲取變動的規則

DataSource 拓展常見的實現方式有:

  • 拉模式:客戶端主動向某個規則管理中心按期輪詢拉取規則,這個規則中心能夠是 SQL、文件,甚至是 VCS 等。這樣作的方式是簡單,缺點是沒法及時獲取變動;
  • 推模式:規則中心統一推送,客戶端經過註冊監聽器的方式時刻監聽變化,好比使用 Nacos、Zookeeper 等配置中心。這種方式有更好的實時性和一致性保證。

至此,sentinel的基本狀況都已經分析了,更加詳細的內容,能夠繼續閱讀源碼來研究。

更多原創好文,請關注「逅弈逐碼」

更多原創好文,請關注公衆號「逅弈逐碼」

相關文章
相關標籤/搜索