

  引用官方文檔的原話,若是將Dubbo託管在Spring-IOC容器下,Dubbo服務引用的時機有兩個,第一個是在Spring容器調用ReferenceBean的afterPropertiesSet方法時引用服務,第二個是在ReferenceBean對應的服務被注入到其餘類中時引用。這兩個引用服務的時機區別在於,第一個是餓漢式的,第二個是懶漢式的。默認狀況下,Dubbo使用懶漢式引用服務。若是須要使用餓漢式,可經過配置 <dubbo:reference> 的init屬性開啓。下面咱們按照Dubbo默認配置進行分析,整個分析過程從ReferenceBean的getObject方法開始。當咱們的服務被注入到其餘類中時,Spring會第一時間調用getObject方法,並由該方法執行服務引用邏輯。按照慣例,在進行具體工做以前,需先進行配置檢查與收集工做。接着根據收集到的信息決定服務用的方式,有三種,第一種是引用本地(JVM)服務,第二是經過直連方式引用遠程服務,第三是經過註冊中心引用遠程服務。不論是哪一種引用方式,最後都會獲得一個Invoker實例。若是有多個註冊中心,多個服務提供者,這個時候會獲得一組Invoker實例,此時須要經過集羣管理類Cluster將多個Invoker合併成一個實例。合併後的Invoker實例已經具有調用本地或遠程服務的能力了,但並不能將此實例暴露給用戶使用,這會對用戶業務代碼形成侵入。此時框架還須要經過代理工廠類(ProxyFactory)爲服務接口生成代理類,並讓代理類去調用 Invoker邏輯。避免了Dubbo框架代碼對業務代碼的侵入,同時也讓框架更容易使用。


