## 建立註冊中心apache
服務註冊操做對於 Dubbo 來講不是必需的,經過服務直連的方式就能夠繞過註冊中心。但一般咱們不會這麼作,直連方式不利於服務治理,僅推薦在測試服務時使用。對於 Dubbo 來講,註冊中心雖不是必需,但倒是必要的。app
在dubbo-config.xml配置註冊中心,指定zookeeper做爲註冊中心實現方式ide
```測試
<!-- zookeeper註冊中心--> <dubbo:registry protocol="zookeeper" address="${dubbo.registry.address}" check="false" port="${dubbo.protocol.port}"> <dubbo:parameter key="qos.enable" value="false"/> </dubbo:registry>
```ui
在RegistryProtocal中註冊過程在export方法this
```url
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException { RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker); URL registryUrl = this.getRegistryUrl(originInvoker); Registry registry = this.getRegistry(originInvoker); URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker); boolean register = registedProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); //服務註冊到註冊中心 if (register) { this.register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl); RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker); this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
```spa
register方法以下code
```router
public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = this.registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
```
ResistryFactory的有多種實現
@SPI("dubbo") public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL var1); }
RegistryFactory接口是@SPI註解,若是未指定,默認使用dubbo做爲註冊中心
下面看下Zookeeper的實現
ZookeeperRegistry繼承AbstractRegistry類
```
public abstract class AbstractRegistryFactory implements RegistryFactory { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class); private static final ReentrantLock LOCK = new ReentrantLock(); private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap(); public AbstractRegistryFactory() { } public static Collection<Registry> getRegistries() { return Collections.unmodifiableCollection(REGISTRIES.values()); } //使用重入鎖銷燬註冊中心 public static void destroyAll() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } LOCK.lock(); try { Iterator i$ = getRegistries().iterator(); while(i$.hasNext()) { Registry registry = (Registry)i$.next(); try { registry.destroy(); } catch (Throwable var6) { LOGGER.error(var6.getMessage(), var6); } } REGISTRIES.clear(); } finally { LOCK.unlock(); } } //獲取註冊中心實例 public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()).addParameter("interface", RegistryService.class.getName()).removeParameters(new String[]{"export", "refer"}); String key = url.toServiceString(); LOCK.lock(); Registry var4; try { Registry registry = (Registry)REGISTRIES.get(key); if (registry == null) { registry = this.createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); var4 = registry; return var4; } var4 = registry; } finally { LOCK.unlock(); } return var4; } protected abstract Registry createRegistry(URL var1); }
```
createRegistry方法須要實現類重寫
public class ZookeeperRegistry extends FailbackRegistry { private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class); private static final int DEFAULT_ZOOKEEPER_PORT = 2181; private static final String DEFAULT_ROOT = "dubbo"; private final String root; private final Set<String> anyServices = new ConcurrentHashSet(); private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap(); private final ZookeeperClient zkClient; public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } else { //設置group 默認dubbo String group = url.getParameter("group", "dubbo"); if (!group.startsWith("/")) { group = "/" + group; } this.root = group; // 建立 Zookeeper 客戶端 this.zkClient = zookeeperTransporter.connect(url); // 添加狀態監聽器 this.zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == 2) { try { ZookeeperRegistry.this.recover(); } catch (Exception var3) { ZookeeperRegistry.logger.error(var3.getMessage(), var3); } } } }); } } static String appendDefaultPort(String address) { if (address != null && address.length() > 0) { int i = address.indexOf(58); if (i < 0) { return address + ":" + 2181; } if (Integer.parseInt(address.substring(i + 1)) == 0) { return address.substring(0, i + 1) + 2181; } } return address; } public boolean isAvailable() { return this.zkClient.isConnected(); } public void destroy() { super.destroy(); try { this.zkClient.close(); } catch (Exception var2) { logger.warn("Failed to close zookeeper client " + this.getUrl() + ", cause: " + var2.getMessage(), var2); } } protected void doRegister(URL url) { try { this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true)); } catch (Throwable var3) { throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } } protected void doUnregister(URL url) { try { this.zkClient.delete(this.toUrlPath(url)); } catch (Throwable var3) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } } protected void doSubscribe(final URL url, final NotifyListener listener) { try { if ("*".equals(url.getServiceInterface())) { String root = this.toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners == null) { this.zkListeners.putIfAbsent(url, new ConcurrentHashMap()); listeners = (ConcurrentMap)this.zkListeners.get(url); } ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { Iterator i$ = currentChilds.iterator(); while(i$.hasNext()) { String child = (String)i$.next(); child = URL.decode(child); if (!ZookeeperRegistry.this.anyServices.contains(child)) { ZookeeperRegistry.this.anyServices.add(child); ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener); } } } }); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(root, false); List<String> services = this.zkClient.addChildListener(root, zkListener); if (services != null && services.size() > 0) { Iterator i$ = services.iterator(); while(i$.hasNext()) { String service = (String)i$.next(); service = URL.decode(service); this.anyServices.add(service); this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener); } } } else { List<URL> urls = new ArrayList(); String[] arr$ = this.toCategoriesPath(url); int len$ = arr$.length; for(int i$ = 0; i$ < len$; ++i$) { String path = arr$[i$]; ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners == null) { this.zkListeners.putIfAbsent(url, new ConcurrentHashMap()); listeners = (ConcurrentMap)this.zkListeners.get(url); } ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(path, false); List<String> children = this.zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(this.toUrlsWithEmpty(url, path, children)); } } this.notify(url, listener, urls); } } catch (Throwable var11) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: " + var11.getMessage(), var11); } } protected void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners != null) { ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener != null) { this.zkClient.removeChildListener(this.toUrlPath(url), zkListener); } } } public List<URL> lookup(URL url) { if (url == null) { throw new IllegalArgumentException("lookup url == null"); } else { try { List<String> providers = new ArrayList(); String[] arr$ = this.toCategoriesPath(url); int len$ = arr$.length; for(int i$ = 0; i$ < len$; ++i$) { String path = arr$[i$]; List<String> children = this.zkClient.getChildren(path); if (children != null) { providers.addAll(children); } } return this.toUrlsWithoutEmpty(url, providers); } catch (Throwable var8) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + this.getUrl() + ", cause: " + var8.getMessage(), var8); } } } private String toRootDir() { return this.root.equals("/") ? this.root : this.root + "/"; } private String toRootPath() { return this.root; } private String toServicePath(URL url) { String name = url.getServiceInterface(); return "*".equals(name) ? this.toRootPath() : this.toRootDir() + URL.encode(name); } private String[] toCategoriesPath(URL url) { String[] categroies; if ("*".equals(url.getParameter("category"))) { categroies = new String[]{"providers", "consumers", "routers", "configurators"}; } else { categroies = url.getParameter("category", new String[]{"providers"}); } String[] paths = new String[categroies.length]; for(int i = 0; i < categroies.length; ++i) { paths[i] = this.toServicePath(url) + "/" + categroies[i]; } return paths; } private String toCategoryPath(URL url) { return this.toServicePath(url) + "/" + url.getParameter("category", "providers"); } private String toUrlPath(URL url) { return this.toCategoryPath(url) + "/" + URL.encode(url.toFullString()); } private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { List<URL> urls = new ArrayList(); if (providers != null && providers.size() > 0) { Iterator i$ = providers.iterator(); while(i$.hasNext()) { String provider = (String)i$.next(); provider = URL.decode(provider); if (provider.contains("://")) { URL url = URL.valueOf(provider); if (UrlUtils.isMatch(consumer, url)) { urls.add(url); } } } } return urls; } private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) { List<URL> urls = this.toUrlsWithoutEmpty(consumer, providers); if (urls == null || urls.isEmpty()) { int i = path.lastIndexOf(47); String category = i < 0 ? path : path.substring(i + 1); URL empty = consumer.setProtocol("empty").addParameter("category", category); urls.add(empty); } return urls; } }
下面分析CuratorZookeeperClient實現
public CuratorZookeeperClient(URL url) { super(url); try { // 建立 CuratorFramework 構造器 Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 構建 CuratorFramework 實例 this.client = builder.build(); this.client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(0); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(1); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(2); } } }); this.client.start(); } catch (Exception var4) { throw new IllegalStateException(var4.getMessage(), var4); } }
以 Zookeeper 爲例,所謂的服務註冊,本質上是將服務配置數據寫入到 Zookeeper 的某個路徑的節點下
## 服務註冊
用法
<dubbo:service retries="0" interface="com.rbsn.tms.operatebill.service.WechatPayService" ref="wechatPayService"/>
服務註冊在FailbackRegistry類中
```
public void register(URL url) { if (!this.destroyed.get()) { super.register(url); this.failedRegistered.remove(url); this.failedUnregistered.remove(url); try { //服務註冊 this.doRegister(url); } catch (Exception var6) { Throwable t = var6; // 獲取 check 參數,若 check = true 將會直接拋出異常 boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true) && !"consumer".equals(url.getProtocol()); boolean skipFailback = var6 instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = var6.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + this.getUrl().getAddress() + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t); } this.logger.error("Failed to register " + url + ", waiting for retry, cause: " + var6.getMessage(), var6); this.failedRegistered.add(url); } } }
```
doRegistry方法在ZookeeperResistry中, 經過 Zookeeper 客戶端建立節點,節點路徑由 toUrlPath 方法生成,
路徑格式以下: // /${group}/${serviceInterface}/providers/${url} // 好比 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
```
protected void doRegister(URL url) { try { this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true)); } catch (Throwable var3) { throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } }
```
create方法在AbstractZookeeperClient類中,ephemeral參數表示是否建立臨時節點 默認true
```
public void create(String path, boolean ephemeral) { int i = path.lastIndexOf(47); if (i > 0) { String parentPath = path.substring(0, i); // 若是要建立的節點類型非臨時節點,那麼這裏要檢測節點是否存在 if (!this.checkExists(parentPath)) { this.create(parentPath, false); } } if (ephemeral) { //建立臨時節點 this.createEphemeral(path); } else { //建立持久化節點 this.createPersistent(path); } }
```
經過遞歸建立當前節點的上一級路徑,而後再根據 ephemeral 的值決定建立臨時仍是持久節點,createEphemeral方法實現以下
public void createEphemeral(String path) { try { ((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(path); } catch (NodeExistsException var3) { ; } catch (Exception var4) { throw new IllegalStateException(var4.getMessage(), var4); } }
整個過程可簡單總結爲:先建立註冊中心實例,以後再經過註冊中心實例註冊服務