Dubbo分析Serialize層
Dubbo分析之Transport層
Dubbo分析之Exchange 層
Dubbo分析之Protocol層
Dubbo分析之Cluster層
Dubbo分析之Registry層javascript
緊接上文Dubbo分析之Protocol層,本文繼續分析dubbo的cluster層,此層封裝多個提供者的路由及負載均衡,並橋接註冊中心,以Invoker爲中心,擴展接口爲Cluster, Directory, Router, LoadBalance;java
整個cluster層可使用以下圖片歸納:
算法
各節點關係:
這裏的Invoker是Provider的一個可調用Service的抽象,Invoker封裝了Provider地址及Service接口信息;
Directory表明多個Invoker,能夠把它當作List ,但與List不一樣的是,它的值多是動態變化的,好比註冊中心推送變動;
Cluster將Directory中的多個Invoker假裝成一個 Invoker,對上層透明,假裝過程包含了容錯邏輯,調用失敗後,重試另外一個;
Router負責從多個Invoker中按路由規則選出子集,好比讀寫分離,應用隔離等;
LoadBalance負責從多個Invoker中選出具體的一個用於本次調用,選的過程包含了負載均衡算法,調用失敗後,須要重選;segmentfault
Cluster通過目錄,路由,負載均衡獲取到一個可用的Invoker,交給上層調用,接口以下:緩存
@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
Cluster是一個集羣容錯接口,通過路由,負載均衡以後獲取的Invoker,由容錯機制來處理,dubbo提供了多種容錯機制包括:
Failover Cluster:失敗自動切換,當出現失敗,重試其它服務器 [1]。一般用於讀操做,但重試會帶來更長延遲。可經過 retries=」2″ 來設置重試次數(不含第一次)。
Failfast Cluster:快速失敗,只發起一次調用,失敗當即報錯。一般用於非冪等性的寫操做,好比新增記錄。
Failsafe Cluster:失敗安全,出現異常時,直接忽略。一般用於寫入審計日誌等操做。
Failback Cluster:失敗自動恢復,後臺記錄失敗請求,定時重發。一般用於消息通知操做。
Forking Cluster:並行調用多個服務器,只要一個成功即返回。一般用於實時性要求較高的讀操做,但須要浪費更多服務資源。可經過 forks=」2″ 來設置最大並行數。
Broadcast Cluster:廣播調用全部提供者,逐個調用,任意一臺報錯則報錯 [2]。一般用於通知全部提供者更新緩存或日誌等本地資源信息。安全
默認使用了FailoverCluster,失敗的時候會默認重試其餘服務器,默認爲兩次;固然也能夠擴展其餘的容錯機制;看一下默認的FailoverCluster容錯機制,具體源碼在FailoverClusterInvoker中:ruby
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
invocation是客戶端傳給服務器的相關參數包括(方法名稱,方法參數,參數值,附件信息),invokers是通過路由以後的服務器列表,loadbalance是指定的負載均衡策略;首先檢查invokers是否爲空,爲空直接拋異常,而後獲取重試的次數默認爲2次,接下來就是循環調用指定次數,若是不是第一次調用(表示第一次調用失敗),會從新加載服務器列表,而後經過負載均衡策略獲取惟一的Invoker,最後就是經過Invoker把invocation發送給服務器,返回結果Result;服務器
具體的doInvoke方法是在抽象類AbstractClusterInvoker中被調用的:負載均衡
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
首先經過Directory獲取Invoker列表,同時在Directory中也會作路由處理,而後獲取負載均衡策略,最後調用具體的容錯策略;下面具體看一下Directory;dom
接口定義以下:
public interface Directory<T> extends Node { /** * get service type. * * @return service type. */ Class<T> getInterface(); /** * list invokers. * * @return invokers */ List<Invoker<T>> list(Invocation invocation) throws RpcException; }
目錄服務做用就是獲取指定接口的服務列表,具體實現有兩個:StaticDirectory和RegistryDirectory,同時都繼承於AbstractDirectory;從名字能夠大體知道StaticDirectory是一個固定的目錄服務,表示裏面的Invoker列表不會動態改變;RegistryDirectory是一個動態的目錄服務,經過註冊中心動態更新服務列表;list實如今抽象類中:
public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } List<Invoker<T>> invokers = doList(invocation); List<Router> localRouters = this.routers; // local reference if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; }
首先檢查目錄是否被銷燬,而後調用doList,具體在實現類中定義,最後調用路由功能,下面重點看一下StaticDirectory和RegistryDirectory中的doList方法
是一個動態的目錄服務,全部能夠看到RegistryDirectory同時也繼承了NotifyListener接口,是一個通知接口,註冊中心有服務列表更新的時候,同時通知RegistryDirectory,通知邏輯以下:
public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { List<Router> routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); }
此通知接口會接受三種類別的url包括:router(路由),configurator(配置),provider(服務提供方);
路由規則:決定一次dubbo服務調用的目標服務器,分爲條件路由規則和腳本路由規則,而且支持可擴展,向註冊中心寫入路由規則的操做一般由監控中心或治理中心的頁面完成;
配置規則:向註冊中心寫入動態配置覆蓋規則 [1]。該功能一般由監控中心或治理中心的頁面完成;
provider:動態提供的服務列表
路由規則和配置規則其實就是對provider服務列表更新和過濾處理,refreshInvoker方法就是根據三種url類別刷新本地的invoker列表,下面看一下RegistryDirectory實現的doList接口:
public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } List<Invoker<T>> invokers = null; Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter } if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; }
refreshInvoker處理以後,服務列表已methodInvokerMap存在,一個方法對應服務列表Map>>;
經過Invocation中指定的方法獲取對應的服務列表,若是具體的方法沒有對應的服務列表,則獲取」*」對應的服務列表;處理完以後就在父類中進行路由處理,路由規則一樣是經過通知接口獲取的,路由規則在下章介紹;
這是一個靜態的目錄服務,裏面的服務列表在初始化的時候就已經存在,而且不會改變;StaticDirectory用得比較少,主要用在服務對多註冊中心的引用;
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { return invokers; }
由於是靜態的,全部doList方法也很簡單,直接返回內存中的服務列表便可;
路由規則決定一次dubbo服務調用的目標服務器,分爲條件路由規則和腳本路由規則,而且支持可擴展,接口以下:
public interface Router extends Comparable<Router> { /** * get the router url. * * @return url */ URL getUrl(); /** * route. * * @param invokers * @param url refer url * @param invocation * @return routed invokers * @throws RpcException */ <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
接口中提供的route方法經過必定的規則過濾出invokers的一個子集;提供了三個實現類:ScriptRouter,ConditionRouter和MockInvokersSelector
ScriptRouter:腳本路由規則支持 JDK 腳本引擎的全部腳本,好比:javascript, jruby, groovy 等,經過type=javascript參數設置腳本類型,缺省爲javascript;
ConditionRouter:基於條件表達式的路由規則,如:host = 10.20.153.10 => host = 10.20.153.11;=> 以前的爲消費者匹配條件,全部參數和消費者的 URL 進行對比,=> 以後爲提供者地址列表的過濾條件,全部參數和提供者的 URL 進行對比;
MockInvokersSelector:是否被配置爲使用mock,此路由器保證只有具備協議MOCK的調用者出如今最終的調用者列表中,全部其餘調用者將被排除;
下面重點看一下ScriptRouter源碼
public ScriptRouter(URL url) { this.url = url; String type = url.getParameter(Constants.TYPE_KEY); this.priority = url.getParameter(Constants.PRIORITY_KEY, 0); String rule = url.getParameterAndDecoded(Constants.RULE_KEY); if (type == null || type.length() == 0) { type = Constants.DEFAULT_SCRIPT_TYPE_KEY; } if (rule == null || rule.length() == 0) { throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule)); } ScriptEngine engine = engines.get(type); if (engine == null) { engine = new ScriptEngineManager().getEngineByName(type); if (engine == null) { throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule)); } engines.put(type, engine); } this.engine = engine; this.rule = rule; }
構造器分別初始化腳本引擎(engine)和腳本代碼(rule),默認的腳本引擎是javascript;看一個具體的url:
"script://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("(function route(invokers) { ... } (invokers))")
script協議表示一個腳本協議,rule後面是一段javascript腳本,傳入的參數是invokers;
(function route(invokers) { var result = new java.util.ArrayList(invokers.size()); for (i = 0; i < invokers.size(); i ++) { if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) { result.add(invokers.get(i)); } } return result; } (invokers)); // 表示當即執行方法
如上這段腳本過濾出host爲10.20.153.10,具體是如何執行這段腳本的,在route方法中:
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { try { List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers); Compilable compilable = (Compilable) engine; Bindings bindings = engine.createBindings(); bindings.put("invokers", invokersCopy); bindings.put("invocation", invocation); bindings.put("context", RpcContext.getContext()); CompiledScript function = compilable.compile(rule); Object obj = function.eval(bindings); if (obj instanceof Invoker[]) { invokersCopy = Arrays.asList((Invoker<T>[]) obj); } else if (obj instanceof Object[]) { invokersCopy = new ArrayList<Invoker<T>>(); for (Object inv : (Object[]) obj) { invokersCopy.add((Invoker<T>) inv); } } else { invokersCopy = (List<Invoker<T>>) obj; } return invokersCopy; } catch (ScriptException e) { //fail then ignore rule .invokers. logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e); return invokers; } }
首先經過腳本引擎編譯腳本,而後執行腳本,同時傳入Bindings參數,這樣在腳本中就能夠獲取invokers,而後進行過濾;最後來看一下負載均衡策略
在集羣負載均衡時,Dubbo提供了多種均衡策略,缺省爲random隨機調用,能夠自行擴展負載均衡策略;接口類以下:
@SPI(RandomLoadBalance.NAME) public interface LoadBalance { /** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
SPI定義了默認的策略爲RandomLoadBalance,提供了一個select方法,經過策略從服務列表中選擇一個invoker;dubbo默認提供了多種策略:
Random LoadBalance:隨機,按權重設置隨機機率,在一個截面上碰撞的機率高,但調用量越大分佈越均勻,並且按機率使用權重後也比較均勻,有利於動態調整提供者權重;
RoundRobin LoadBalance:輪詢,按公約後的權重設置輪詢比率;存在慢的提供者累積請求的問題,好比:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,
長此以往,全部請求都卡在調到第二臺上;
LeastActive LoadBalance:最少活躍調用數,相同活躍數的隨機,活躍數指調用先後計數差;使慢的提供者收到更少請求,由於越慢的提供者的調用先後計數差會越大;
ConsistentHash LoadBalance:一致性 Hash,相同參數的請求老是發到同一提供者;當某一臺提供者掛時,本來發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引發劇烈變更;
下面重點看一下默認的RandomLoadBalance源碼
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); }
首先計算總權重,同時檢查是否每個服務都有相同的權重;若是總權重大於0而且服務的權重都不相同,則經過權重來隨機選擇,不然直接經過Random函數來隨機;
本文圍繞Cluster層中的幾個重要的接口從上到下來分別介紹,並重點介紹了其中的某些實現類;結合官方提供的調用圖,仍是很容易理解此層的。