ReferenceConfig#init方法的結尾處,調用createProxy方法,採集的參數集合做爲入參傳遞到該方法中。
java
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { ... // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }
方法主要邏輯就是
一、默認狀況下若是本地有服務暴露,則引用本地服務.
二、用戶寫死了引用的URL,指定的URL多是對點對直連地址,也多是註冊中心URL
三、經過註冊中心配置拼裝URL,List<URL> us = loadRegistries(false); 用戶配置了幾個註冊中心,就會產生幾個URLsegmentfault
無論走哪一種引用類型,都會執行下面的核心代碼服務器
invoker = refprotocol.refer(interfaceClass, url);
refprotocol是一個Protocol接口,getAdaptiveExtension返回的是一個Protocol$Adpative,在https://segmentfault.com/a/11... 該類的源碼及其生產方法。app
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
又看到了Protocol這個接口異步
@SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> var1) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException; void destroy(); }
主要任務是,暴露遠程服務、引用遠程服務、釋放協議【釋放暴露於引用服務時佔用的資源】,
dubbo支持多種協議,http,thrift,RMI等,真是經過dubbo的SPI機制,才能夠靈活的在這些協議來回切換。
咱們第二種爲例講一下核心邏輯,async
同服務暴露時的同樣,由於Dubbo的AOP機制,在得到RegistryProtocol時,會通過兩個Wrapper類的包裝ide
這個地方也不例外,可是兩個Wrapper類的ProtocolFilterWrapper,ProtocolListenerWrapper並沒有實際的業務邏輯,咱們直接跳過。ui
執行refprotocol.refer(interfaceClass, url)即執行RegistryProtocol#refer代碼。url
@SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
Registry registry = registryFactory.getRegistry(url);
在服務發佈的時候,已經講過了,其主要核心做用就是鏈接zookeeper服務器,並返回一個ZookeeperRegistry實例。spa
在RegistryProtocol#doRefer方法中,經過ZookeeperRegistry#register的執行,建立引用服務的consumers節點。建立以下節點:
/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F192.168.43.156%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D15775%26side%3Dconsumer%26timestamp%3D1525073802234
cluster.join(directory)
cluster也是一個帶有Adaptive註解的擴展類,默認實現時FailoverCluster
@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
進入FailoverCluster#join,返回FailoverClusterInvoker,一個能夠失敗轉移的Invoker,
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
FailoverClusterInvoker源碼中,它實現了父類中的一個模板子方法doInvoke。
父類AbstractClusterInvoker的invoke方法,
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
方法list(invocation)返回了List<Invoker<T>> invokers,這些Invoker就是實際與服務交互的對象,
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
咱們在構造FailoverClusterInvoker時,傳入的Directory實現類是RegistryDirectory,即AbstractDirectory#list方法。
DubboInvoker 最終Invoker執行的方法是:
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
調用invoker,白話描述就是:
將經過遠程通訊將Invocation信息傳遞給服務器端,服務器端接收到該Invocation信息後,找到對應的本地Invoker,而後經過反射執行相應的方法,將方法的返回值再經過遠程通訊將結果傳遞給客戶端。
這裏分3種狀況:
返回FailoverClusterInvoker