本例以一個簡單典型的服務發佈爲例,spring配置以下java
//dubbo協議 <dubbo:protocol name="dubbo" port="20880" id="dubbo1"/> //zk註冊中心 <dubbo:registry id="hangzhouRegistry" address="zookeeper://192.168.64.128:2181"/> <dubbo:service interface="demo.dubbo.api.DemoService" ref="demoService" protocol="dubbo1" />
//服務接口 public interface DemoService { public String sayHello(String name); } //實現 public class DemoServiceImpl implements DemoService { public String sayHello(String name) { Random random=new Random(); try { Thread.sleep(800* random.nextInt(6)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress(); } }
有博文 dubbo基於spring的構建分析,能夠看到dubbo服務發佈註冊的啓動方法有兩個入口,
第一在ServiceBean類的onApplicationEvent()方法,在容器初始化完成後,執行暴露邏輯
第二在ServiceBean類的afterPropertiesSet()方法,當前servicebean屬性構造完成後,執行暴露邏輯
具體暴露方法在其父類ServiceConfig的ServiceConfig方法裏spring
//這是個同步的方法 public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } //延遲暴露 經過delay參數配置的,延遲暴露,放入單獨線程中。 if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { //暴露方法 doExport(); } }
/*** * 也是個同步的方法,暴露過程具體過程 */ protected synchronized void doExport() { //....屬性檢查和賦值,代碼略 //暴露過程 doExportUrls(); } private void doExportUrls() { //獲取註冊中心信息 List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { //多個協議,暴露屢次 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } //暴露過程 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //屬性的解析和賦值..... //..... 代碼略..... //獲取服務暴露範圍,本地或者遠程 String scope = url.getParameter(Constants.SCOPE_KEY); //配置爲none不暴露 //不配默認,服務暴露遠程同時在本地暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { //本地暴露 (***看這裏***)關鍵1 exportLocal(url); } //若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露本地服務) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { //有多個註冊中心,暴露到多個註冊中心 for (URL registryURL : registryURLs) { //url 添加dynamic 屬性值 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //(***看這裏***)關鍵2 //默認走到JavassistProxyFactory.getInvoker方法 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //這裏的invoker的url的協議是register類型 Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { //沒有註冊中心 ,只在本機ip打開服務端口,生成服務代理,並不註冊到註冊中心。 //(***看這裏***)關鍵3 此處的url協議爲dubbo Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url); }
上面方法中有三個地方涉及到具體的服務暴露先看,關鍵1bootstrap
private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL)//設置爲injvm 協議 .setHost(NetUtils.LOCALHOST) .setPort(0); //這裏的protocol是Protocol$Adpative的實例(spi機制) //proxyFactory是ProxyFactory$Adpative實例(spi機制) Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); //放到暴露列表 exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }
ProxyFactory$Adpative的getInvoker方法api
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); //這裏的extension默認是JavassistProxyFactory實例, return extension.getInvoker(arg0, arg1, arg2); }
JavassistProxyFactory的getInvoker方法緩存
//proxy 是服務實現類,type是服務接口 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { //利用Wrapper類經過服務接口生成對應的代理類。 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //實現抽象類AbstractProxyInvoker抽象方法doInvoke,並調用(proxy, type, url)構造函數實例化匿名類 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { //這裏調用代理類的invokeMethod方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
AbstractProxyInvoker的構造方法併發
public AbstractProxyInvoker(T proxy, Class<T> type, URL url) { if (proxy == null) { throw new IllegalArgumentException("proxy == null"); } if (type == null) { throw new IllegalArgumentException("interface == null"); } if (!type.isInstance(proxy)) { throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type); } this.proxy = proxy; this.type = type; this.url = url;//賦值ur到自身(invoker),這個後面用到 }
AbstractProxyInvoker的invoke方法app
public Result invoke(Invocation invocation) throws RpcException { try { //回調用doInvoke方法,會傳入執行實例proxy,方法名和方法參數類型和值 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
這裏以文章開頭DemoService接口爲例經過Wrapper.getWrapper返回的類代碼,這裏須要代碼hack框架
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.common.DemoService; import java.lang.reflect.InvocationTargetException; import java.util.Map; public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; } public boolean hasProperty(String paramString) { return pts.containsKey(paramString); } public Class getPropertyType(String paramString) { return (Class)pts.get(paramString); } public String[] getMethodNames() { return mns; } public String[] getDeclaredMethodNames() { return dmns; } public void setPropertyValue(Object paramObject1, String paramString, Object paramObject2) { try { DemoService localDemoService = (DemoService)paramObject1; } catch (Throwable localThrowable) { throw new IllegalArgumentException(localThrowable); } throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService."); } public Object getPropertyValue(Object paramObject, String paramString) { try { DemoService localDemoService = (DemoService)paramObject; } catch (Throwable localThrowable) { throw new IllegalArgumentException(localThrowable); } throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService."); } //(**看這裏,關鍵方法實現****) public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject) throws InvocationTargetException { DemoService localDemoService; try { //賦值執行實例,這裏是接口實現類,DemoServiceImpl對象 localDemoService = (DemoService)paramObject; } catch (Throwable localThrowable1) { throw new IllegalArgumentException(localThrowable1); } try { //根據傳入的要調用的方法名paramString,方法參數值,調用執行實例方法 if (("sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) return localDemoService.sayHello((String)paramArrayOfObject[0]); } catch (Throwable localThrowable2) { throw new InvocationTargetException(localThrowable2); } throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.common.DemoService."); } }
到這比較清楚瞭解,具體的代理過程了。dom
第一步上面invoker對象生成好後,接下來就要經過Protocol$Adpative的export方法暴露服務jvm
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //根據上文本地調用這裏的protocal協議別設置爲injvm //因此這裏會走到InjvmProtocol的export方法 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }
看下InjvmProtocol的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //返回InjvmExporter對象 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
InjvmExporter構造函數
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker);//invoker賦給自身 this.key = key; this.exporterMap = exporterMap; //存的形式,serviceKey:自身(exporter) put到map關聯起來,這樣能夠經過servciekey找到exporterMap而後找到invoker exporterMap.put(key, this); }
這裏的exporterMap是由InjvmProtocol實例擁有,而InjvmProtocol有時單例的,由於InjvmProtocol類有如下實例和方法:
//靜態自身成員變量 private static InjvmProtocol INSTANCE; //構造方法,把自身賦值給INSTANCE對象 public InjvmProtocol() { INSTANCE = this; }
因此exporterMap對象也是單例的。同時這裏順便看下InjvmProtocol的refer方法,本地服務的引用查找也是經過自身exporterMap對象。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { //把exporterMap對象賦值給InjvmInvoker return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap); } //具體查找過程 public Result doInvoke(Invocation invocation) throws Throwable { //經過exporterMap獲取exporter Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0); return exporter.getInvoker().invoke(invocation); }
以上是本地服務的發佈和引用過程
接下來看下本例中會用到的遠程服務發佈暴露過程,即暴露服務端口併發布服務信息到註冊中心。也便是關鍵2代碼處
經過上面本地服務暴露過程分析能夠知道,遠程服務的態代理生成過程和本地服務代理生成是同樣的,惟一區別點是構造invoker是傳入的url是registryURL
傳入url的不一樣,會形成下面這句的執行過程的不一樣
Exporter<?> exporter = protocol.export(invoker);
這裏protocol經過spi走的是Protocol$Adpative的export方法:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //因爲傳入的url是registryURL因此會走RegistryProtocol的export方法 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }
RegistryProtocol的export方法以下
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker 暴露invoker (***看doLocalExport方法**) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider 獲取對應註冊中心操做對象 final Registry registry = getRegistry(originInvoker); //獲取要註冊到註冊中心的地址 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //註冊服務url到註冊中心(***把服務信息註冊到註冊中心**) registry.register(registedProviderUrl); // 訂閱override數據 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter實例 //實現一個匿名類實現接口Exporter return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } //取消暴露的過程 public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { //取消註冊 registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { //取消訂閱 overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
doLocalExport方法,這裏方法裏涉及過程比較多
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { //經過原始originInvoker構造緩存key String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); //緩存沒有,走具體暴露邏輯 if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //InvokerDelegete是RegistryProtocol類的靜態內部類,繼承自InvokerWrapper, //經過構造器賦值持有代理originInvoker和服務暴露協議url對象,算是包裝一層 //而url 是經過getProviderUrl(originInvoker)返回的,此時url的協議已經是dubbo,即服務暴露的協議 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //ExporterChangeableWrapper是RegistryProtocol的私有內部類實現了Exporter接口。 //經過調用它的構造方法(Exporter<T> exporter, Invoker<T> originInvoker)構造exporterWrapper實例 //而這裏傳入的exporter是經過(Exporter<T>) protocol.export(invokerDelegete)語句建立 //由上一步知道,這裏的invokerDelegete裏url屬性的protocol協議已是dubbo //下面具體看下protocol.export(invokerDelegete)方法。 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
還對Protocol$Adpative類的export邏輯分析
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //因爲這裏invokerDelegete的url屬性的protocol是dubbo,因此這裏走DubboProtocol的export協議 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }
接着看下DubboProtocol的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // get serviceKey. String key = serviceKey(url);//key的組成group/service:version:port //構造服務的exporter //如同InjvmProtocol同樣,DubboProtocol也是單例的 因此這裏exporterMap也是單例的 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //經過key放入exporterMap,把持有invoker的exporter 和serviceKey關聯 //這個在後面服務調用時,能夠經過key找到對應的exporter進而找到invoker提供服務 exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根據url開啓一個服務,好比綁定端口,開始接受請求信息(**繼續看這裏***) openServer(url); return exporter; }
private void openServer(URL url) { //key=host:port 用於定位server String key = url.getAddress(); //client也能夠暴露一個只有server能夠調用的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { //服務實例放到serverMap,key是host:port //這裏的serverMap也單例的 ExchangeServer server = serverMap.get(key); if (server == null) { //經過createServer(url)方法獲取server (***看這裏***) serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } } /*** * 開啓服務 * @param url * @return */ private ExchangeServer createServer(URL url) { //默認開啓server關閉時發送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默認開啓heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); /*** * 經過server key 檢查是不是dubbo目前spi擴展支持的傳輸框架。默認是netty */ if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); //經過codec key 獲取編解碼方案,兼容dubbo1,默認是dubbo1compatible ,不然默認dubbo 編解碼方案 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //構造具體服務實例, //Exchangers是門面類,裏面封裝了具體交換層實現,並調用它的bind方法(***看這裏***) server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } //這裏會驗證一下,客戶端傳輸層實現 //若是沒有對應的實現,會拋出異常 str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
跟蹤方法
//Exchangers.bind方法 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //經過spi會走HeaderExchanger的bind邏輯 return getExchanger(url).bind(url, handler); } //HeaderExchanger的bind方法 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //Transporters也是門面類,封裝了具體的傳輸層實現查找和bind過程 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } //HeaderExchangeServer的構造函數 public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } //開啓心跳 startHeatbeatTimer(); }
繼續跟進方法
//Transporters.bind方法 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //根據spi 這裏具體走 NettyTransporter.bind return getTransporter().bind(url, handler); } //NettyTransporter的bind方法 public Server bind(URL url, ChannelHandler listener) throws RemotingException { //能夠看到這裏是NettyServer實例 return new NettyServer(url, listener); } //NettyServer構造器 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { //調用父類AbstractServer構造器 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
父類AbstractServer的構造函數
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { //打開端口,啓動服務,在其子類實現,這裏是NettyServer的doOpen()方法(***看這裏**) doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
NettyServer的doOpen()方法:
protected void doOpen() throws Throwable { //經過netty 開啓服務監聽端口 NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder());//解碼器 pipeline.addLast("encoder", adapter.getEncoder());//編碼器 pipeline.addLast("handler", nettyHandler);//NettyHandler 擴展netty雙向handler基類r 能夠接受進站和出站數據流 return pipeline; } }); // bind 地址 開啓端口 channel = bootstrap.bind(getBindAddress()); }
以上基本梳理了的dubbo從service配置解析到生成服務代理,並經過netty開啓服務端口的服務暴露過程。