Dubbo(六):zookeeper註冊中心的應用

  Dubbo中有一個很是本質和重要的功能,那就是服務的自動註冊與發現,而這個功能是經過註冊中心來實現的。而dubbo中考慮了外部許多的註冊組件的實現,zk,redis,etcd,consul,eureka...java

  各自實現方式各有不一樣,可是對外表現都是一致的:都實現了 Registry 接口!node

  今天咱們就來看看最經常使用和註冊中心 Zookeeper 的接入實現吧!redis

 

1. dubbo在 zookeeper 中的服務目錄劃分

  註冊中心的做用主要分發服務的發佈,與服務訂閱,及服務推送,服務查詢!而zk中,則以服務主單位進行目錄劃分的。apache

  整個zookeeper的路徑概覽:bootstrap

    /dubbo: root 目錄, 持久化目錄 (可經過 group=xxx 自定義root目錄)
    /dubbo/xxx.xx.XxxService: 每一個服務一個路徑,持久化目錄
    /dubbo/xxx.xx.XxxService/configurators: 配置存放路徑,默認爲空
    /dubbo/xxx.xx.XxxService/providers: 服務提供者目錄,全部提供者以其子路徑形式存儲,持久化目錄。各服務提供者以臨時目錄形式存在,路徑樣例如: dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D112756%26release%3D%26side%3Dprovider%26timestamp%3D1588548412331
    /dubbo/xxx.xx.XxxService/consumers: 服務消費者目錄, 持久化路徑。全部消費者以其子路徑形式存儲,路徑樣例以下:consumer%3A%2F%2F192.168.1.4%2Forg.apache.dubbo.rpc.service.GenericService%3Fapplication%3Ddubbo-demo-api-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26generic%3Dtrue%26interface%3Dorg.apache.dubbo.demo.DemoService%26pid%3D139620%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1588596195728
    /dubbo/xxx.xx.XxxService/routers: 路由配置信息,默認爲空api

 

