當dubbo消費方和提供方都發布和引用完成後,第四步就是消費方調用提供方。java
仍是以dubbo的DemoService舉例數組
-- 提供方 <dubbo:application name="demo-provider"/> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <dubbo:protocol name="dubbo" port="20880"/> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> -- 消費方 <dubbo:application name="demo-consumer"/> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
整個服務的調用過程都是以Invoker接口爲核心,經過不一樣的Invoker實現對象層層調用,完成整個RPC的調用。在調用過程當中,消費方構建包含調用信息的RpcInvocation,並封裝在Request中,傳輸到調用方,調用方解析出Request中的RpcInvocation完成調用,並將調用結果RpcResult封裝到Response中返回,而後由消費方接收後再從Response中解析出RpcResult中攜帶的調用結果完成整個調用。網絡
消費方獲取的demoService實例對象實際是代理工廠ProxyFactory.getProxy建立的代理對象app
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
於是調用demoService.sayHello方法時,實際調用的是javassist生成的代理對象。消費方的調用堆棧以下負載均衡
代理對象是由javassist生成,生成的sayHello方法以下異步
public String sayHello(String paramString){ Object[] arrayOfObject = new Object[1]; arrayOfObject[0] = paramString; Object localObject = this.handler.invoke(this, methods[0], arrayOfObject); return ((String)localObject); }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
判斷非Object的方法,將方法和參數組裝成RpcInvocation,做爲Invoker.invoke的參數。ide
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
根據url獲取方法參數上的MOCK_KEY的值,判斷是否要執行mock調用:ui
a. 若是mock爲null或false,直接調用FailoverClusterInvoker b. 若是mock以force開頭,強制執行mock調用 c. 以上都不是,則先調用FailoverClusterInvoker,調用失敗再執行mock調用this
// 執行Directory.list 和 router.route List<Invoker<T>> invokers = list(invocation); // 執行LoadBalance.select Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
a. 經過Directory目錄服務的list方法獲取訂閱返回的服務提供者的Invoker對象集合,若是存在路由,路由服務根據策略過濾Invoker對象集合 b. 根據負載均衡策略LoadBalance來選擇一個Invokerurl
Filter的Invoker執行器鏈由Protocol的Wrapper類ProtocolFilterWrapper.refer方法再調用buildInvokerChain構建。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { // 倒序循環,順序執行 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
而Listener的Invoker執行器則是Protocol的Wrapper類ProtocolListenerWrapper.refer方法返回的ListenerInvokerWrapper。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); }
服務消費方調用遠程服務時,傳遞的參數是Request對象,封裝了RpcInvocation。
public class RpcInvocation implements Invocation, Serializable { private static final long serialVersionUID = -4355285085441097045L; // 方法名稱 private String methodName; // 參數類型 private Class<?>[] parameterTypes; // 參數值 private Object[] arguments; // 附件 private Map<String, String> attachments; private transient Invoker<?> invoker; }
RpcInvocation包含了服務調用執行的方法名和參數,以及在附件中封裝了服務的path、interface、group和version。
在DubboInvoker.invoke中,先獲取交互層客戶端ExchangeClient,其中包含了和服務提供者的長鏈接,傳入RpcInvocation參數,由通訊層HeaderExchangeChannel將RpcInvocation封裝成Request,而後傳輸出去。
HeaderExchangeChannel public void send(Object message, boolean sent) throws RemotingException { if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion("2.0.0"); request.setTwoWay(false); request.setData(message); channel.send(request, sent); } }
對於DemoService.sayHello方法,消費方傳輸的RpcInvocation的內容以下
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
而後由netty將數據傳輸到服務提供者,遠程調用的類型分爲同步,異步或oneway模式,對應的調用結果是:
1.oneway返回空RpcResult,不接收返回結果
2.異步方式直接返回空RpcResult,而異步獲取ResponseFuture回調
3.同步方式,調用方式仍是異步,經過等待ResponseFuture.get()返回結果
整個服務消費方的活動圖以下
服務提供方在export服務後,就打開端口等待服務消費方的調用。當服務消費方調用發送調用時,服務提供方netty接收到MessageEvent,調用NettyHandler的messageReceived方法,而後向上一直調用到DubboProtocol的requestHandler的reply方法,獲取到對應的Invoker,執行invoke方法調用服務的實際實現。
服務提供者的調用堆棧,調用從下到上,分爲兩個部分,由兩個線程來執行。
在AllChannelHandler中,會把請求放入事件處理線程池中,由ChannelEventRunnable線程執行。這是一個分發機制(Dispatch服務),用於指定事件執行的線程池的模型。
AllChannelHandler cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
在HeaderExchangeHandler中,handleRequest方法從Request中獲取RpcInvocation,而後調用DubboProtocol的requestHandler。
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
方法參數Invocation,就是服務消費方發送的消息
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=201, dubbo=2.0.0, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
從中獲取path、group和version,組裝成serviceKey,從exporterMap中獲取DubboExporter,從而獲得Invoker。
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 根據invocation匹配Invoker Invoker<?> invoker = getInvoker(channel, inv); // 部分省略 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); // 組裝serviceKey String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // 根據serviceKey從exporterMap中獲取DubboExporter,再得到Invoker DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); return exporter.getInvoker(); }
Filter分爲消費方Filter和提供方Filter,是經過Filter實現類上@Activate的group設置。如消費方的ConsumerContextFilter的group=consumer,而提供方的ContextFilter的group=provider
@Activate(group = Constants.CONSUMER, order = -10000) public class ConsumerContextFilter implements Filter {} @Activate(group = Constants.PROVIDER, order = -10000) public class ContextFilter implements Filter {}
JavassistProxyFactory.getInvoker建立實際對象的Wrapper類,並返回AbstractProxyInvoker內部類對象。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
AbstractProxyInvoker.invoke執行時,調用上面內部類的doInvoke方法,並將執行結果封裝成RpcResult。
public Result invoke(Invocation invocation) throws RpcException { try { return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
public class Wrapper0 extends Wrapper { public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { com.alibaba.dubbo.demo.provider.DemoServiceImpl w; try { w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("sayHello".equals($2) && ($3.length == 1)) { return ($w) w.sayHello((java.lang.String) $4[0]); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException( "Not found method \"" + $2 + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl."); } } }
服務提供方執行的活動圖
當提供方執行完真正服務實現的方法後,須要將返回值傳輸給消費方
public ResponseFuture request(Object request, int timeout) throws RemotingException { // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); }
整個Dubbo服務調用的活動圖
參考:https://blog.csdn.net/quhongwei_zhanqiu/article/details/41701979