dubbo源碼分析-服務導出(二)

## 建立註冊中心apache

服務註冊操做對於 Dubbo 來講不是必需的,經過服務直連的方式就能夠繞過註冊中心。但一般咱們不會這麼作,直連方式不利於服務治理,僅推薦在測試服務時使用。對於 Dubbo 來講,註冊中心雖不是必需,但倒是必要的。app

在dubbo-config.xml配置註冊中心,指定zookeeper做爲註冊中心實現方式ide

```測試

<!-- zookeeper註冊中心-->
<dubbo:registry protocol="zookeeper" address="${dubbo.registry.address}" check="false" port="${dubbo.protocol.port}">
    <dubbo:parameter key="qos.enable" value="false"/>
</dubbo:registry>

```ui

在RegistryProtocal中註冊過程在export方法this

```url

public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
    RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
    URL registryUrl = this.getRegistryUrl(originInvoker);
    Registry registry = this.getRegistry(originInvoker);
    URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
    boolean register = registedProviderUrl.getParameter("register", true);
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
    //服務註冊到註冊中心
    if (register) {
        this.register(registryUrl, registedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
    RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
    this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

```spa

register方法以下code

```router

public void register(URL registryUrl, URL registedProviderUrl) {
    Registry registry = this.registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}

```

ResistryFactory的有多種實現

@SPI("dubbo")
public interface RegistryFactory {
    @Adaptive({"protocol"})
    Registry getRegistry(URL var1);
}

RegistryFactory接口是@SPI註解,若是未指定,默認使用dubbo做爲註冊中心

下面看下Zookeeper的實現

 

ZookeeperRegistry繼承AbstractRegistry類

```

public abstract class AbstractRegistryFactory implements RegistryFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);
    private static final ReentrantLock LOCK = new ReentrantLock();
    private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap();

    public AbstractRegistryFactory() {
    }

    public static Collection<Registry> getRegistries() {
        return Collections.unmodifiableCollection(REGISTRIES.values());
    }
    //使用重入鎖銷燬註冊中心
    public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }

        LOCK.lock();

        try {
            Iterator i$ = getRegistries().iterator();

            while(i$.hasNext()) {
                Registry registry = (Registry)i$.next();

                try {
                    registry.destroy();
                } catch (Throwable var6) {
                    LOGGER.error(var6.getMessage(), var6);
                }
            }

            REGISTRIES.clear();
        } finally {
            LOCK.unlock();
        }
    }
    //獲取註冊中心實例
    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName()).addParameter("interface", RegistryService.class.getName()).removeParameters(new String[]{"export", "refer"});
        String key = url.toServiceString();
        LOCK.lock();

        Registry var4;
        try {
            Registry registry = (Registry)REGISTRIES.get(key);
            if (registry == null) {
                registry = this.createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }

                REGISTRIES.put(key, registry);
                var4 = registry;
                return var4;
            }

            var4 = registry;
        } finally {
            LOCK.unlock();
        }

        return var4;
    }

    protected abstract Registry createRegistry(URL var1);
}

```

createRegistry方法須要實現類重寫

