Dubbo中有一個很是本質和重要的功能,那就是服務的自動註冊與發現,而這個功能是經過註冊中心來實現的。而dubbo中考慮了外部許多的註冊組件的實現,zk,redis,etcd,consul,eureka...java
各自實現方式各有不一樣,可是對外表現都是一致的:都實現了 Registry 接口!node
今天咱們就來看看最經常使用和註冊中心 Zookeeper 的接入實現吧!redis
註冊中心的做用主要分發服務的發佈,與服務訂閱,及服務推送,服務查詢!而zk中,則以服務主單位進行目錄劃分的。apache
整個zookeeper的路徑概覽:bootstrap
/dubbo: root 目錄, 持久化目錄 (可經過 group=xxx 自定義root目錄)
/dubbo/xxx.xx.XxxService: 每一個服務一個路徑,持久化目錄
/dubbo/xxx.xx.XxxService/configurators: 配置存放路徑,默認爲空
/dubbo/xxx.xx.XxxService/providers: 服務提供者目錄,全部提供者以其子路徑形式存儲,持久化目錄。各服務提供者以臨時目錄形式存在,路徑樣例如: dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D112756%26release%3D%26side%3Dprovider%26timestamp%3D1588548412331
/dubbo/xxx.xx.XxxService/consumers: 服務消費者目錄, 持久化路徑。全部消費者以其子路徑形式存儲,路徑樣例以下:consumer%3A%2F%2F192.168.1.4%2Forg.apache.dubbo.rpc.service.GenericService%3Fapplication%3Ddubbo-demo-api-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26generic%3Dtrue%26interface%3Dorg.apache.dubbo.demo.DemoService%26pid%3D139620%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1588596195728
/dubbo/xxx.xx.XxxService/routers: 路由配置信息,默認爲空api
Zookeeper 是在本地服務經過socket端口暴露以後,再調用 RegistryFactoryWrapper 進行獲取的。緩存
註冊時的URL以下: registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613&pid=135556®istry=zookeeper×tamp=1588518545808session
zk的調用分兩種狀況,一是做爲服務提供者時會在 ServiceConfig#doExportUrlsFor1Protocol 中,進行遠程服務暴露時會拉起。二是在消費者在進行遠程調用時會 ReferenceConfig#createProxy 時拉取以便獲取提供者列表。咱們以ServiceConfig爲例看看其調用zk的過程:app
// ServiceConfig#doExportUrlsFor1Protocol // 此處 PROTOCOL 是一個SPI類,默認實例是DubboProtocol,但其在處理具體協議時會根據協議類型作出相應選擇 // 此處 協議爲 registry, 因此會選擇 RegistryProtocol 進行export() 處理 Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); // 而在RegistryProtocol中,又有幾個重要屬性,是dubbo進行依賴注入完成的 // org.apache.dubbo.registry.integration.RegistryProtocol public void setCluster(Cluster cluster) { this.cluster = cluster; } public void setProtocol(Protocol protocol) { this.protocol = protocol; } public void setRegistryFactory(RegistryFactory registryFactory) { this.registryFactory = registryFactory; } // org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 檢查是不是要進行服務註冊,即檢查協議字段是不是 registry, 即前綴是 registry:// // 註冊完成後即返回 if (UrlUtils.isRegistry(invoker.getUrl())) { // ProtocolFilterWrapper.export() return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY))); } // org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 一樣判斷是不是註冊請求 if (UrlUtils.isRegistry(invoker.getUrl())) { // RegistryProtocol.export() return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } // org.apache.dubbo.registry.integration.RegistryProtocol#export @Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 獲取registry實例,如 zk client final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // 註冊服務地址到註冊中心 register(registryUrl, registeredProviderUrl); } // register stated url on provider model registerStatedUrl(registryUrl, registeredProviderUrl, register); // Deprecated! Subscribe to override rules in 2.6.x or before. // 訂閱目錄 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); notifyExport(exporter); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } /** * Get an instance of registry based on the address of invoker * * @param originInvoker * @return */ protected Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = getRegistryUrl(originInvoker); // RegistryFactory 又是經過 SPI 機制生成的 // 會根據具體的註冊中心的類型建立調用具體實例,如此處爲: zookeeper, 因此會調用 ZookeeperRegistryFactory.getRegistry() return registryFactory.getRegistry(registryUrl); } // 全部 RegistryFactory 都會被包裝成 RegistryFactoryWrapper, 以便修飾 // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry @Override public Registry getRegistry(URL url) { // 對於zk, 會調用 ZookeeperRegistryFactory return new ListenerRegistryWrapper(registryFactory.getRegistry(url), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class) .getActivateExtension(url, "registry.listeners"))); } // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL) @Override public Registry getRegistry(URL url) { if (destroyed.get()) { LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " + "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of."); return DEFAULT_NOP_REGISTRY; } url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = createRegistryCacheKey(url); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //create registry by spi/ioc // 調用子類方法建立 registry 實例,此處爲 ZookeeperRegistry.createRegistry registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry @Override public Registry createRegistry(URL url) { // 最終將Zk組件接入到應用中了,後續就可使用zk提供的相應功能了 return new ZookeeperRegistry(url, zookeeperTransporter); }
至此,zookeeper被接入了。咱們先來看下 zookeeper 註冊中心構造方法實現:socket
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { // 抽象父類處理 super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // group=dubbo String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener((state) -> { if (state == StateListener.RECONNECTED) { logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" + " Since ephemeral ZNode will not get deleted for a connection lose, " + "there's no need to re-register url of this instance."); ZookeeperRegistry.this.fetchLatestAddresses(); } else if (state == StateListener.NEW_SESSION_CREATED) { logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry..."); try { ZookeeperRegistry.this.recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } else if (state == StateListener.SESSION_LOST) { logger.warn("Url of this instance will be deleted from registry soon. " + "Dubbo client will try to re-register once a new session is created."); } else if (state == StateListener.SUSPENDED) { } else if (state == StateListener.CONNECTED) { } }); } // org.apache.dubbo.registry.support.FailbackRegistry#FailbackRegistry public FailbackRegistry(URL url) { super(url); // retry.period=5000 this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); // since the retry task will not be very much. 128 ticks is enough. // 當鏈接失敗時,使用後臺定時重試 retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); } // org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry public AbstractRegistry(URL url) { setUrl(url); // 默認會開啓file.cache 選項,因此通常會要求存在該文件 if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) { // Start file save timer syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false); // 默認路徑 $USER_HOME/.dubbo/dubbo-registry-dubbo-demo-api-provider-127.0.0.1-2181.cache String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache"; String filename = url.getParameter(FILE_KEY, defaultFilename); File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; // When starting the subscription center, // we need to read the local cache file for future Registry fault tolerance processing. // 若是文件存在先從其中加載原有配置 loadProperties(); notify(url.getBackupUrls()); } } // 若是存在註冊中心緩存文件,則從其中加載各屬性值 // 其值爲 xxx.xxxService=xxx 格式 private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry cache file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry cache file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } } // org.apache.dubbo.registry.support.AbstractRegistry#notify(java.util.List<org.apache.dubbo.common.URL>) protected void notify(List<URL> urls) { if (CollectionUtils.isEmpty(urls)) { return; } for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } }
該構造函數的主要做用是創建與zkServer的鏈接實例,添加狀態監聽,以及本地緩存文件的處理。
上一節咱們看到在初始化 zookeeper 時,是在export()過程當中經過 getRegistry() 實現的。一樣,在export()過程當中,在獲取了註冊中心實例後,還須要將服務地址註冊上去,纔算功成。服務註冊的時序圖以下:
// org.apache.dubbo.registry.integration.RegistryProtocol#export @Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 1. 獲取註冊中心實例 final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish // 2. 將 registeredProviderUrl 註冊上去 boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registryUrl, registeredProviderUrl); } // register stated url on provider model registerStatedUrl(registryUrl, registeredProviderUrl, register); // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); notifyExport(exporter); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } private void register(URL registryUrl, URL registeredProviderUrl) { // 此處獲取 registry 實際是registry的一個 wrapper, registryUrl 以 zookeeper:// 開頭 Registry registry = registryFactory.getRegistry(registryUrl); // 此處的 registeredProviderUrl 如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=135556&release=&side=provider×tamp=1588518573613 registry.register(registeredProviderUrl); } // org.apache.dubbo.registry.ListenerRegistryWrapper#register @Override public void register(URL url) { try { // 調用實際的 registry registry.register(url); } finally { if (CollectionUtils.isNotEmpty(listeners)) { RuntimeException exception = null; for (RegistryServiceListener listener : listeners) { if (listener != null) { try { listener.onRegister(url); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } } // org.apache.dubbo.registry.support.FailbackRegistry#register @Override public void register(URL url) { if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; } super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side // 調用zk進行url註冊 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister @Override public void doRegister(URL url) { try { // 建立zk node, dynamic=true, 默認建立的節點爲臨時節點 // url地址如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider×tamp=1588639020690 // 此地址經過 toUrlPath 轉換爲 zookeeper 的目錄地址,好比 providers 目錄,consumers 目錄... zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } // 將提供者信息轉移爲zk目錄 private String toUrlPath(URL url) { // 找到分類目錄,最下級爲當前提供者的全部url信息encode後的值 // 即 /dubbo/interfaceName/providers/xxxx return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString()); } private String toCategoryPath(URL url) { // category='', 默認爲 providers return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); } private String toServicePath(URL url) { String name = url.getServiceInterface(); if (ANY_VALUE.equals(name)) { return toRootPath(); } // /dubbo/interfaceName return toRootDir() + URL.encode(name); }
服務註冊主要就分兩步:1. 獲取registry實例(經過SPI機制); 2. 將服務的url轉換爲對應的路徑,以臨時節點的形式create到zkServer; 以服務名做爲父路徑,以自身服務url做爲葉子路徑,能夠很方便在相應路徑上找到全部的提供者或消費者。
當應用要關閉,或者註冊失敗時,須要進行服務下線。固然,若是應用沒有及時作下線處理,zk會經過其自身的臨時節點過時機制,也會將該服務作下線處理。從而避免消費者或管理臺看到無效的服務存在。
應用服務的主動下線操做是由 ShutdownHookCallbacks 來實現的,其時序圖以下:
// org.apache.dubbo.config.bootstrap.DubboBootstrap#DubboBootstrap private DubboBootstrap() { configManager = ApplicationModel.getConfigManager(); environment = ApplicationModel.getEnvironment(); // 在 DubboBootstrap 建立時就建立幾個關閉鉤子 DubboShutdownHook.getDubboShutdownHook().register(); // 將 DubboBootstrap 的銷燬動做添加到 DubboShutdownHook 的執行隊列中,以便在關閉時一塊兒調用 ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() { @Override public void callback() throws Throwable { DubboBootstrap.this.destroy(); } }); } /** * Register the ShutdownHook */ public void register() { if (registered.compareAndSet(false, true)) { DubboShutdownHook dubboShutdownHook = getDubboShutdownHook(); Runtime.getRuntime().addShutdownHook(dubboShutdownHook); dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook)); } } // org.apache.dubbo.config.bootstrap.DubboBootstrap#destroy public void destroy() { if (destroyLock.tryLock()) { try { // DubboShutdownHook 實現的destroy方法 DubboShutdownHook.destroyAll(); if (started.compareAndSet(true, false) && destroyed.compareAndSet(false, true)) { unregisterServiceInstance(); unexportMetadataService(); unexportServices(); unreferServices(); destroyRegistries(); DubboShutdownHook.destroyProtocols(); destroyServiceDiscoveries(); clear(); shutdown(); release(); } } finally { destroyLock.unlock(); } } } // org.apache.dubbo.config.DubboShutdownHook#destroyAll public static void destroyAll() { if (destroyed.compareAndSet(false, true)) { AbstractRegistryFactory.destroyAll(); destroyProtocols(); } } // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll /** * Close all created registries */ public static void destroyAll() { if (!destroyed.compareAndSet(false, true)) { return; } if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process LOCK.lock(); try { // 獲取全部註冊的地址,一一進行destroy()操做 for (Registry registry : getRegistries()) { try { registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } REGISTRIES.clear(); } finally { // Release the lock LOCK.unlock(); } } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#destroy @Override public void destroy() { super.destroy(); try { // (解綁)銷燬動做只需執行一次,一次會將全部須要解綁的地址所有操做完成 zkClient.close(); } catch (Exception e) { logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e); } } // org.apache.dubbo.registry.support.FailbackRegistry#destroy @Override public void destroy() { super.destroy(); retryTimer.stop(); } // org.apache.dubbo.registry.support.AbstractRegistry#destroy @Override public void destroy() { if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } Set<URL> destroyRegistered = new HashSet<>(getRegistered()); if (!destroyRegistered.isEmpty()) { for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(DYNAMIC_KEY, true)) { try { // 此處會將在該 register 中註冊的全部地址進行解綁,因此,當前實例只需調用一次destroy()便可 unregister(url); if (logger.isInfoEnabled()) { logger.info("Destroy unregister url " + url); } } catch (Throwable t) { logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { try { unsubscribe(url, listener); if (logger.isInfoEnabled()) { logger.info("Destroy unsubscribe url " + url); } } catch (Throwable t) { logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } AbstractRegistryFactory.removeDestroyedRegistry(this); } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister @Override public void doUnregister(URL url) { try { // 註冊地址如: /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613 zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
其實就是一個關閉鉤子,進行路徑的主動刪除操做。
前面兩個操做,服務的註冊與解綁,能夠針對提供者,也能夠針對消費者。可是對服務訂閱則更可能是針對消費者!因其要時刻關注提供者的變化,接收註冊中心的消息推送。其以 ReferenceConfig.get() 入口。因此,咱們以消費者的視圖進行解析zk的訂閱過程:
// org.apache.dubbo.registry.integration.RegistryProtocol#refer @Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); } // org.apache.dubbo.registry.integration.RegistryProtocol#doRefer private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); // 註冊消費者監聽到 zk 中 // 添加路徑 providers, configurators, routers, 以便在同時監聽幾個目錄 directory.subscribe(toSubscribeUrl(subscribeUrl)); Invoker<T> invoker = cluster.join(directory); List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; } private static URL toSubscribeUrl(URL url) { return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY); } // org.apache.dubbo.registry.ListenerRegistryWrapper#subscribe @Override public void subscribe(URL url, NotifyListener listener) { try { registry.subscribe(url, listener); } finally { if (CollectionUtils.isNotEmpty(listeners)) { RuntimeException exception = null; for (RegistryServiceListener registryListener : listeners) { if (registryListener != null) { try { registryListener.onSubscribe(url); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } } // org.apache.dubbo.registry.support.FailbackRegistry#subscribe @Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a subscription request to the server side doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (CollectionUtils.isNotEmpty(urls)) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // Record a failed registration request to a failed list, retry regularly addFailedSubscribed(url, listener); } } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe @Override public void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); } } }); zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (CollectionUtils.isNotEmpty(services)) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { // configurators List<URL> urls = new ArrayList<>(); // /dubbo/org.apache.dubbo.demo.DemoService/configurators // url 樣例: provider://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.56.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider×tamp=1588639020690 // 它會通過 toCategoriesPath() 轉換爲多個控制目錄,依次註冊 for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); // listener 是 ConcurrentHashMap 中的 ConcurrentHashMap, 因此迭代也是兩層的 // 當目錄發生變動時,運行 notify() 方法 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); // 與提供者的服務註冊不同,消費者的訂閱路徑是 持久化的 zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 自身註冊完成後,主動觸發一次 notify() 以刷新信息 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } private String[] toCategoriesPath(URL url) { String[] categories; if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) { categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY}; } else { categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY}); } String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { // servicePath: /dubbo/interfaceName paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i]; } return paths; }
事實上,其步驟與提供者是相似的:1. 獲取registry實例; 2. 註冊自身消費標識到consumers目錄; 3. 訂閱providers,configurators,routers 子目錄變動;
當註冊中心發現服務提供者發生了變化時,將會將該信息推送給訂閱了相應路徑的客戶端。這個客戶端首先會被前面設置的 zkListener 處理。
// ZookeeperRegistry.doSubscribe @Override public void doSubscribe(final URL url, final NotifyListener listener) { ... List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); // 服務回調將由 ZookeeperRegistry.this.notify() 處理 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); ... } // org.apache.dubbo.registry.support.FailbackRegistry#notify @Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } try { doNotify(url, listener, urls); } catch (Exception t) { // Record a failed registration request to a failed list, retry regularly addFailedNotified(url, listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // org.apache.dubbo.registry.support.FailbackRegistry#doNotify protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); } // org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>) /** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } // keep every provider's category. Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); // 再交由最原始傳入的 listener 進行處理, 即 RegistryDirectory.notify() listener.notify(categoryList); // We will update our cache file after each notification. // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. saveProperties(url); } } // org.apache.dubbo.registry.integration.RegistryDirectory#notify @Override public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); // 添加到全部提供者列表中,以便在後續請求能夠將最新的地址信息給consumer使用 toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); /** * 3.x added for extend URL address */ ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); }
除了服務下線外,還須要解除訂閱關係。以便zookeeper不用再維護其監聽推送。
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnsubscribe @Override public void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners != null) { ChildListener zkListener = listeners.get(listener); if (zkListener != null) { if (ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); zkClient.removeChildListener(root, zkListener); } else { // 將路徑列舉出來,一個個解除監聽 for (String path : toCategoriesPath(url)) { zkClient.removeChildListener(path, zkListener); } } } } } // 解除監聽操做一樣是由關閉鉤子 觸發的 // org.apache.dubbo.registry.support.AbstractRegistry#destroy @Override public void destroy() { if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } Set<URL> destroyRegistered = new HashSet<>(getRegistered()); if (!destroyRegistered.isEmpty()) { for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(DYNAMIC_KEY, true)) { try { unregister(url); if (logger.isInfoEnabled()) { logger.info("Destroy unregister url " + url); } } catch (Throwable t) { logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { try { // 解除監聽操做 unsubscribe(url, listener); if (logger.isInfoEnabled()) { logger.info("Destroy unsubscribe url " + url); } } catch (Throwable t) { logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } AbstractRegistryFactory.removeDestroyedRegistry(this); } // org.apache.dubbo.registry.support.FailbackRegistry#unsubscribe @Override public void unsubscribe(URL url, NotifyListener listener) { super.unsubscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a canceling subscription request to the server side // 調用 ZookeeperRegistry.doUnregister(), 刪除路徑監聽 doUnsubscribe(url, listener); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedUnsubscribed(url, listener); } }
和前面的下線操做意義相同,只是一個是 delete 路徑,一個是解除變動監聽。
dubbo 中以服務爲單位進行註冊通知管理,避免了一個因應用提供者提供許多服務,而致使消費者必須處理大量數據的狀況。各消費者只關注與本身相關的服務路徑,從而最小化影響。經過臨時節點和心跳機制,保證了各服務的準確體現。這是zk比較擅長的。雖然該點功能小,卻意義重大。