2. zookeeper 組件的接入

  Zookeeper 是在本地服務經過socket端口暴露以後,再調用 RegistryFactoryWrapper 進行獲取的。緩存

  註冊時的URL以下: registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613&pid=135556&registry=zookeeper&timestamp=1588518545808session

  zk的調用分兩種狀況,一是做爲服務提供者時會在 ServiceConfig#doExportUrlsFor1Protocol 中,進行遠程服務暴露時會拉起。二是在消費者在進行遠程調用時會 ReferenceConfig#createProxy 時拉取以便獲取提供者列表。咱們以ServiceConfig爲例看看其調用zk的過程:app

    // ServiceConfig#doExportUrlsFor1Protocol 
    // 此處 PROTOCOL 是一個SPI類,默認實例是DubboProtocol,但其在處理具體協議時會根據協議類型作出相應選擇
    // 此處 協議爲 registry, 因此會選擇 RegistryProtocol 進行export() 處理
    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

    // 而在RegistryProtocol中,又有幾個重要屬性,是dubbo進行依賴注入完成的
    // org.apache.dubbo.registry.integration.RegistryProtocol    
    public void setCluster(Cluster cluster) {
        this.cluster = cluster;
    }
    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }
    public void setRegistryFactory(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }


    // org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 檢查是不是要進行服務註冊,即檢查協議字段是不是 registry, 即前綴是 registry://
        // 註冊完成後即返回
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            // ProtocolFilterWrapper.export()
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
    }
    // org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 一樣判斷是不是註冊請求
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            // RegistryProtocol.export()
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
    // org.apache.dubbo.registry.integration.RegistryProtocol#export
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 獲取registry實例,如 zk client
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 註冊服務地址到註冊中心
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        // 訂閱目錄
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        // RegistryFactory 又是經過 SPI 機制生成的    
        // 會根據具體的註冊中心的類型建立調用具體實例,如此處爲: zookeeper, 因此會調用 ZookeeperRegistryFactory.getRegistry()
        return registryFactory.getRegistry(registryUrl);
    }
    // 全部 RegistryFactory 都會被包裝成 RegistryFactoryWrapper, 以便修飾
    // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
    @Override
    public Registry getRegistry(URL url) {
        // 對於zk, 會調用 ZookeeperRegistryFactory
        return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
                        .getActivateExtension(url, "registry.listeners")));
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 調用子類方法建立 registry 實例,此處爲 ZookeeperRegistry.createRegistry
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
    @Override
    public Registry createRegistry(URL url) {
        // 最終將Zk組件接入到應用中了,後續就可使用zk提供的相應功能了
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

  至此,zookeeper被接入了。咱們先來看下 zookeeper 註冊中心構造方法實現:socket

    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        // 抽象父類處理
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // group=dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener((state) -> {
            if (state == StateListener.RECONNECTED) {
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#FailbackRegistry
    public FailbackRegistry(URL url) {
        super(url);
        // retry.period=5000
        this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);

        // since the retry task will not be very much. 128 ticks is enough.
        // 當鏈接失敗時,使用後臺定時重試
        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry
    public AbstractRegistry(URL url) {
        setUrl(url);
        // 默認會開啓file.cache 選項,因此通常會要求存在該文件
        if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
            // Start file save timer
            syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
            // 默認路徑 $USER_HOME/.dubbo/dubbo-registry-dubbo-demo-api-provider-127.0.0.1-2181.cache
            String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
            String filename = url.getParameter(FILE_KEY, defaultFilename);
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                    if (!file.getParentFile().mkdirs()) {
                        throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            // When starting the subscription center,
            // we need to read the local cache file for future Registry fault tolerance processing.
            // 若是文件存在先從其中加載原有配置
            loadProperties();
            notify(url.getBackupUrls());
        }
    }
    // 若是存在註冊中心緩存文件,則從其中加載各屬性值
    // 其值爲 xxx.xxxService=xxx 格式
    private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                in = new FileInputStream(file);
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Load registry cache file " + file + ", data: " + properties);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry cache file " + file, e);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#notify(java.util.List<org.apache.dubbo.common.URL>)
    protected void notify(List<URL> urls) {
        if (CollectionUtils.isEmpty(urls)) {
            return;
        }

        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();

            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }

            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

  該構造函數的主要做用是創建與zkServer的鏈接實例,添加狀態監聽,以及本地緩存文件的處理。

 

3. Zookeeper 服務提供者註冊

  上一節咱們看到在初始化 zookeeper 時,是在export()過程當中經過 getRegistry() 實現的。一樣,在export()過程當中,在獲取了註冊中心實例後,還須要將服務地址註冊上去,纔算功成。服務註冊的時序圖以下:

    // org.apache.dubbo.registry.integration.RegistryProtocol#export
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 1. 獲取註冊中心實例
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        // 2. 將 registeredProviderUrl 註冊上去
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    private void register(URL registryUrl, URL registeredProviderUrl) {
        // 此處獲取 registry 實際是registry的一個 wrapper, registryUrl 以 zookeeper:// 開頭
        Registry registry = registryFactory.getRegistry(registryUrl);
        // 此處的 registeredProviderUrl 如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=135556&release=&side=provider&timestamp=1588518573613
        registry.register(registeredProviderUrl);
    }
    // org.apache.dubbo.registry.ListenerRegistryWrapper#register
    @Override
    public void register(URL url) {
        try {
            // 調用實際的 registry
            registry.register(url);
        } finally {
            if (CollectionUtils.isNotEmpty(listeners)) {
                RuntimeException exception = null;
                for (RegistryServiceListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.onRegister(url);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#register
    @Override
    public void register(URL url) {
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            // 調用zk進行url註冊
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedRegistered(url);
        }
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
    @Override
    public void doRegister(URL url) {
        try {
            // 建立zk node, dynamic=true, 默認建立的節點爲臨時節點
            // url地址如:  dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider&timestamp=1588639020690
            // 此地址經過 toUrlPath 轉換爲 zookeeper 的目錄地址,好比 providers 目錄,consumers 目錄...
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    // 將提供者信息轉移爲zk目錄
    private String toUrlPath(URL url) {
        // 找到分類目錄,最下級爲當前提供者的全部url信息encode後的值
        // 即 /dubbo/interfaceName/providers/xxxx
        return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
    }
    private String toCategoryPath(URL url) {
        // category='', 默認爲 providers
        return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
    }
    private String toServicePath(URL url) {
        String name = url.getServiceInterface();
        if (ANY_VALUE.equals(name)) {
            return toRootPath();
        }
        // /dubbo/interfaceName
        return toRootDir() + URL.encode(name);
    }

  服務註冊主要就分兩步:1. 獲取registry實例(經過SPI機制); 2. 將服務的url轉換爲對應的路徑,以臨時節點的形式create到zkServer; 以服務名做爲父路徑,以自身服務url做爲葉子路徑,能夠很方便在相應路徑上找到全部的提供者或消費者。

 

4. zookeeper 服務下線處理

  當應用要關閉,或者註冊失敗時,須要進行服務下線。固然,若是應用沒有及時作下線處理,zk會經過其自身的臨時節點過時機制,也會將該服務作下線處理。從而避免消費者或管理臺看到無效的服務存在。

  應用服務的主動下線操做是由 ShutdownHookCallbacks 來實現的,其時序圖以下:

    // org.apache.dubbo.config.bootstrap.DubboBootstrap#DubboBootstrap
    private DubboBootstrap() {
        configManager = ApplicationModel.getConfigManager();
        environment = ApplicationModel.getEnvironment();
        // 在 DubboBootstrap 建立時就建立幾個關閉鉤子
        DubboShutdownHook.getDubboShutdownHook().register();
        // 將 DubboBootstrap 的銷燬動做添加到 DubboShutdownHook 的執行隊列中,以便在關閉時一塊兒調用
        ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
            @Override
            public void callback() throws Throwable {
                DubboBootstrap.this.destroy();
            }
        });
    }
    /**
     * Register the ShutdownHook
     */
    public void register() {
        if (registered.compareAndSet(false, true)) {
            DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
            Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
            dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
        }
    }
    // org.apache.dubbo.config.bootstrap.DubboBootstrap#destroy
    public void destroy() {
        if (destroyLock.tryLock()) {
            try {
                // DubboShutdownHook 實現的destroy方法
                DubboShutdownHook.destroyAll();

                if (started.compareAndSet(true, false)
                        && destroyed.compareAndSet(false, true)) {

                    unregisterServiceInstance();
                    unexportMetadataService();
                    unexportServices();
                    unreferServices();

                    destroyRegistries();
                    DubboShutdownHook.destroyProtocols();
                    destroyServiceDiscoveries();

                    clear();
                    shutdown();
                    release();
                }
            } finally {
                destroyLock.unlock();
            }
        }
    }
    // org.apache.dubbo.config.DubboShutdownHook#destroyAll
    public static void destroyAll() {
        if (destroyed.compareAndSet(false, true)) {
            AbstractRegistryFactory.destroyAll();
            destroyProtocols();
        }
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
    /**
     * Close all created registries
     */
    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            // 獲取全部註冊的地址,一一進行destroy()操做
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#destroy
    @Override
    public void destroy() {
        super.destroy();
        try {
            // (解綁)銷燬動做只需執行一次,一次會將全部須要解綁的地址所有操做完成
            zkClient.close();
        } catch (Exception e) {
            logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    // org.apache.dubbo.registry.support.FailbackRegistry#destroy
    @Override
    public void destroy() {
        super.destroy();
        retryTimer.stop();
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        // 此處會將在該 register 中註冊的全部地址進行解綁,因此,當前實例只需調用一次destroy()便可
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister
    @Override
    public void doUnregister(URL url) {
        try {
            // 註冊地址如: /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

  其實就是一個關閉鉤子,進行路徑的主動刪除操做。

 

5. zookeeper 消費者服務訂閱

  前面兩個操做,服務的註冊與解綁,能夠針對提供者,也能夠針對消費者。可是對服務訂閱則更可能是針對消費者!因其要時刻關注提供者的變化,接收註冊中心的消息推送。其以 ReferenceConfig.get() 入口。因此,咱們以消費者的視圖進行解析zk的訂閱過程:

    // org.apache.dubbo.registry.integration.RegistryProtocol#refer
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

    // org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        // 註冊消費者監聽到 zk 中
        // 添加路徑 providers, configurators, routers, 以便在同時監聽幾個目錄
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }
    private static URL toSubscribeUrl(URL url) {
        return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
    }

    // org.apache.dubbo.registry.ListenerRegistryWrapper#subscribe
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        try {
            registry.subscribe(url, listener);
        } finally {
            if (CollectionUtils.isNotEmpty(listeners)) {
                RuntimeException exception = null;
                for (RegistryServiceListener registryListener : listeners) {
                    if (registryListener != null) {
                        try {
                            registryListener.onSubscribe(url);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#subscribe
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), k);
                        }
                    }
                });
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                // configurators
                List<URL> urls = new ArrayList<>();
                // /dubbo/org.apache.dubbo.demo.DemoService/configurators 
                // url 樣例: provider://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.56.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider&timestamp=1588639020690
                // 它會通過 toCategoriesPath() 轉換爲多個控制目錄,依次註冊
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // listener 是 ConcurrentHashMap 中的 ConcurrentHashMap, 因此迭代也是兩層的
                    // 當目錄發生變動時,運行 notify() 方法
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    // 與提供者的服務註冊不同,消費者的訂閱路徑是 持久化的
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 自身註冊完成後,主動觸發一次 notify() 以刷新信息
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    private String[] toCategoriesPath(URL url) {
        String[] categories;
        if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
            categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
        } else {
            categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
        }
        String[] paths = new String[categories.length];
        for (int i = 0; i < categories.length; i++) {
            // servicePath: /dubbo/interfaceName
            paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
        }
        return paths;
    }

  事實上,其步驟與提供者是相似的:1. 獲取registry實例; 2. 註冊自身消費標識到consumers目錄; 3. 訂閱providers,configurators,routers 子目錄變動;

 

6. zookeeper 服務信息通知notify

  當註冊中心發現服務提供者發生了變化時,將會將該信息推送給訂閱了相應路徑的客戶端。這個客戶端首先會被前面設置的 zkListener 處理。

    // ZookeeperRegistry.doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        ...
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // 服務回調將由 ZookeeperRegistry.this.notify() 處理
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
        ...
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#notify            
    @Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // Record a failed registration request to a failed list, retry regularly
            addFailedNotified(url, listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#doNotify
    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        super.notify(url, listener, urls);
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>)
    /**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((CollectionUtils.isEmpty(urls))
                && !ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            // 再交由最原始傳入的 listener 進行處理, 即 RegistryDirectory.notify()
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }
    // org.apache.dubbo.registry.integration.RegistryDirectory#notify
    @Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        // 添加到全部提供者列表中,以便在後續請求能夠將最新的地址信息給consumer使用
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        /**
         * 3.x added for extend URL address
         */
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
            }
        }
        refreshOverrideAndInvoker(providerURLs);
    }

  

7. zookeeper 服務解除事件訂閱

  除了服務下線外,還須要解除訂閱關係。以便zookeeper不用再維護其監聽推送。

    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnsubscribe
    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners != null) {
            ChildListener zkListener = listeners.get(listener);
            if (zkListener != null) {
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    zkClient.removeChildListener(root, zkListener);
                } else {
                    // 將路徑列舉出來,一個個解除監聽
                    for (String path : toCategoriesPath(url)) {
                        zkClient.removeChildListener(path, zkListener);
                    }
                }
            }
        }
    }
    // 解除監聽操做一樣是由關閉鉤子 觸發的
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        // 解除監聽操做
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#unsubscribe
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        super.unsubscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a canceling subscription request to the server side
            // 調用 ZookeeperRegistry.doUnregister(), 刪除路徑監聽
            doUnsubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedUnsubscribed(url, listener);
        }
    }

  和前面的下線操做意義相同,只是一個是 delete 路徑,一個是解除變動監聽。

 

  dubbo 中以服務爲單位進行註冊通知管理,避免了一個因應用提供者提供許多服務,而致使消費者必須處理大量數據的狀況。各消費者只關注與本身相關的服務路徑,從而最小化影響。經過臨時節點和心跳機制,保證了各服務的準確體現。這是zk比較擅長的。雖然該點功能小,卻意義重大。

相關文章
相關標籤/搜索