Dubbo解析(四)-服務發佈

RPC處理的就是遠程服務的調用,一個消費者經過網絡調用一個提供者。而單消費者和單提供者解決不了項目的高負載,延伸出的RPC框架增長了註冊中心,從而對多個消費者和提供者進行協調。而消費者、提供者和註冊中心的交互也就是服務的發佈和引用。緩存

Dubbo的服務與註冊中心之間的交互圖以下:服務器

Dubbo的消費者和提供者的交互過程:網絡

  1. 提供者向註冊中心註冊,並暴露本地服務
  2. 消費者向註冊中心註冊,並訂閱提供者列表
  3. 消費者獲取提供者列表,
  4. 消費者按照負載均衡選擇一個提供者,直接調用其服務

這一篇主要介紹第一個操做。app

Dubbo的提供者(dubbo:service)轉換成ServiceBean將服務發佈出去,ServiceBean繼承ServiceConfig抽象配置類,同時實現多個Spring相關的容器接口。負載均衡

當Spring啓動後觸發ContextRefreshedEvent事件,會調用onApplicationEvent方法框架

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (isDelay() && !isExported() && !isUnexported()) {
        export();
    }
}

從而執行export方法。export方法定義在ServiceConfig類中,通過一系列的校驗和配置組裝,最終調用doExportUrls,發佈全部的url。jvm

ServiceConfig.doExportUrls執行服務export

對於dubbo的配置,簡單的介紹一下,如同dubbo:service對應於ServiceConfig同樣,dubbo的xml標籤都對應於一個配置類ide

dubbo:application : ApplicationConfig,應用配置類優化

dubbo:registry : RegistryConfig,註冊中心配置類this

dubbo:protocol : ProtocolConfig,服務協議配置類

而doExportUrls方法分紅兩部分

  1. 獲取全部註冊中心的URL
  2. 遍歷全部協議ProtocolConfig,將每一個protocol發佈到全部註冊中心上
private void doExportUrls() {
	// 1.獲取註冊中心URL
    List<URL> registryURLs = loadRegistries(true);
    // 2.遍歷全部協議,export protocol並註冊到全部註冊中心
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

loadRegistries獲取註冊中心URL

首先執行checkRegistry,判斷是否有配置註冊中心,若是沒有,則從默認配置文件dubbo.properties中讀取dubbo.registry.address組裝成RegistryConfig。

AbstractInterfaceConfig.checkRegistry()

if (registries == null || registries.isEmpty()) {
    String address = ConfigUtils.getProperty("dubbo.registry.address");
    if (address != null && address.length() > 0) {
        registries = new ArrayList<RegistryConfig>();
        String[] as = address.split("\\s*[|]+\\s*");
        for (String a : as) {
            RegistryConfig registryConfig = new RegistryConfig();
            registryConfig.setAddress(a);
            registries.add(registryConfig);
        }
    }
}

而後根據RegistryConfig的配置,組裝registryURL,造成的URL格式以下

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=1528&qos.port=22222&registry=zookeeper&timestamp=1530743640901

這個URL表示它是一個registry協議(RegistryProtocol),地址是註冊中心的ip:port,服務接口是RegistryService,registry的類型爲zookeeper。

doExportUrlsFor1Protocol發佈服務和註冊

由於dubbo支持多協議配置,對於每一個ProtocolConfig配置,組裝protocolURL,註冊到每一個註冊中心上。

首先根據ProtocolConfig構建協議的URL

  1. 設置side=provider,dubbo={dubboVersion},timestamp=時間戳,pid=進程id
  2. 從application,module,provider,protocol配置中添加URL的parameter
  3. 遍歷dubbo:service下的dubbo:methoddubbo:argument添加URL的parameter
  4. 判斷是否泛型暴露,設置generic,及methods=*,不然獲取服務接口的全部method
  5. 獲取host及port,host及port都是經過多種方式獲取,保證最終不爲空
// 獲取綁定的ip,1從系統配置獲取2從protocolConfig獲取3從providerConfig獲取
// 4獲取localhost對應得ipv45鏈接註冊中心獲取6直接獲取localhost對應的ip(127.0.0.1)
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// 獲取綁定的port,1從系統配置2從protocolConfig3從providerConfig4從defaultPort之上隨機取可用的
Integer port = this.findConfigedPorts(protocolConfig, name, map);

最終構建URL對象

// 建立protocol export url
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

構建出的protocolURL格式以下

dubbo://192.168.199.180:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.199.180&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider&timestamp=1530746052546

這個URL表示它是一個dubbo協議(DubboProtocol),地址是當前服務器的ip,端口是要暴露的服務的端口號,能夠從dubbo:protocol配置,服務接口爲dubbo:service配置發佈的接口。

而後從url中獲取scope屬性,若是scope!="none"則發佈服務,默認scope爲null。若是scope不爲none,判斷是否爲local或remote,從而發佈Local服務或Remote服務,默認兩個都發布。

這裏的Local服務只是injvm的服務,提供一種消費者和提供者都在一個jvm內的調用方式。

主要來看Remote服務,遍歷全部的registryURL,執行如下操做:

  1. 若是配置了monitor,則組裝monitorURL,並給registryURL設置MONITOR_KEY=monitorURL
  2. 給registryURL設置EXPORT_KEY爲上面構建的protocolURL
  3. 根據實現對象,服務接口Class和registryuRL經過ProxyFactory獲取代理Invoker(繼承於AbstractProxyInvoker),詳見上一篇動態代理和包裝
  4. 將Invoker對象和ServiceConfig組裝成MetaDataInvoker,經過protocol.export(invoker)暴露出去。
for (URL registryURL : registryURLs) {
    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
    // 組裝監控URL
    URL monitorUrl = loadMonitor(registryURL);
    if (monitorUrl != null) {
        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
    }
    // 以registryUrl建立Invoker
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    // 包裝Invoker和ServiceConfig
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

    // 以RegistryProtocol爲主,註冊和訂閱註冊中心,並暴露本地服務端口
    Exporter<?> exporter = protocol.export(wrapperInvoker);
    exporters.add(exporter);
}

根據SPI機制,這裏的procotol.export執行時,從Invoker獲取URL的protocol爲registry,由RegistryProtocol處理export過程。

RegistryProtocol.export暴露服務

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // export dubbo invoker
	// 發佈本地invoker,暴露本地服務,打開服務器端口
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // 獲取registryUrl,protocol從registryUrl的registry參數獲取
    // 修改registryUrl的protocol,並移除registryUrl的registry參數
    // 即將registry://改爲相似zookeeper://
    URL registryUrl = getRegistryUrl(originInvoker);
    
    // 根據url從registryFactory中獲取對應的registry
    final Registry registry = getRegistry(originInvoker);
    // 獲取要註冊的providerUrl
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    // 判斷是否要將provider url註冊到註冊中心
    boolean register = registedProviderUrl.getParameter("register", true);

    // 本地提供者和消費者註冊表
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

    if (register) {
    	// 向註冊中心註冊providerUrl
        register(registryUrl, registedProviderUrl);
        // 本地註冊表設置此provider註冊完成
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // 組裝提供者節點的url
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    // 建立Override監聽器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    // 添加到監聽器列表
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 向註冊中心訂閱提供者url節點
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    // 返回一個全新的Exporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
  1. 從Invoker中獲取providerURL,同傳入的Invoker對象組裝成InvokerDelegete,經過protocol.export根據providerURL(通常爲dubbo協議)暴露服務,打開服務器端口,得到Exporter緩存到本地。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
	// 獲取cache key
    String key = getCacheKey(originInvoker);
    // 是否存在已綁定的exporter
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
            	// 封裝invoker和providerUrl
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                // export provider invoker,Protocol的具體實現是由Url中的protocol屬性決定的
                // 封裝建立出的exporter和origin invoker
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}
  1. 修改registryUrl,將其協議由registry改成具體的註冊中心協議,即registry://改成zookeeper://。
private URL getRegistryUrl(Invoker<?> originInvoker) {
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryUrl;
}
  1. 根據registryUrl從RegistryFactory根據SPI機制獲取具體的Registry
private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}
  1. 獲取要註冊的providerUrl,移除不須要在註冊中心看到的providerUrl中部分參數
