小弟一直苦思 consumer 與provider 究竟是怎麼通訊的呢,與是從網上找了一篇,以爲寫得很靠譜。本身就算總結,也未必有這個好,因此記錄下來!!java
消費者調用流程涉及到消費者端和生產者端的交互,因此將分爲三個部分來說解,分別是
-消費者發起調用請求
-生產者響應調用請求
-消費者獲取調用結果api
消費者發起調用請求app
以前文章中講過消費者初始化時最後返回的是一個InvokerInvocationHandler
的代理對象,根據動態代理的原理,DUBBO接口的方法調用都會由invoke
方法代理,咱們來看一下其實現負載均衡
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
正常狀況下的方法調用會走invoker.invoke(new RpcInvocation(method, args)).recreate()
這個分支,首先來看new RpcInvocation(method, args)
dom
public RpcInvocation(Method method, Object[] arguments) { this(method.getName(), method.getParameterTypes(), arguments, null, null); } public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) { this.methodName = methodName; this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes; this.arguments = arguments == null ? new Object[0] : arguments; this.attachments = attachments == null ? new HashMap<String, String>() : attachments; this.invoker = invoker; }
很是簡單的一個初始化賦值操做,就不作過多講解了,接着回頭看invoker.invoke(new RpcInvocation(method, args))
方法,這裏的invoker
以前也說過了,是一個經過SPI機制生成的對象,以默認設置的參數failover
爲例,這裏的invoker
就是一個MockClusterInvoker
對象中包含了一個FailoverClusterInvoker
對象引用的相似鏈式的對象,那麼咱們來詳細看看MockClusterInvoker
的invoke
方法異步
public Result invoke(Invocation invocation) throws RpcException { Result result = null; //獲取mock屬性的值,沒有配置,默認false String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")){ //no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); }catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }
當沒有配置mock
值時,value
值獲得的是默認值false
,會去執行result = this.invoker.invoke(invocation)
,this.invoker
剛纔提到過了是一個FailoverClusterInvoker
類型的對象,但該對象並無實現invoke
方法,實際上該方法是繼承自父類AbstractClusterInvoker
的,來看一下ide
public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } //異步操做默認添加invocation id RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
這裏的list(invocation)
方法根據invocation
中的參數來獲取全部的invoker
列表,就不深刻講了,接着來看loadbalance
對象的生成,loadbalance
對象根據SPI機制生成,具體實現由loadbalance
參數決定,也就是具體的負載均衡策略,DUBBO提供的實現有random
、roundrobin
、leastactive
、consistenthash
四種,其中沒有根據服務端負載進行調節的策略。其中默認實現爲random
,生成的loadbalance
就是一個RandomLoadBalance
的對象。本次只分析同步的接口調用方式,跳過RpcUtils.attachInvocationIdIfAsync
,接着看doInvoke(invocation, invokers, loadbalance)方法,該方法實如今FailoverClusterInvoker
中oop
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; //檢查invokers是否爲空 checkInvokers(copyinvokers, invocation); //獲取重試次數 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行從新選擇,避免重試時invoker列表已發生變化. //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變 if (i > 0) { checkWheatherDestoried(); //得到InvokerWrapper的List copyinvokers = list(invocation); //從新檢查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn(""); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException /** * 略去部分代碼 */ }
這裏select(loadbalance, invocation, copyinvokers, invoked)
方法根據傳入的loadbalance
對象挑選出一個執行用的invoker
,裏面調用鏈較深,在此不作詳細分析。最終將經過invoker.invoke(invocation)
進行調用並返回一個Result
類型的對象,也就是最終的執行結果,這裏的invoker
對象是InvokerWrapper
的實例,該實例引用了一個ListenerInvokerWrapper
的實例,接着又鏈式引用了AbstractInvoker
的實例,所以最終執行的invoke
方法在AbstractInvoker
中,來看一下ui
public Result invoke(Invocation inv) throws RpcException { if(destroyed) { throw new RpcException("略"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> context = RpcContext.getContext().getAttachments(); if (context != null) { invocation.addAttachmentsIfAbsent(context); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){ invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } //異步操做默認添加invocation id RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception /** * 略去部分代碼 */ } }
這裏的關鍵方法是doInvoke(invocation)
,其實如今具體的Invoker
實現類中,這裏咱們採用的是默認的dubbo協議,因此實現類爲DubboInvoker
,來看看其doInvoke
方法this
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; //消費者初始化時與服務端創建的鏈接 if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } /** * 略去部分代碼 */ }
這裏的isOneway
和isAsync
兩個標誌位分別區分單向調用(不在意調用結果)和異步調用,這裏咱們分析同步調用的流程,這裏的currentClient
是一個ReferenceCountExchangeClient
類型的對象
public ResponseFuture request(Object request) throws RemotingException { return client.request(request); }
這裏的client
是一個HeaderExchangeClient
類型的對象,
public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); }
這裏的channel
是一個HeaderExchangeChannel
類型的對象,繼續跟進去
public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; }
這裏的request
方法本身又進行了一次內部調用,能夠看到具體實現時建立了一個DefaultFuture
對象而且經過channel.send(req)
方法發送請求到生產者端,這裏不作具體深刻了。接着咱們跳回DubboInvoker
類doInvoke
方法中的currentClient.request(inv, timeout).get()
,這裏是否是和jdk中future的用法很像,事實上這裏也確實是經過get
方法的調用將線程阻塞在這裏等待結果,從而將異步調用轉化爲同步。爲了證明這個想法,咱們來看看DefaultFuture
的get
方法
public Object get() throws RemotingException { return get(timeout); } public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
從done.await(timeout, TimeUnit.MILLISECONDS)
能夠看到這裏不只是等待isDone()
這個狀態位,同時還有超時時間的限制。isDone()
判斷的是什麼,來看一下
public boolean isDone() { return response != null; }
判斷response
對象是否爲空,那麼後面的流程其實不難猜,生產者處理完結果會來填充response
。
生產者響應調用請求
生產者開啓了端口監聽,消息的解碼由Netty處理,解碼後交由NettyHandler
的messageReceived
方法進行業務處理,來看一下
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
先來看一下NettyChannel.getOrAddChannel
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } NettyChannel ret = channelMap.get(ch); if (ret == null) { NettyChannel nc = new NettyChannel(ch, url, handler); if (ch.isConnected()) { ret = channelMap.putIfAbsent(ch, nc); } if (ret == null) { ret = nc; } } return ret; }
主要是從channelMap
中獲取對應的NettyChannel
,接着回到NettyHandler
的messageReceived
方法來看handler.received(channel, e.getMessage())
,這裏的handler
是一個NettyServer
的實例,但它自己沒有實現received
方法,該方法要追溯到它的父類的父類的父類(真的就是這麼長的繼承關係。。。)AbstractPeer
中,來看一下
public void received(Channel ch, Object msg) throws RemotingException { if (closed) { return; } handler.received(ch, msg); }
這裏的handler
是MultiMessageHandler
對象的實例,來看一下其received
方法的實現
@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage)message; for(Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } }
這裏的handler
又是HeartbeatHandler
類的實例
public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if(logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug( new StringBuilder(32) .append("Receive heartbeat response in thread ") .append(Thread.currentThread().getName()) .toString()); } return; } handler.received(channel, message); }
由於不是心跳類的消息,因此執行handler.received(channel, message)
繼續這個調用鏈,這裏的handler
是AllChannelHandler
類型的
public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
這裏終於結束了調用鏈,轉而啓動了一個線程池來執行任務,那咱們來看看具體的任務線程ChannelEventRunnable
中到底須要執行什麼任務
public void run() { switch (state) { case CONNECTED: try{ handler.connected(channel); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try{ handler.disconnected(channel); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try{ handler.sent(channel,message); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e); } break; case RECEIVED: try{ handler.received(channel, message); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e); } break; case CAUGHT: try{ handler.caught(channel, exception); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel + ", message is: " + message + ", exception is " + exception,e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } }
這裏傳入的是RECEIVED
狀態,執行對應分支又是調用handler.received(channel, message)
,好吧繼續。。。
這裏的handler
是DecodeHandler
的實例,繼續跟下去
public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request)message).getData()); } if (message instanceof Response) { decode( ((Response)message).getResult()); } handler.received(channel, message); }
調用鏈還在繼續,此次的handler
是HeaderExchangeHandler
類型
public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; //判斷是心跳仍是正常請求 if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
正常同步請求會開始執行handleRequest(exchangeChannel, request)
處理請求,並經過channel.send(response)
回覆結果,來重點看一下handleRequest
方法
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); //處理異常的請求 if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
能夠看出正常請求將由handler.reply(channel, msg)
處理,這裏的handler
是DubboProtocol
中的一個ExchangeHandlerAdapter
實現,其reply
方法以下
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //經過方法名獲取Invoker Invoker<?> invoker = getInvoker(channel, inv); //若是是callback 須要處理高版本調用低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
這裏一共作了兩件事,先經過getInvoker(channel, inv)
獲取具體的invoker
,再經過invoker.invoke(inv)
執行獲取結果,先來看一下getInvoker(channel, inv)
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{ boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); //若是是客戶端的回調服務. isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if (isStubServiceInvoke){ port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if(isCallBackServiceInvoke){ path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); return exporter.getInvoker(); }
這裏又看到了熟悉的exporterMap
,以前講生產者初始化的時候就說過這個map中放入了封裝過的Invoker
對象exporter
,如今又把它取了出了並經過getInvoker()
方法得到封裝在其中的Invoker
對象。
接着來看invoker.invoke(inv)
方法,其實現首先在InvokerWrapper
類中
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
而後會調用到AbstractProxyInvoker
中的invoke
方法
public Result invoke(Invocation invocation) throws RpcException { try { return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
這裏doInvoke
方法的實如今JavassistProxyFactory
中getInvoker
方法中
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper類不能正確處理帶$的類名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
這裏根據傳入的 proxy
對象的類信息建立對它的包裝對象Wrapper
並調用其invokeMethod
方法,經過傳入的參數來調用proxy
對象的對應方法,返回調用結果,也就是執行具體的業務。
完成handleRequest(exchangeChannel, request)
方法的解析後,回到HeaderExchangeHandler
類中接着來看一下channel.send(response)
,這裏的channel
傳入的是NettyChannel
類型的對象,send
方法的實如今其父類的父類AbstractPeer
中,來看一下
public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false)); }
其具體實現又在NettyChannel
中
public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { ChannelFuture future = channel.write(message); if (sent) { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if(! success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
能夠看到業務處理結果最後經過ChannelFuture
對象進行了發送,到今生產者端的任務就完成了。
消費者獲取調用結果
這裏消費者端經過NETTY從生產者端獲取數據的流程和以前的一模一樣,調用鏈直到HeaderExchangeHandler
以前都是同樣的,咱們先來回顧一下HeaderExchangeHandler
的received
方法
public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; //判斷是心跳仍是正常請求 if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
以前走的是Request分支,此次由於是響應消息走的是Response分支,那麼來看一下handleResponse(channel, (Response) message)
的具體實現
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
繼續跟進去看received
方法
public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
繼續看doReceived
幹了什麼
private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
看到這裏把執行結果賦值給response
,正好應證了咱們以前的猜測,消費者的同步阻塞也就能夠繼續執行下去了,這也算是很是經典的異步轉同步的實現方案了吧。
本文把消費者端和生產者端交互的大概流程進行了講解,流程主要分爲三個部分,分別是:消費者發起調用請求、生產者響應調用請求和消費者獲取調用結果,歸納一下就是消費者經過生成的代理對象調用invoke
方法經過Netty的通道去請求生產者的exporter
進行執行,而且經過future
的方式將異步的交互轉爲了同步響應。