public class ZookeeperRegistry extends FailbackRegistry {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
    private static final int DEFAULT_ZOOKEEPER_PORT = 2181;
    private static final String DEFAULT_ROOT = "dubbo";
    private final String root;
    private final Set<String> anyServices = new ConcurrentHashSet();
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap();
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        } else {
            //設置group 默認dubbo
            String group = url.getParameter("group", "dubbo");
            if (!group.startsWith("/")) {
                group = "/" + group;
            }

            this.root = group;
            // 建立 Zookeeper 客戶端
            this.zkClient = zookeeperTransporter.connect(url);
            // 添加狀態監聽器
            this.zkClient.addStateListener(new StateListener() {
                public void stateChanged(int state) {
                    if (state == 2) {
                        try {
                            ZookeeperRegistry.this.recover();
                        } catch (Exception var3) {
                            ZookeeperRegistry.logger.error(var3.getMessage(), var3);
                        }
                    }

                }
            });
        }
    }

    static String appendDefaultPort(String address) {
        if (address != null && address.length() > 0) {
            int i = address.indexOf(58);
            if (i < 0) {
                return address + ":" + 2181;
            }

            if (Integer.parseInt(address.substring(i + 1)) == 0) {
                return address.substring(0, i + 1) + 2181;
            }
        }

        return address;
    }

    public boolean isAvailable() {
        return this.zkClient.isConnected();
    }

    public void destroy() {
        super.destroy();

        try {
            this.zkClient.close();
        } catch (Exception var2) {
            logger.warn("Failed to close zookeeper client " + this.getUrl() + ", cause: " + var2.getMessage(), var2);
        }

    }

    protected void doRegister(URL url) {
        try {
            this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true));
        } catch (Throwable var3) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3);
        }
    }

    protected void doUnregister(URL url) {
        try {
            this.zkClient.delete(this.toUrlPath(url));
        } catch (Throwable var3) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3);
        }
    }

    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if ("*".equals(url.getServiceInterface())) {
                String root = this.toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
                if (listeners == null) {
                    this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                    listeners = (ConcurrentMap)this.zkListeners.get(url);
                }

                ChildListener zkListener = (ChildListener)listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            Iterator i$ = currentChilds.iterator();

                            while(i$.hasNext()) {
                                String child = (String)i$.next();
                                child = URL.decode(child);
                                if (!ZookeeperRegistry.this.anyServices.contains(child)) {
                                    ZookeeperRegistry.this.anyServices.add(child);
                                    ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener);
                                }
                            }

                        }
                    });
                    zkListener = (ChildListener)listeners.get(listener);
                }

                this.zkClient.create(root, false);
                List<String> services = this.zkClient.addChildListener(root, zkListener);
                if (services != null && services.size() > 0) {
                    Iterator i$ = services.iterator();

                    while(i$.hasNext()) {
                        String service = (String)i$.next();
                        service = URL.decode(service);
                        this.anyServices.add(service);
                        this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList();
                String[] arr$ = this.toCategoriesPath(url);
                int len$ = arr$.length;

                for(int i$ = 0; i$ < len$; ++i$) {
                    String path = arr$[i$];
                    ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
                    if (listeners == null) {
                        this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                        listeners = (ConcurrentMap)this.zkListeners.get(url);
                    }

                    ChildListener zkListener = (ChildListener)listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = (ChildListener)listeners.get(listener);
                    }

                    this.zkClient.create(path, false);
                    List<String> children = this.zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(this.toUrlsWithEmpty(url, path, children));
                    }
                }

                this.notify(url, listener, urls);
            }

        } catch (Throwable var11) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: " + var11.getMessage(), var11);
        }
    }

    protected void doUnsubscribe(URL url, NotifyListener listener) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
        if (listeners != null) {
            ChildListener zkListener = (ChildListener)listeners.get(listener);
            if (zkListener != null) {
                this.zkClient.removeChildListener(this.toUrlPath(url), zkListener);
            }
        }

    }

    public List<URL> lookup(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("lookup url == null");
        } else {
            try {
                List<String> providers = new ArrayList();
                String[] arr$ = this.toCategoriesPath(url);
                int len$ = arr$.length;

                for(int i$ = 0; i$ < len$; ++i$) {
                    String path = arr$[i$];
                    List<String> children = this.zkClient.getChildren(path);
                    if (children != null) {
                        providers.addAll(children);
                    }
                }

                return this.toUrlsWithoutEmpty(url, providers);
            } catch (Throwable var8) {
                throw new RpcException("Failed to lookup " + url + " from zookeeper " + this.getUrl() + ", cause: " + var8.getMessage(), var8);
            }
        }
    }

    private String toRootDir() {
        return this.root.equals("/") ? this.root : this.root + "/";
    }

    private String toRootPath() {
        return this.root;
    }

    private String toServicePath(URL url) {
        String name = url.getServiceInterface();
        return "*".equals(name) ? this.toRootPath() : this.toRootDir() + URL.encode(name);
    }

    private String[] toCategoriesPath(URL url) {
        String[] categroies;
        if ("*".equals(url.getParameter("category"))) {
            categroies = new String[]{"providers", "consumers", "routers", "configurators"};
        } else {
            categroies = url.getParameter("category", new String[]{"providers"});
        }

        String[] paths = new String[categroies.length];

        for(int i = 0; i < categroies.length; ++i) {
            paths[i] = this.toServicePath(url) + "/" + categroies[i];
        }

        return paths;
    }

    private String toCategoryPath(URL url) {
        return this.toServicePath(url) + "/" + url.getParameter("category", "providers");
    }

    private String toUrlPath(URL url) {
        return this.toCategoryPath(url) + "/" + URL.encode(url.toFullString());
    }

    private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
        List<URL> urls = new ArrayList();
        if (providers != null && providers.size() > 0) {
            Iterator i$ = providers.iterator();

            while(i$.hasNext()) {
                String provider = (String)i$.next();
                provider = URL.decode(provider);
                if (provider.contains("://")) {
                    URL url = URL.valueOf(provider);
                    if (UrlUtils.isMatch(consumer, url)) {
                        urls.add(url);
                    }
                }
            }
        }

        return urls;
    }

    private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
        List<URL> urls = this.toUrlsWithoutEmpty(consumer, providers);
        if (urls == null || urls.isEmpty()) {
            int i = path.lastIndexOf(47);
            String category = i < 0 ? path : path.substring(i + 1);
            URL empty = consumer.setProtocol("empty").addParameter("category", category);
            urls.add(empty);
        }

        return urls;
    }
}

