Dubbo的服務註冊與發佈的入口來源於dubbo-config模塊中的dubbo-config-springjava
基於 dubbo.jar 內的
META-INF/spring.handlers
配置,Spring 在遇到 dubbo 名稱空間時,會回調DubboNamespaceHandler
。spring全部 dubbo 的標籤,都統一用
DubboBeanDefinitionParser
進行解析,基於一對一屬性映射,將 XML 標籤解析爲 Bean 對象。apachespring.handlers:json
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandlerDubboNamespaceHandler#init:緩存
// 須要重點關注如下兩行代碼 registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAwarepublic interface InitializingBean { // bean初始化的時候被調用 void afterPropertiesSet() throws Exception; } DisposableBean, public interface DisposableBean { // 銷燬 void destroy() throws Exception; } ApplicationListener<ContextRefreshedEvent>, // 監聽 (事件發送器發佈的), public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { void onApplicationEvent(E var1); } BeanNameAware, public interface BeanNameAware extends Aware { void setBeanName(String var1); } ApplicationEventPublisherAware // 事件發送器 public interface ApplicationEventPublisherAware extends Aware { void setApplicationEventPublisher(ApplicationEventPublisher var1); } // 實現(spring上下文刷新的時候觸發) public void onApplicationEvent(ContextRefreshedEvent event) { if (!this.isExported() && !this.isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + this.getInterface()); } this.export(); // 發佈並導出 } }// super public synchronized void export() { this.checkAndUpdateSubConfigs(); // 可經過@Service(delay = 1000,export = false)配置 if (!shouldExport()) { // 當前服務是否須要發佈 return; } if (shouldDelay()) { // 是否須要延遲發佈 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { doExport(); // 後續會走到doExportUrls()方法 } }private void doExportUrls() { // registry://ip:port/com.xxx.xxxService?..... /** * registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService? application=springboot- dubbo&dubbo=2.0.2&pid=6672&qos.enable=false®istry=zookeeper&release=2.7.3&ti mestamp=1571066801904 * registryURLs的值如上所示 * loadRegistries()方法中是在組裝這樣一個URL地址 * // 默認 * <dubbo:protocol name="dubbo" valid="true" id="dubbo" prefix="dubbo.protocols." /> **/ List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }// 方法作了簡化 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 當前服務發佈的主機地址 host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 服務端口號 Integer port = this.findConfigedPorts(protocolConfig, name, map); // 組成最終的URL URL url = new URL(name, host, port, (String)this.getContextPath(protocolConfig).map((p) -> { return p + "/" + this.path; }).orElse(this.path), map); } // scope: local/remote兩種 // 若是服務的調用都在同一個jvm中完成走本地調用(此時不須要走遠程調用) String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { // 非遠程則走本地服務 exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { // 遠程 // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } /** * @Adaptive({PROXY_KEY}) * <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException; * ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); * 自適應擴展 * 最終必定會生成一個ProxyFactory$Adaptive#getInvoker // 默認使用javassist * * e.g.: * package org.apache.dubbo.rpc; * import org.apache.dubbo.common.extension.ExtensionLoader; * public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory { * public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache * .dubbo.rpc.RpcException { * if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker * argument == null"); * if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc * .Invoker argument getUrl() == null"); * org.apache.dubbo.common.URL url = arg0.getUrl(); * String extName = url.getParameter("proxy", "javassist"); * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension * (extName); * return extension.getProxy(arg0); * } * public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws * org.apache.dubbo.rpc.RpcException { * if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker * argument == null"); * if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc * .Invoker argument getUrl() == null"); * org.apache.dubbo.common.URL url = arg0.getUrl(); * String extName = url.getParameter("proxy", "javassist"); * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension * (extName); * return extension.getProxy(arg0, arg1); * } * public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class * arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException { * if (arg2 == null) throw new IllegalArgumentException("url == null"); * org.apache.dubbo.common.URL url = arg2; * String extName = url.getParameter("proxy", "javassist"); * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension * (extName); * return extension.getInvoker(arg0, arg1, arg2); * } * } * */ Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); /** * wrapperInvoker = RegistryProtocol * protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); * protocol是自適應擴展點所以會生成一個Protocol$Adaptive的代理類 * 調用的export方法是基於方法級別的適配 * @Adaptive * <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; * ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry") -- > 會獲得RegistryProtocol */ Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } }RegistryProtocol:public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { /** *e.g.: * zookeeper:ip:2181/org.apache.dubbo.registry.ReigistryService?.... */ URL registryUrl = getRegistryUrl(originInvoker); // url to export locally /** * dubbo://ip:2181/com.xxx.xxxService?... */ URL providerUrl = getProviderUrl(originInvoker); //export invoker // 啓動一個netty服務 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 將dubbo://...的URL註冊到zookeeper上 final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish // boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { /** * 實現服務的註冊 * registryUrl => zookeeper://ip:port/org.apache.dubbo.RegistryService?... * */ register(registryUrl, registeredProviderUrl); // registeredProviderUrl => dubbo:ip:port/com.xxx.xxxService?... providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } public void register(URL registryUrl, URL registeredProviderUrl) { /** * @SPI("dubbo") * public interface RegistryFactory * RegistryFactory被SPI標註因此必定會有一個以RegistryFactory命名的擴展文件 * 經過@Adaptive({"protocol"})註解標註在方法上,因此這裏必定會生成一個RegistryFactory$Adaptive的代理類 * 由於咱們這邊使用zookeeper做爲註冊中心最終會使用zk的工廠,不過這裏沒有zk的實現,因此會找到抽象工廠AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL); * * Registry getRegistry(URL url); */ Registry registry = registryFactory.getRegistry(registryUrl); // zk => ZookeeperRegistry extends FailbackRegistry ->FailbackRegistry#register registry.register(registeredProviderUrl); } public Registry getRegistry(URL url) { //create registry by spi/ioc // zookeeper registry = createRegistry(url); } public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; /** * 經過set方法注入一個對象 */ public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } // @Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } } @SPI("curator") // 擴展點 public interface ZookeeperTransporter { // 仍是自適應擴展點,那麼確定能找到這個文件META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter // curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url); // 在這裏完成註冊 }private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { // originInvoker-> DelegateProviderMetaDataInvoker Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); //protocol-> Protocol$Adaptive -> QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol(invoker)))) return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }DubboProtocol:springboot
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // 緩存,一個key對應一個ExchangeServer ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // 建立服務 serverMap.put(key, createServer(url)); } } } } } private ExchangeServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); // 獲取當前應該採用何種方式發佈服務,netty3,netty4,mina,grizzy(Transporters類的實現) String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { // 綁定服務 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; } public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { /** * 自適應擴展點,默認= HeaderExchanger * */ return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { // 基於擴展點實現,在META-INF/dubbo/internal/能夠找到Exchanger // header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); } // org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { /** * 建立一個鏈 new DecodeHandler(new HeaderExchangeHandler(handler)) * 經過Transporters.bind綁定一個服務 * new HeaderExchangeServer */ return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } // org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...) public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } /** * getTransporter() => ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); * @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) * Server bind(URL url, ChannelHandler handler) throws RemotingException; * 經過方法層面的自適應擴展點生成Transporter$Adapter的代理類 * bind => Transporter$Adapter#bind, => 默認採用netty4 */ return getTransporter().bind(url, handler); }