Dubbo服務暴露過程

概覽

dubbo暴露服務有兩種狀況,一種是設置了延遲暴露(好比delay=」5000」),另一種是沒有設置延遲暴露或者延遲設置爲-1(delay=」-1」):java

  • 設置了延遲暴露,dubbo在Spring實例化bean(initializeBean)的時候會對實現了InitializingBean的類進行回調,回調方法是afterPropertySet()。ServiceBean實現了若是InitializingBean接口,重寫了afterPropertySet()方法。若是設置了延遲暴露,dubbo在這個方法中進行服務的發佈。
  • 沒有設置延遲或者延遲爲-1,dubbo會在Spring實例化完bean以後,在刷新容器最後一步發佈ContextRefreshEvent事件的時候,通知實現了ApplicationListener的類進行回調onApplicationEvent,dubbo會在這個方法中發佈服務。(ServiceBean實現了ApplicationListener接口)

可是無論延遲與否,都是使用ServiceConfig的export()方法進行服務的暴露。使用export初始化的時候會將Bean對象轉換成URL格式,全部Bean屬性轉換成URL的參數。spring

暴露流程

  • 首先將服務的實現封裝成一個Invoker,Invoker中封裝了服務的實現類。
  • 將Invoker封裝成Exporter,並緩存起來,緩存裏使用Invoker的url做爲key。
  • 服務端Server啓動,監聽端口。(請求來到時,根據請求信息生成key,到緩存查找Exporter,就找到了Invoker,就能夠完成調用。)

Spring容器初始化調用

當Spring容器實例化bean完成,走到最後一步發佈ContextRefreshEvent事件的時候,ServiceBean會執行onApplicationEvent方法,該方法調用ServiceConfig的export方法,從而進行服務的暴露。apache

export的步驟簡介

  1. 首先會檢查各類配置信息,填充各類屬性,總之就是保證我在開始暴露服務以前,全部的東西都準備好了,而且是正確的。
  2. 加載全部的註冊中心,由於咱們暴露服務須要註冊到註冊中心中去。
  3. 根據配置的全部協議和註冊中心url分別進行服務暴露,暴露爲 本地服務 或者 遠程服務

    3.1 不論是本地仍是遠程服務暴露,首先都會獲取Invoker。api

    3.2 獲取完Invoker以後,轉換成對外的Exporter,緩存起來。數組

加載全部的註冊中心

export方法先判斷是否須要延遲暴露,若是是不延遲暴露,會執行doExport方法。緩存

doExport方法先執行一系列的檢查方法,而後調用doExportUrls方法。服務器

doExportUrls方法先調用loadRegistries獲取全部的註冊中心url。session

private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

url以下:app

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=springProviderApplication&check=false&compiler=javassist&dubbo=2.0.2&logger=slf4j&organization=huangyuan&owner=huangyuan&pid=1689&register=true&registry=zookeeper&session=60000&subscribe=true&timestamp=1544326825698

根據配置的協議進行服務暴露

而後遍歷調用doExportUrlsFor1Protocol方法。doExportUrlsFor1Protocol根據不一樣的協議將服務轉換爲URL形式,一些配置參數會附在URL後面。框架

根據scope的值,進行服務暴露。

  • 若是scope配置爲none則不暴露,
  • 若是服務未配置成remote,則本地暴露exportLocal;
  • 若是未配置成local,則暴露遠程服務。

疑惑點:

(1)scope的值默認是null,按照代碼的邏輯,會進行本地暴露,又進行遠程暴露,爲何呢?

(2)關於(1)的解答,我以爲既然用戶不設置爲「不暴露」,也不設置爲「只暴露遠程服務」,也不設置爲「只暴露本地服務」,那麼dubbo就認爲因此都須要進行暴露。

若是是暴露遠程服務,則通過上面的操做以後,url變成:

dubbo://192.168.1.4:23801/com.huang.yuan.api.service.DemoService2?anyhost=true&application=springProviderApplication&bind.ip=192.168.1.4&bind.port=23801&compiler=javassist&default.cluster=failover&default.delay=-1&default.loadbalance=random&default.proxy=javassist&default.retries=2&default.serialization=hessian2&default.timeout=3000&delay=-1&dubbo=2.0.2&generic=false&interface=com.huang.yuan.api.service.DemoService2&logger=slf4j&methods=demoTest&organization=huangyuan&owner=huangyuan&pid=1831&revision=1.0-SNAPSHOT&side=provider&timestamp=1544327969839&version=1.0

暴露爲遠程服務

先獲取Invoker,而後導出成Exporter

// 根據服務具體實現,實現接口,以及registryUrl經過ProxyFactory將XXXServiceImpl封裝成一個本地執行的Invoker
// invoker是對具體實現的一種代理。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
 
 // 使用Protocol將invoker導出成一個Exporter
 // 暴露封裝服務invoker
 // 調用Protocol生成的適配類的export方法
 Exporter<?> exporter = protocol.export(invoker);
