前面,咱們已經知道,基於spring這個解析入口,到發佈服務的過程,接着基於DubboProtocol去發佈,最終調用Netty的api建立了一個NettyServer。java
那麼繼續沿着RegistryProtocol.export這個方法,來看看註冊服務的代碼:spring
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //發佈本地服務 //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 訂閱override數據 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter實例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
/** * 根據invoker的地址獲取registry實例 * @param originInvoker * @return */ private Registry getRegistry(final Invoker<?> originInvoker){ URL registryUrl = originInvoker.getUrl(); //得到registry://192.168.11.156:2181的協議地址 if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { //獲得zookeeper的協議地址 String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); //registryUrl就會變成了zookeeper://192.168.11.156 registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } //registryFactory是什麼? return registryFactory.getRegistry(registryUrl); }
private RegistryFactory registryFactory; public void setRegistryFactory(RegistryFactory registryFactory) { this.registryFactory = registryFactory; }
@SPI("dubbo") public interface RegistryFactory { /** * 鏈接註冊中心. * * 鏈接註冊中心需處理契約:<br> * 1. 當設置check=false時表示不檢查鏈接,不然在鏈接不上時拋出異常。<br> * 2. 支持URL上的username:password權限認證。<br> * 3. 支持backup=10.20.153.10備選註冊中心集羣地址。<br> * 4. 支持file=registry.cache本地磁盤文件緩存。<br> * 5. 支持timeout=1000請求超時設置。<br> * 6. 支持session=60000會話超時或過時設置。<br> * * @param url 註冊中心地址,不容許爲空 * @return 註冊中心引用,總不返回空 */ @Adaptive({"protocol"}) Registry getRegistry(URL url); }
咱們拿到這個動態生成的自適應擴展點,看看這段代碼裏面的實現api
public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) " + "name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class). getExtension(extName); return extension.getRegistry(arg0); } }
這個方法中並無getRegistry方法,而是在父類AbstractRegistryFactory緩存
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // 鎖定註冊中心獲取過程,保證註冊中心單一實例 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // 釋放鎖 LOCK.unlock(); } }
建立一個註冊中心,這個是一個抽象方法,具體的實如今對應的子類實例中實現的,在ZookeeperRegistryFactory中服務器
public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } 經過zkClient,得到一個zookeeper的鏈接實例 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; //設置根節點 zkClient = zookeeperTransporter.connect(url);//創建鏈接 zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
@Override public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服務器端發送註冊請求 doRegister(url); } catch (Exception e) { Throwable t = e; // 若是開啓了啓動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 將失敗的註冊請求記錄到失敗列表,定時重試 failedRegistered.add(url); } }
終於找到你了,調用zkclient.create在zookeeper中建立一個節點。網絡
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
在register 方法裏面,調用subscribe 方法,訂閱註冊中心變化session
/** * 訂閱符合條件的已註冊數據,當有註冊數據變動時自動推送. * * 訂閱需處理契約:<br> * 1. 當URL設置了check=false時,訂閱失敗後不報錯,在後臺定時重試。<br> * 2. 當URL設置了category=routers,只通知指定分類的數據,多個分類用逗號分隔,並容許星號通配,表示訂閱全部分類數據。<br> * 3. 容許以interface,group,version,classifier做爲條件查詢,如:interface=com.alibaba.foo.BarService&version=1.0.0<br> * 4. 而且查詢條件容許星號通配,訂閱全部接口的全部分組的全部版本,或:interface=*&group=*&version=*&classifier=*<br> * 5. 當註冊中心重啓,網絡抖動,需自動恢復訂閱請求。<br> * 6. 容許URI相同但參數不一樣的URL並存,不能覆蓋。<br> * 7. 必須阻塞訂閱過程,等第一次通知完後再返回。<br> * * @param url 訂閱條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @param listener 變動事件監聽器,不容許爲空 */ void subscribe(URL url, NotifyListener listener);
protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.size() == 0) && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } // 第一次主動調用 notify // 對 /router /providers /configerations 路徑下的變動 進行notify //後續(zookeeper watcher 機制) for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); } }