dubbo源碼分析(5)

前幾篇已經分析了服務提供者和服務消費者加載以及啓動,本篇將大體分析服務提供者和服務消費者是如何向註冊中心註冊的。html

服務註冊java

對於服務提供方,它須要發佈服務,並且因爲應用系統的複雜性,服務的數量、類型也不斷膨脹;對於服務消費方,它最關心如何獲取到它所須要的服務,而面對複雜的應用系統,須要管理大量的服務調用。並且,對於服務提供方和服務消費方來講,他們還有可能兼具這兩種角色,即既須要提供服務,有須要消費服務。經過將服務統一管理起來,能夠有效地優化內部應用對服務發佈/使用的流程和管理。服務註冊中心能夠經過特定協議來完成服務對外的統一。服務器

dubbo提供的註冊中心,查看源碼有如下幾種類型app

服務首先暴露在服務端,而後調用Registry的register方法在註冊中心註冊服務,而後用戶經過配置文件中配置的service的url去subscribe(訂閱服務),Registry接收到訂閱消息後會往url對應的的List<NotifyListener>中塞入當前NotifyListener,反之從這個list中移除listener就是取消訂閱。registry會調用據consumer的訂閱狀況調用notify方法推送服務列表給Consumer。ide

如下實例咱們以zookeeper註冊中心爲實例來分析,其餘幾種也比較類型,沒有用到過。暫時忽略。函數

根據dubbo擴展機制,前篇文章中提到過,這裏再也不說明源碼分析

文件中內容:優化

zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory

首先會執行ZookeeperRegistryFactory類中的createRegistry(URL url),而後調用ZookeeperRegistry中的構造函數this

代碼以下:url

  public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        //若是provider的url是「0.0.0.0」或者在參數中帶anyHost=true則拋出異常註冊地址不存在
        if (url.isAnyHost()) {
    		throw new IllegalStateException("registry address == null");
    	}
    	//服務分組(默認「dubbo」) 
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (! group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
            	if (state == RECONNECTED) {
	            	try {
			     recover();
			} catch (Exception e) {
			      logger.error(e.getMessage(), e);
			}
            	}
            }
        });
    }

服務提供者(provider)初始化時會調用doRegister方法向註冊中心發起註冊

 protected void doRegister(URL url) {
        try {
           //鏈接註冊中心
           zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

服務端註冊以後,那麼客戶端又是怎麼調用的呢?

服務消費者在初始化ConsumerConfig時會調用RegistryProtocol的refer方法進一步調用RegistryDirectory的subscribe方法最終調用ZookeeperRegistry的subscribe方法向註冊中心訂閱服務。

RegistryProtocol中refer方法

 @SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
      url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
        	return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        return doRefer(cluster, registry, type, url);
    }

FailBackRegistry的subscribe方法

 @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 向服務器端發送訂閱請求
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && urls.size() > 0) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // 若是開啓了啓動時檢測,則直接拋出異常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // 將失敗的訂閱請求記錄到失敗列表,定時重試
            addFailedSubscribed(url, listener);
        }
    }

而後,又調用Zookeeper中的doSubscribe方法

   protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            //若是provider的service接口配置是「*」
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                //獲取服務分組根路徑
                String root = toRootPath();
                //獲取服務的listener
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    //若是沒有則建立一個
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                //若是沒有子監聽器則建立
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
								child = URL.decode(child);
                                if (! anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //向服務器訂閱服務,註冊中心會調用NotifyListener的notify函數返回服務列表
                zkClient.create(root, false);
                //獲取服務地址列表
                List<String> services = zkClient.addChildListener(root, zkListener);
                //若是存在服務 
                if (services != null && services.size() > 0) {
                    for (String service : services) {
			service = URL.decode(service);
			anyServices.add(service);
			//若是serviceInterface是「*」則從分組根路徑遍歷service並訂閱全部服務  
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
            //若是serviceInterface不是「*」則建立Zookeeper客戶端索取服務列表,並通知(notify)消費者(consumer)這些服務能夠用了 
                List<URL> urls = new ArrayList<URL>();
                //獲取相似於http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    //獲取ChildListener
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                            	ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //建立Zookeeper客戶端
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                    	urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //提醒消費者  
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

 至此,Dubbo的源碼分析結束。若有不對的地方,敬請指教!

相關文章
相關標籤/搜索