暴露遠程服務時的獲取Invoker過程

這裏會調用com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker獲取

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    
    // 封裝一個Wrapper類,若是類是以$開頭,就使用接口類型獲取,其餘的使用實現類獲取
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    
    // 返回一個Invoker實例,doInvoke方法中直接返回 wrapper的invokeMethod
    // 關於生成的wrapper,請看下面列出的生成的代碼,其中invokeMethod方法中就有實現類對實際方法的調用
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, 
                                  Class<?>[] parameterTypes, 
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

這裏生成Wrapper對象,利用了javasisst動態代理技術,生成的代碼以下:

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) { 
    // 持有實際對象的引用
    com.huang.yuan.dubbo.service.impl.DemoService2Impl w; 
    
     // 獲取實際調用對象(進行類型轉換)
     w = ((com.huang.yuan.dubbo.service.impl.DemoService2Impl)$1); 
    
    // 執行真正的方法調用 (demoTest是接口中真正須要執行的方法)
    w.demoTest((java.lang.String)$4[0]);                                                                                                                 }

因而可知,Invoker執行方法的時候,會調用doInvoke方法,會調用Wrapper的invokeMethod,這個方法中會有的實現類調用真實方法的代碼。

(代碼能夠從com.alibaba.dubbo.common.bytecode.Wrapper#makeWrapper中得到)

本地暴露

private void exportLocal(URL url) {
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        // 這時候轉成本地暴露的url:injvm://127.0.0.1/dubbo.common.hello.service.HelloService?anyhost=true&......
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(NetUtils.LOCALHOST)
                .setPort(0);
        // 首先仍是先得到Invoker,而後導出成Exporter,並緩存
        // 這裏的proxyFactory實際是JavassistProxyFactory
        // 導出expter使用com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol#export
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
    }
}

到這裏就把url轉成了Invoker對象

導出Invoker爲Exporter

Registry類型的Invoker處理過程

org.apache.dubbo.registry.integration.RegistryProtocol#export方法

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 將invoker轉成exporter
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        // 獲取RegistryUrl
        URL registryUrl = getRegistryUrl(originInvoker);

        /*
         * 根據invoker中的url獲取Registry實例
         * 而且鏈接到註冊中心
         * 此時提供者做爲消費者 引用 註冊中心的核心服務 RegistryService
         */
        final Registry registry = getRegistry(originInvoker);

        //註冊到註冊中心的URL,如 dubb://XXXXX
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

        // 判斷是否延遲發佈
        boolean register = registeredProviderUrl.getParameter("register", true);

        // 將originInvoker加入本地緩存
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        if (register) {
            /*
             *  調用遠端註冊中心的register方法進行服務註冊
             *  如有消費者訂閱此服務,則推送消息讓消費者引用此服務。
             *  註冊中心緩存了全部提供者註冊的服務以供消費者發現。
             */
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        /*
         *  訂閱override數據
         *
         *  提供者訂閱時,會影響 同一JVM即暴露服務,又引用同一服務的的場景,
         *  由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。
         */
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        /*
         * 提供者向註冊中心訂閱全部註冊服務
         * 當註冊中心有此服務的覆蓋配置註冊進來時,推送消息給提供者,從新暴露服務,這由管理頁面完成。
         */
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        /*
         *  保證每次export都返回一個新的exporter實例
         *  返回暴露後的Exporter給上層ServiceConfig進行緩存,便於後期撤銷暴露。
         */
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }
將invoker轉成exporter
## org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {

        // 調用代碼中的protocol.export方法,會根據協議名選擇調用具體的實現類
                    // 這裏會調用 DubboProtocol 的export方法
                    // 導出完以後,返回一個新的ExporterChangeableWrapper實例
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    ......
}

DubboProtocol的export方法實現:

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

        // 獲取須要暴露的url
        URL url = invoker.getUrl();

        /*
         * 經過url獲取key
         *
         * key的例子:com.huang.yuan.api.service.DemoService2:1.0:23801
         * 在服務調用的時候,一樣經過這個key獲取exporter
         */
        String key = serviceKey(url);

        // 生成exporter實例
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

        // 緩存exporter
        exporterMap.put(key, exporter);

        /*
         * todo 沒理解 Constants.STUB_EVENT_KEY
         *  是否支持本地存根
         *  項目使用遠程服務後,客戶端一般只剩下接口,而實現全在服務器端
         *
         *  但提供方有些時候想在客戶端也執行部分邏輯,
         *      好比:作ThreadLocal緩存,提早驗證參數,調用失敗後僞造容錯數據等等,此時就須要在API中帶上Stub
         *
         *  客戶端生成Proxy實例,會把Proxy經過構造函數傳給Stub,而後把Stub暴露給用戶,Stub能夠決定要不要去調Proxy
         */
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);

        // 是否回調服務
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);

        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        // 根據URL綁定IP與端口,創建NIO框架的Server
        openServer(url);

        optimizeSerialization(url);

        return exporter;
    }
