SphU.entry
,這明顯是很關鍵的方法,下圖的內容就是這裏構建的-Sentinel工做主流程就包含在上面一個方法裏,經過鏈式調用的方式,通過了創建樹狀結構,保存統計簇點,異常日誌記錄,實時數據統計,負載保護,權限認證,流量控制,熔斷降級等Slothtml
Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); }
HttpEventTask
類,它開啓了一個線程,轉麼用來作爲socket鏈接,控制檯經過socket請求通知客戶端,從而更新客戶端規則,更改規則核心代碼以下// Find the matching command handler. CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); if (commandHandler != null) { CommandResponse<?> response = commandHandler.handle(request); handleResponse(response, printWriter, outputStream); } else { // No matching command handler. badRequest(printWriter, "Unknown command `" + commandName + '`'); }
經過命令模式,commandName爲setRules時,更新規則node
sentinel-transport-netty-http
這個包和sentinel-transport-simple-http
處於同級,官方的例子用的simple-http,但明顯它也準備了netty-http,因而我替換成了netty-http,運行後效果和原先同樣,至於效率上有沒有提高,我就不清楚了^_^FlowRuleChecker
,在core核心包中,核心檢查方法以下private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
DegradeRuleManager
,在core核心包,核心內容以下,再深刻就是它判斷的算法了,感興趣的本身去看以下的passCheck
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { Set<DegradeRule> rules = degradeRules.get(resource.getName()); if (rules == null) { return; } for (DegradeRule rule : rules) { if (!rule.passCheck(context, node, count)) { throw new DegradeException(rule.getLimitApp(), rule); } } }
DefaultSlotChainBuilder
,構建了以下的slotpublic class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
SlotChainProvider
中的構建方法以下private static void resolveSlotChainBuilder() { List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>(); boolean hasOther = false; for (SlotChainBuilder builder : LOADER) { if (builder.getClass() != DefaultSlotChainBuilder.class) { hasOther = true; list.add(builder); } } if (hasOther) { builder = list.get(0); } else { // No custom builder, using default. builder = new DefaultSlotChainBuilder(); } RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: " + builder.getClass().getCanonicalName()); }
LOADER
中加入了其餘的非默認實現就能夠替代原來的DefaultSlotChainBuilder
,那LOADER
怎麼來的?看代碼,以下的全局變量,也就是須要自定義實現SlotChainBuilder
接口的實現類private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);
ServiceLoader
,也就是SPI
,全稱Service Provider Interface
,加載它須要特定的配合,好比我自定義實現一個Slot
/** * @author laoliangliang * @date 2019/7/25 14:13 */ public class MySlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); //自定義的 chain.addLast(new CarerSlot()); return chain; } }
/** * @author laoliangliang * @date 2019/7/25 14:15 */ @Slf4j public class CarerSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { log.info(JSON.toJSONString(resourceWrapper)); fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
CarerSlot
,那是否能被加載到呢?事實上還不夠,須要在META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
建這樣一個文件,內容以下public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }
SphU.entry
的源碼,就會發現,以下,該方法一進入不就是先獲取Env的sph,再調用的entry嗎,因此初始化的地方也就找到了,第一次調用SphU.entry
的地方,或者你不用這個,使用的註解,裏面一樣有這個方法public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }
SphU.entry
包裹能夠實現熔斷降級,經過註解的形式包裹代碼方法應該是比較容易的,那麼在哪裏實現和配置的呢@Bean public SentinelResourceAspect sentinelResourceAspect() { pushlish(); return new SentinelResourceAspect(); }
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { }
對@SentinelResource
註解進行了處理git
Warm Up
模式不看算法細節,看它的中文說明應該就能理解是怎麼回事了吧;所謂慢啓動模式,要求系統的QPS請求增速不能超過必定的速率,不然會被壓制超過部分請求失敗,應該是爲了不一啓動就有大流量的請求進入致使系統一會兒就宕機卡主或直接進入了熔斷@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = (long) node.passQps(); long previousQps = (long) node.previousPassQps(); syncToken(previousQps); // 開始計算它的斜率 // 若是進入了警惕線,開始調整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,可是要比慢 // current interval = restToken*slope+1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { return true; } } return false; }
今日視頻:微服務架構實戰160講
歡迎關注公衆號,一塊兒學習進步