Sentinel 是怎樣攔截異常流量的?


各位在家裏用電的過程當中,必定也經歷過「跳閘」。這個「」就是在電量超過負荷的時候用來保護咱們用電安全的,也被稱爲「斷路器」,還有個響亮的英文名 -- CircuitBreaker。
java


和用電安全同樣,對於「限流」、「降級」、「熔斷」...,你我應該也都耳熟能詳。咱們開發的各種軟件、系統、互聯網應用等爲了避免被異常流量壓垮,也須要一個斷路器。
node


在 Spring 應用中,使用斷路器很方便,咱們可使用 Spring Cloud CircuitBreaker。
web

Spring Cloud Circuit Breaker 是啥?若是你熟悉 Spring 是什麼人的話,你能猜個八九不離十。和Spring Data JPA 這些相似,Spring 他又搞了個抽象的,標準的API 出來。此次他抽象的是關於降級熔斷的「斷路器」。有了這一層,具體實現是誰能夠方便的更換,咱們使用的代碼裏改動基本爲0。數據庫


咱們先來從官方Demo有個初步印象:tomcat

@RestControllerpublic class DemoController { private CircuitBreakerFactory circuitBreakerFactory;  private HttpBinService httpBin; public DemoController(CircuitBreakerFactory circuitBreakerFactory, HttpBinService httpBinService) { this.circuitBreakerFactory = circuitBreakerFactory; this.httpBin = httpBinService;  } @GetMapping("/delay/{seconds}") public Map delay(@PathVariable int seconds) { return circuitBreakerFactory.create("delay").run(httpBin.delaySuppplier(seconds), t -> { Map<String, String> fallback = new HashMap<>(); fallback.put("hello", "world"); return fallback; }); }}


千言萬語,總結出來這樣一句circuitBreakerFactory.create("delay").run() 安全


由於是抽象,對應的實現就有好多種啦。微信

目前支持的實現有:架構

  • Hystrixapp

  • Resilience4j框架

  • Sentinel

  • Spring Retry


而抽象至關於定了個標準,像JDBC同樣,不管咱們把數據庫換成了MySQL,Oracle 仍是SQLite,接口等非特定類型的代碼都不須要改變。斷路器也同樣。

這裏的斷路器工廠,建立方法都是標準的。具體這裏執行業務邏輯的時候斷路器實現要怎樣進行攔截降級,就能夠交給具體的實現來完成。


此次,咱們以開源的 Sentinel 爲例,來看看他們是怎樣攔住異常流量的。


首先,由於是Spring Cloud,因此還會基於 Spring Boot 的 Autoconfiguration。如下是配置類,咱們看到生成了一個工廠。

public class SentinelCircuitBreakerAutoConfiguration { @Bean @ConditionalOnMissingBean(CircuitBreakerFactory.class) public CircuitBreakerFactory sentinelCircuitBreakerFactory() { return new SentinelCircuitBreakerFactory(); } }

在咱們實際代碼執行邏輯的時候,create 出來的是什麼呢?

是個斷路器 CircuitBreaker,用來執行代碼。

public interface CircuitBreaker {
default <T> T run(Supplier<T> toRun) { return run(toRun, throwable -> { throw new NoFallbackAvailableException("No fallback available.", throwable); });  }; <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);}


包含兩個執行的方法,須要在的時候能夠指定fallback邏輯。具體到 Sentinel 是這樣的:

 public CircuitBreaker create(String id) { SentinelConfigBuilder.SentinelCircuitBreakerConfiguration conf = getConfigurations() .computeIfAbsent(id, defaultConfiguration); return new SentinelCircuitBreaker(id, conf.getEntryType(), conf.getRules()); }


你會看到建立了一個SentinelCircuitBreaker。咱們的業務邏輯,就會在這個斷路器裏執行,run方法就是各個具體實現的舞臺。


@Override public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) { Entry entry = null; try { entry = SphU.entry(resourceName, entryType); // If the SphU.entry() does not throw `BlockException`, it means that the // request can pass. return toRun.get(); } catch (BlockException ex) { // SphU.entry() may throw BlockException which indicates that // the request was rejected (flow control or circuit breaking triggered). // So it should not be counted as the business exception. return fallback.apply(ex); } catch (Exception ex) { // For other kinds of exceptions, we'll trace the exception count via // Tracer.trace(ex). Tracer.trace(ex); return fallback.apply(ex); } finally { // Guarantee the invocation has been completed. if (entry != null) { entry.exit(); } } }


OK,到此爲止, Spring Cloud CircuitBreaker 已經展示完了。其它的細節都放到了具體實現的「盒子」裏。下面咱們把這個盒子打開。



Sentinel 是個熔斷降級框架,官方這樣自我介紹:

面向分佈式服務架構的高可用流量控制組件,主要以流量爲切入點,從流量控制、熔斷降級、系統自適應保護等多個維度來幫助用戶保障微服務的穩定性。


官網的這張代碼截圖簡潔的說明了他是怎樣工做的


擋在業務代碼的前面,有事兒先衝它來,能經過以後才走業務邏輯,和各種闖關還真相似。


在上面 CircuitBreaker 的 run 方法裏,我們必定都注意到了這句

entry = SphU.entry(resourceName, entryType);

這就是一切攔截的祕密。


不管咱們是經過前面的CircuitBreaker的方式,仍是 @SentinelResource 這種註解形式,仍是經過 Interceptor 的方式,沒什麼本質區別。只是觸發點不同。最後都是經過SphU來搞定。

既然是攔截,那必定要攔下來作這樣或那樣的檢查。


實際檢查的時候,entry 裏核心代碼有這些:

 Entry entryWithPriority(ResourceWrapper resourceWrapper, ...)        throws BlockException {        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper,...); } catch (BlockException e1) { e.exit(count, args); throw e1; }  return e; }


