逅弈 轉載請註明原創出處,謝謝!java
《一塊兒學 Sentinel》 原理-實體類github
《一塊兒學 Sentinel》 實戰-控制檯篇spring
Sentinel 系列教程,現已上傳到 github 和 gitee 中:緩存
Sentinel 是阿里中間件團隊開源的,面向分佈式服務架構的輕量級高可用流量控制組件,主要以流量爲切入點,從流量控制、熔斷降級、系統負載保護等多個維度來幫助用戶保護服務的穩定性。多線程
你們可能會問:Sentinel 和以前經常使用的熔斷降級庫 Netflix Hystrix 有什麼異同呢?Sentinel官網有一個對比的文章,這裏摘抄一個總結的表格,具體的對比能夠點此 連接 查看。
對比內容 | Sentinel | Hystrix |
---|---|---|
隔離策略 | 信號量隔離 | 線程池隔離/信號量隔離 |
熔斷降級策略 | 基於響應時間或失敗比率 | 基於失敗比率 |
實時指標實現 | 滑動窗口 | 滑動窗口(基於 RxJava) |
規則配置 | 支持多種數據源 | 支持多種數據源 |
擴展性 | 多個擴展點 | 插件的形式 |
基於註解的支持 | 支持 | 支持 |
限流 | 基於 QPS,支持基於調用關係的限流 | 不支持 |
流量整形 | 支持慢啓動、勻速器模式 | 不支持 |
系統負載保護 | 支持 | 不支持 |
控制檯 | 開箱即用,可配置規則、查看秒級監控、機器發現等 | 不完善 |
常見框架的適配 | Servlet、Spring Cloud、Dubbo、gRPC 等 | Servlet、Spring Cloud Netflix |
從對比的表格能夠看到,Sentinel比Hystrix在功能性上還要強大一些,本文讓咱們一塊兒來了解下Sentinel的源碼,揭開Sentinel的神祕面紗。
將Sentinel的源碼fork到本身的github庫中,接着把源碼clone到本地,而後開始源碼閱讀之旅吧。
首先咱們看一下Sentinel項目的整個結構:
基本上每一個框架都會帶有樣例模塊,有的叫example,有的叫demo,sentinel也不例外。
那咱們從sentinel的demo中找一個例子運行下看看大體的狀況吧,上面說過了sentinel主要的核心功能是作限流、降級和系統保護,那咱們就從「限流」開始看sentinel的實現原理吧。
能夠看到sentinel-demo模塊中有不少不一樣的樣例,咱們找到basic模塊下的flow包,這個包下面就是對應的限流的樣例,可是限流也有不少種類型的限流,咱們就找根據qps限流的類看吧,其餘的限流方式原理上都大差不差。
public class FlowQpsDemo { private static final String KEY = "abc"; private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 32; private static int seconds = 30; public static void main(String[] args) throws Exception { initFlowQpsRule(); tick(); // first make the system run on a very low condition simulateTraffic(); System.out.println("===== begin to do flow control"); System.out.println("only 20 requests per second can pass"); } private static void initFlowQpsRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource(KEY); // set limit qps to 20 rule1.setCount(20); // 設置限流類型:根據qps rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); rule1.setLimitApp("default"); rules.add(rule1); // 加載限流的規則 FlowRuleManager.loadRules(rules); } private static void simulateTraffic() { for (int i = 0; i < threadCount; i++) { Thread t = new Thread(new RunTask()); t.setName("simulate-traffic-Task"); t.start(); } } private static void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } static class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("begin to statistic!!!"); long oldTotal = 0; long oldPass = 0; long oldBlock = 0; while (!stop) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } long globalTotal = total.get(); long oneSecondTotal = globalTotal - oldTotal; oldTotal = globalTotal; long globalPass = pass.get(); long oneSecondPass = globalPass - oldPass; oldPass = globalPass; long globalBlock = block.get(); long oneSecondBlock = globalBlock - oldBlock; oldBlock = globalBlock; System.out.println(seconds + " send qps is: " + oneSecondTotal); System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock); if (seconds-- <= 0) { stop = true; } } long cost = System.currentTimeMillis() - start; System.out.println("time cost: " + cost + " ms"); System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get()); System.exit(0); } } static class RunTask implements Runnable { @Override public void run() { while (!stop) { Entry entry = null; try { entry = SphU.entry(KEY); // token acquired, means pass pass.addAndGet(1); } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (entry != null) { entry.exit(); } } Random random2 = new Random(); try { TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); } catch (InterruptedException e) { // ignore } } } } }
執行上面的代碼後,打印出以下的結果:
能夠看到,上面的結果中,pass的數量和咱們的預期並不相同,咱們預期的是每秒容許pass的請求數是20個,可是目前有不少pass的請求數是超過20個的。
緣由是,咱們這裏測試的代碼使用了多線程,注意看 threadCount
的值,一共有32個線程來模擬,而在RunTask的run方法中執行資源保護時,即在 SphU.entry
的內部是沒有加鎖的,因此就會致使在高併發下,pass的數量會高於20。
能夠用下面這個模型來描述下,有一個TimeTicker線程在作統計,每1秒鐘作一次。有N個RunTask線程在模擬請求,被訪問的business code被資源key保護着,根據規則,每秒只容許20個請求經過。
因爲pass、block、total等計數器是全局共享的,而多個RunTask線程在執行SphU.entry申請獲取entry時,內部沒有鎖保護,因此會存在pass的個數超過設定的閾值。
那爲了證實在單線程下限流的正確性與可靠性,那咱們的模型就應該變成了這樣:
那接下來我把 threadCount
的值改成1,只有一個線程來執行這個方法,看下具體的限流結果,執行上面的代碼後打印的結果以下:
能夠看到pass數基本上維持在20,可是第一次統計的pass值仍是超過了20。這又是什麼緣由致使的呢?
其實仔細看下Demo中的代碼能夠發現,模擬請求是用的一個線程,統計結果是用的另一個線程,統計線程每1秒鐘統計一次結果,這兩個線程之間是有時間上的偏差的。從TimeTicker線程打印出來的時間戳能夠看出來,雖然每隔一秒進行統計,可是當前打印時的時間和上一次的時間仍是有偏差的,不徹底是1000ms的間隔。
要真正驗證每秒限制20個請求,保證數據的精準性,須要作基準測試,這個不是本篇文章的重點,有興趣的同窗能夠去了解下jmh,sentinel中的基準測試也是經過jmh作的。
經過一個簡單的示例程序,咱們瞭解了sentinel能夠對請求進行限流,除了限流外,還有降級和系統保護等功能。那如今咱們就撥開雲霧,深刻源碼內部去一窺sentinel的實現原理吧。
首先從入口開始:SphU.entry()
。這個方法會去申請一個entry,若是可以申請成功,則說明沒有被限流,不然會拋出BlockException,表面已經被限流了。
從 SphU.entry()
方法往下執行會進入到 Sph.entry()
,Sph的默認實現類是 CtSph
,在CtSph中最終會執行到 entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException
這個方法。
咱們來看一下這個方法的具體實現:
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // Init the entry only. No rule checking will occur. return new CtEntry(resourceWrapper, null, context); } if (context == null) { context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // 獲取該資源對應的SlotChain ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no * rule checking will be done. */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { // 執行Slot的entry方法 chain.entry(context, resourceWrapper, null, count, args); } catch (BlockException e1) { e.exit(count, args); // 拋出BlockExecption throw e1; } catch (Throwable e1) { RecordLog.info("Sentinel unexpected exception", e1); } return e; }
這個方法能夠分爲如下幾個部分:
其中比較重要的是第二、3兩個步驟,咱們來分解一下這兩個步驟。
首先看一下lookProcessChain的方法實現:
private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 具體構造chain的方法 chain = Env.slotsChainbuilder.build(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
該方法使用了一個HashMap作了緩存,key是資源對象。這裏加了鎖,而且作了 double check
。具體構造chain的方法是經過: Env.slotsChainbuilder.build()
這句代碼建立的。那就進入這個方法看看吧。
public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; }
Chain是鏈條的意思,從build的方法可看出,ProcessorSlotChain是一個鏈表,裏面添加了不少個Slot。具體的實現須要到DefaultProcessorSlotChain中去看。
public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } }
DefaultProcessorSlotChain中有兩個AbstractLinkedProcessorSlot類型的變量:first和end,這就是鏈表的頭結點和尾節點。
建立DefaultProcessorSlotChain對象時,首先建立了首節點,而後把首節點賦值給了尾節點,能夠用下圖表示:
將第一個節點添加到鏈表中後,整個鏈表的結構變成了以下圖這樣:
將全部的節點都加入到鏈表中後,整個鏈表的結構變成了以下圖所示:
這樣就將全部的Slot對象添加到了鏈表中去了,每個Slot都是繼承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一種責任鏈的設計,每一個對象中都有一個next屬性,指向的是另外一個AbstractLinkedProcessorSlot對象。其實責任鏈模式在不少框架中都有,好比Netty中是經過pipeline來實現的。
知道了SlotChain是如何建立的了,那接下來就要看下是如何執行Slot的entry方法的了。
lookProcessChain方法得到的ProcessorSlotChain的實例是DefaultProcessorSlotChain,那麼執行chain.entry方法,就會執行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是這樣的:
@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) throws Throwable { first.transformEntry(context, resourceWrapper, t, count, args); }
也就是說,DefaultProcessorSlotChain的entry實際是執行的first屬性的transformEntry方法。
而transformEntry方法會執行當前節點的entry方法,在DefaultProcessorSlotChain中first節點重寫了entry方法,具體以下:
@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, args); }
first節點的entry方法,實際又是執行的super的fireEntry方法,那繼續把目光轉移到fireEntry方法,具體以下:
@Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, args); } }
從這裏能夠看到,從fireEntry方法中就開始傳遞執行entry了,這裏會執行當前節點的下一個節點transformEntry方法,上面已經分析過了,transformEntry方法會觸發當前節點的entry,也就是說fireEntry方法實際是觸發了下一個節點的entry方法。具體的流程以下圖所示:
從圖中能夠看出,從最初的調用Chain的entry()方法,轉變成了調用SlotChain中Slot的entry()方法。從上面的分析能夠知道,SlotChain中的第一個Slot節點是NodeSelectorSlot。
如今能夠把目光轉移到SlotChain中的第一個節點NodeSelectorSlot的entry方法中去了,具體的代碼以下:
@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; } // Build invocation tree ((DefaultNode)context.getLastNode()).addChild(node); } } context.setCurNode(node); // 由此觸發下一個節點的entry方法 fireEntry(context, resourceWrapper, node, count, args); }
從代碼中能夠看到,NodeSelectorSlot節點作了一些本身的業務邏輯處理,具體的你們能夠深刻源碼繼續追蹤,這裏大概的介紹下每種Slot的功能職責:
NodeSelectorSlot
負責收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;ClusterBuilderSlot
則用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用做爲多維度限流,降級的依據;StatistcSlot
則用於記錄,統計不一樣緯度的 runtime 信息;FlowSlot
則用於根據預設的限流規則,以及前面 slot 統計的狀態,來進行限流;AuthorizationSlot
則根據黑白名單,來作黑白名單控制;DegradeSlot
則經過統計信息,以及預設的規則,來作熔斷降級;SystemSlot
則經過系統的狀態,例如 load1 等,來控制總的入口流量;執行完業務邏輯處理後,調用了fireEntry()方法,由此觸發了下一個節點的entry方法。此時咱們就知道了sentinel的責任鏈就是這樣傳遞的:每一個Slot節點執行完本身的業務後,會調用fireEntry來觸發下一個節點的entry方法。
因此能夠將上面的圖完整了,具體以下:
至此就經過SlotChain完成了對每一個節點的entry()方法的調用,每一個節點會根據建立的規則,進行本身的邏輯處理,當統計的結果達到設置的閾值時,就會觸發限流、降級等事件,具體是拋出BlockException異常。
sentinel主要是基於7種不一樣的Slot造成了一個鏈表,每一個Slot都各司其職,本身作完份內的事以後,會把請求傳遞給下一個Slot,直到在某一個Slot中命中規則後拋出BlockException而終止。
前三個Slot負責作統計,後面的Slot負責根據統計的結果結合配置的規則進行具體的控制,是Block該請求仍是放行。
控制的類型也有不少可選項:根據qps、線程數、冷啓動等等。
而後基於這個核心的方法,衍生出了不少其餘的功能:
sentinel-dashboard是一個單獨的應用,經過spring-boot進行啓動,主要提供一個輕量級的控制檯,它提供機器發現、單機資源實時監控、集羣資源彙總,以及規則管理的功能。
咱們只須要對應用進行簡單的配置,就可使用這些功能。
mvn clean package
使用以下命令啓動編譯後的控制檯:
$ java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -jar target/sentinel-dashboard.jar
上述命令中咱們指定了一個JVM參數,-Dserver.port=8080
用於指定 Spring Boot 啓動端口爲 8080
。
控制檯啓動後,客戶端須要按照如下步驟接入到控制檯。
經過 pom.xml
引入 jar 包:
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-transport-simple-http</artifactId> <version>x.y.z</version> </dependency>
啓動時加入 JVM 參數 -Dcsp.sentinel.dashboard.server=consoleIp:port
指定控制檯地址和端口。若啓動多個應用,則須要經過 -Dcsp.sentinel.api.port=xxxx
指定客戶端監控 API 的端口(默認是 8719)。
除了修改 JVM 參數,也能夠經過配置文件取得一樣的效果。更詳細的信息能夠參考 啓動配置項。
確保客戶端有訪問量,Sentinel 會在客戶端首次調用的時候進行初始化,開始向控制檯發送心跳包。
sentinel-dashboard是一個獨立的web應用,能夠接受客戶端的鏈接,而後與客戶端之間進行通信,他們之間使用http協議進行通信。他們之間的關係以下圖所示:
dashboard啓動後會等待客戶端的鏈接,具體的作法是在 MachineRegistryController
中有一個 receiveHeartBeat
的方法,客戶端發送心跳消息,就是經過http請求這個方法。
dashboard接收到客戶端的心跳消息後,會把客戶端的傳遞過來的ip、port等信息封裝成一個 MachineInfo
對象,而後將該對象經過 MachineDiscovery
接口的 addMachine
方法添加到一個ConcurrentHashMap中保存起來。
這裏會有問題,由於客戶端的信息是保存在dashboard的內存中的,因此當dashboard應用重啓後,以前已經發送過來的客戶端信息都會丟失掉。
client在啓動時,會經過CommandCenterInitFunc選擇一個,而且只選擇一個CommandCenter進行啓動。
啓動以前會經過spi的方式掃描獲取到全部的CommandHandler的實現類,而後將全部的CommandHandler註冊到一個HashMap中去,待後期使用。
PS:考慮一下,爲何CommandHandler不須要作持久化,而是直接保存在內存中。
註冊完CommandHandler以後,緊接着就啓動CommandCenter了,目前CommandCenter有兩個實現類:
CommandCenter啓動後,就等待dashboard發送消息過來了,當接收到消息後,會把消息經過具體的CommandHandler進行處理,而後將處理的結果返回給dashboard。
這裏須要注意的是,dashboard給client發送消息是經過異步的httpClient進行發送的,在HttpHelper類中。
可是詭異的是,既然經過異步發送了,又經過一個CountDownLatch來等待消息的返回,而後獲取結果,那這樣不就失去了異步的意義的嗎?具體的代碼以下:
private String httpGetContent(String url) { final HttpGet httpGet = new HttpGet(url); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<String> reference = new AtomicReference<>(); httpclient.execute(httpGet, new FutureCallback<HttpResponse>() { @Override public void completed(final HttpResponse response) { try { reference.set(getBody(response)); } catch (Exception e) { logger.info("httpGetContent " + url + " error:", e); } finally { latch.countDown(); } } @Override public void failed(final Exception ex) { latch.countDown(); logger.info("httpGetContent " + url + " failed:", ex); } @Override public void cancelled() { latch.countDown(); } }); try { latch.await(5, TimeUnit.SECONDS); } catch (Exception e) { logger.info("wait http client error:", e); } return reference.get(); }
sentinel也對一些主流的框架進行了適配,使得在使用主流框架時,也能夠享受到sentinel的保護。目前已經支持的適配器包括如下這些:
其實作適配就是經過那些主流框架的擴展點,而後在擴展點上加入sentinel限流降級的代碼便可。拿Servlet的適配代碼看一下,具體的代碼是:
public class CommonFilter implements Filter { @Override public void init(FilterConfig filterConfig) { } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest sRequest = (HttpServletRequest)request; Entry entry = null; try { // 根據請求生成的資源 String target = FilterUtil.filterTarget(sRequest); target = WebCallbackManager.getUrlCleaner().clean(target); // 「申請」該資源 ContextUtil.enter(target); entry = SphU.entry(target, EntryType.IN); // 若是能成功「申請」到資源,則說明未被限流 // 則將請求放行 chain.doFilter(request, response); } catch (BlockException e) { // 不然若是捕獲了BlockException異常,說明請求被限流了 // 則將請求重定向到一個默認的頁面 HttpServletResponse sResponse = (HttpServletResponse)response; WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse); } catch (IOException e2) { // 省略部分代碼 } finally { if (entry != null) { entry.exit(); } ContextUtil.exit(); } } @Override public void destroy() { } }
經過Servlet的Filter進行擴展,實現一個Filter,而後在doFilter方法中對請求進行限流控制,若是請求被限流則將請求重定向到一個默認頁面,不然將請求放行給下一個Filter。
Sentinel 的理念是開發者只須要關注資源的定義,當資源定義成功,能夠動態增長各類流控降級規則。
Sentinel 提供兩種方式修改規則:
loadRules
)DataSource
適配不一樣數據源修改經過 API 修改比較直觀,能夠經過如下三個 API 修改不一樣的規則:
FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控規則 DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降級規則 SystemRuleManager.loadRules(List<SystemRule> rules); // 修改系統規則
上述 loadRules()
方法只接受內存態的規則對象,但應用重啓後內存中的規則就會丟失,更多的時候規則最好可以存儲在文件、數據庫或者配置中心中。
DataSource
接口給咱們提供了對接任意配置源的能力。相比直接經過 API 修改規則,實現 DataSource
接口是更加可靠的作法。
官方推薦經過控制檯設置規則後將規則推送到統一的規則中心,用戶只須要實現 DataSource 接口,來監聽規則中心的規則變化,以實時獲取變動的規則。
DataSource
拓展常見的實現方式有:
至此,sentinel的基本狀況都已經分析了,更加詳細的內容,能夠繼續閱讀源碼來研究。