獲取Register實例
/*
 * 根據invoker中的url獲取Registry實例
 * 而且鏈接到註冊中心
 * 此時提供者做爲消費者 引用 註冊中心的核心服務 RegistryService
 */
final Registry registry = getRegistry(originInvoker);

具體的操做在中org.apache.dubbo.registry.integration.RegistryProtocol

/**
     * 根據invoker的地址獲取registry實例
     *
     * @param originInvoker
     * @return
     */
    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        /*
         * 根據SPI機制獲取具體的Registry實例,
         *   會調用到com.alibaba.dubbo.registry.support.AbstractRegistryFactory#getRegistry
         * 這裏獲取到的是ZookeeperRegistry
         */
        return registryFactory.getRegistry(registryUrl);
    }

這裏會調用到org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry的構造方法

public AbstractRegistry(URL url) {
        // 設置註冊中心url
        setUrl(url);

        // 是否開啓"文件保存定時器"
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);

        /*
         * 保存的文件名稱爲:
         * /Users/huangyuan/.dubbo/dubbo-registry-springProviderApplication-127.0.0.1:2181.cache
         *
         * dubbo中zookeeper作註冊中心,若是註冊中心集羣都掛掉,那發佈者和訂閱者還能通訊嗎?
         *
         * 能,dubbo會將zookeeper的信息緩存到本地,
         * 做爲一個緩存文件,而且轉換成properties對象方便使用
         * 
         * 該文件在註冊中心通知的時候,進行存儲更新  notify(url.getBackupUrls());
         */
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");

        File file = null;

        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                // 不存在則建立
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }

        this.file = file;

        /*
         * 加載文件中的屬性,key-value形式,示例以下:
         * "group1/com.huang.yuan.api.service.DemoService:1.0" -> "empty://192.168.1.2:23801/com.huang.yuan.api.service.DemoService?anyhost=true&application=springProviderApplication..."
         */
        loadProperties();

        // 通知訂閱
        // url.getBackupUrls() 獲取的是註冊中心的url
        notify(url.getBackupUrls());
    }
註冊服務到註冊中心

RegistryProtocol註冊服務到註冊中心(這裏註冊中心是Zookeeper,所以最終的註冊操做是在org.apache.dubbo.registry.zookeeper.ZookeeperRegistry中執行,代碼以下)

@Override
    protected void doRegister(URL url) {
        try {
            /*
             * 這裏zkClient就是咱們上面調用構造的時候生成的 ZkClientZookeeperClient
             * ZkClientZookeeperClient 保存着鏈接到Zookeeper的zkClient實例
             * 開始註冊,也就是在Zookeeper中建立節點s
             * 這裏toUrlPath獲取到的path爲:(相似)
             *                  /dubbo/com.huang.yuan.api.service.DemoService2/provider.....
             *                  這就是在Zookeeper上面建立的文件夾路徑及節點
             * 默認建立的節點是臨時節點
             */
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
訂閱服務

com.alibaba.dubbo.registry.support.FailbackRegistry向註冊中心訂閱服務

public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);

        removeFailedSubscribed(url, listener);

        try {
            /*
             * 想註冊中心發送訂閱請求
             * 這裏有一個地方比較有意思,就是本身的服務、依賴外部的服務,都會進行訂閱。
             * 這一步以後就會在/dubbo/dubbo.common.hello.service/XXXService節點下多一個configurators節點
             */
            doSubscribe(url, listener);
        } catch (Exception e) {
            ......
        }
    }

在org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe完成訂閱特定服務的操做。

List<URL> urls = new ArrayList<URL>();

                /*
                 * toCategoriesPath是獲取分類路徑
                 *      好比說獲取的path數組,含有下面3個值
                 *      /dubbo/com.huang.yuan.api.service.DemoService2/providers
                 *      /dubbo/com.huang.yuan.api.service.DemoService2/configurators
                 *      /dubbo/com.huang.yuan.api.service.DemoService2/routers
                 */
                for (String path : toCategoriesPath(url)) {

                    // 獲取訂閱者
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }

                    // 監聽器
                    ChildListener zkListener = listeners.get(listener);

                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 這裏設置了監聽回調的地址,即回調給FailbackRegistry中的notify
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }

                    // 根據路徑path建立節點
                    zkClient.create(path, false);
                    // 添加子節點監聽器
                    List<String> children = zkClient.addChildListener(path, zkListener);

                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }

                // 通知消費者
                // 在org.apache.dubbo.registry.support.FailbackRegistry.notify中發佈操做
                notify(url, listener, urls);
            }

會建立監聽器,當訂閱的服務有變動的時候,註冊中心就會調用com.alibaba.dubbo.registry.NotifyListener#notify方法,告訴消費者

返回expoter
相關文章
相關標籤/搜索