private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
    URL providerUrl = getProviderUrl(originInvoker);
    //The address you see at the registry
    final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl))
            .removeParameter(Constants.MONITOR_KEY)
            .removeParameter(Constants.BIND_IP_KEY)
            .removeParameter(Constants.BIND_PORT_KEY)
            .removeParameter(QOS_ENABLE)
            .removeParameter(QOS_PORT)
            .removeParameter(ACCEPT_FOREIGN_IP);
    return registedProviderUrl;
}

獲得的providerUrl格式以下

dubbo://192.168.199.180:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3012&side=provider&timestamp=1530749502888
  1. 將providerUrl註冊到註冊中心,若是註冊中心爲zookeeper,則使用ZookeeperRegistry註冊器進行註冊
public void register(URL registryUrl, URL registedProviderUrl) {
	// registryFactory爲動態生成類RegistryFactory$Adapative,根據url.getProtocol獲取實際的RegistryFactory
	// 如zookeeper對應的是ZookeeperRegistryFactory,獲取的Registry對象爲ZookeeperRegistry
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}
  1. 從registryUrl組裝overrideSubscribeUrl,並構建OverrideListener,向註冊中心訂閱overrideSubscribeUrl,用於當配置數據變化時,觸發overrideListener的notify方法通知提供者從新暴露服務。

  2. 將本地暴露的exporter,傳入的參數originInvoker以及overrideSubscribeUrl和registedProviderUrl封裝成新的DestroyableExporter返回,供消費者調用時獲取。

對於第1步中的providerUrl構建的Invoker,經過protocol.export暴露本地服務,通常都是dubbo協議,從而使用DubboProtocol暴露服務。

DubboProtocol暴露本地服務

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
	// 獲取提供者url
    URL url = invoker.getUrl();

    // export service.
    // 從url獲取service key > group/service:version:port
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    // 對本地存根stub及回調的初始處理
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // 打開服務器端口,暴露本地服務
    openServer(url);
    // 優化序列化操做
    optimizeSerialization(url);
    return exporter;
}
  1. 從Invoker獲取providerUrl,構建serviceKey(group/service:version:port),構建DubboExporter並以serviceKey爲key放入本地map緩存
  2. 處理url攜帶的本地存根和callback回調
  3. 根據url打開服務器端口,暴露本地服務。先以url.getAddress爲key查詢本地緩存serverMap獲取ExchangeServer,若是不存在,則經過createServer建立。
private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}
  1. createServer方法,設置心跳時間,判斷url中的傳輸方式(key=server,對應Transporter服務)是否支持,設置codec=dubbo,最後根據url和ExchangeHandler對象綁定server返回,這裏的ExchangeHandler很是重要,它就是消費方調用時,底層通訊層回調的Handler,從而獲取包含實際Service實現的Invoker執行器,它是定義在DubboProtocol類中的ExchangeHandlerAdapter內部類。
private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    // 設置心跳時間
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 獲取server方式,netty or mina
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    // 設置codec=dubbo
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 綁定url和ExchangeHandler,返回ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}
  1. 返回DubboExporter對象

服務發佈序列圖及活動圖

參考:https://blog.csdn.net/quhongwei_zhanqiu/article/details/41651205

相關文章
相關標籤/搜索