Dubbo的服務註冊與發佈

Dubbo的服務註冊與發佈的入口來源於dubbo-config模塊中的dubbo-config-springjava

基於 dubbo.jar 內的 META-INF/spring.handlers 配置,Spring 在遇到 dubbo 名稱空間時,會回調 DubboNamespaceHandlerspring

全部 dubbo 的標籤,都統一用 DubboBeanDefinitionParser 進行解析,基於一對一屬性映射,將 XML 標籤解析爲 Bean 對象。apache

spring.handlers:json

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

DubboNamespaceHandler#init:緩存

 

 // 須要重點關注如下兩行代碼   

 registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));     registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); 
ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware
public interface InitializingBean {
		// bean初始化的時候被調用
    void afterPropertiesSet() throws Exception;
		}
DisposableBean,
	public interface DisposableBean {
// 銷燬
	void destroy() throws Exception;
	}

	
ApplicationListener<ContextRefreshedEvent>, 
	// 監聽 (事件發送器發佈的),
	public interface ApplicationListener<E extends ApplicationEvent> extends 			EventListener {
		void onApplicationEvent(E var1);
	}

BeanNameAware,
	public interface BeanNameAware extends Aware {
		 void setBeanName(String var1);
	}

ApplicationEventPublisherAware    
	// 事件發送器
	public interface ApplicationEventPublisherAware extends Aware {
		  void setApplicationEventPublisher(ApplicationEventPublisher var1);
	}



// 實現(spring上下文刷新的時候觸發)
public void onApplicationEvent(ContextRefreshedEvent event) {
  if (!this.isExported() && !this.isUnexported()) {
      if (logger.isInfoEnabled()) {
          logger.info("The service ready on spring started. service: " + this.getInterface());
      }

      this.export(); // 發佈並導出
  }

}
// super
public synchronized void export() {
  this.checkAndUpdateSubConfigs();
 // 可經過@Service(delay = 1000,export = false)配置
   if (!shouldExport()) { // 當前服務是否須要發佈
            return;
        }

        if (shouldDelay()) { // 是否須要延遲發佈
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport(); // 後續會走到doExportUrls()方法
        }
}
private void doExportUrls() {
		// registry://ip:port/com.xxx.xxxService?.....
 /**
     *  registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?			application=springboot-						dubbo&dubbo=2.0.2&pid=6672&qos.enable=false&registry=zookeeper&release=2.7.3&ti			mestamp=1571066801904
     *  registryURLs的值如上所示
     * loadRegistries()方法中是在組裝這樣一個URL地址

     * // 默認
     * <dubbo:protocol name="dubbo" valid="true" id="dubbo" prefix="dubbo.protocols." />
  **/
  List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }

}
// 方法作了簡化
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

// 當前服務發佈的主機地址
host = this.findConfigedHosts(protocolConfig, registryURLs, map);
 // 服務端口號
  Integer port = this.findConfigedPorts(protocolConfig, name, map);
 // 組成最終的URL
 
  URL url = new URL(name, host, port, (String)this.getContextPath(protocolConfig).map((p) -> {
      return p + "/" + this.path;
  }).orElse(this.path), map);
 
}
 // scope: local/remote兩種
// 若是服務的調用都在同一個jvm中完成走本地調用(此時不須要走遠程調用)
 String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
               // 非遠程則走本地服務
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                
                 // 遠程
                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                       /**
                         *   @Adaptive({PROXY_KEY})
                         *   <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
                         *   ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
                         *   自適應擴展
                         *   最終必定會生成一個ProxyFactory$Adaptive#getInvoker // 默認使用javassist
                         *
                         *   e.g.:
                         *   package org.apache.dubbo.rpc;
                         * import org.apache.dubbo.common.extension.ExtensionLoader;
                         * public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
                         * public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache
                         * .dubbo.rpc.RpcException {
                         * if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker
                         * argument == null");
                         * if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc
                         * .Invoker argument getUrl() == null");
                         * org.apache.dubbo.common.URL url = arg0.getUrl();
                         * String extName = url.getParameter("proxy", "javassist");
                         * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache
                         * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
                         * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
                         * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension
                         * (extName);
                         * return extension.getProxy(arg0);
                         * }
                         * public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws
                         * org.apache.dubbo.rpc.RpcException {
                         * if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker
                         * argument == null");
                         * if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc
                         * .Invoker argument getUrl() == null");
                         * org.apache.dubbo.common.URL url = arg0.getUrl();
                         * String extName = url.getParameter("proxy", "javassist");
                         * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache
                         * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
                         * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
                         * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension
                         * (extName);
                         * return extension.getProxy(arg0, arg1);
                         * }
                         * public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class
                         * arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
                         * if (arg2 == null) throw new IllegalArgumentException("url == null");
                         * org.apache.dubbo.common.URL url = arg2;
                         * String extName = url.getParameter("proxy", "javassist");
                         * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache
                         * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
                         * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
                         * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension
                         * (extName);
                         * return extension.getInvoker(arg0, arg1, arg2);
                         * }
                         * }
                         *
                         */
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        /**
                         * wrapperInvoker = RegistryProtocol
                         * protocol =  ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                         * protocol是自適應擴展點所以會生成一個Protocol$Adaptive的代理類
                         * 調用的export方法是基於方法級別的適配
                         * @Adaptive
                         * <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
                         * ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry") -- > 會獲得RegistryProtocol
                         */

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                }
        }
