ReferenceBean
跟ServiceBean
同樣,都實現了InitializingBean
的接口,天然也會調用afterPropertiesSet
方法在,這個方法和dubbo的啓動過程(二)--服務方屬性配置雷同,這邊就不繼續了。除了InitializingBean
這個接口,他還繼承了FactoryBean
接口,實例化的時候,就會調用getObject
方法,例子看spring學習之FactoryBean,源碼見doGetObjectFromFactoryBean。spring
public Object getObject() { return get(); } public synchronized T get() { // 各類配置的更新 checkAndUpdateSubConfigs(); if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); } if (ref == null) { init(); } return ref; }
// 以上代碼跟服務方雷同,省略 ref = createProxy(map); String serviceKey = URL.buildKey(interfaceName, group, version); ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes)); initialized = true;
在createProxy
方法中,會調用RegistryProtocol
的refer
方法,這裏先略過ProtocolFilterWrapper
、ProtocolListenerWrapper
、QosProtocolWrapper
。segmentfault
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 獲取註冊中心 url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // 關聯引用 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 把消費方信息註冊到註冊中心 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); // 獲取invoker並緩存 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
cluster.join(directory)這個代碼,會執行包裝類MockClusterWrapper的join方法:緩存
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
因此invoke是MockClusterInvoker。app
在createProxy
方法中,invoker建立完了,就開始建立代理,代理類中,經過newInstance的構造參數,把InvokerInvocationHandler傳入到成員變量中,後面調用的方法,就是經過這個類的invoke方法進行反射的。負載均衡
// 調用下面的JavassistProxyFactory的getProxy方法 return (T) PROXY_FACTORY.getProxy(invoker); // 注意傳入的是InvokerInvocationHandler public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
當執行方法的時候,就會執行InvokerInvocationHandler的invoke方法:異步
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // Object直接調用返回 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // toString、hashCode、equals返回響應的invoke方法 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 這個invoke就是MockClusterInvoker,RpcInvocation傳入方法參數信息 return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
MockClusterInvoker#invokeasync
public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 是否配置了mock String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); // 。。。省略部分代碼。若是強制調用mock,調用本地的mock result = doMockInvoke(invocation, null); // 沒有mock,調用FailoverClusterInvoker的方法 result = this.invoker.invoke(invocation); // 。。。省略 return result; }
這邊會調用FailoverClusterInvoker的父類AbstractClusterInvoker#invokeide
public Result invoke(final Invocation invocation) throws RpcException { // 檢查是否被Destroyed checkWhetherDestroyed(); // binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 獲取Invoker列表 List<Invoker<T>> invokers = list(invocation); // 負載均衡 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // FailoverClusterInvoker#doInvoke return doInvoke(invocation, invokers, loadbalance); }
FailoverClusterInvoker#doInvoke
失敗重試、負載均衡這裏設置oop
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; // 檢查invokers是否爲空 checkInvokers(copyInvokers, invocation); // 獲取方法名 String methodName = RpcUtils.getMethodName(invocation); // 失敗重試次數 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. // 最後一次exception RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { // 檢查是否Destroyed checkWhetherDestroyed(); // 獲取Invokers copyInvokers = list(invocation); // check again 檢查invokers是否爲空 checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // InvokerWrapper#invoke Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { // 。。。。 } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 。。。。 }
InvokerWrapper#invoke:
這個invoker是ProtocolFilterWrapper,在這個invoke中,會調用ProtocolFilterWrapper&CallbackRegistrationInvoker#invoke-->ConsumerContextFilter#invoke-->FutureFilter#invoke-->MonitorFilter#invoke-->ListenerInvokerWrapper#invoke-->AsyncToSyncInvoker#invoke-->DubboInvoker#invoke學習
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
ConsumerContextFilter#invoke:
在上下文設置invoker、invocation、LocalAddress、RemoteAddress、RemoteApplicationName、Attachment,再把invoker設置到invocation中,執行完下一個invoke後,再清空上下文信息。
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()) .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY)) .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY)); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { RpcContext.removeServerContext(); return invoker.invoke(invocation); } finally { RpcContext.removeContext(); } }
FutureFilter#invoke:
主要是設置回調的
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { fireInvokeCallback(invoker, invocation); // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. return invoker.invoke(invocation); }
MonitorFilter#invoke:
接口對應的方法調用的次數。
key爲:接口全限定命.方法
value是AtomicInteger類型,記錄調用次數
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain } // concurrent counter private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) { String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); AtomicInteger concurrent = concurrents.get(key); if (concurrent == null) { concurrents.putIfAbsent(key, new AtomicInteger()); concurrent = concurrents.get(key); } return concurrent; }
ListenerInvokerWrapper#invoke:
直接調用AsyncToSyncInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
AsyncToSyncInvoker#invoke:
若是是同步,就阻塞等待返回,若是是異步,就直接返回
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { // 同步就阻塞等待返回 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult; }
AbstractInvoker#invoke
public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; // 傳入當前的dubboInvoker invocation.setInvoker(this); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addAttachments(contextAttachments); } // 包括FUTURE、ASYNC、SYNC invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); // Async時設置ID RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } }
DubboInvoker#doInvoke
遠程調用接口,獲取數據
protected Result doInvoke(final Invocation invocation) throws Throwable { // 給inv設置路徑和版本信息 RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); // 獲取一個ExchangeClient ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否爲Oneway boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); asyncRpcResult.subscribeTo(responseFuture); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(responseFuture); return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }