假設遠程通訊的實現仍是用netty4,解碼器將數據包解析成 Request 對象後,NettyHandler 的 messageReceived 方法緊接着會收到這個對象,因此第一步就是NettyServerHandler的channelRead。json
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 看是否在緩存中命中,若是沒有命中,則建立NettyChannel而且緩存。 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // 接受消息 handler.received(channel, msg); } finally { // 若是通道不活躍或者斷掉,則從緩存中清除 NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
public void received(Channel ch, Object msg) throws RemotingException { // 若是通道已經關閉,則直接返回 if (closed) { return; } handler.received(ch, msg); }
public void received(Channel channel, Object message) throws RemotingException { // 若是消息是MultiMessage類型的,也就是多消息類型 if (message instanceof MultiMessage) { // 強制轉化爲MultiMessage MultiMessage list = (MultiMessage) message; // 把各個消息進行發送 for (Object obj : list) { handler.received(channel, obj); } } else { // 直接發送 handler.received(channel, message); } }
Dispatcher實現類 | 對應的handler | 用途 |
AllDispatcher | AllChannelHandler | 全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件等 |
ConnectionOrderedDispatcher | ConnectionOrderedChannelHandler | 在 IO 線程上,將鏈接和斷開事件放入隊列,有序逐個執行,其它消息派發到線程池 |
DirectDispatcher | 無 | 全部消息都不派發到線程池,所有在 IO 線程上直接執行 |
ExecutionDispatcher | ExecutionChannelHandler | 只有請求消息派發到線程池,不含響應。其它消息均在 IO 線程上執行 |
MessageOnlyDispatcher | MessageOnlyChannelHandler | 只有請求和響應消息派發到線程池,其它消息均在 IO 線程上執行 |
public void run() { // 若是是接收的消息 if (state == ChannelState.RECEIVED) { try { // 直接調用下一個received handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { //若是是鏈接事件請求 case CONNECTED: try { // 執行鏈接 handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; // 若是是斷開鏈接事件請求 case DISCONNECTED: try { // 執行斷開鏈接 handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; // 若是是發送消息 case SENT: try { // 執行發送消息 handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; // 若是是異常 case CAUGHT: try { // 執行異常捕獲 handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } }
能夠參考《dubbo源碼解析(九)遠程通訊——Transport層》的(七)DecodeHandler。能夠看到received方法中根據消息的類型進行不一樣的解碼。而DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成後,就會分發到HeaderExchangeHandler的received。
public void received(Channel channel, Object message) throws RemotingException { // 設置接收到消息的時間戳 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 得到通道 final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { // 若是消息是Request類型 if (message instanceof Request) { // handle request. // 強制轉化爲Request Request request = (Request) message; // 若是該請求是事件心跳事件或者只讀事件 if (request.isEvent()) { // 執行事件 handlerEvent(channel, request); } else { // 若是是正常的調用請求,且須要響應 if (request.isTwoWay()) { // 處理請求 handleRequest(exchangeChannel, request); } else { // 若是不須要響應,則繼續下一步 handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { // 處理響應 handleResponse(channel, (Response) message); } else if (message instanceof String) { // 若是是telnet相關的請求 if (isClientSide(channel)) { // 若是是客戶端側,則直接拋出異常,由於客戶端側不支持telnet Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { // 若是是服務端側,則執行telnet命令 String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { // 若是都不是,則繼續下一步 handler.received(exchangeChannel, message); } } finally { // 移除關閉或者不活躍的通道 HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { // 建立一個Response實例 Response res = new Response(req.getId(), req.getVersion()); // 若是請求被破壞了 if (req.isBroken()) { // 得到請求的數據包 Object data = req.getData(); String msg; // 若是數據爲空 if (data == null) { //消息設置爲空 msg = null; // 若是在這以前已經出現異常,也就是數據爲Throwable類型 } else if (data instanceof Throwable) { // 響應消息把異常信息返回 msg = StringUtils.toString((Throwable) data); } else { // 返回請求數據 msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); // 設置錯誤請求的狀態碼 res.setStatus(Response.BAD_REQUEST); // 發送該消息 channel.send(res); return; } // find handler by message class. // 得到請求數據 也就是 RpcInvocation 對象 Object msg = req.getData(); try { // 繼續向下調用 返回一個future CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { //設置調用結果狀態爲成功 res.setStatus(Response.OK); // 把結果放入響應 res.setResult(appResult); } else { // 若是服務調用有異常,則設置結果狀態碼爲服務錯誤 res.setStatus(Response.SERVICE_ERROR); // 把報錯信息放到響應中 res.setErrorMessage(StringUtils.toString(t)); } // 發送該響應 channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { // 若是在執行中拋出異常,則也算服務異常 res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { // 若是請求消息不屬於會話域,則拋出異常 if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } //強制類型轉化 Invocation inv = (Invocation) message; // 得到暴露的服務invoker Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback // 若是是回調服務 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // 得到 方法定義 String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; // 若是隻有一個方法定義 if (methodsStr == null || !methodsStr.contains(",")) { // 設置會話域中是否有一致的方法定義標誌 hasMethod = inv.getMethodName().equals(methodsStr); } else { // 分割不一樣的方法 String[] methods = methodsStr.split(","); // 若是方法不止一個,則分割後遍歷查詢,找到了則設置爲true for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } // 若是沒有該方法,則打印告警日誌 if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } // 設置遠程地址 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 調用下一個調用鏈 Result result = invoker.invoke(inv); // 返回CompletableFuture<Result> return result.completionFuture().thenApply(Function.identity()); }
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { // 若是調用的方法是回聲測試的方法 則直接返回結果,不然 調用下一個調用鏈 if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) { // 建立一個默認的AsyncRpcResult返回 return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv); } return invoker.invoke(inv); }
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { // 若是是泛化調用 if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() != null && inv.getArguments().length == 3 && !GenericService.class.isAssignableFrom(invoker.getInterface())) { // 得到請求名字 String name = ((String) inv.getArguments()[0]).trim(); // 得到請求參數類型 String[] types = (String[]) inv.getArguments()[1]; // 得到請求參數 Object[] args = (Object[]) inv.getArguments()[2]; try { // 得到方法 Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types); // 得到該方法的參數類型 Class<?>[] params = method.getParameterTypes(); if (args == null) { args = new Object[params.length]; } // 得到附加值 String generic = inv.getAttachment(GENERIC_KEY); if (StringUtils.isBlank(generic)) { generic = RpcContext.getContext().getAttachment(GENERIC_KEY); } // 若是附加值爲空,在用上下文攜帶的附加值 if (StringUtils.isEmpty(generic) || ProtocolUtils.isDefaultGenericSerialization(generic)) { // 直接進行類型轉化 args = PojoUtils.realize(args, params, method.getGenericParameterTypes()); } else if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (byte[].class == args[i].getClass()) { try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) { // 使用nativejava方式反序列化 args[i] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA) .deserialize(null, is).readObject(); } catch (Exception e) { throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e); } } else { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_NATIVE_JAVA + "] only support message type " + byte[].class + " and your message type is " + args[i].getClass()); } } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (args[i] instanceof JavaBeanDescriptor) { // 用JavaBean方式反序列化 args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]); } else { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_BEAN + "] only support message type " + JavaBeanDescriptor.class.getName() + " and your message type is " + args[i].getClass().getName()); } } } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) { // as proto3 only accept one protobuf parameter if (args.length == 1 && args[0] instanceof String) { try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) { // 用protobuf-json進行反序列化 args[0] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension("" + GENERIC_SERIALIZATION_PROTOBUF) .deserialize(null, is).readObject(method.getParameterTypes()[0]); } catch (Exception e) { throw new RpcException("Deserialize argument failed.", e); } } else { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_PROTOBUF + "] only support one" + String.class.getName() + " argument and your message size is " + args.length + " and type is" + args[0].getClass().getName()); } } return invoker.invoke(new RpcInvocation(method, args, inv.getAttachments())); } catch (NoSuchMethodException e) { throw new RpcException(e.getMessage(), e); } catch (ClassNotFoundException e) { throw new RpcException(e.getMessage(), e); } } return invoker.invoke(inv); } static class GenericListener implements Listener { @Override public void onResponse(Result appResponse, Invoker<?> invoker, Invocation inv) { // 若是是泛化調用 if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() != null && inv.getArguments().length == 3 && !GenericService.class.isAssignableFrom(invoker.getInterface())) { // 得到序列化方式 String generic = inv.getAttachment(GENERIC_KEY); // 若是爲空,默認獲取會話域中的配置 if (StringUtils.isBlank(generic)) { generic = RpcContext.getContext().getAttachment(GENERIC_KEY); } // 若是回調有異常,直接設置異常 if (appResponse.hasException() && !(appResponse.getException() instanceof GenericException)) { appResponse.setException(new GenericException(appResponse.getException())); } // 若是是native java形式序列化 if (ProtocolUtils.isJavaGenericSerialization(generic)) { try { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512); // 使用native java形式序列化 ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).serialize(null, os).writeObject(appResponse.getValue()); // 加入結果 appResponse.setValue(os.toByteArray()); } catch (IOException e) { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_NATIVE_JAVA + "] serialize result failed.", e); } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { // 用JavaBean方式序列化 appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD)); } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) { try { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512); // 用protobuf-json進行序列化 ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(GENERIC_SERIALIZATION_PROTOBUF) .serialize(null, os).writeObject(appResponse.getValue()); appResponse.setValue(os.toString()); } catch (IOException e) { throw new RpcException("Generic serialization [" + GENERIC_SERIALIZATION_PROTOBUF + "] serialize result failed.", e); } } else { // 直接進行類型轉化而且設置值 appResponse.setValue(PojoUtils.generalize(appResponse.getValue())); } } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { } }
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 若是開啓監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 設置開始監控時間 invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); // 對同時在線數量加1 getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain }
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) { // 若是開啓監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 收集監控對數據,而且更新監控數據 collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); // 同時在線監控數減1 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 收集監控對數據,而且更新監控數據 collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); // 同時在線監控數減1 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { try { // 得到監控的url URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY); // 經過該url得到Monitor實例 Monitor monitor = monitorFactory.getMonitor(monitorUrl); if (monitor == null) { return; } // 建立一個統計的url URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error); // 把收集的信息更新而且發送信息 monitor.collect(statisticsURL); } catch (Throwable t) { logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); } } private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { // ---- service statistics ---- // 調用服務消耗的時間 long elapsed = System.currentTimeMillis() - start; // invocation cost // 得到同時監控的數量 int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count String application = invoker.getUrl().getParameter(APPLICATION_KEY); // 得到服務名 String service = invoker.getInterface().getName(); // service name // 得到調用的方法名 String method = RpcUtils.getMethodName(invocation); // method name // 得到組 String group = invoker.getUrl().getParameter(GROUP_KEY); // 得到版本號 String version = invoker.getUrl().getParameter(VERSION_KEY); int localPort; String remoteKey, remoteValue; // 若是是消費者端的監控 if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) { // ---- for service consumer ---- // 本地端口爲0 localPort = 0; // key爲provider remoteKey = MonitorService.PROVIDER; // value爲服務ip remoteValue = invoker.getUrl().getAddress(); } else { // ---- for service provider ---- // 端口爲服務端口 localPort = invoker.getUrl().getPort(); // key爲consumer remoteKey = MonitorService.CONSUMER; // value爲遠程地址 remoteValue = remoteHost; } String input = "", output = ""; if (invocation.getAttachment(INPUT_KEY) != null) { input = invocation.getAttachment(INPUT_KEY); } if (result != null && result.getAttachment(OUTPUT_KEY) != null) { output = result.getAttachment(OUTPUT_KEY); } // 返回一個url return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version); } }
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
public Result invoke(Invocation invocation) throws RpcException { try { // 執行下一步 Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); // 把返回結果用CompletableFuture包裹 CompletableFuture<Object> future = wrapWithFuture(value, invocation); // 建立AsyncRpcResult實例 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); future.whenComplete((obj, t) -> { AppResponse result = new AppResponse(); // 若是拋出異常 if (t != null) { // 屬於CompletionException異常 if (t instanceof CompletionException) { // 設置異常信息 result.setException(t.getCause()); } else { // 直接設置異常 result.setException(t); } } else { // 若是沒有異常,則把結果放入異步結果內 result.setValue(obj); } // 完成 asyncRpcResult.complete(result); }); return asyncRpcResult; } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
這裏默認代理實現方式是Javassist。能夠參考《dubbo源碼解析(二十三)遠程調用——Proxy》的(六)JavassistProxyFactory。其中Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。dubbo 會在運行時經過 Javassist 框架爲 Wrapper 生成實現類,並實現 invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以 DemoServiceImpl 爲例,Javassist 爲其生成的代理類以下。
/** Wrapper0 是在運行時生成的,你們可以使用 Arthas 進行反編譯 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // 省略其餘方法 public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // 類型轉換 demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // 根據方法名調用指定的方法 if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); } }
能夠看到我上述講到的(八)HeaderExchangeHandler的received和(九)HeaderExchangeHandler的handleRequest有好幾處channel.send方法的調用,也就是當結果返回的返回的時候,會主動發送執行結果給客戶端。固然發送的時候仍是會對結果Response 對象進行編碼,編碼邏輯我就先不在這裏闡述。
當客戶端接收到這個返回的消息時候,進行解碼後,識別爲Response 對象,將該對象派發到線程池中,該過程跟服務端接收到調用請求到邏輯是同樣的,能夠參考上述的解析,區別在於到(八)HeaderExchangeHandler的received方法的時候,執行的是handleResponse方法。
static void handleResponse(Channel channel, Response response) throws RemotingException { // 若是響應不爲空,而且不是心跳事件的響應,則調用received if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
public static void received(Channel channel, Response response) { received(channel, response, false); } public static void received(Channel channel, Response response, boolean timeout) { try { // future集合中移除該請求的future,(響應id和請求id一一對應的) DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { //得到超時 Timeout t = future.timeoutCheckTask; // 若是沒有超時,則取消timeoutCheckTask if (!timeout) { // decrease Time t.cancel(); } // 接收響應結果 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { // 通道集合移除該請求對應的通道,表明着這一次請求結束 CHANNELS.remove(response.getId()); } }
private void doReceived(Response res) { // 若是結果爲空,則拋出異常 if (res == null) { throw new IllegalStateException("response cannot be null"); } // 若是結果的狀態碼爲ok if (res.getStatus() == Response.OK) { // 則future調用完成 this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 若是超時,則返回一個超時異常 this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { // 不然返回一個RemotingException this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }
隨後用戶線程便可從 DefaultFuture 實例中獲取到相應結果。