下面分析CuratorZookeeperClient實現

public CuratorZookeeperClient(URL url) {
    super(url);

    try {
         // 建立 CuratorFramework 構造器 
        Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
         // 構建 CuratorFramework 實例 
        this.client = builder.build();
        this.client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            public void stateChanged(CuratorFramework client, ConnectionState state) {
                if (state == ConnectionState.LOST) {
                    CuratorZookeeperClient.this.stateChanged(0);
                } else if (state == ConnectionState.CONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(1);
                } else if (state == ConnectionState.RECONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(2);
                }

            }
        });
        this.client.start();
    } catch (Exception var4) {
        throw new IllegalStateException(var4.getMessage(), var4);
    }
}

以 Zookeeper 爲例,所謂的服務註冊,本質上是將服務配置數據寫入到 Zookeeper 的某個路徑的節點下

## 服務註冊

用法

<dubbo:service  retries="0" interface="com.rbsn.tms.operatebill.service.WechatPayService"
                ref="wechatPayService"/>

服務註冊在FailbackRegistry類中

```

public void register(URL url) {
    if (!this.destroyed.get()) {
        super.register(url);
        this.failedRegistered.remove(url);
        this.failedUnregistered.remove(url);

        try {
            //服務註冊
            this.doRegister(url);
        } catch (Exception var6) {
            Throwable t = var6;
            // 獲取 check 參數,若 check = true 將會直接拋出異常 
            boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true) && !"consumer".equals(url.getProtocol());
            boolean skipFailback = var6 instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = var6.getCause();
                }

                throw new IllegalStateException("Failed to register " + url + " to registry " + this.getUrl().getAddress() + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t);
            }

            this.logger.error("Failed to register " + url + ", waiting for retry, cause: " + var6.getMessage(), var6);
            this.failedRegistered.add(url);
        }

    }
}

```

doRegistry方法在ZookeeperResistry中, 經過 Zookeeper 客戶端建立節點,節點路徑由 toUrlPath 方法生成,

路徑格式以下: // /${group}/${serviceInterface}/providers/${url} // 好比 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......

```

protected void doRegister(URL url) {
    try {
        this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true));
    } catch (Throwable var3) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3);
    }
}

```

create方法在AbstractZookeeperClient類中,ephemeral參數表示是否建立臨時節點 默認true

```

public void create(String path, boolean ephemeral) {
    int i = path.lastIndexOf(47);
    if (i > 0) {
        String parentPath = path.substring(0, i);
         // 若是要建立的節點類型非臨時節點,那麼這裏要檢測節點是否存在 
        if (!this.checkExists(parentPath)) {
            this.create(parentPath, false);
        }
    }

    if (ephemeral) {
        //建立臨時節點
        this.createEphemeral(path);
    } else {
        //建立持久化節點
        this.createPersistent(path);
    }

}

```

經過遞歸建立當前節點的上一級路徑,而後再根據 ephemeral 的值決定建立臨時仍是持久節點,createEphemeral方法實現以下

public void createEphemeral(String path) {
    try {
        ((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(path);
    } catch (NodeExistsException var3) {
        ;
    } catch (Exception var4) {
        throw new IllegalStateException(var4.getMessage(), var4);
    }

}

整個過程可簡單總結爲:先建立註冊中心實例,以後再經過註冊中心實例註冊服務

相關文章
相關標籤/搜索