首先看來自官方文檔java
這裏的Invoker是Provider的一個可調用Service的抽象,Invoker封裝了Provider地址及Service接口信息。算法
Directory表明多個Invoker,能夠把它當作List<Invoker>,但與List不一樣的是,它的值多是動態變化的,好比註冊中心推送變動。api
Cluster將Directory中的多個Invoker假裝成一個Invoker,對上層透明,假裝過程包含了容錯邏輯,調用失敗後,重試另外一個。安全
Router負責從多個Invoker中按路由規則選出子集,好比讀寫分離,應用隔離等。服務器
LoadBalance負責從多個Invoker中選出具體的一個用於本次調用,選的過程包含了負載均衡算法,調用失敗後,須要重選。
負載均衡
之前本身看到上面的文字,理解不太深。隨着對分佈式系統的深刻接觸,以及DUBBO源碼的研究,纔有了更精確的理解。總結一句話:閱讀技術類文檔時,對一些語言的理解,受到閱讀者自身狀況的限制,每每理解有深有淺。閒話少說。本文,就是針對圖中的組件,一個個進行剖析。異步
1.Invoker
async
Invoker 是Provider的一個可調用Service的抽象,Invoker封裝了Provider地址及Service接口信息。分佈式
1.1 API
ide
public interface Invoker<T> extends Node { /** * get service interface. * @return service interface. */ Class<T> getInterface(); /** * invoke. * @param invocation * @return result * @throws RpcException */ Result invoke(Invocation invocation) throws RpcException; }
1.2 層次樹結構
分析可知:主要有2個分支
1.AbstractInvoker:主要具體遠程實現,和RPC 協議有關,屬於dubbo-rpc-api範疇。
2.AbstractClusterInvoker:主要邏輯包括Invoker的選擇,和高可用有關,屬於dubbo-cluster範疇。
1.3 AbstractInvoker VS AbstractClusterInvoker
1.3.1 類圖比較
經過下圖能夠很容易發現:AbstractClusterInvoker比較AbstractInvoker,多了些select,doselect,reselect方法。這些方法的做用也能夠猜到。
1.3.2 以兩個實現類對比
選擇AbstractInvoker的子類DubboInvoker,
和AbstractClusterInvoker的子類FailoverClusterInvoker爲表明。
其中FailoverClusterInvoker是dubbo集羣容錯默認方案,Dubbo協議爲默認協議。
先看下AbstractInvoker分支
public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); Map<String, String> context = RpcContext.getContext().getAttachments(); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception } catch (RpcException e) { } catch (Throwable e) { } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; .. } public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); logger.debug("doInvoke start -----------------------------"); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }finally { logger.debug("doInvoke end-----------------------------"); } } ... }
這裏能夠很清晰地看到,遠程調用分三種狀況:
1. 不須要返回值的調用(所謂oneWay)
2. 異步(async)
3. 同步
對於第一種狀況,客戶端只管發請求就完了,不考慮返回結果。
對於第二種狀況,客戶端除了發請求,還須要將結果塞到一個ThreadLocal變量中,以便於客戶端get返回值
對於第三種狀況,客戶端除了發請求,還會同步等待返回結果
再看下AbstractClusterInvoker分支
public abstract class AbstractClusterInvoker<T> implements Invoker<T> { public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; } protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; .. } public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行從新選擇,避免重試時invoker列表已發生變化. //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變 if (i > 0) { checkWheatherDestoried(); copyinvokers = list(invocation); //從新檢查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } } ... }
總結下AbstractClusterInvoker和AbstractClusterInvoker都須要實現Invoker接口,二者都聲明瞭doInvoke抽象方法。但方法定義有區別。
protected abstract Result doInvoke(Invocation invocation) throws Throwable; protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;
AbstractClusterInvoker 須要一個Invoker列表,他們來自Directory,LoadBalance 能夠能夠理解爲負載均衡策略。
二者的聯繫:AbstractClusterInvoker 比AbstractInvoker多了一些選擇和負載均衡部分,到最後仍是會調用AbstractInvoker分支,負責具體RPC調用工做。
1.4 常見容錯方案對比
Feature |
優勢 |
缺點 |
實現類 |
Failover Cluster |
失敗自動切換,當出現失敗,重試其它服務器,一般用於讀操做(推薦使用) |
重試會帶來更長延遲 |
FailoverClusterInvoker |
Failfast Cluster |
快速失敗,只發起一次調用,失敗當即報錯,一般用於非冪等性的寫操做 |
若是有機器正在重啓,可能會出現調用失敗 |
FailfastClusterInvoker |
Failsafe Cluster |
失敗安全,出現異常時,直接忽略,一般用於寫入審計日誌等操做 |
調用信息丟失 |
FailsafeClusterInvoker |
Failback Cluster |
失敗自動恢復,後臺記錄失敗請求,定時重發,一般用於消息通知操做 |
不可靠,重啓丟失 |
FailbackClusterInvoker |
Forking Cluster |
並行調用多個服務器,只要一個成功即返回,一般用於實時性要求較高的讀操做 |
須要浪費更多服務資源 |
ForkingClusterInvoker |
Broadcast Cluster |
廣播調用全部提供者,逐個調用,任意一臺報錯則報錯,一般用於更新提供方本地狀態 |
速度慢,任意一臺報錯則報錯 |
BroadcastClusterInvoker |
2. 總結
*ClusterInvoker 分別使用Router,和Directory獲取Invoker列表。
*ClusterInvoker而後再Invoker列表中,藉助LoadBalance提供的負載均衡策略,返回一個可用的Invoker.
*Directory表明多個Invoker,能夠理解爲Invoker的邏輯集合,負責Invoker的下線,上線。
Router和Directory二者都對我提供Invoker列表。經過提供的API,很容易找出區別來。
//Directory List<Invoker<T>> list(Invocation invocation) throws RpcException; //Route <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
Directory:返回的Invoker列表,表明當前正常對外提供服務的Invoker列表。重點在維護可用節點列表。
Route:返回的Invoker列表,是在現有可用的Invoker列表裏,按照規則進行再篩選,重點在規則匹配,過濾等。