注意這裏的ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);在請求過來處理的時候,若是未初始化處理鏈,則進行初始化,將各類first,next設置好,後面的請求都會按這個來處理。全部須要攔截的Slot,都會加到這個 chain 裏面,再逐個執行 chain 裏的 slot。和Servlet Filter 相似。


chain裏都加了些啥呢?

public class HotParamSlotChainBuilder implements SlotChainBuilder { public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new ParamFlowSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; }


初始的時候,first 指向一個匿名內部類,這些加進來的slot,會在每次addLast的時候,作爲鏈的next,    


AbstractLinkedProcessorSlot<?> end = first;
@Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; }    } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; }


而每一個 slot,有本身的特定用處,處理完本身的邏輯以後,會經過 fireEntry 來觸發下一個 slot的執行。

給你一張長長的線程調用棧就會過度的明顯了:

 java.lang.Thread.State: RUNNABLE at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.checkFlow(FlowSlot.java:168) at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:161) at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:139) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:39) at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:33) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:36) at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:30) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:39) at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:33) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:57) at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:50) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:35) at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:29) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:101) at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:47) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot.entry(NodeSelectorSlot.java:171) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain$1.entry(DefaultProcessorSlotChain.java:31) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain.entry(DefaultProcessorSlotChain.java:75) at com.alibaba.csp.sentinel.CtSph.entryWithPriority(CtSph.java:148) at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:347) at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:340) at com.alibaba.csp.sentinel.SphU.entry(SphU.java:285)



降級有三種類型


每種類型,都會根據對應的配置項數據比對,不符合就中斷,中斷以後也不能一直斷着,啥時候再恢復呢?就根據配置的時間窗口,會啓動一個恢復線程,到時間就會調度,把中斷標識恢復。

public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { if (cut.get()) { return false;        } ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); if (clusterNode == null) { return true;        } if (grade == RuleConstant.DEGRADE_GRADE_RT) { double rt = clusterNode.avgRt(); if (rt < this.count) { passCount.set(0); return true;            } // Sentinel will degrade the service only if count exceeds. if (passCount.incrementAndGet() < rtSlowRequestAmount) { return true; } } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { double exception = clusterNode.exceptionQps(); double success = clusterNode.successQps(); double total = clusterNode.totalQps(); // If total amount is less than minRequestAmount, the request will pass. if (total < minRequestAmount) { return true;            } // In the same aligned statistic time window, // "success" (aka. completed count) = exception count + non-exception count (realSuccess) double realSuccess = success - exception; if (realSuccess <= 0 && exception < minRequestAmount) { return true;            } if (exception / success < count) { return true; } } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { double exception = clusterNode.totalException(); if (exception < count) { return true; }        } if (cut.compareAndSet(false, true)) { ResetTask resetTask = new ResetTask(this); pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);        } return false; }


恢復作了兩件事:1、把passCount設置成0,2、中斷標識還原


上面介紹了對請求的攔截處理,這其中最核心的,也就是咱們最主要配置的,一個是「流控」,一個是「降級」。這兩個對應的Slot,會在處理請求的時候,根據配置好的 「規則」rule 來判斷。好比咱們上面看到的時間窗口、熔斷時間等,以及流控的線程數,QPS數這些。


這些規則默認的配置在內存裏,也能夠經過不一樣的數據源加載進來。同時啓用了Sentinel 控制檯的話,在控制檯 也能夠配置規則。這些規則,會經過 HTTP 發送給對應使用了 sentinel 的應用實例節點。

收到更新內容後,實例裏的「規則管理器」會從新加載一次 rule,下次請求處理就直接生效了。


相關閱讀

MySQL: 喂,別走,聽我解釋一下好嗎?

多表查詢用什麼聯接?別信感受,用數聽說話

一個數據庫SQL查詢的數次輪迴

數據庫是咋工做的?(一)

憑什麼讓日誌先寫?

Java七武器系列長生劍 -- Java虛擬機的顯微鏡 Serviceability Agent

Java七武器系列霸王槍 -- 線程狀態分析 jstack

Java七武器系列孔雀翎-- 問題診斷神器BTrace

嵌套事務、掛起事務,Spring 是怎樣給事務又實現傳播特性的?

怎樣閱讀源代碼?





源碼|實戰|成長|職場


這裏是「Tomcat那些事兒

請留下你的足跡

咱們一塊兒「終身成長」

本文分享自微信公衆號 - Tomcat那些事兒(tomcat0000)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索