上一篇咱們簡單描述了dubbo服務暴露-服務引用的流程 這一篇咱們從dubbo協議來具體分析一下Protocol層異步
###export()過程ide
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //將exporter存到map裏 exporterMap.put(key, exporter); ... openServer(url); return exporter; } /** * 開啓服務 * * @param url */ private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也能夠暴露一個只有server能夠調用的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { //map中不存在建立server serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } } /** * 建立服務 * * @param url * @return */ private ExchangeServer createServer(URL url) { ... ExchangeServer server; try {//啓動服務監聽 傳入了requestHandler //當收到客戶端調用時會調用requestHandler.received()方法 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ... return server; } private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //獲取到對應的invoker Invoker<?> invoker = getInvoker(channel, inv); ... //根據 Invocation 調用信息,調用真正服務實現 return invoker.invoke(inv); } ... } ... }; /** * 根據請求參數獲取到對應的服務端invoker * * @param channel * @param inv * @return * @throws RemotingException */ Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); ... //生成serviceKey String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv .getAttachments().get(Constants.GROUP_KEY)); //從map中找到exporter DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); ... return exporter.getInvoker(); }
歸納一下: export()就是根據服務url生成Exporter並存在map中,而後暴露服務,設置回調. 當客戶端調用請求時進入回調,根據請求url找到存在map中的Exporter, 最後用Exporter中的Invoker調用真正的服務url
注意:服務端客戶端都存在Invoker對象,但二者有所區別. 客戶端Invoker用於溝通服務端實現遠程調用 如:DubboInvoker 服務端Invoker用於調用真正服務實現 通常都繼承AbstractProxyInvoker 見下述代碼段code
com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory#getInvoker public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } }; }
###refer()過程server
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
refer()就是根據url生成Invoker對象
來看DubboInvoker.doInvoke()方法繼承
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; ... try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否有返回值 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) {//無返回值 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) {//異步 ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else {//同步 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (Exception e) { ... } } /** * Invocation. (API, Prototype, NonThreadSafe) * 封裝遠程調用信息(方法名 參數) */ public interface Invocation { /** *方法名 */ String getMethodName(); /** *方法參數類型 */ Class<?>[] getParameterTypes(); /** *方法參數 */ Object[] getArguments(); /** *冗餘參數 */ Map<String, String> getAttachments(); String getAttachment(String key); String getAttachment(String key, String defaultValue); Invoker<?> getInvoker(); }
能夠看到doInvoker方法會經過client對象執行遠程調用rpc
到此,你們對Protocol層三大對象應該有了一個簡單的瞭解.get