ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml"); context.start(); DemoService demoService = context.getBean("demoService", DemoService.class); String hello = demoService.sayHello("world"); System.out.println("result: " + hello);
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 得到方法名稱 String methodName = method.getName(); // 得到方法參數類型 Class<?>[] parameterTypes = method.getParameterTypes(); // 若是該方法所在的類是Object類型,則直接調用invoke。 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // 若是這個方法是toString,則直接調用invoker.toString() if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } // 若是這個方法是hashCode直接調用invoker.hashCode() if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } // 若是這個方法是equals,直接調用invoker.equals(args[0]) if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 調用invoke return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
能夠看到上面的源碼,首先對Object的方法進行了處理,若是調用的方法不是這些方法,則先會 建立RpcInvocation,而後再調用invoke。segmentfault
public RpcInvocation(Method method, Object[] arguments) { this(method.getName(), method.getParameterTypes(), arguments, null, null); }
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) { // 設置方法名 this.methodName = methodName; // 設置參數類型 this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes; // 設置參數 this.arguments = arguments == null ? new Object[0] : arguments; // 設置附加值 this.attachments = attachments == null ? new HashMap<String, String>() : attachments; // 設置invoker實體 this.invoker = invoker; }
public Result invoke(Invocation invocation) throws RpcException { // 調用攔截器鏈的invoke Result asyncResult = filterInvoker.invoke(invocation); // 把異步返回的結果加入到上下文中 asyncResult.thenApplyWithContext(r -> { // 循環各個過濾器 for (int i = filters.size() - 1; i >= 0; i--) { Filter filter = filters.get(i); // onResponse callback // 若是該過濾器是ListenableFilter類型的 if (filter instanceof ListenableFilter) { // 強制類型轉化 Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { // 若是內部類listener不爲空,則調用回調方法onResponse listener.onResponse(r, filterInvoker, invocation); } } else { // 不然,直接調用filter的onResponse,作兼容。 filter.onResponse(r, filterInvoker, invocation); } } // 返回異步結果 return r; }); // 返回異步結果 return asyncResult; }
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { // 依次調用各個過濾器,得到最終的返回結果 asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback // 捕獲異常,若是該過濾器是ListenableFilter類型的 if (filter instanceof ListenableFilter) { // 得到內部類Listener Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { //調用onError,回調錯誤信息 listener.onError(e, invoker, invocation); } } // 拋出異常 throw e; } // 返回結果 return asyncResult; }
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到上下文,設置invoker,會話域,本地地址和原創地址 RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // 若是會話域是RpcInvocation,則設置invoker if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { // 移除服務端的上下文 RpcContext.removeServerContext(); // 調用下一個過濾器 return invoker.invoke(invocation); } finally { // 清空上下文 RpcContext.removeContext(); } }
static class ConsumerContextListener implements Listener { @Override public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { // 把結果中的附加值放入到上下文中 RpcContext.getServerContext().setAttachments(appResponse.getAttachments()); } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { // 不作任何處理 } }
Result result = filter.invoke(next, invocation);
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { // 該方法是真正的調用方法的執行 fireInvokeCallback(invoker, invocation); // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. return invoker.invoke(invocation); }
class FutureListener implements Listener { @Override public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) { if (result.hasException()) { // 處理異常結果 fireThrowCallback(invoker, invocation, result.getException()); } else { // 處理正常結果 fireReturnCallback(invoker, invocation, result.getValue()); } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { } }
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 若是開啓監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 設置監控開始時間 invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); // 得到當前的調用數,而且增長 getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain }
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) { // 若是開啓監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 執行監控,蒐集數據 collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); // 減小當前調用數 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { // 若是開啓監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 執行監控,蒐集數據 collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); // 減小當前調用數 getConcurrent(invoker, invocation).decrementAndGet(); // count down } }
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { // 若是是同步的調用 if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) { // 從異步結果中get結果 asyncResult.get(); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } // 返回異步結果 return asyncResult; }
public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed // 若是服務引用銷燬,則打印告警日誌,可是經過 if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; // 會話域中加入該調用鏈 invocation.setInvoker(this); // 把附加值放入會話域 if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addAttachmentsIfAbsent(attachment); } // 把上下文的附加值放入會話域 Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */ invocation.addAttachments(contextAttachments); } // 從配置中獲得是什麼模式的調用,一共有FUTURE、ASYNC和SYNC invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); // 加入編號 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 執行調用鏈 return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception // 得到異常 Throwable te = e.getTargetException(); if (te == null) { // 建立默認的異常異步結果 return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { // 設置異常碼 ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } // 建立默認的異常異步結果 return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } }
protected Result doInvoke(final Invocation invocation) throws Throwable { // rpc會話域 RpcInvocation inv = (RpcInvocation) invocation; // 得到方法名 final String methodName = RpcUtils.getMethodName(invocation); // 把path放入到附加值中 inv.setAttachment(PATH_KEY, getUrl().getPath()); // 把版本號放入到附加值 inv.setAttachment(VERSION_KEY, version); // 當前的客戶端 ExchangeClient currentClient; // 若是數組內就一個客戶端,則直接取出 if (clients.length == 1) { currentClient = clients[0]; } else { // 取模輪詢 從數組中取,當取到最後一個時,從頭開始 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是不是單向發送 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 得到超時時間 int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); // 若是是單向發送 if (isOneway) { // 是否等待消息發送,默認不等待消息發出,將消息放入 IO 隊列,即刻返回。 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 單向發送只負責發送消息,不等待服務端應答,因此沒有返回值 currentClient.send(inv, isSent); // 設置future爲null,由於單向發送沒有返回值 RpcContext.getContext().setFuture(null); // 建立一個默認的AsyncRpcResult return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { // 不然直接建立AsyncRpcResult AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); // 異步調用,返回CompletableFuture類型的future CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); // 當調用結果完成時 responseFuture.whenComplete((obj, t) -> { // 若是有異常 if (t != null) { // 拋出一個異常 asyncRpcResult.completeExceptionally(t); } else { // 完成調用 asyncRpcResult.complete((AppResponse) obj); } }); // 異步返回結果用CompletableFuture包裝,把future放到上下文中, RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); // 返回結果 return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { return client.request(request, timeout); }
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { return channel.request(request, timeout); }
// 建立DefaultFuture對象,能夠從future中主動得到請求對應的響應信息 DefaultFuture future = new DefaultFuture(channel, req, timeout);
public void send(Object message, boolean sent) throws RemotingException { // 若是須要重連或者沒有連接,則鏈接 if (needReconnect && !isConnected()) { connect(); } // 得到通道 Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } // 經過通道發送消息 channel.send(message, sent); }