dubbo PRC服務調用過程很複雜,這裏準備經過分析一個典型rpc方法調用的調用棧來講明調用過程。說它典型,是由於本次分析的調用場景很典型簡單
先定義一個接口java
public interface DemoService { public String sayHello(String name); }
而後一個服務實現類git
public class DemoServiceImpl implements DemoService { public String sayHello(String name) { Random random=new Random(); try { Thread.sleep(800* random.nextInt(6)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress(); } }
客戶端經過註冊中心引用這個服務,註冊中心用zookeepr協議實現。spring
<dubbo:registry id="hangzhouRegistry" address="zookeeper://192.168.64.128:2181" /> //這裏配置的過濾器 和負載均衡策略 <dubbo:reference id="demoService" interface="demo.dubbo.api.DemoService" loadbalance="random" timeout="6000" filter="monitor"/>
服務端經過註冊中心發佈服務,默認是dubbo協議發佈bootstrap
<dubbo:registry address="zookeeper://192.168.64.128:2181"/> <dubbo:service interface="demo.dubbo.api.DemoService" ref="demoService" />
啓動發佈好服務時候後,經過mian方法調用服務方法sayHello,並打印,代碼以下:設計模式
public class DemoApplicationCustomer { public static void main(String[] args) { ApplicationContext context= new ClassPathXmlApplicationContext(new String[]{"/spring/dubbo-demo-consumer.xml"}); final demo.dubbo.api.DemoService demoService= (DemoService) context.getBean("demoService"); System.err.println(demoService.sayHello("mydubbodebug")); } }
這個mian方法會,啓動一個spring容器,而後觸發一個簡單的rpc方法調用。
接下來就是在dubbo源碼中,某個地方打個斷點在rpc把請求消息經過網絡發送出去以前。hold住請求你,這樣就好查看dubbo在客戶端的調用方法棧了,
關於斷點打在哪裏的問題,以前寫過一篇博客http://www.javashuo.com/article/p-cuduvfsh-cu.html,介紹過dubbo通訊消息的解析過程。能夠知道dubbo默認底層的傳輸框架是netty。api
看下com.alibaba.dubbo.remoting.transport.netty.NettyClient類doOpen方法數組
/*** * 打開到遠端服務機器的鏈接 * @throws Throwable */ @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); //設置消息流的處理handler,發出去的消息先通過handler再通過encoder, //這裏斷點能夠設置在nettyHandler類裏。 pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); }
NettyHandler類繼承了netty的SimpleChannelHandler類,並實現了writeRequested方法緩存
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { super.writeRequested(ctx, e);//斷點處 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.sent(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
咱們能夠在super.writeRequested(ctx, e);這句前打個斷點。網絡
啓動運行DemoApplicationCustomer後,咱們能夠獲得以下線程棧信息:app
java.lang.Thread.State: RUNNABLE at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99) at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:266) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) at org.jboss.netty.channel.Channels.write(Channels.java:611) at org.jboss.netty.channel.Channels.write(Channels.java:578) at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251) at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98) at com.alibaba.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:258) at com.alibaba.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:54) at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:112) at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:88) at com.alibaba.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient.request(ReferenceCountExchangeClient.java:78) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:97) at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:144) at com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:74) at com.alibaba.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:65) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69) at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69) at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:48) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69) at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53) at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77) at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:229) at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72) at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52) at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1) at com.example.DemoApplicationCustomer.main(DemoApplicationCustomer.java:6)
這裏用到29個類,除了6個jboss內部的6個類,其餘23個就是須要咱們研究的。從下往上看,能夠直觀的看到pc客戶端方法調要用通過的類和方法。
接下來對每一個一個類的建立過程和調用時機作出解讀。
第一行棧信息
1,at com.example.DemoApplicationCustomer.main(DemoApplicationCustomer.java:6)
DemoApplicationCustomer類是啓動類,能夠忽略。
第二行棧信息
2,at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)
com.alibaba.dubbo.common.bytecode.proxy0類,它是一個代理類。它代理了全部RPC服務接口的方法調用。
這個類實例何時建立的,類代碼是什麼樣的?
以前寫過一篇博文,dubbo基於spring的構建分析,能夠看到代理的建立是由ReferenceBean類裏的
public Object getObject() throws Exception {
return get();
}
方法裏觸發,具體實如今ReferenceConfig類createProxy方法裏
/*** * 建立客戶端rpc調用代理 * @param map * @return */ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map){ //....用於生成invoker的邏輯,關於inoker生成邏輯這裏先忽略,後面會說到 //建立服務代理 return (T) proxyFactory.getProxy(invoker); }
而proxyFactory變量賦值爲
ProxyFactoryproxyFactory=ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
博文 dubbo SPI解析 裏能夠獲得到ProxyFactory接口的Adaptive類的getProxy方法源碼以下:
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); //這裏默認用了ProxyFactory javassist擴展的getProxy方法建立代理 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); } }
ProxyFactory接口的javassist擴展類JavassistProxyFactory的getProxy方法實現
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { //代理類實現化以new InvokerInvocationHandler(invoker)問參數 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
再到生成動態代理的Proxy類
/** * Get proxy. * * @param ics interface class array. * @return Proxy instance. */ public static Proxy getProxy(Class<?>... ics) { return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } /** * Get proxy. * * @param cl class loader. * @param ics interface class array. 能夠實現多個接口 * @return Proxy instance. */ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { if (ics.length > 65535) throw new IllegalArgumentException("interface limit exceeded"); StringBuilder sb = new StringBuilder(); for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface."); Class<?> tmp = null; try { tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); sb.append(itf).append(';'); } // use interface class name list as key. // 用接口類名作key,多個接口以分號分開。 String key = sb.toString(); // get cache by class loader. // 緩存 Map<String, Object> cache; synchronized (ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if (cache == null) { cache = new HashMap<String, Object>(); ProxyCacheMap.put(cl, cache); } } Proxy proxy = null; synchronized (cache) { do { Object value = cache.get(key); if (value instanceof Reference<?>) { //若是有存在引用對象,返回緩存對象。 proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null) return proxy; } //對象正在生成,線程掛起,等待 if (value == PendingGenerationMarker) { try { cache.wait(); } catch (InterruptedException e) { } } else {//放入正在生成標識 cache.put(key, PendingGenerationMarker); break; } } while (true); } //類名稱後自動加序列號 0,1,2,3... long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; //ClassGenerator dubbo用javassist實現的工具類 ClassGenerator ccp = null, ccm = null; try { ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<String>(); List<Method> methods = new ArrayList<Method>(); for (int i = 0; i < ics.length; i++) { //檢查包名稱及不一樣包的修飾符 if (!Modifier.isPublic(ics[i].getModifiers())) { String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { if (!pkg.equals(npkg)) throw new IllegalArgumentException("non-public interfaces from different packages"); } } //代理類添加要實現的接口Class對象 ccp.addInterface(ics[i]); for (Method method : ics[i].getMethods()) { //獲取方法描述符,不一樣接口,一樣的方法,只能被實現一次。 String desc = ReflectUtils.getDesc(method); if (worked.contains(desc)) continue; worked.add(desc); int ix = methods.size(); //方法返回類型 Class<?> rt = method.getReturnType(); //方法參數類型列表 Class<?>[] pts = method.getParameterTypes(); //生成接口的實現代碼,每一個方法都同樣 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); for (int j = 0; j < pts.length; j++) code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); if (!Void.TYPE.equals(rt)) code.append(" return ").append(asArgument(rt, "ret")).append(";"); methods.add(method); ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null) pkg = PACKAGE_NAME; // create ProxyInstance class. // 具體代理類名稱,這裏是類全名 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;"); ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); //建立構造函數 ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); ccp.addDefaultConstructor(); Class<?> clazz = ccp.toClass(); //經過反射,把method數組放入,靜態變量methods中, clazz.getField("methods").set(null, methods.toArray(new Method[0])); // create Proxy class. String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); //設置父類爲抽象類,Proxy類子類, ccm.setSuperClass(Proxy.class); //生成實現它的抽象方法newInstance代碼 //new 的實例對象,是上面生成的代理類 pcn ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); Class<?> pc = ccm.toClass(); proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { // release ClassGenerator if (ccp != null) ccp.release(); if (ccm != null) ccm.release(); synchronized (cache) { if (proxy == null) cache.remove(key); else //放入緩存,key:實現的接口名,value 代理對象,這個用弱引用, //當jvm gc時,會打斷對實例對象的引用,對象接下來就等待被回收。 cache.put(key, new WeakReference<Proxy>(proxy)); cache.notifyAll(); } } return proxy; }
以上簡單分析下生成過程。這裏貼出經過代碼hack生成的代理類源碼,這裏動態生成了2個類
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC; import java.lang.reflect.InvocationHandler; public class Proxy0 extends Proxy implements DC { public Object newInstance(InvocationHandler var1) { return new proxy01(var1); } public Proxy0_my() { } }
這個類繼承了抽象類Proxy,實現了它的抽象方法newInstance,接口DC是dubbo內部做爲動態類標識的接口。
還有一個類proxy01,就是在開始方法棧裏看到的代理類,源碼以下
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.rpc.service.EchoService; import demo.dubbo.api.DemoService; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; public class proxy01 implements ClassGenerator.DC, EchoService, DemoService { public static Method[] methods; private InvocationHandler handler; //實現了接口方法 public String sayHello(String var1) { Object[] var2 = new Object[]{var1}; Object var3 = null; try { var3 = this.handler.invoke(this, methods[1], var2); } catch (Throwable throwable) { throwable.printStackTrace(); } return (String)var3; } public Object $echo(Object var1) { Object[] var2 = new Object[]{var1}; Object var3 = null; try { var3 = this.handler.invoke(this, methods[3], var2); } catch (Throwable throwable) { throwable.printStackTrace(); } return (Object)var3; } public proxy01() { } //public 構造函數,這裏handler是 //由Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))語句傳入的InvokerInvocationHandler對象 public proxy01(InvocationHandler var1) { this.handler = var1; } }
能夠看到代理類實現了3個接口。
ClassGeneratr.DC是dubbo動態類標識接口
DemoService是實際業務接口。這樣代理就能夠調用服務方法了。
EchoService是回顯測試接口,只有一個方法,
public interface EchoService { /** * echo test. * * @param message message. * @return message. */ Object $echo(Object message); }
它能爲全部dubbo rpc服務加上的一個回顯測試方法。
EchoService echoService = (EchoService) memberService; // 經過強制轉型爲EchoService,能夠測試。
到這裏咱們大概梳理了代理類生成過程。能夠看到sayHello方法的調用實際上是
this.handler.invoke(this, methods[3], var2);調用。這也能夠解釋了方法棧裏第3行信息
3,com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
再看下InvokerInvocationHandler類
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; //經過構造函數傳入invoker public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); //若是是Object類方法 if (method.getDeclaringClass() == Object.class) { //反射調用 return method.invoke(invoker, args); } //對3個特殊方法的調用,作了處理 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } //其餘業務方法經過invoker.invoke方法調用(***看這裏***) return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
這裏的invoker對象,經過InvokerInvocationHandler構造方法傳入,而InvokerInvocationHandler對象是由JavassistProxyFactory類
getProxy(Invoker<T> invoker, Class<?>[] interfaces)方法建立。
這還要回到調用proxyFactory.getProxy(invoker);方法的地方,即ReferenceConfig類的createProxy(Map<String, String> map)方法
如下部分邏輯是生成invoker的過程:
if (urls.size() == 1) {//只有一個直連地址或一個註冊中心配置地址 //這裏的urls.get(0)協議,多是直連地址(默認dubbo協議),也多是regiter註冊地址(zookeeper協議) //咱們這裏走的是註冊中心,因此 invoker = refprotocol.refer(interfaceClass, urls.get(0));//本例經過配置一個註冊中心的形式(***看這裏***) } else {//多個直連地址或者多個註冊中心地址,甚至是二者的組合。 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { //建立invoker放入invokers invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 多個註冊中心,用最後一個registry url } } if (registryURL != null) { //有註冊中心協議的URL, //對多個url,其中存在有註冊中心的,寫死用AvailableCluster集羣策略 //這其中包括直連和註冊中心混合或者都是註冊中心兩種狀況 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 多個直連的url invoker = cluster.join(new StaticDirectory(invokers)); } }
能夠看到invoker是經過
refprotocol.refer(interfaceClass, urls.get(0));
或者
cluster.join(new StaticDirectory(u, invokers));
cluster.join(new StaticDirectory(invokers));
三種構建語句依照條件選一種調用生成。
這裏分析第一種生成invokder的狀況,
根據spi機制這裏refprotocol對象是Protocol$Adpative實例,具體refer實現是
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); }
經過代碼能夠得知,Protocol具體實現要根據url的Protocol值再經過spi獲得
若是是直連地址,這裏就是dubbo協議,最後走DubboProtocol類的refer方法
具體實現是:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
若是是註冊中心,這裏protocol是register,會走RegistryProtocol類的refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //經過register 能夠獲取具體註冊中心協議,這裏是zookeeper,並設置爲url的協議值。 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); //獲取zookeeper Registry 實現,即ZookeeperRegistryFactory ,並調用getRegistry方法實現 //獲取zookeeper類型的registry對象 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } //這裏cluster是Cluster$Adpative類對象 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //這裏的RegistryDirectory和StaticDirectory向對應的,前者是動態從註冊中心獲取url目錄對象,後者是靜態指定url目錄。 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } //訂閱註冊中心,能夠獲取服務提供方地址等信息 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //經過調用Cluster$Adpative類的join方法返回Invoker對象(***看這裏***) return cluster.join(directory); }
這裏看下Cluster$Adpative類join方法實現
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); //經過cluster獲取集羣策略,默認是failover //本例是使用failover機制 String extName = url.getParameter("cluster", "failover"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); //經過spi這裏獲得FailoverCluster對象 return extension.join(arg0); }
再看下FailoverCluster的join方法:
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { //返回FailoverClusterInvoker對象 return new FailoverClusterInvoker<T>(directory); }
因爲Cluster spi實現中有個MockClusterWrapper是包裝類,這裏牽涉到是dubbo的aop機制,這裏先調用它的join方法
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
又因爲FailoverClusterInvoker是AbstractClusterInvoker的子類,它的invoke方法實如今其父類中的,因此以下方法棧信息:
at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77) at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:229) at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
這些類都是dubbo的集羣容錯,以前寫過一篇博客http://www.javashuo.com/article/p-dwqnopaq-hc.html 是關於集羣容錯的介紹的。
再往下看AbstractClusterInvoker的invoke方法實現:
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; //會調用directory的list方法 返回要調用invokers集合。 //實際上是AbstractDirectory的list方法,這個方法裏就是利用路由規則(若是有),從全部 //提供者中,遴選出符合規則的提供者 //接下里纔是,集羣容錯和負載均衡。 List<Invoker<T>> invokers = list(invocation);//生成invoker方法(****看這裏***) if (invokers != null && invokers.size() > 0) { //從url經過key "loadbalance" 取不到值,就取默認random隨機策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { //取默認random隨機策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
list方法:
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { //directory.list(invocation)獲取invokers,這裏directory是RegistryDirectory List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
跟到RegistryDirectory類的list方法,實如今其父類AbstractDirectory中
/*** * 落地路由規則 * @param invocation * @return * @throws RpcException */ public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } //獲取全部的提供者 List<Invoker<T>> invokers = doList(invocation);//(***看這裏***) //本地路由規則,這個其實已近設置好setRouters方法。何時設置的,稍後看看 List<Router> localRouters = this.routers; // local reference if (localRouters != null && localRouters.size() > 0) { for (Router router : localRouters) { try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) { //Router接口,實現類的rout的方法。路由獲取服務提供者 invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; } //這裏doList是個抽象方法,由RegistryDirectory實現具體: public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. 沒有服務提供者 2. 服務提供者被禁用 throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?"); } List<Invoker<T>> invokers = null; // local reference 從這裏搜索methodInvokerMap賦值,在refreshInvoker方法裏。(***看這裏***) Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根據第一個參數枚舉路由 } if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; }
下面是refreshInvoker(List<URL> invokerUrls)方法
private void refreshInvoker(List<URL> invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止訪問 this.methodInvokerMap = null; // 置空列表 destroyAllInvokers(); // 關閉全部Invoker } else { this.forbidden = false; // 容許訪問 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便於交叉對比 } if (invokerUrls.size() == 0) { return; } //生成Invoker方法 toInvokers(***看這裏****) Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 將URL列表轉成Invoker列表,invoker在這裏建立 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表 // state change //若是計算錯誤,則不進行處理. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 關閉未使用的Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
順便說下,refreshInvoker()方法會在 RegistryDirectory類的notify(List<URL> urls)方法裏調用,這個方法也是訂閱註冊中心回調方法。下面跟到toInvokers方法
/** * 將urls轉成invokers,若是url已經被refer過,再也不從新引用。 * * @param urls * @param overrides * @param query * @return invokers */ private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); if (urls == null || urls.size() == 0) { return newUrlInvokerMap; } Set<String> keys = new HashSet<String>(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //若是reference端配置了protocol,則只選擇匹配的protocol if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL參數是排序的 if (keys.contains(key)) { // 重複URL continue; } keys.add(key); // 緩存key爲沒有合併消費端參數的URL,無論消費端如何合併參數,若是服務端URL發生變化,則從新refer Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 緩存中沒有,從新refer try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = !url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { //這裏是invoker的建立的地方(***看這裏***) invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // 將新的引用放入緩存 newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
終於找到invoker的建立的地方,先看InvokerDelegete,它是RegistryDirectory的內部類
/** * 代理類,主要用於存儲註冊中心下發的url地址, * 用於從新從新refer時可以根據providerURL queryMap overrideMap從新組裝 * * @param <T> * @author chao.liuc */ private static class InvokerDelegete<T> extends InvokerWrapper<T> { private URL providerUrl; public InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl) { //調用父類構造方法 super(invoker, url); this.providerUrl = providerUrl; } public URL getProviderUrl() { return providerUrl; } }
invoke方法在其父類InvokerWrapper裏實現的
public Result invoke(Invocation invocation) throws RpcException { //這裏的invoker是從它的構造方法裏傳入的 return invoker.invoke(invocation); }
因此在方法棧裏看到下面一行棧信息
at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
InvokerDelegete構造方法調用的父類InvokerWrapper的構造方法並傳入invoker,回頭看
new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);這句。
可知上面的invoker是由protocol.refer(serviceType, url)建立的。
經過debug,可知這裏的protocol是Protocol$Adpative類型,
這裏的url的Protocol是dubbo,經過spi能夠獲得這裏最後走DubboProtocol類refer方法
可是因爲Protocal接口實現中,有兩個包裝類
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
因此這裏先執行ProtocolFilterWrapper的refer方法,再執行ProtocolListenerWrapper的refer方法,
最後才執行DubboProtocol類refer方法。
ProtocolFilterWrapper的refer方法以下:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //先獲取激活的過濾器,咱們這裏手動配置了monitor MonitorFilter顧慮器, // 另外兩個自動激活的過濾器是FutureFilter,ConsumerContextFilter //這裏須要看spi機制的getActivateExtension方法相關代碼 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } //實現invoker的 invoke方法 public Result invoke(Invocation invocation) throws RpcException { //嵌套進過濾器鏈 return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
因此有如下調用棧信息
at com.alibaba.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:65) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69) at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69) at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:48) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
接着ProtocolListenerWrapper的refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } // return new ListenerInvokerWrapper<T>(protocol.refer(type, url), //獲取激活的監聽器,目前dubbo沒有 提供合適的監聽器 只有一個DeprecatedInvokerListener實現類,仍是個Deprecated的 //因此這裏爲空 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); }
這個能夠解釋下面這句堆棧信息:
at com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:74)
最後看下DubboProtocol類refer方法,這裏建立了DubboInvoker對象
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
DubboInvoker的父類AbstractInvoker實現了invoke方法
public Result invoke(Invocation inv) throws RpcException { if (destroyed.get()) { throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is DESTROYED, can not be invoked any more!"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> context = RpcContext.getContext().getAttachments(); if (context != null) { invocation.addAttachmentsIfAbsent(context); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { //doInvoke 具體實如今子類中 return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return new RpcResult(e); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return new RpcResult(te); } } catch (RpcException e) { if (e.isBiz()) { return new RpcResult(e); } else { throw e; } } catch (Throwable e) { return new RpcResult(e); } }
DubboInvoker實現的doInvoke方法
DubboInvoker實現的doInvoke方法 @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); //實際的請求語句 ,這裏的currentClient是 自身對象屬性clients[0]值(***看這裏****) return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
因此會有這兩句調用者棧輸出
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:97) at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:144)
接下來應該看用於發起請求的currentClient對象的的實現了,它的實現能夠追蹤到DubboProtocol類refer方法裏
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. //getClients(url) 建立 DubboInvoker 屬性clients對象, DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
getClients方法:
private ExchangeClient[] getClients(URL url) { //是否共享鏈接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //若是connections不配置,則共享鏈接,不然每服務每鏈接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { //獲取共享鏈接 clients[i] = getSharedClient(url); } else { //初始化client ,本例子不是共享鏈接,走這個邏輯(****看這裏**) clients[i] = initClient(url); } } return clients; } /** * 建立新鏈接. */ private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默認開啓heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在嚴重性能問題,暫時不容許使用 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client; try { //設置鏈接應該是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { //(****看這裏**) //經過 Exchangers.connect(url, requestHandler); 構建client ,接下來跟蹤Exchangers.connect方法 //這裏會傳入一個requestHandler,這個是客戶端解救服務端方法返回回調的 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
這裏用到了facade設計模式,Exchangers是個門面類,封裝了具體查找合適的Exchanger實現,並調用connect方法返回ExchangeClient的過程,相關方法代碼以下:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //把codec key 設置爲exchange return getExchanger(url).connect(url, handler); } public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); //經過exchanger key 獲取 Exchanger的spi實現,默認是header,這裏是HeaderExchanger類 return getExchanger(type); } public static Exchanger getExchanger(String type) { //這裏返回Exchanger接口的header擴展類HeaderExchanger return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
那麼HeaderExchanger類connect方法
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; /** * 客戶端的鏈接操做 * @param url * @param handler * @return * @throws RemotingException */ public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { //這裏返回了HeaderExchangeClient對象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } }
因此有棧信息:
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:88)
再看HeaderExchangeClient.request方法
public ResponseFuture request(Object request) throws RemotingException { //這裏channel對象是從類構造函數中賦值,this.channel = new HeaderExchangeChannel(client); return channel.request(request); }
因此有棧信息
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:112)
繼續追查HeaderExchangeChannel類request方法
public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } //重載後方法: public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { //經過具體channel 發送請求 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
這裏有牽涉channel對象,這裏的channel對象也是經過HeaderExchangeChannel類的構造函數,從上層方法傳進來的,
而HeaderExchangeChannel是由HeaderExchangeClient構造的,
HeaderExchangeClient對象是由HeaderExchanger的connect方法裏建立的
這裏回到HeaderExchanger的connect方法:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
能夠看到上文中HeaderExchangeChannel類中發送消息的channel對象是
Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
這句建立的。這裏的Transporters也是個門面類,是facade設計模式的實現,Transporters具體connect方法實現以下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //這裏具體走 NettyTransporter.connect // public Client connect(URL url, ChannelHandler listener) throws RemotingException { // return new NettyClient(url, listener); // } //因此這裏默認返回的NettyClient return getTransporter().connect(url, handler); } //這個方法根據spi返回NettyTransporter擴展類 public static Transporter getTransporter() { //這裏經過生成的Transporter$Adaptive 的實現以下: return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
因此最後是經過NettyClient類實例的send方法發送的具體請求,
NettyClient類send方法實如今其祖先類AbstractPeer中
public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false)); }
這個實現又調用NettyClient父類AbstractClient的send方法實現
public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } //獲取具體channel實例 Channel channel = getChannel(); //TODO getChannel返回的狀態是否包含null須要改進 if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); }
這裏的getChannel()方法由NettyClient自身實現,以下:
protected com.alibaba.dubbo.remoting.Channel getChannel() { Channel c = channel; if (c == null || !c.isConnected()) return null; return NettyChannel.getOrAddChannel(c, getUrl(), this); } //再到NettyChannel.getOrAddChannel方法 static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } //返回NettyChannel類 NettyChannel ret = channelMap.get(ch); if (ret == null) { NettyChannel nc = new NettyChannel(ch, url, handler); if (ch.isConnected()) { ret = channelMap.putIfAbsent(ch, nc); } if (ret == null) { ret = nc; } } return ret; }
因此有如下棧信息:
at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98) at com.alibaba.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:258) at com.alibaba.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:54)
後面就是jboss內部的調用和消息轉換:
at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:266) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) at org.jboss.netty.channel.Channels.write(Channels.java:611) at org.jboss.netty.channel.Channels.write(Channels.java:578) at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
最後就走到咱們開始打斷點的NettyHandler類writeRequested方法:
at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)