該模塊的使用介紹請參考dubbo官方用戶手冊以下章節內容。java
其中註冊中心實際上是對於目錄服務的一種實現方式,本文不會對註冊中心進行詳細講解。算法
各節點關係:express
因爲每種接口都有多種實現類,篇幅和時間有限,咱們選擇其中最爲典型的一種來進行源碼分析。apache
集羣的源碼以下。api
package com.alibaba.dubbo.rpc.cluster; import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.support.FailoverCluster; /** * Cluster. (SPI, Singleton, ThreadSafe) * * <a href="http://en.wikipedia.org/wiki/Computer_cluster">Cluster</a> * <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a> * * @author william.liangf */ @SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
該接口只有一個方法,就是將directory對象中的多個invoker的集合整合成一個invoker對象。該方法被ReferenceConfig類的createProxy方法調用,調用它的代碼以下。緩存
// 對有註冊中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));
Cluster內置有9個擴展實現類,都實現了不一樣的集羣容錯策略,咱們只分析默認的自動故障轉移的擴展實現FailoverCluster。服務器
源碼以下,只是構造了一個類型爲FailoverClusterInvoker的invoker對象。app
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
咱們進入看看FailoverClusterInvoker的源碼。
負載均衡
/** * 失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲。 * * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a> * * @author william.liangf * @author chao.liuc */ public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @SuppressWarnings({ "unchecked", "rawtypes" }) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行從新選擇,避免重試時invoker列表已發生變化. //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變 if (i > 0) { checkWheatherDestoried(); copyinvokers = list(invocation); //從新檢查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
該類又繼承自抽象實現類AbstractClusterInvoker,使用該類的一些方法,所以也要結合該類的源碼一塊兒看。less
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.support; import java.util.ArrayList; import java.util.List; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.Version; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Directory; import com.alibaba.dubbo.rpc.cluster.LoadBalance; import com.alibaba.dubbo.rpc.support.RpcUtils; /** * AbstractClusterInvoker * * @author william.liangf * @author chao.liuc */ public abstract class AbstractClusterInvoker<T> implements Invoker<T> { private static final Logger logger = LoggerFactory .getLogger(AbstractClusterInvoker.class); protected final Directory<T> directory; protected final boolean availablecheck; private volatile boolean destroyed = false; private volatile Invoker<T> stickyInvoker = null; public AbstractClusterInvoker(Directory<T> directory) { this(directory, directory.getUrl()); } public AbstractClusterInvoker(Directory<T> directory, URL url) { if (directory == null) throw new IllegalArgumentException("service directory == null"); this.directory = directory ; //sticky 須要檢測 avaliablecheck this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK) ; } public Class<T> getInterface() { return directory.getInterface(); } public URL getUrl() { return directory.getUrl(); } public boolean isAvailable() { Invoker<T> invoker = stickyInvoker; if (invoker != null) { return invoker.isAvailable(); } return directory.isAvailable(); } public void destroy() { directory.destroy(); destroyed = true; } /** * 使用loadbalance選擇invoker.</br> * a)先lb選擇,若是在selected列表中 或者 不可用且作檢驗時,進入下一步(重選),不然直接返回</br> * b)重選驗證規則:selected > available .保證重選出的結果儘可能不在select中,而且是可用的 * * @param availablecheck 若是設置true,在選擇的時候先選invoker.available == true * @param selected 已選過的invoker.注意:輸入保證不重複 * */ protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ; { //ignore overloaded method if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){ stickyInvoker = null; } //ignore cucurrent problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){ if (availablecheck && stickyInvoker.isAvailable()){ return stickyInvoker; } } } Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); if (sticky){ stickyInvoker = invoker; } return invoker; } private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); // 若是隻有兩個invoker,退化成輪循 if (invokers.size() == 2 && selected != null && selected.size() > 0) { return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //若是 selected中包含(優先判斷) 或者 不可用&&availablecheck=true 則重試. if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){ try{ Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if(rinvoker != null){ invoker = rinvoker; }else{ //看下第一次選的位置,若是不是最後,選+1位置. int index = invokers.indexOf(invoker); try{ //最後在避免碰撞 invoker = index <invokers.size()-1?invokers.get(index+1) :invoker; }catch (Exception e) { logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e); } } }catch (Throwable t){ logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t); } } return invoker; } /** * 重選,先從非selected的列表中選擇,沒有在從selected列表中選擇. * @param loadbalance * @param invocation * @param invokers * @param selected * @return * @throws RpcException */ private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck) throws RpcException { //預先分配一個,這個列表是必定會用到的. List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size()); //先從非select中選 if( availablecheck ){ //選isAvailable 的非select for(Invoker<T> invoker : invokers){ if(invoker.isAvailable()){ if(selected ==null || !selected.contains(invoker)){ reselectInvokers.add(invoker); } } } if(reselectInvokers.size()>0){ return loadbalance.select(reselectInvokers, getUrl(), invocation); } }else{ //選所有非select for(Invoker<T> invoker : invokers){ if(selected ==null || !selected.contains(invoker)){ reselectInvokers.add(invoker); } } if(reselectInvokers.size()>0){ return loadbalance.select(reselectInvokers, getUrl(), invocation); } } //最後從select中選可用的. { if(selected != null){ for(Invoker<T> invoker : selected){ if((invoker.isAvailable()) //優先選available && !reselectInvokers.contains(invoker)){ reselectInvokers.add(invoker); } } } if(reselectInvokers.size()>0){ return loadbalance.select(reselectInvokers, getUrl(), invocation); } } return null; } public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected void checkWheatherDestoried() { if(destroyed){ throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is now destroyed! Can not invoke any more."); } } @Override public String toString() { return getInterface() + " -> " + getUrl().toString(); } protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) { if (invokers == null || invokers.size() == 0) { throw new RpcException("Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey() + " from registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Please check if the providers have been started and registered."); } } protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; } }
源碼實現分析。
負載均衡器
@SPI(RandomLoadBalance.NAME) public interface LoadBalance { /** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
上述源碼所示,負載均衡只定義了一個方法,就是在候選的invokers中選擇一個invoker對象出來。默認的擴展實現是random。那我麼就分析RandomLoadBalance的源碼。
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.loadbalance; import java.util.List; import java.util.Random; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; /** * random load balance. * * @author qianlei * @author william.liangf */ public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 總個數 int totalWeight = 0; // 總權重 boolean sameWeight = true; // 權重是否都同樣 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // 累計總權重 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; // 計算全部權重是否同樣 } } if (totalWeight > 0 && ! sameWeight) { // 若是權重不相同且權重大於0則按總權重數隨機 int offset = random.nextInt(totalWeight); // 並肯定隨機值落在哪一個片段上 for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 若是權重相同或權重爲0則均等隨機 return invokers.get(random.nextInt(length)); } }
該類繼承了抽象類AbstractLoadBalance,所以咱們也要結合該類一塊兒分析。
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.loadbalance; import java.util.List; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.cluster.LoadBalance; /** * AbstractLoadBalance * * @author william.liangf */ public abstract class AbstractLoadBalance implements LoadBalance { public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); } protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation); protected int getWeight(Invoker<?> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L); if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) ); return ww < 1 ? 1 : (ww > weight ? weight : ww); } }
源碼分析以下:
public interface Router extends Comparable<Router> { /** * get the router url. * * @return url */ URL getUrl(); /** * route. * * @param invokers * @param url refer url * @param invocation * @return routed invokers * @throws RpcException */ <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
路由器就定義了上述2個方法,核心方法是route,從大的invoker列表結合中根據規則過濾出一個子集合。咱們這裏只分析實現類ConditionRouter的源碼。
/* * Copyright 1999-2012 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.router.condition; import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.common.utils.UrlUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Router; /** * ConditionRouter * * @author william.liangf */ public class ConditionRouter implements Router, Comparable<Router> { private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class); private final URL url; private final int priority; private final boolean force; private final Map<String, MatchPair> whenCondition; private final Map<String, MatchPair> thenCondition; public ConditionRouter(URL url) { this.url = url; this.priority = url.getParameter(Constants.PRIORITY_KEY, 0); this.force = url.getParameter(Constants.FORCE_KEY, false); try { String rule = url.getParameterAndDecoded(Constants.RULE_KEY); if (rule == null || rule.trim().length() == 0) { throw new IllegalArgumentException("Illegal route rule!"); } rule = rule.replace("consumer.", "").replace("provider.", ""); int i = rule.indexOf("=>"); String whenRule = i < 0 ? null : rule.substring(0, i).trim(); String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim(); Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule); Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: When條件是容許爲空的,外部業務來保證相似的約束條件 this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } } public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (invokers == null || invokers.size() == 0) { return invokers; } try { if (! matchWhen(url)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } for (Invoker<T> invoker : invokers) { if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (result.size() > 0) { return result; } else if (force) { logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY)); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers; } public URL getUrl() { return url; } public int compareTo(Router o) { if (o == null || o.getClass() != ConditionRouter.class) { return 1; } ConditionRouter c = (ConditionRouter) o; return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1); } public boolean matchWhen(URL url) { return matchCondition(whenCondition, url, null); } public boolean matchThen(URL url, URL param) { return thenCondition != null && matchCondition(thenCondition, url, param); } private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param) { Map<String, String> sample = url.toMap(); for (Map.Entry<String, String> entry : sample.entrySet()) { String key = entry.getKey(); MatchPair pair = condition.get(key); if (pair != null && ! pair.isMatch(entry.getValue(), param)) { return false; } } return true; } private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)"); private static Map<String, MatchPair> parseRule(String rule) throws ParseException { Map<String, MatchPair> condition = new HashMap<String, MatchPair>(); if(StringUtils.isBlank(rule)) { return condition; } // 匹配或不匹配Key-Value對 MatchPair pair = null; // 多個Value值 Set<String> values = null; final Matcher matcher = ROUTE_PATTERN.matcher(rule); while (matcher.find()) { // 逐個匹配 String separator = matcher.group(1); String content = matcher.group(2); // 表達式開始 if (separator == null || separator.length() == 0) { pair = new MatchPair(); condition.put(content, pair); } // KV開始 else if ("&".equals(separator)) { if (condition.get(content) == null) { pair = new MatchPair(); condition.put(content, pair); } else { condition.put(content, pair); } } // KV的Value部分開始 else if ("=".equals(separator)) { if (pair == null) throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); values = pair.matches; values.add(content); } // KV的Value部分開始 else if ("!=".equals(separator)) { if (pair == null) throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); values = pair.mismatches; values.add(content); } // KV的Value部分的多個條目 else if (",".equals(separator)) { // 若是爲逗號表示 if (values == null || values.size() == 0) throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); values.add(content); } else { throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); } } return condition; } private static final class MatchPair { final Set<String> matches = new HashSet<String>(); final Set<String> mismatches = new HashSet<String>(); public boolean isMatch(String value, URL param) { for (String match : matches) { if (! UrlUtils.isMatchGlobPattern(match, value, param)) { return false; } } for (String mismatch : mismatches) { if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) { return false; } } return true; } } }
該源碼實現了以下條件路由器功能。
基於條件表達式的路由規則,如:
|
規則:
表達式:
public interface Directory<T> extends Node { /** * get service type. * * @return service type. */ Class<T> getInterface(); /** * list invokers. * * @return invokers */ List<Invoker<T>> list(Invocation invocation) throws RpcException; }
目錄服務定義了一個核心接口list,就是列舉出某個接口在目錄中的全部服務列表。
提供了一個抽象的目錄實現類,源碼以下。
/** * 增長router的Directory * * @author chao.liuc */ public abstract class AbstractDirectory<T> implements Directory<T> { // 日誌輸出 private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class); private final URL url ; private volatile boolean destroyed = false; private volatile URL consumerUrl ; private volatile List<Router> routers; public AbstractDirectory(URL url) { this(url, null); } public AbstractDirectory(URL url, List<Router> routers) { this(url, url, routers); } public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) { if (url == null) throw new IllegalArgumentException("url == null"); this.url = url; this.consumerUrl = consumerUrl; setRouters(routers); } public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed){ throw new RpcException("Directory already destroyed .url: "+ getUrl()); } List<Invoker<T>> invokers = doList(invocation); List<Router> localRouters = this.routers; // local reference if (localRouters != null && localRouters.size() > 0) { for (Router router: localRouters){ try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; } public URL getUrl() { return url; } public List<Router> getRouters(){ return routers; } public URL getConsumerUrl() { return consumerUrl; } public void setConsumerUrl(URL consumerUrl) { this.consumerUrl = consumerUrl; } protected void setRouters(List<Router> routers){ // copy list routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers); // append url router String routerkey = url.getParameter(Constants.ROUTER_KEY); if (routerkey != null && routerkey.length() > 0) { RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey); routers.add(routerFactory.getRouter(url)); } // append mock invoker selector routers.add(new MockInvokersSelector()); Collections.sort(routers); this.routers = routers; } public boolean isDestroyed() { return destroyed; } public void destroy(){ destroyed = true; } protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException ; }
list方法的實現邏輯是:先檢查目錄是否銷燬狀態,若已經銷燬則拋出異常;調用抽象方法doList實現真正的從目錄服務中獲取invoker列表,該方法須要子類實現;循環對象中的路由器列表,若路由器url爲null或者參數runtime爲true則調用該路由器的route方法進行路由,將返回的invoker列表替換爲路由後的結果; 返回最終的invoker列表。
setRouters方法是設置路由器列表,除了參數參入的routers以外,還會追加2個默認的路由器,一個是參數router指定的routerFactory得到的router,另一個是MockInvokersSelector對象;
模塊還提供了一個默認目錄實現類StaticDirectory,它是一個靜態的內存緩存目錄服務實現。源碼以下:
public class StaticDirectory<T> extends AbstractDirectory<T> { private final List<Invoker<T>> invokers; public StaticDirectory(List<Invoker<T>> invokers){ this(null, invokers, null); } public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers){ this(null, invokers, routers); } public StaticDirectory(URL url, List<Invoker<T>> invokers) { this(url, invokers, null); } public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) { super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers); if (invokers == null || invokers.size() == 0) throw new IllegalArgumentException("invokers == null"); this.invokers = invokers; } public Class<T> getInterface() { return invokers.get(0).getInterface(); } public boolean isAvailable() { if (isDestroyed()) { return false; } for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return true; } } return false; } public void destroy() { if(isDestroyed()) { return; } super.destroy(); for (Invoker<T> invoker : invokers) { invoker.destroy(); } invokers.clear(); } @Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { return invokers; } }
它的doList方法的實現是直接將屬性invokers的值返回,很是簡單。
此外還有一個RegistryDirectory的實現類,該類是整合了註冊中心和目錄服務。
由於考慮到本模塊與dubbo-registry相關性較大,接下來咱們將研究dubbo-registry-api和dubbo-registry-default模塊的源碼。