爲何須要集羣流控呢?假設須要將某個API的總qps限制在100,機器數可能爲50,這時很天然的想到使用一個專門的server來統計總的調用量,其餘實例與該server通訊來判斷是否能夠調用,這就是基本的集羣流控方式,sentinel的實現就是這樣的。node
若是服務調用使用輪訓或者隨機路由方式,理論上能夠經過在各個單機上設置流控規則便可(單機qps上限=總qps上限 / 機器數
)。集羣流控能夠解決流量分配不均的問題致使整體流控效果不佳的問題,其能夠精確地控制整個集羣的調用總量,結合單機限流兜底,能夠更好地發揮流量控制的效果,不過因爲會與server進行通訊,因此性能上會有必定損耗。git
集羣流控中共有兩種身份:github
Sentinel 1.4.0 開始引入了集羣流控模塊,主要包含如下幾部分:安全
sentinel-cluster-common-default
: 公共模塊,包含公共接口和實體sentinel-cluster-client-default
: 默認集羣流控 client 模塊,使用 Netty 進行通訊,提供接口方便序列化協議擴展sentinel-cluster-server-default
: 默認集羣流控 server 模塊,使用 Netty 進行通訊,提供接口方便序列化協議擴展;同時提供擴展接口對接規則判斷的具體實現(TokenService),默認實現是複用 sentinel-core 的相關邏輯
大體瞭解集羣流控概念以後,下面一塊兒分析下集羣流控規則、client端和server端各自處理機制~網絡
FlowRule 添加了兩個字段用於集羣限流相關配置,以下所示。clusterMode在方法FlowRuleChecker.canPassCheck
中會用到進行判斷是不是集羣流控,false表示單機流控;true表示集羣流控,會調用方法passClusterCheck與集羣流控server端通訊判斷是否觸發了流控,此時異常降級策略爲本地流控(fallbackToLocalOrPass方法,fallbackToLocalWhenFail屬性爲true時執行本地流控,不然直接返回ture不走流控檢查)。數據結構
1 private boolean clusterMode; // 標識是否爲集羣限流配置 2 private ClusterFlowConfig clusterConfig; // 集羣限流相關配置項 3 4 // ClusterFlowConfig屬性 5 private Long flowId; // 全局惟一的規則 ID,由集羣限流管控端分配. 6 private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; // 閾值模式,默認(0)爲單機均攤,1 爲全局閾值. 7 private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; 8 private boolean fallbackToLocalWhenFail = true; // 在 client 鏈接失敗或通訊失敗時,是否退化到本地的限流模式 9 10 public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { 11 String limitApp = rule.getLimitApp(); 12 if (limitApp == null) { 13 return true; 14 } 15 if (rule.isClusterMode()) {// 集羣模式 16 return passClusterCheck(rule, context, node, acquireCount, prioritized); 17 } 18 // 單機模式流控 19 return passLocalCheck(rule, context, node, acquireCount, prioritized); 20 }
FLOW_CLUSTER_STRATEGY_NORMAL
,針對ClusterFlowConfig配置該屬性爲FLOW_CLUSTER_STRATEGY_NORMAL才合法,除此以外,暫無太多業務意義。client端的處理機制和單機是同樣的,只不過clusterMode和clusterConfig屬性配置上了而已,具體的client使用能夠參考官方文檔 集羣流控,這裏再也不贅述。若是是集羣流控,在FlowRuleChecker.canPassCheck
方法中會調用方法passClusterCheck
,以下:app
1 private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, 2 boolean prioritized) { 3 try { 4 TokenService clusterService = pickClusterService(); 5 if (clusterService == null) { 6 // 爲null降級處理 7 return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); 8 } 9 long flowId = rule.getClusterConfig().getFlowId(); 10 TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); 11 return applyTokenResult(result, rule, context, node, acquireCount, prioritized); 12 } catch (Throwable ex) { 13 RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); 14 } 15 // 降級處理 本地限流 16 return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); 17 }
requestToken負責與token server端通訊,入參包括flowId, acquireCount, prioritized
,這裏是沒有Resource信息的,server端經過flowid來獲取對應規則進行流控判斷。注意,調用writeAndFlush發送請求以後等待響應結果,最大等待時間ClusterClientConfigManager.getRequestTimeout()
;請求發送過程當中,出現任何異常或者返回錯誤(這裏不包括BLOCKED狀況),都會默認走降級本地流控邏輯:fallbackToLocalOrPass
。框架
瞭解了client端處理流程,接下來看下server端處理流程,client和server端都是用netty做爲底層網絡通訊服務,關於netty的原理不是本文討論的重點所以會簡單帶過。若是小夥伴們還不太熟悉netty,請參閱對應資料便可。對於netty,每一個Java開發者都須要瞭解甚至是熟悉的,這樣不只僅幫助咱們理解NIO及Reactor模型,還能再閱讀基於netty的框架源碼(好比dubbo/rocketmq等)時,將重點關注在框架自己實現上,而不是網絡通訊流程及細節上。
Sentinel 集羣限流服務端有兩種啓動方式:分佈式
目前針對token server高可用,sentinel並無對應的解決方案,不過沒有並不意味着沒考慮,由於默承認以降級走本地流控。sentinel做爲一個限流組件,在大部分應用場景中,若是token server掛了降級爲本地流控就能夠知足了。ide
若是必須考慮token server高可用,可考慮token server集羣部署,每一個token server都能訪問(或存儲)全量規則數據,多個client經過特定路由規則分配到不一樣的token server(相同類型服務路由到同一個token server,不一樣類型服務可路由到不一樣token server),token server故障時提供failover機制便可。若是此時考慮到相同類型服務出現網絡分區,也就是一部分服務能夠正常與token server通訊,另外一個部分服務沒法正常與token server通訊,若是沒法正常通訊的這部分服務直接進行failover,會致使集羣限流不許的問題,可經過zookeeper來保存在線的token server,若是zookeeper中token server列表有變化,再進行failover;此狀況下再出現任何形式的網絡分區,再執行降級邏輯,執行本地限流。
server端不論是獨立模式仍是嵌入模式,都是經過NettyTransportServer
來啓動的:
public void start() { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); p.addLast(new NettyRequestDecoder()); p.addLast(new LengthFieldPrepender(2)); p.addLast(new NettyResponseEncoder()); p.addLast(new TokenServerHandler(connectionPool)); } }); b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() { // }); }
以上邏輯主要是netty啓動邏輯,重點關注initChannel方法,這些是往pipeline添加自定義channelHandler,主要是處理粘包、編解碼器和業務處理Handler,這裏最重要的是TokenServerHandler,由於是請求處理邏輯,因此重點關注其channelRead方法:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 // 全局保存channel 3 globalConnectionPool.refreshLastReadTime(ctx.channel()); 4 if (msg instanceof ClusterRequest) { 5 ClusterRequest request = (ClusterRequest)msg; 6 if (request.getType() == ClusterConstants.MSG_TYPE_PING) { 7 // ping請求處理,會記錄namespace信息 8 handlePingRequest(ctx, request); 9 return; 10 } 11 // 根據request type獲取對應處理器 12 // 針對集羣流控,type爲MSG_TYPE_FLOW 13 RequestProcessor<?, ?> processor = RequestProcessorProvider.getProcessor(request.getType()); 14 ClusterResponse<?> response = processor.processRequest(request); 15 writeResponse(ctx, response); 16 } 17 }
針對集羣流控,type爲MSG_TYPE_FLOW
,對應處理器爲FlowRequestProcessor
。首先會提取請求入參 flowId, acquireCount, prioritized
,主要步驟以下:
根據限流規則檢查以後,會統計相關的PASS/BLOCK/PASS_REQUEST/BLOCK_REQUEST
等信息,該流程和單機流控流程是相似的,具體代碼再也不贅述。處理完成以後,會返回client端處理結果,至此整個集羣流控流程就分析完了。
往期精選
以爲文章不錯,對你有所啓發和幫助,但願能轉發給更多的小夥伴。若是有問題,請關注下面公衆號,發送問題給我,多謝。
歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。