如今,你可能已經本身經過官網的教程搭建了本身的dubbo demo服務,你在執行demo的時候會發現,服務消費者只持有服務接口,你是經過@Reference註解去獲取的實現,你已經知道spring bean工廠會自動爲用戶建立代理實例,那麼dubbo爲咱們的消費者建立的代理實現是什麼呢?只要開啓idea的調試模式,你就能夠看到咱們獲得的實現實際上是:com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler。該Handler實現了InvocationHandler,InvocationHandler是JDK動態代理實現的核心接口,若是你不瞭解動態代理,那建議你本身去了解一下。
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
源碼很簡單,只有一個invoke方法,它是代理類和接口之間的橋樑。若是你再細心一點,會發現InvokerInvocationHandler中的Invoker實現類是com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker。它是dubbo invoker的默認實現,裏面封裝了服務降級等功能。到這裏,你基本已經知道消費者到底是怎麼去調用服務的了,後面你只要繼續跟着源碼調試,服務是如何和netty創建聯繫的app
消費者請求調用鏈: proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> MockClusterInvoker#invoke(Invocation) —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多個 Filter 調用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; //列舉服務列表 List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; // 服務檢查 checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { // i>0,說明第一次服務調用失敗,須要從新檢查服務列表 checkWhetherDestroyed(); //若是服務已經銷燬,拋出異常 copyinvokers = list(invocation);//從新列舉服務 // check again checkInvokers(copyinvokers, invocation);//檢查服務是否爲空 } //負載均衡獲取服務 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker);//將獲取的服務添加到已執行列表 RpcContext.getContext().setInvokers((List) invoked); try { //服務請求 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { } // 異常處理部分省略。。。 } }
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); //設置路徑和版本 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); //ExchangeClient ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { //從配置中獲取是否同步執行 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); //是否單向執行 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); //請求超時時間 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { //若是單向執行,發起調用後當即返回一個空RpcResult boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { //若是異步,調用後當即返回一個空的RpcResult ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { //不然,同步等待 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
public ResponseFuture request(Object request, int timeout) throws RemotingException { return channel.request(request, timeout); }
public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0");//版本號 req.setTwoWay(true);//雙向通訊 req.setData(request);//請求數據 DefaultFuture future = new DefaultFuture(channel, req, timeout);//Future try { channel.send(req);//Dubbo封裝的通訊Channel } catch (RemotingException e) { future.cancel(); throw e; } return future; }
public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { //數據請求 ChannelFuture future = channel.write(message);//此處的channel纔是org.jboss.netty.channel.Channel if (sent) { //等待請求結果 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }