RPC處理的就是遠程服務的調用,一個消費者經過網絡調用一個提供者。而單消費者和單提供者解決不了項目的高負載,延伸出的RPC框架增長了註冊中心,從而對多個消費者和提供者進行協調。而消費者、提供者和註冊中心的交互也就是服務的發佈和引用。緩存
Dubbo的服務與註冊中心之間的交互圖以下:服務器
Dubbo的消費者和提供者的交互過程:網絡
這一篇主要介紹第一個操做。app
Dubbo的提供者(dubbo:service)轉換成ServiceBean將服務發佈出去,ServiceBean繼承ServiceConfig抽象配置類,同時實現多個Spring相關的容器接口。負載均衡
當Spring啓動後觸發ContextRefreshedEvent事件,會調用onApplicationEvent方法框架
public void onApplicationEvent(ContextRefreshedEvent event) { if (isDelay() && !isExported() && !isUnexported()) { export(); } }
從而執行export方法。export方法定義在ServiceConfig類中,通過一系列的校驗和配置組裝,最終調用doExportUrls,發佈全部的url。jvm
對於dubbo的配置,簡單的介紹一下,如同dubbo:service對應於ServiceConfig同樣,dubbo的xml標籤都對應於一個配置類ide
dubbo:application : ApplicationConfig,應用配置類優化
dubbo:registry : RegistryConfig,註冊中心配置類this
dubbo:protocol : ProtocolConfig,服務協議配置類
而doExportUrls方法分紅兩部分
private void doExportUrls() { // 1.獲取註冊中心URL List<URL> registryURLs = loadRegistries(true); // 2.遍歷全部協議,export protocol並註冊到全部註冊中心 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
首先執行checkRegistry,判斷是否有配置註冊中心,若是沒有,則從默認配置文件dubbo.properties中讀取dubbo.registry.address組裝成RegistryConfig。
AbstractInterfaceConfig.checkRegistry() if (registries == null || registries.isEmpty()) { String address = ConfigUtils.getProperty("dubbo.registry.address"); if (address != null && address.length() > 0) { registries = new ArrayList<RegistryConfig>(); String[] as = address.split("\\s*[|]+\\s*"); for (String a : as) { RegistryConfig registryConfig = new RegistryConfig(); registryConfig.setAddress(a); registries.add(registryConfig); } } }
而後根據RegistryConfig的配置,組裝registryURL,造成的URL格式以下
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=1528&qos.port=22222®istry=zookeeper×tamp=1530743640901
這個URL表示它是一個registry協議(RegistryProtocol),地址是註冊中心的ip:port,服務接口是RegistryService,registry的類型爲zookeeper。
由於dubbo支持多協議配置,對於每一個ProtocolConfig配置,組裝protocolURL,註冊到每一個註冊中心上。
首先根據ProtocolConfig構建協議的URL
// 獲取綁定的ip,1從系統配置獲取2從protocolConfig獲取3從providerConfig獲取 // 4獲取localhost對應得ipv45鏈接註冊中心獲取6直接獲取localhost對應的ip(127.0.0.1) String host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 獲取綁定的port,1從系統配置2從protocolConfig3從providerConfig4從defaultPort之上隨機取可用的 Integer port = this.findConfigedPorts(protocolConfig, name, map);
最終構建URL對象
// 建立protocol export url URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
構建出的protocolURL格式以下
dubbo://192.168.199.180:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.199.180&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider×tamp=1530746052546
這個URL表示它是一個dubbo協議(DubboProtocol),地址是當前服務器的ip,端口是要暴露的服務的端口號,能夠從dubbo:protocol配置,服務接口爲dubbo:service配置發佈的接口。
而後從url中獲取scope屬性,若是scope!="none"則發佈服務,默認scope爲null。若是scope不爲none,判斷是否爲local或remote,從而發佈Local服務或Remote服務,默認兩個都發布。
這裏的Local服務只是injvm的服務,提供一種消費者和提供者都在一個jvm內的調用方式。
主要來看Remote服務,遍歷全部的registryURL,執行如下操做:
for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); // 組裝監控URL URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } // 以registryUrl建立Invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // 包裝Invoker和ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 以RegistryProtocol爲主,註冊和訂閱註冊中心,並暴露本地服務端口 Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }
根據SPI機制,這裏的procotol.export執行時,從Invoker獲取URL的protocol爲registry,由RegistryProtocol處理export過程。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // export dubbo invoker // 發佈本地invoker,暴露本地服務,打開服務器端口 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 獲取registryUrl,protocol從registryUrl的registry參數獲取 // 修改registryUrl的protocol,並移除registryUrl的registry參數 // 即將registry://改爲相似zookeeper:// URL registryUrl = getRegistryUrl(originInvoker); // 根據url從registryFactory中獲取對應的registry final Registry registry = getRegistry(originInvoker); // 獲取要註冊的providerUrl final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not // 判斷是否要將provider url註冊到註冊中心 boolean register = registedProviderUrl.getParameter("register", true); // 本地提供者和消費者註冊表 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { // 向註冊中心註冊providerUrl register(registryUrl, registedProviderUrl); // 本地註冊表設置此provider註冊完成 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // 組裝提供者節點的url final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); // 建立Override監聽器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); // 添加到監聽器列表 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 向註冊中心訂閱提供者url節點 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export // 返回一個全新的Exporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { // 獲取cache key String key = getCacheKey(originInvoker); // 是否存在已綁定的exporter ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { // 封裝invoker和providerUrl final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // export provider invoker,Protocol的具體實現是由Url中的protocol屬性決定的 // 封裝建立出的exporter和origin invoker exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
private URL getRegistryUrl(Invoker<?> originInvoker) { URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } return registryUrl; }
private Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = getRegistryUrl(originInvoker); return registryFactory.getRegistry(registryUrl); }
private URL getRegistedProviderUrl(final Invoker<?> originInvoker) { URL providerUrl = getProviderUrl(originInvoker); //The address you see at the registry final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)) .removeParameter(Constants.MONITOR_KEY) .removeParameter(Constants.BIND_IP_KEY) .removeParameter(Constants.BIND_PORT_KEY) .removeParameter(QOS_ENABLE) .removeParameter(QOS_PORT) .removeParameter(ACCEPT_FOREIGN_IP); return registedProviderUrl; }
獲得的providerUrl格式以下
dubbo://192.168.199.180:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3012&side=provider×tamp=1530749502888
public void register(URL registryUrl, URL registedProviderUrl) { // registryFactory爲動態生成類RegistryFactory$Adapative,根據url.getProtocol獲取實際的RegistryFactory // 如zookeeper對應的是ZookeeperRegistryFactory,獲取的Registry對象爲ZookeeperRegistry Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
從registryUrl組裝overrideSubscribeUrl,並構建OverrideListener,向註冊中心訂閱overrideSubscribeUrl,用於當配置數據變化時,觸發overrideListener的notify方法通知提供者從新暴露服務。
將本地暴露的exporter,傳入的參數originInvoker以及overrideSubscribeUrl和registedProviderUrl封裝成新的DestroyableExporter返回,供消費者調用時獲取。
對於第1步中的providerUrl構建的Invoker,經過protocol.export暴露本地服務,通常都是dubbo協議,從而使用DubboProtocol暴露服務。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 獲取提供者url URL url = invoker.getUrl(); // export service. // 從url獲取service key > group/service:version:port String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event // 對本地存根stub及回調的初始處理 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 打開服務器端口,暴露本地服務 openServer(url); // 優化序列化操做 optimizeSerialization(url); return exporter; }
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { // server supports reset, use together with override server.reset(url); } } }
private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default // 設置心跳時間 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 獲取server方式,netty or mina String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 設置codec=dubbo url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 綁定url和ExchangeHandler,返回ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
參考:https://blog.csdn.net/quhongwei_zhanqiu/article/details/41651205