根據上圖,能夠看出,服務調用過程爲:html
Consumer 端的 Proxy 調用 Cluster 層選擇集羣中的某一個 Invoker(負載均衡)apache
Invoker 最終會調用 Protocol 層進行 RPC 通信(netty,tcp 長鏈接),將服務調用信息和配置信息進行傳遞c#
Provider 端 Protocol 層接收到服務調用信息後,最終會調用真實的服務實現負載均衡
經過前面 Dubbo 服務發現&引用 的學習,咱們知道,Consumer 端的調用過程大致以下:異步
執行 FailoverClusterInvoker#invoke(Invocation invocation) (負載均衡。當有多個 provider 時走這一步,沒有的話,就跳過)tcp
執行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (全部 group="provider" 的 Filter)ide
執行 DubboInvoker#invoke(Invocation inv)學習
因此,最終的通信過程是由 DubboInvoker 來完成的:編碼
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); // 選擇 ExchangeClient(通常狀況下只有一個) ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // dubbo 異步調用 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 服務調用分支 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
閱讀源碼能夠發現,獲取 Rpc 調用結果的方法是: ExchangeClient#request(inv, timeout).get()
, 最終會調用 NettyChannel#send(Object message, false) 去寫消息。而消息的編解碼是經過 com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec
來處理的。spa
// 對 Request 消息進行編碼(DubboCodec.class) protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { RpcInvocation inv = (RpcInvocation) data; out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); out.writeUTF(inv.getMethodName()); // 寫出調用方法的名稱 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); // 寫出調用方法的參數 Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); // 寫出調用的實參 } out.writeObject(inv.getAttachments()); // 寫出 attachments }
Provider 端接收到 Consumer 端的消息後,會經過 DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
處理,將消息轉化爲 RpcInvocation。最終經過預先設置好的 netty 的 handler 來將 RpcInvocation 轉化爲 Invoker 的調用。
經過 Dubbo 服務發現&引用 的學習,咱們知道,這個 handler 最終會調用 DubboProtocol#requestHandler 來處理,最終將 Invocation 轉化爲 Invoker 的調用。
調用完成後,返回結果經過DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data)
進行編碼,最終寫回到 Consumer 端:
// 對返回結果進行編碼 protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException { Result result = (Result) data; Throwable th = result.getException(); if (th == null) { Object ret = result.getValue(); if (ret == null) { out.writeByte(RESPONSE_NULL_VALUE); } else { out.writeByte(RESPONSE_VALUE); out.writeObject(ret); } } else { out.writeByte(RESPONSE_WITH_EXCEPTION); out.writeObject(th); } }
Conmuser 端接收到調用的返回數據後,會由 DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
將數據轉化成 RpcResult。
ExchangeClient#request(inv, timeout).get() 最終會調用 DefaultFuture#get() 來獲取 Response 中的返回數據。DefaultFuture#get() 使用了同步阻塞的方式來等待 provider 端的返回數據。
因爲 Invoker
是 Dubbo 領域模型中很是重要的一個概念,不少設計思路都是向它靠攏。這就使得 Invoker
滲透在整個實現代碼裏,對於剛開始接觸 Dubbo 的人,確實容易給搞混了。 下面咱們用一個精簡的圖來講明最重要的兩種 Invoker
:服務提供 Invoker
和服務消費 Invoker
:
服務提供 Invoker:DubboInvoker(默認)、HessianRpcInvoker等(視協議而定)
服務消費 Invoker:AbstractProxyInvoker