前幾篇已經分析了服務提供者和服務消費者加載以及啓動,本篇將大體分析服務提供者和服務消費者是如何向註冊中心註冊的。html
服務註冊java
對於服務提供方,它須要發佈服務,並且因爲應用系統的複雜性,服務的數量、類型也不斷膨脹;對於服務消費方,它最關心如何獲取到它所須要的服務,而面對複雜的應用系統,須要管理大量的服務調用。並且,對於服務提供方和服務消費方來講,他們還有可能兼具這兩種角色,即既須要提供服務,有須要消費服務。經過將服務統一管理起來,能夠有效地優化內部應用對服務發佈/使用的流程和管理。服務註冊中心能夠經過特定協議來完成服務對外的統一。服務器
dubbo提供的註冊中心,查看源碼有如下幾種類型app
服務首先暴露在服務端,而後調用Registry的register方法在註冊中心註冊服務,而後用戶經過配置文件中配置的service的url去subscribe(訂閱服務),Registry接收到訂閱消息後會往url對應的的List<NotifyListener>中塞入當前NotifyListener,反之從這個list中移除listener就是取消訂閱。registry會調用據consumer的訂閱狀況調用notify方法推送服務列表給Consumer。ide
如下實例咱們以zookeeper註冊中心爲實例來分析,其餘幾種也比較類型,沒有用到過。暫時忽略。函數
根據dubbo擴展機制,前篇文章中提到過,這裏再也不說明源碼分析
文件中內容:優化
zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory
首先會執行ZookeeperRegistryFactory類中的createRegistry(URL url),而後調用ZookeeperRegistry中的構造函數this
代碼以下:url
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); //若是provider的url是「0.0.0.0」或者在參數中帶anyHost=true則拋出異常註冊地址不存在 if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } //服務分組(默認「dubbo」) String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
服務提供者(provider)初始化時會調用doRegister方法向註冊中心發起註冊
protected void doRegister(URL url) { try { //鏈接註冊中心 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
服務端註冊以後,那麼客戶端又是怎麼調用的呢?
服務消費者在初始化ConsumerConfig時會調用RegistryProtocol的refer方法進一步調用RegistryDirectory的subscribe方法最終調用ZookeeperRegistry的subscribe方法向註冊中心訂閱服務。
RegistryProtocol中refer方法
@SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0 ) { if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1 || "*".equals( group ) ) { return doRefer( getMergeableCluster(), registry, type, url ); } } return doRefer(cluster, registry, type, url); }
FailBackRegistry的subscribe方法
@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服務器端發送訂閱請求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 若是開啓了啓動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 將失敗的訂閱請求記錄到失敗列表,定時重試 addFailedSubscribed(url, listener); } }
而後,又調用Zookeeper中的doSubscribe方法
protected void doSubscribe(final URL url, final NotifyListener listener) { try { //若是provider的service接口配置是「*」 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { //獲取服務分組根路徑 String root = toRootPath(); //獲取服務的listener ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { //若是沒有則建立一個 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); //若是沒有子監聽器則建立 if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { for (String child : currentChilds) { child = URL.decode(child); if (! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } //向服務器訂閱服務,註冊中心會調用NotifyListener的notify函數返回服務列表 zkClient.create(root, false); //獲取服務地址列表 List<String> services = zkClient.addChildListener(root, zkListener); //若是存在服務 if (services != null && services.size() > 0) { for (String service : services) { service = URL.decode(service); anyServices.add(service); //若是serviceInterface是「*」則從分組根路徑遍歷service並訂閱全部服務 subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { //若是serviceInterface不是「*」則建立Zookeeper客戶端索取服務列表,並通知(notify)消費者(consumer)這些服務能夠用了 List<URL> urls = new ArrayList<URL>(); //獲取相似於http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址 for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } //獲取ChildListener ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } //建立Zookeeper客戶端 zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } //提醒消費者 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
至此,Dubbo的源碼分析結束。若有不對的地方,敬請指教!