public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    if (ref == null) {
    return ref;


private void init() {
    if (initialized) {
    initialized = true;
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:reference interface=\"\"... not allow null!");
    //(2)調用appendProperties(AbstractConfig config)方法完善ConsumerConfig的配置;
    //調用appendProperties(AbstractConfig config)方法完善ReferenceConfig的配置,該方法邏輯以下:
    if (getGeneric() == null && getConsumer() != null) {
    if (ProtocolUtils.isGeneric(getGeneric())) {
        interfaceClass = GenericService.class;
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        checkInterfaceAndMethods(interfaceClass, methods);
    /****************************** begin ******************************/  
    //該配置值賦給成員變量String url,用於服務消費方點對點調用服務提供方。    
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
        resolveFile = System.getProperty("dubbo.resolve.file");
        if (resolveFile == null || resolveFile.length() == 0) {
            File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
            if (userResolveFile.exists()) {
                resolveFile = userResolveFile.getAbsolutePath();
        if (resolveFile != null && resolveFile.length() > 0) {
            Properties properties = new Properties();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(resolveFile));
            } catch (IOException e) {
                throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
            } finally {
                try {
                    if (null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
            resolve = properties.getProperty(interfaceName);
    if (resolve != null && resolve.length() > 0) {
        url = resolve;
    /****************************** end ******************************/  
    /****************************** begin ******************************/  
    if (consumer != null) {
        if (application == null) {
            application = consumer.getApplication();
        if (module == null) {
            module = consumer.getModule();
        if (registries == null) {
            registries = consumer.getRegistries();
        if (monitor == null) {
            monitor = consumer.getMonitor();
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        if (monitor == null) {
            monitor = module.getMonitor();
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        if (monitor == null) {
            monitor = application.getMonitor();
    /****************************** end ******************************/  
    /****************************** start ******************************/  
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); //side=consumer
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); //dubbo=2.6.2
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //timestamp=時間戳
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); //pid=進程pid
    if (!isGeneric()) {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    map.put(Constants.INTERFACE_KEY, interfaceName); //interface=interfaceName
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    /****************************** end ******************************/  

    /****************************** start ******************************/  
    //下面代碼塊的做用是處理MethodConfig 實例。該實例包含了事件通知配置如onreturn、onthrow、oninvoke等。
    String prefix = StringUtils.getServiceKey(map);
    if (methods != null && !methods.isEmpty()) {
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
            appendAttributes(attributes, method, prefix + "." + method.getName());
            checkAndConvertImplicitConfig(method, map, attributes);
    /****************************** end ******************************/  

    /****************************** start ******************************/  
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property ... value: ...");
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //register.ip=實際的IP地址
    /****************************** end ******************************/  

    ref = createProxy(map);
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);



private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer; 
    if (isInjvm() == null) {       
        if (url != null && url.length() > 0) { 
            isJvmRefer = false;
        } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
            isJvmRefer = true;
        } else {
            isJvmRefer = false;
    } else {
        isJvmRefer = isInjvm().booleanValue();
    /******************************* injvm調用 *******************************/
    if (isJvmRefer) {
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);        
        invoker = refprotocol.refer(interfaceClass, url);        
    } else {
        /******************************* 點對點直連調用 *******************************/   
        if (url != null && url.length() > 0) { 
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        url = url.setPath(interfaceName);
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    } else {
                        urls.add(ClusterUtils.mergeUrl(url, map));
        } else {  
            /******************************* 經過註冊中心調用 *******************************/
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    //(3)將註冊中心URL存入ReferenceConfig的全局變量List<URL> urls中;
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
            if (urls == null || urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference ...");
        //若是成員變量List<URL> urls大小爲1,則直接經過Protocol自適應拓展類構建Invoker實例接口。
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));

    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    if (c == null) {
        c = true; // default true
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("Failed to check the status of the service ...");
    return (T) proxyFactory.getProxy(invoker);



 * 獲取服務引用的I
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    //在獲取到RegistryProtocol這個擴展類實例時,dubbo SPI機制會自動裝配它的RegistryFactory字段,
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);

    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
    return doRefer(cluster, registry, type, url);

 * 建立Invoker對象
 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
     directory.setRegistry(registry); //添加註冊中心屬性
     directory.setProtocol(protocol); //添加協議自適應擴展類
     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());      
     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
     if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
                 Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));
     //向subscribeUrl的parameter參數添加鍵值對"category" -> "providers,configurators,routers"
                Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY
                + "," + Constants.ROUTERS_CATEGORY));

     Invoker invoker = cluster.join(directory);
     //而後以serviceUniqueName = consumerUrl.getServiceKey()作爲key,存入
     //在ProviderConsumerRegTable中的static變量ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>
     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
     return invoker;



private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

private ZookeeperTransporter zookeeperTransporter;

 * 獲取一個註冊中心封裝對象
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
        .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
        .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        REGISTRIES.put(key, registry);
        return registry;
    } finally {

 * 建立緩存,直接new一個ZookeeperRegistry對象
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);




 * ZookeeperRegistry的構造方法
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    this.root = group;
    zkClient = zookeeperTransporter.connect(url);
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);


private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

private final ScheduledFuture<?> retryFuture;

private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

 * FailbackRegistry構造函數
public FailbackRegistry(URL url) {
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        public void run() {
            try {
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);


private final Set<URL> registered = new ConcurrentHashSet<URL>();

private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();

 * AbstractRegistry構造函數
public AbstractRegistry(URL url) {
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        file = new File(filename);
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid registry store file ...");
    this.file = file;
    //讀取緩存的配置文件,將讀取結果存到Properties properties成員屬性中



 * 繼承自父類FailbackRegistry的register方法,用於註冊服務引用(消費者)的URL
public void register(URL url) {
    failedRegistered.remove(url); //從註冊失敗列表中移除該url    
    failedUnregistered.remove(url); //從註銷失敗列表中移除該url  
    try {
    } catch (Exception e) {
        Throwable t = e;
        //計算flag check的布爾值
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
            && url.getParameter(Constants.CHECK_KEY, true)
            && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);

        //對於註冊失敗的消費者URL,添加到註冊失敗列表中Set<URL> failedRegistered

 * 由ZookeeperRegistry實現的doRegister方法
protected void doRegister(URL url) {
    try {
        //(2)url.getParameter(Constants.DYNAMIC_KEY, true)決定是否建立臨時節點,true-臨時節點。
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);



 * RegistryDirectory的subscribe方法
public void subscribe(URL url) {
    setConsumerUrl(url); //簡單的設置RegistryDirectory成員變量consumerUrl
    registry.subscribe(url, this); 


 * 繼承自FailbackRegistry的subscribe方法
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;
        List<URL> urls = getCacheUrls(url);
        if (urls != null && !urls.isEmpty()) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe ... Using cached list: ...");
        } else {
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                throw new IllegalStateException("Failed to subscribe ... cause: ");
            } else {
                logger.error("Failed to subscribe url ... waiting for retry ...");
        //向成員變量 ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed添加訂閱失敗的數據
        addFailedSubscribed(url, listener);

 * 由ZookeeperRegistry實現的doSubscribe方法
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                                           Constants.CHECK_KEY, String.valueOf(false)), listener);
                zkListener = listeners.get(listener);
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && !services.isEmpty()) {
                for (String service : services) {
                    service = URL.decode(service);
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
        } else {
            List<URL> urls = new ArrayList<URL>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                    zkListener = listeners.get(listener);
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
            notify(url, listener, urls);
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

   相似於register方法,FallbackRegistry首先調用父類AbstractRegistry的subscribe,在通過一系列的校驗以後,向成員變量ConcurrentMap<URL, Set<NotifyListener>> subscribed添加服務引用(即消費者)的URL和監聽器。併發

 * AbstractRegistry的subscribe方法
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners == null) {
        subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
        listeners = subscribed.get(url);



 * 用於根據Curator客戶端推送的鏈接狀態,RECONNECTED進行恢復
protected void recover() throws Exception {
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        //將getRegistered()添加到成員變量Set<URL> failedRegistered中
        for (URL url : recoverRegistered) {
    //獲取恢復的訂閱列表Map<URL, Set<NotifyListener>>,getSubscribed()獲取成員變量subscribed
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);

 * 對於成員變量failedRegistered、failedUnregistered、failedSubscribed、
 * failedUnsubscribed、failedNotified進行重試,能夠看到,若是重試成功則將
 * 其移出相應的重試列表,若是重試失敗,則忽略異常等待下次重試
protected void retry() {
    if (!failedRegistered.isEmpty()) {
        Set<URL> failed = new HashSet<URL>(failedRegistered);
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry register " + failed);
            try {
                for (URL url : failed) {
                    try {
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry register ... waiting for again, cause: ...");
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry register ... waiting for again, cause: ...");
    if (!failedUnregistered.isEmpty()) {
        Set<URL> failed = new HashSet<URL>(failedUnregistered);
        if (!failed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry unregister " + failed);
            try {
                for (URL url : failed) {
                    try {
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry unregister ... waiting for again, cause: ...");
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry unregister ... waiting for again, cause: ");
    if (!failedSubscribed.isEmpty()) {
        Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>
                                                         (failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().size() == 0) {
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry subscribe " + failed);
            try {
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doSubscribe(url, listener);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry subscribe ... waiting for again, cause: ...");
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry subscribe ... waiting for again, cause: ");
    if (!failedUnsubscribed.isEmpty()) {
        Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>
                                                         (failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().isEmpty()) {
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry unsubscribe " + failed);
            try {
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doUnsubscribe(url, listener);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ..");
            } catch (Throwable t) { 
                logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ...");
    if (!failedNotified.isEmpty()) {
        Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, 
        for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, 
                                                     List<URL>>>(failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().size() == 0) {
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry notify " + failed);
            try {
                for (Map<NotifyListener, List<URL>> values : failed.values()) {
                    for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
                        try {
                            NotifyListener listener = entry.getKey();
                            List<URL> urls = entry.getValue();
                        } catch (Throwable t) { 
                            logger.warn("Failed to retry notify ... waiting for again, cause: ...");
            } catch (Throwable t) { 
                logger.warn("Failed to retry notify ... waiting for again, cause: ...");



public ZookeeperClient connect(URL url) {
    return new CuratorZookeeperClient(url);


 public CuratorZookeeperClient(URL url) {
     try {
         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
             .retryPolicy(new RetryNTimes(1, 1000))
         String authority = url.getAuthority();
         if (authority != null && authority.length() > 0) {
             builder = builder.authorization("digest", authority.getBytes());
         client = builder.build();
         client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
             public void stateChanged(CuratorFramework client, ConnectionState state) {
                 if (state == ConnectionState.LOST) {
                 } else if (state == ConnectionState.CONNECTED) {
                 } else if (state == ConnectionState.RECONNECTED) {
     } catch (Exception e) {
         throw new IllegalStateException(e.getMessage(), e);



 * StubProxyFactoryWrapper的getProxy方法
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    T proxy = proxyFactory.getProxy(invoker);
    if (GenericService.class != invoker.getInterface()) {
        String stub = invoker.getUrl().getParameter(
            Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
        if (ConfigUtils.isNotEmpty(stub)) {
            Class<?> serviceType = invoker.getInterface();
            if (ConfigUtils.isDefault(stub)) {
                if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                    stub = serviceType.getName() + "Stub";
                } else {
                    stub = serviceType.getName() + "Local";
            try {
                Class<?> stubClass = ReflectUtils.forName(stub);
                if (!serviceType.isAssignableFrom(stubClass)) {
                    throw new IllegalStateException("The stub implementation class ...");
                try {
                    Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                    proxy = (T) constructor.newInstance(new Object[]{proxy});
                    //export stub service
                    URL url = invoker.getUrl();
                    if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                        url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(
                            Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                        url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                        try {
                            export(proxy, (Class) invoker.getInterface(), url);
                        } catch (Exception e) {
                            LOGGER.error("export a stub service error.", e);
                } catch (NoSuchMethodException e) {
                    throw new IllegalStateException("No such constructor \"public ...");
            } catch (Throwable t) {
                LOGGER.error("Failed to create stub implementation class ...");
    return proxy;

 * JavassistProxyFactory的getProxy方法,繼承自AbstractProxyFactory
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    Class<?>[] interfaces = null;
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    return getProxy(invoker, interfaces);

 * JavassistProxyFactory實現的getProxy方法,非繼承,獲取服務接口代理類對象
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    //return invoker.invoke(new RpcInvocation(method, args)).recreate();
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));



private static final Map<ClassLoader, Map<String, Object>> ProxyCacheMap = 
    new WeakHashMap<ClassLoader, Map<String, Object>>();

 * 生成Proxy子類,該子類封裝了生成服務接口代理類的邏輯
public static Proxy getProxy(Class<?>... ics) {
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);

 * 生成Proxy子類,該子類封裝了生成服務接口代理類的邏輯
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");
        Class<?> tmp = null;
        try {
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            ProxyCacheMap.put(cl, cache);
    String key = sb.toString();
    Proxy proxy = null;
    synchronized (cache) {
        do {
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null)
                    return proxy;
            //   就表示Proxy實例還沒有建立完成,在此進行等待;直到實例建立完成並進行notify。
            if (value == PendingGenerationMarker) {
                try {
                } catch (InterruptedException e) {
            } else {
                cache.put(key, PendingGenerationMarker);
        while (true);

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        /************************* 開始建立服務接口代理類 *************************/
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();
        for (int i = 0; i < ics.length; i++) {
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        throw new IllegalArgumentException("non-public interfaces from diff pack...");
            for (Method method : ics[i].getMethods()) {
                String desc = ReflectUtils.getDesc(method);
                if (worked.contains(desc))

                //服務接口代理類方法大小 TODO 
                int ix = methods.size();
                Class<?> rt = method.getReturnType();
                Class<?>[] pts = method.getParameterTypes();

                //拼接代碼字符串"Object[] args = new Object[N];",N是當前遍歷的method參數個數
                StringBuilder code = new StringBuilder("Object[] args = new Object[").
                for (int j = 0; j < pts.length; j++)
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                //拼接handler的調用語句,"Object ret = handler.invoke(this, methods[ix], args);"
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                //若方法返回類型不爲void,則拼接返回類型並進行強制類型轉換"return (類型)ret;"
                if (!Void.TYPE.equals(rt))
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts,
                              method.getExceptionTypes(), code.toString());
        if (pkg == null)
            pkg = PACKAGE_NAME;
        //設置服務接口代理類類名爲"pkg + ".proxy" + id",好比org.apache.dubbo.proxy0,注意是小寫
        String pcn = pkg + ".proxy" + id;
        //添加成員屬性Method[] methods
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        //public proxy0(java.lang.reflect.InvocationHandler $1) { handler=$1; }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, 
                           new Class<?>[0], "handler=$1;");
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        /************************* 開始建立Proxy類的子類 *************************/          
        String fcn = Proxy.class.getName() + id; //類名=Proxy全限定名+id,如Proxy一、Proxy2等      
        ccm = ClassGenerator.newInstance(cl); //建立生成Proxy子類的ClassGenerator       
        ccm.setClassName(fcn); //設置類名       
        ccm.addDefaultConstructor(); //添加默認構造器     
        ccm.setSuperClass(Proxy.class); //設置父類
        //public Object newInstance(java.lang.reflect.InvocationHandler $1) { 
        //    return new org.apache.dubbo.proxy0($1);
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + 
                      " h){ return new " + pcn + "($1); }");
        Class<?> pc = ccm.toClass(); //生成Proxy子類Class
        proxy = (Proxy) pc.newInstance(); //生成Proxy子類的實例
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        if (ccp != null)
        if (ccm != null)
        synchronized (cache) {
            if (proxy == null)
                cache.put(key, new WeakReference<Proxy>(proxy));
    return proxy;