RegistryProtocol:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        /**
         *e.g.:
         * zookeeper:ip:2181/org.apache.dubbo.registry.ReigistryService?....
         */
        URL registryUrl = getRegistryUrl(originInvoker);

        // url to export locally
        /**
         * dubbo://ip:2181/com.xxx.xxxService?...
         */
        URL providerUrl = getProviderUrl(originInvoker);

       

        //export invoker
        // 啓動一個netty服務
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 將dubbo://...的URL註冊到zookeeper上
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        //
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            /**
             * 實現服務的註冊
             * registryUrl => zookeeper://ip:port/org.apache.dubbo.RegistryService?...
             * 
             */
            register(registryUrl, registeredProviderUrl); 
            // registeredProviderUrl => dubbo:ip:port/com.xxx.xxxService?...
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

public void register(URL registryUrl, URL registeredProviderUrl) {
        /**
         *  @SPI("dubbo")
         *  public interface RegistryFactory
         *  RegistryFactory被SPI標註因此必定會有一個以RegistryFactory命名的擴展文件
         *  經過@Adaptive({"protocol"})註解標註在方法上,因此這裏必定會生成一個RegistryFactory$Adaptive的代理類
         *  由於咱們這邊使用zookeeper做爲註冊中心最終會使用zk的工廠,不過這裏沒有zk的實現,因此會找到抽象工廠AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL);
         *
         *  Registry getRegistry(URL url);
         */
        Registry registry = registryFactory.getRegistry(registryUrl);
        // zk => ZookeeperRegistry extends FailbackRegistry ->FailbackRegistry#register
        registry.register(registeredProviderUrl);
    }


public Registry getRegistry(URL url) {
       //create registry by spi/ioc
            // zookeeper
            registry = createRegistry(url);
    }

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    /**
     * 經過set方法注入一個對象
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

   // 
    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}
@SPI("curator") // 擴展點

public interface ZookeeperTransporter {
    // 仍是自適應擴展點,那麼確定能找到這個文件META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
    // curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url); // 在這裏完成註冊

}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
//           originInvoker-> DelegateProviderMetaDataInvoker
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            //protocol-> Protocol$Adaptive -> QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol(invoker))))
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

DubboProtocol:springboot

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            // 緩存,一個key對應一個ExchangeServer
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // 建立服務
                        serverMap.put(key, createServer(url));
                    }
                }
            } 
        }
    }

private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        // 獲取當前應該採用何種方式發佈服務,netty3,netty4,mina,grizzy(Transporters類的實現)
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 綁定服務
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

   public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
       

        /**
         * 自適應擴展點,默認= HeaderExchanger
         *
         */
        return getExchanger(url).bind(url, handler);
    }


    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        // 基於擴展點實現,在META-INF/dubbo/internal/能夠找到Exchanger
//        header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }


// org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind

 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        /**
         *  建立一個鏈 new DecodeHandler(new HeaderExchangeHandler(handler))
         *  經過Transporters.bind綁定一個服務
         *  new HeaderExchangeServer
         */
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

// org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)

  public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        /**
         * getTransporter() => ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
         *  @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
         *  Server bind(URL url, ChannelHandler handler) throws RemotingException;
         *  經過方法層面的自適應擴展點生成Transporter$Adapter的代理類
         *  bind => Transporter$Adapter#bind, => 默認採用netty4
         */
        return getTransporter().bind(url, handler);
    }
相關文章
相關標籤/搜索