強烈建議查看官方文檔html
@Autowired @Qualifier("serviceImpl") private TestService testService;
這裏咱們能夠發現,testService是proxy0對象,也就是服務引用那篇裏返回的,java
@Autowired TestService testService:spring會去加載該Bean,調用到ReferenceBean.getObject獲取對象spring
-->InvokerInvocationHandler.invoke -->RpcInvocation //全部請求都會轉爲RpcInvocation -->MockClusterInvoker.invoke //1.進入集羣 -->result = this.invoker.invoke(invocation); -->AbstractClusterInvoker.invoke -->list(invocation) -->directory.list //2.進入目錄查找 從this.methodInvokerMap裏面查找一個Invoker -->AbstractDirectory.list -->doList(invocation) -->RegistryDirectory.doList //從this.methodInvokerMap裏面查找一個Invoker -->router.route //3.進入路由 -->MockInvokersSelector.route -->getNormalInvokers -->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random") -->doInvoke -->FailoverClusterInvoker.doInvoke -->select //4.進入負載均衡 -->AbstractClusterInvoker.select -->doselect //這裏,若是集羣中只有一個服務,直接返回 -->loadbalance.select -->AbstractLoadBalance.select -->doSelect -->RoundRobinLoadBalance.doSelect -->invokers.get(currentSequence % length)//取模輪循 -->Result result = invoker.invoke(invocation) -------------擴展點---------------- -->InvokerWrapper.invoke -->ListenerInvokerWrapper.invoke -->ConsumerContextFilter.invoke -->ProtocolFilterWrapper.invoke -->MonitorFilter.invoke -->ProtocolFilterWrapper.invoke -->FutureFilter.invoke -->ListenerInvokerWrapper.invoke -->AbstractInvoker.invoke //將附加消息(attachment)添加到invocation, 將帶到服務端去 ---------------擴展點--------------- -->doInvoke(invocation) -------------***------------ -->DubboInvoker.doInvoke //這裏主線程會等待,直到被喚醒,並且有返回值(通常是這種) //爲何DubboInvoker是個protocol? 由於 //registryDirectory.refreshInvoker.toInvokers:protocol.refer -------------------邏輯隔開線-------------------- -->ReferenceCountExchangeClient.request -->HeaderExchangeClient.request -----------------***------------------- -->HeaderExchangeChannel.request //建立request(自帶ID) -->AbstractPeer.send -->AbstractClient.send -->NettyChannel.send //裏面就是netty客戶端向服務端發送消息的邏輯 //channel.writeAndFlush(message) // 這裏會使用netty的worke線程池去調用 // NettyClientHandler#write
中間涉及到對數據的編碼操做apache
ExchangeCodec#encode-->encodeRequest-->DubboCodec #encodeRequestData(序列化請求參數)c#
解碼操做數組
ExchangeCodec#decode-->decodeBody-->DubboCodec -->decodeBody-->DecodeableRpcInvocation.decode併發
------------------------來到服務端---------- NettyClientHandler#write -->NettyServerHandler.channelRead //(這裏dubbo官網說的是NettyHandler#messageReceived,可是個人2.6.6版本並無進入那個方法,而是這裏寫着的write,channelRead) //服務端接收消息 -->AbstractPeer.received -->MultiMessageHandler.received -->HeartbeatHandler.received -->AllChannelHandler.received -->ChannelEventRunnable.run -->DecodeHandler.received //中間插入解碼操做(主要是針對運行時解碼) -->HeaderExchangeHandler.received ---****--- -->handleRequest(這個方法執行目標對象的目標方法) //Object result = handler.reply(channel, msg); //handle是DubboProtocol.requestHandler屬性 -->DubboProtocol.ExchangeHandler.reply -->Invoker.invoke // 執行過濾連 省略過濾鏈步驟 -->InvokerWrapper.invoke -->DelegateProviderMetaDataInvoker.invoke ----****---- -->AbstractProxyInvoker.invoke //new RpcResult(doInvoke(proxy, invocation.getMethodName(), //invocation.getParameterTypes(), invocation.getArguments())); -->wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); //JavassistProxyFactory.getInvoker.AbstractProxyInvoker.doInvoke -->getData (目標方法) -----------------邏輯分割線------------------- -->AbstractPeer.send -->NettyChannel.send //channel.writeAndFlush(message) 返回消息到客戶端 -->NettyServerHandler.write ------------------服務端執行完畢---------------------- -->NettyClientHandler.channelRead(客戶端接收消息) -->AbstractPeer.received -->MultiMessageHandler.received -->HeartbeatHandler.received -->AllChannelHandler.received -->ChannelEventRunnable.run -->DecodeHandler.received -->HeaderExchangeHandler.received //中間插入解碼操做(對返回的信息) -->handleResponse -->DefaultFuture.received -->DefaultFuture.doReceived //response = res 將返回的信息保存在DefaultFuture中 //done.signal() 喚醒DubboInvoker.doInvoke中暫停的主線程
DubboInvoker#doInvokeapp
該方法在請求時經過get方法處於while(true)等待中負載均衡
當被喚醒並且有返回值後(一般使用的這一種)繼續執行主線程dom
return (Result) currentClient.request(inv, timeout).get(); -->currentClient.request //這裏就是執行的以上全部過程(請求與相應) -->DefaultFuture.get(timeout) -->returnFromResponse // retrun response.getResult()
DubboInvoker
public class DubboInvoker<T> extends AbstractInvoker<T> { private final ExchangeClient[] clients; protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 設置 path 和 version 到 attachment 中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { // 從 clients 數組中獲取 ExchangeClient currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 獲取異步配置 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // isOneway 爲 true,表示「單向」通訊 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 異步無返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 發送請求 currentClient.send(inv, isSent); // 設置上下文中的 future 字段爲 null RpcContext.getContext().setFuture(null); // 返回一個空的 RpcResult return new RpcResult(); } // 異步有返回值 else if (isAsync) { // 發送請求,並獲得一個 ResponseFuture 實例 ResponseFuture future = currentClient.request(inv, timeout); // 設置 future 到上下文中 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 暫時返回一個空結果 return new RpcResult(); } // 同步調用 else { RpcContext.getContext().setFuture(null); // 發送請求,獲得一個 ResponseFuture 實例,並調用該實例的 get 方法進行等待 return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(..., "Invoke remote method timeout...."); } catch (RemotingException e) { throw new RpcException(..., "Failed to invoke remote method: ..."); } } // 省略其餘方法 }
問題: 通常狀況下,服務消費方會併發調用多個服務,每一個用戶線程發送請求後,會調用不一樣 DefaultFuture 對象的 get 方法進行等待。 一段時間後,服務消費方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每一個響應對象傳遞給相應的 DefaultFuture 對象,且不出錯??
看到DubboInvoker.doInvoke中的同步有返回值的一段代碼
return (Result) currentClient.request(inv, timeout).get();
get方法會等待被喚醒同時有返回結果
看到HeaderExchangeChannel#request方法
@Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException("is closed!"); } // 建立一個request對象,默認賦值了一個ID Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } //直接返回DefaultFuture return future; }
這裏就直接返回了DefaultFuture ,對應是上面的currentClient.request, get方法就是調用的DefaultFuture .get
channel.send後面的調用會爲每次調用開啓不一樣的線程
在請求時會將請求參數序列化到服務端
服務端接到請求後,會還原Request對象
ExchangeCodec.ExchangeCodec 服務端解碼
// decode request. Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0); if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { data = decodeEventData(channel, in); } else { data = decodeRequestData(channel, in); } req.setData(data); } catch (Throwable t) { // bad request req.setBroken(true); req.setData(t); } return req;
在服務端調用目標方法完畢後會將請求返回的結果和id設置給Reponse對象
HeaderExchangeHandler.handleRequest
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { //給response設置id(該ID是請求時Request中的ID) Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // 請求方法的參數 Object msg = req.getData(); try { // result: 調用目標方法返回的結果 Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
最後來到服務端調用完後回到客戶端調用DefaultFuture.received
public static void received(Channel channel, Response response) { try { // FUTURES保存着每次調用後返回的DefaultFuture對象,key是生成Request生成時的ID // 這裏用response.getId()去獲取,由於Request對應的Response有相同的ID DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { // 喚醒對應線程的DefaultFuture對象 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
這裏就喚醒了Response對應的DefaultFuture對象,一個請求的響應就完成了,過程頗爲複雜。
在dubbo官網對服務調用用很是詳細的講解!!