Dubbo(三):深刻理解Dubbo源碼之如何將服務發佈到註冊中心

1、前言

  前面有說到Dubbo的服務發現機制,也就是SPI,那既然Dubbo內部實現了更增強大的服務發現機制,如今咱們就來一塊兒看看Dubbo在發現服務後須要作什麼才能將服務註冊到註冊中心中。java

2、Dubbo服務註冊簡介

  首先須要明白的是Dubbo是依賴於Spring容器的(至於爲何在上篇博客中有介紹),Dubbo服務註冊過程也是始於Spring容器發佈刷新事件。然後Dubbo在接收到事件後,就會進行服務註冊,整個邏輯大體分爲三個部分:git

  一、檢查參數,組裝URL:服務消費方是經過URL拿到服務提供者的,因此咱們須要爲服務提供者配置好對應的URL。github

  二、導出服務到本地和遠程:這裏的本地指的是JVM,遠程指的是實現invoke,使得服務消費方可以經過invoke調用到服務。spring

  三、向註冊中心註冊服務:可以讓服務消費方知道服務提供方提供了那個服務。apache

3、接收Spring容器刷新事件

  在簡介中咱們提到Dubbo服務註冊是始於Spring容器發佈刷新事件,那麼Dubbo是如何接收該事件的呢?數組

  在咱們日常編寫provider的接口實現類時,都會打上@Service註解,從而這個標註這個類屬於ServiceBean。在ServiceBean中有這樣一個方法onApplicationEvent。該方法會在收到 Spring 上下文刷新事件後執行服務註冊操做緩存

 1 public void onApplicationEvent(ContextRefreshedEvent event) {  2         //是否已導出 && 是否是已被取消導出
 3         if (!this.isExported() && !this.isUnexported()) {  4             if (logger.isInfoEnabled()) {  5                 logger.info("The service ready on spring started. service: " + this.getInterface());  6  }  7 
 8             this.export();  9  } 10 
11     }
View Code

  注意這裏是2.7.3的Dubbo,接收Spring上下文刷新事件已經不須要設置延遲導出,而是在導出的時候檢查配置再決定是否須要延時,因此只有兩個判斷。而在2.6.x版本的Dubbo存在着isDelay的判斷。這個是判斷服務是否延時導出。這裏說個題外話2.6.x的版本是com.alibaba.dubbo的,而2.7.x是org.apache.dubbo的,而2.7.0也開始表明dubbo從Apache裏畢業了。服務器

  在這裏就是Dubbo服務導出到註冊中心過程的起點。須要咱們在服務接口實現類上打上@Service。ServiceBean是Dubbo與Spring 框架進行整合的關鍵,能夠看作是兩個框架之間的橋樑。具備一樣做用的類還有ReferenceBean。app

4、檢查配置參數以及URL裝配

  一、檢查配置框架

    在這一階段Dubbo須要檢查用戶的配置是否合理,或者爲用戶補充缺省配置。就是從刷新事件開始,進入export()方法,源碼解析以下:

 1 public void export() {  2         super.export();  3         this.publishExportEvent();  4  }  5 
 6 //進入到ServiceConfig.class中的export。
 7 
 8 public synchronized void export() {  9         //檢查而且更新配置
10         this.checkAndUpdateSubConfigs(); 11         //是否須要導出
12         if (this.shouldExport()) { 13             //是否須要延時
14             if (this.shouldDelay()) { 15                 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, (long)this.getDelay(), TimeUnit.MILLISECONDS); 16             } else { 17                 //馬上導出
18                 this.doExport(); 19  } 20 
21  } 22  } 23 
24 //進入checkAndUpdateSubConfigs。
25 
26 public void checkAndUpdateSubConfigs() { 27         //檢查配置項包括provider是否存在,導出端口是否可用,註冊中心是否能夠鏈接等等
28         this.completeCompoundConfigs(); 29         this.startConfigCenter(); 30         this.checkDefault(); 31         this.checkProtocol(); 32         this.checkApplication(); 33         if (!this.isOnlyInJvm()) { 34             this.checkRegistry(); 35  } 36         //檢查接口內部方法是否不爲空
37         this.refresh(); 38         this.checkMetadataReport(); 39         if (StringUtils.isEmpty(this.interfaceName)) { 40             throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); 41         } else { 42             if (this.ref instanceof GenericService) { 43                 this.interfaceClass = GenericService.class; 44                 if (StringUtils.isEmpty(this.generic)) { 45                     this.generic = Boolean.TRUE.toString(); 46  } 47             } else { 48                 try { 49                     this.interfaceClass = Class.forName(this.interfaceName, true, Thread.currentThread().getContextClassLoader()); 50                 } catch (ClassNotFoundException var5) { 51                     throw new IllegalStateException(var5.getMessage(), var5); 52  } 53 
54                 this.checkInterfaceAndMethods(this.interfaceClass, this.methods); 55                 this.checkRef(); 56                 this.generic = Boolean.FALSE.toString(); 57  } 58             //是否須要導出服務或者只是在本地運行測試
59  Class stubClass; 60             if (this.local != null) { 61                 if ("true".equals(this.local)) { 62                     this.local = this.interfaceName + "Local"; 63  } 64 
65                 try { 66                     stubClass = ClassUtils.forNameWithThreadContextClassLoader(this.local); 67                 } catch (ClassNotFoundException var4) { 68                     throw new IllegalStateException(var4.getMessage(), var4); 69  } 70 
71                 if (!this.interfaceClass.isAssignableFrom(stubClass)) { 72                     throw new IllegalStateException("The local implementation class " + stubClass.getName() + " not implement interface " + this.interfaceName); 73  } 74  } 75 
76             if (this.stub != null) { 77                 if ("true".equals(this.stub)) { 78                     this.stub = this.interfaceName + "Stub"; 79  } 80 
81                 try { 82                     stubClass = ClassUtils.forNameWithThreadContextClassLoader(this.stub); 83                 } catch (ClassNotFoundException var3) { 84                     throw new IllegalStateException(var3.getMessage(), var3); 85  } 86 
87                 if (!this.interfaceClass.isAssignableFrom(stubClass)) { 88                     throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + this.interfaceName); 89  } 90  } 91 
92             this.checkStubAndLocal(this.interfaceClass); 93             this.checkMock(this.interfaceClass); 94  } 95     }
View Code

    上面的源碼分析可看出。export方法主要檢查的配置項有@Service標籤的類是否屬性合法。服務提供者是否存在,是否有對應的Application啓動,端口是否能鏈接,是否有對應的註冊中心等等一些配置,在檢查完這些配置後Dubbo會識別咱們這次啓動服務是想在本地啓動進行一些調試,仍是將服務暴露給別人。不想暴露出去能夠進行配置

1 <dubbo:provider export="false" />

  二、URL裝配

    在Dubbo中的URL通常包括如下字段:protocol,host,port,path,parameters。在檢查配置後會進入到doExport中。

      protocol:就是URL最前面的字段,表示的是協議,通常是:dubbo thrift http zk

      host.port:就是對應的IP地址和端口

      path:接口名稱

      parameters:參數鍵值對

 1 protected synchronized void doExport() {  2         if (this.unexported) {  3             throw new IllegalStateException("The service " + this.interfaceClass.getName() + " has already unexported!");  4         } else if (!this.exported) {  5             this.exported = true;  6             if (StringUtils.isEmpty(this.path)) {  7                 this.path = this.interfaceName;  8  }  9 
10             this.doExportUrls(); 11  } 12  } 13 
14 //進入到doExportUrls
15 private void doExportUrls() { 16         //加載註冊中心連接
17         List<URL> registryURLs = this.loadRegistries(true); 18         //使用遍歷器遍歷protocols,並在每一個協議下導出服務
19         Iterator var2 = this.protocols.iterator(); 20 
21         while(var2.hasNext()) { 22             ProtocolConfig protocolConfig = (ProtocolConfig)var2.next(); 23             String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> { 24                 return p + "/" + this.path; 25             }).orElse(this.path), this.group, this.version); 26             ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass); 27  ApplicationModel.initProviderModel(pathKey, providerModel); 28             this.doExportUrlsFor1Protocol(protocolConfig, registryURLs); 29  } 30 
31  } 32 
33 //進入到加載註冊中心連接的方法
34 
35 protected List<URL> loadRegistries(boolean provider) { 36         List<URL> registryList = new ArrayList(); 37         if (CollectionUtils.isNotEmpty(this.registries)) { 38             Iterator var3 = this.registries.iterator(); 39             //循環的從註冊鏈表中拿取地址及配置
40  label47: 41             while(true) { 42  RegistryConfig config; 43  String address; 44                 do { 45                     if (!var3.hasNext()) { 46                         return registryList; 47  } 48 
49                     config = (RegistryConfig)var3.next(); 50                     address = config.getAddress(); 51                     //address爲空就默認爲0.0.0.0
52                     if (StringUtils.isEmpty(address)) { 53                         address = "0.0.0.0"; 54  } 55                 } while("N/A".equalsIgnoreCase(address)); 56 
57                 Map<String, String> map = new HashMap(); 58                 // 添加 ApplicationConfig 中的字段信息到 map 中
59                 appendParameters(map, this.application); 60                 // 添加 RegistryConfig 字段信息到 map 中
61  appendParameters(map, config); 62                 // 添加 path,protocol 等信息到 map 中
63                 map.put("path", RegistryService.class.getName()); 64  appendRuntimeParameters(map); 65                 if (!map.containsKey("protocol")) { 66                     map.put("protocol", "dubbo"); 67  } 68                 // 解析獲得 URL 列表,address 可能包含多個註冊中心 ip, 69                 // 所以解析獲得的是一個 URL 列表
70                 List<URL> urls = UrlUtils.parseURLs(address, map); 71                 Iterator var8 = urls.iterator(); 72 
73                 while(true) { 74  URL url; 75                     do { 76                         if (!var8.hasNext()) { 77                             continue label47; 78  } 79 
80                         url = (URL)var8.next(); 81                         //// 將 URL 協議頭設置爲 registry
82                         url = URLBuilder.from(url).addParameter("registry", url.getProtocol()).setProtocol("registry").build(); 83                     // 經過判斷條件,決定是否添加 url 到 registryList 中,條件以下: 84                     // (服務提供者 && register = true 或 null) || (非服務提供者 && subscribe = true 或 null)
85                     } while((!provider || !url.getParameter("register", true)) && (provider || !url.getParameter("subscribe", true))); 86 
87                     //添加url到registryList中
88  registryList.add(url); 89  } 90  } 91         } else { 92             return registryList; 93  } 94     }
View Code

    loadRegistries方法主要包含以下的邏輯:

      一、構建參數映射集合,也就是 map

      二、構建註冊中心連接列表

      三、遍歷連接列表,並根據條件決定是否將其添加到 registryList 中

    實際上由於Dubbo現現在支持不少註冊中心,因此對於一些註冊中心的URL也要進行遍歷構建。這裏是生成註冊中心的URL。還未生成Dubbo服務的URL。好比說使用的是Zookeeper註冊中心,可能從loadRegistries中拿到的就是:

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.7.3&pid=1528&qos.port=22222&registry=zookeeper&timestamp=1530743640901

    這種類型的URL,表示這是一個註冊協議,如今能夠根據這個URL定位到註冊中心去了。服務接口是RegistryService,registry的類型爲zookeeper。但是咱們還未生成Dubbo服務提供方的URL因此接着看下面代碼

    而後進行到doExportUrlsFor1Protocol(裝配Dubbo服務的URL而且實行發佈)

 1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {  2         //首先是將一些信息,好比版本、時間戳、方法名以及各類配置對象的字段信息放入到 map 中  3         //map 中的內容將做爲 URL 的查詢字符串。構建好 map 後,緊接着是獲取上下文路徑、主機名以及端口號等信息。  4        //最後將 map 和主機名等數據傳給 URL 構造方法建立 URL 對象。須要注意的是,這裏出現的 URL 並不是 java.net.URL,而是 com.alibaba.dubbo.common.URL。
 5         String name = protocolConfig.getName();  6         // 若是協議名爲空,或空串,則將協議名變量設置爲 dubbo
 7         if (StringUtils.isEmpty(name)) {  8             name = "dubbo";  9  }  10 
 11         Map<String, String> map = new HashMap();  12         // 添加 side、版本、時間戳以及進程號等信息到 map 中
 13         map.put("side", "provider");  14  appendRuntimeParameters(map);  15         // 經過反射將對象的字段信息添加到 map 中
 16         appendParameters(map, this.metrics);  17         appendParameters(map, this.application);  18         appendParameters(map, this.module);  19         appendParameters(map, this.provider);  20  appendParameters(map, protocolConfig);  21         appendParameters(map, this);  22  String scope;  23  Iterator metadataReportService;  24         // methods 爲 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標籤的配置信息
 25         if (CollectionUtils.isNotEmpty(this.methods)) {  26             Iterator var5 = this.methods.iterator();  27             //檢測 <dubbo:method> 標籤中的配置信息,並將相關配置添加到 map 中
 28  label166:  29             while(true) {  30  MethodConfig method;  31  List arguments;  32                 do {  33                     if (!var5.hasNext()) {  34                         break label166;  35  }  36 
 37                     method = (MethodConfig)var5.next();  38  appendParameters(map, method, method.getName());  39                     String retryKey = method.getName() + ".retry";  40                     if (map.containsKey(retryKey)) {  41                         scope = (String)map.remove(retryKey);  42                         if ("false".equals(scope)) {  43                             map.put(method.getName() + ".retries", "0");  44  }  45  }  46 
 47                     arguments = method.getArguments();  48                 } while(!CollectionUtils.isNotEmpty(arguments));  49 
 50                 metadataReportService = arguments.iterator();  51 
 52                 while(true) {  53  ArgumentConfig argument;  54  Method[] methods;  55                     do {  56                         do {  57                             while(true) {  58                                 if (!metadataReportService.hasNext()) {  59                                     continue label166;  60  }  61 
 62                                 argument = (ArgumentConfig)metadataReportService.next();  63                                 if (argument.getType() != null && argument.getType().length() > 0) {  64                                     methods = this.interfaceClass.getMethods();  65                                     break;  66  }  67 
 68                                 if (argument.getIndex() == -1) {  69                                     throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");  70  }  71 
 72                                 appendParameters(map, argument, method.getName() + "." + argument.getIndex());  73  }  74                         } while(methods == null);  75                     } while(methods.length <= 0);  76 
 77                     for(int i = 0; i < methods.length; ++i) {  78                         String methodName = methods[i].getName();  79                         if (methodName.equals(method.getName())) {  80                             Class<?>[] argtypes = methods[i].getParameterTypes();  81                             if (argument.getIndex() != -1) {  82                                 if (!argtypes[argument.getIndex()].getName().equals(argument.getType())) {  83                                     throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());  84  }  85 
 86                                 appendParameters(map, argument, method.getName() + "." + argument.getIndex());  87                             } else {  88                                 for(int j = 0; j < argtypes.length; ++j) {  89                                     Class<?> argclazz = argtypes[j];  90                                     if (argclazz.getName().equals(argument.getType())) {  91                                         appendParameters(map, argument, method.getName() + "." + j);  92                                         if (argument.getIndex() != -1 && argument.getIndex() != j) {  93                                             throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());  94  }  95  }  96  }  97  }  98  }  99  } 100  } 101  } 102  } 103 
104  String host; 105          // 檢測 generic 是否爲 "true",並根據檢測結果向 map 中添加不一樣的信息
106         if (ProtocolUtils.isGeneric(this.generic)) { 107             map.put("generic", this.generic); 108             map.put("methods", "*"); 109         } else { 110             host = Version.getVersion(this.interfaceClass, this.version); 111             if (host != null && host.length() > 0) { 112                 map.put("revision", host); 113  } 114              // 爲接口生成包裹類 Wrapper,Wrapper 中包含了接口的詳細信息,好比接口方法名數組,字段信息等
115             String[] methods = Wrapper.getWrapper(this.interfaceClass).getMethodNames(); 116             if (methods.length == 0) { 117                 logger.warn("No method found in service interface " + this.interfaceClass.getName()); 118                 map.put("methods", "*"); 119             } else { 120                 // 將逗號做爲分隔符鏈接方法名,並將鏈接後的字符串放入 map 中
121                 map.put("methods", StringUtils.join(new HashSet(Arrays.asList(methods)), ",")); 122  } 123  } 124         // 添加 token 到 map 中
125         if (!ConfigUtils.isEmpty(this.token)) { 126             if (ConfigUtils.isDefault(this.token)) { 127                 map.put("token", UUID.randomUUID().toString()); 128             } else { 129                 map.put("token", this.token); 130  } 131  } 132         //獲取host和port
133         host = this.findConfigedHosts(protocolConfig, registryURLs, map); 134         Integer port = this.findConfigedPorts(protocolConfig, name, map); 135         // 獲取上下文路徑而且組裝URL
136         URL url = new URL(name, host, port, (String)this.getContextPath(protocolConfig).map((p) -> { 137             return p + "/" + this.path; 138         }).orElse(this.path), map); 139         if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { 140             // 加載 ConfiguratorFactory,並生成 Configurator 實例,而後經過實例配置 url,使用了前面提到的SPI機制
141             url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url); 142  } 143         //下面邏輯主要分三步 144         // 若是 scope = none,則什麼都不作 145         // scope != remote,導出到本地 146         // scope != local,導出到遠程
147         scope = url.getParameter("scope"); 148         if (!"none".equalsIgnoreCase(scope)) { 149             if (!"remote".equalsIgnoreCase(scope)) { 150                 this.exportLocal(url); 151  } 152 
153             if (!"local".equalsIgnoreCase(scope)) { 154                 if (!this.isOnlyInJvm() && logger.isInfoEnabled()) { 155                     logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url); 156  } 157 
158                 if (CollectionUtils.isNotEmpty(registryURLs)) { 159                     metadataReportService = registryURLs.iterator(); 160 
161                     while(metadataReportService.hasNext()) { 162                         URL registryURL = (URL)metadataReportService.next(); 163                         if (!"injvm".equalsIgnoreCase(url.getProtocol())) { 164                             url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); 165                             URL monitorUrl = this.loadMonitor(registryURL); 166                             if (monitorUrl != null) { 167                                 url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString()); 168  } 169 
170                             if (logger.isInfoEnabled()) { 171                                 logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL); 172  } 173 
174                             String proxy = url.getParameter("proxy"); 175                             if (StringUtils.isNotEmpty(proxy)) { 176                                 registryURL = registryURL.addParameter("proxy", proxy); 177  } 178                             // 爲服務提供類(ref)生成 Invoker
179                             Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString())); 180                             DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 181                             // 導出服務,並生成 Exporter
182                             Exporter<?> exporter = protocol.export(wrapperInvoker); 183                             this.exporters.add(exporter); 184  } 185  } 186                 // 不存在註冊中心,僅導出服務
187                 } else { 188                     Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url); 189                     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 190                     Exporter<?> exporter = protocol.export(wrapperInvoker); 191                     this.exporters.add(exporter); 192  } 193 
194                 metadataReportService = null; 195  MetadataReportService metadataReportService; 196                 if ((metadataReportService = this.getMetadataReportService()) != null) { 197  metadataReportService.publishProvider(url); 198  } 199  } 200  } 201 
202         this.urls.add(url); 203     }
View Code

    上面的源碼前半段是進行URL裝配,這個URL就是Dubbo服務的URL,大體以下:

dubbo://192.168.1.6:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.1.6&bind.port=20880&dubbo=2.7.3&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider&timestamp=1530746052546

    這個URL表示它是一個dubbo協議(DubboProtocol),地址是當前服務器的ip,端口是要暴露的服務的端口號,能夠從dubbo:protocol配置,服務接口爲dubbo:service配置發佈的接口。

    後半段主要是判斷scope變量來決定是否將服務導出遠程或者本地,導出到本地實際上很簡單隻須要生成Invoker。當導出到遠程就須要添加監視器還要生成invoker。監視器能讓Dubbo定時查看註冊中心掛了沒。會拋出指定異常,而invoker使得服務消費方可以遠程調用到服務。而且還會進行註冊到註冊中心下面咱們接着來看看服務的發佈。由於Invoker比較重要在消費者和提供者中都有,因此這個後面會單獨拿出來進行探討。

5、服務發佈本地與遠程

  一、服務發佈到本地

1 private void exportLocal(URL url) { 2         //進行本地URL的構建
3         URL local = URLBuilder.from(url).setProtocol("injvm").setHost("127.0.0.1").setPort(0).build(); 4         //根據本地的URL來實現對應的Invoker
5         Exporter<?> exporter = protocol.export(PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, local)); 6         this.exporters.add(exporter); 7         logger.info("Export dubbo service " + this.interfaceClass.getName() + " to local registry url : " + local); 8     }
View Code

    可見發佈到本地是從新構建了protocol,injvm就是表明在本地的JVM裏,host與port都統一默認127.0.0.1:0。

  二、服務發佈到遠程

 1 public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {  2         //獲取註冊中心的URL,好比:zookeeper://127.0.0.1:2181/......
 3         URL registryUrl = this.getRegistryUrl(originInvoker);  4         //獲取全部服務提供者的URL,好比:dubbo://192.168.1.6:20880/.......
 5         URL providerUrl = this.getProviderUrl(originInvoker);  6         //獲取訂閱URL,好比:provider://192.168.1.6:20880/......
 7         URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl);  8         //建立監聽器
 9         RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker); 10         //向訂閱中心推送監聽器
11         this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 12         providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener); 13         //導出服務
14         RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl); 15         Registry registry = this.getRegistry(originInvoker); 16         //獲取已註冊的服務提供者的URL,好比dubbo://192.168.1.6:20880/.......
17         URL registeredProviderUrl = this.getRegisteredProviderUrl(providerUrl, registryUrl); 18         // 向服務提供者與消費者註冊表中註冊服務提供者
19         ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); 20         // 獲取 register 參數
21         boolean register = registeredProviderUrl.getParameter("register", true); 22         // 根據 register 的值決定是否註冊服務
23         if (register) { 24             this.register(registryUrl, registeredProviderUrl); 25             providerInvokerWrapper.setReg(true); 26  } 27         // 向註冊中心進行訂閱 override 數據
28  registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 29  exporter.setRegisterUrl(registeredProviderUrl); 30  exporter.setSubscribeUrl(overrideSubscribeUrl); 31         // 建立並返回 DestroyableExporter
32         return new RegistryProtocol.DestroyableExporter(exporter); 33     }
View Code

    上面的源碼主要是根據前面生成的URL進行服務的發佈和註冊(註冊在下一節展開源碼)。當執行到doLocalExport也就是發佈本地服務到遠程時候會調用 DubboProtocol 的 export 方法大體會經歷下面一些步驟來導出服務

    • 從Invoker獲取providerUrl,構建serviceKey(group/service:version:port),構建DubboExporter並以serviceKey爲key放入本地map緩存
    • 處理url攜帶的本地存根和callback回調
    • 根據url打開服務器端口,暴露本地服務。先以url.getAddress爲key查詢本地緩存serverMap獲取ExchangeServer,若是不存在,則經過createServer建立。
    • createServer方法,設置心跳時間,判斷url中的傳輸方式(key=server,對應Transporter服務)是否支持,設置codec=dubbo,最後根據url和ExchangeHandler對象綁定server返回,這裏的ExchangeHandler很是重要,它就是消費方調用時,底層通訊層回調的Handler,從而獲取包含實際Service實現的Invoker執行器,它是定義在DubboProtocol類中的ExchangeHandlerAdapter內部類
    • 返回DubboExporter對象

     到這裏大體的服務發佈圖以下:

 

 6、服務註冊

  服務註冊操做對於 Dubbo 來講不是必需的,經過服務直連的方式就能夠繞過註冊中心。但一般咱們不會這麼作,直連方式不利於服務治理,僅推薦在測試服務時使用。對於 Dubbo 來講,註冊中心雖不是必需,但倒是必要的。源碼以下:

 1 public void register(URL url) {  2     super.register(url);  3  failedRegistered.remove(url);  4  failedUnregistered.remove(url);  5     try {  6         // 模板方法,由子類實現
 7  doRegister(url);  8     } catch (Exception e) {  9         Throwable t = e; 10 
11         // 獲取 check 參數,若 check = true 將會直接拋出異常
12         boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 13                 && url.getParameter(Constants.CHECK_KEY, true) 14                 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 15         boolean skipFailback = t instanceof SkipFailbackWrapperException; 16         if (check || skipFailback) { 17             if (skipFailback) { 18                 t = t.getCause(); 19  } 20             throw new IllegalStateException("Failed to register"); 21         } else { 22             logger.error("Failed to register"); 23  } 24 
25         // 記錄註冊失敗的連接
26  failedRegistered.add(url); 27  } 28 } 29 
30 //進入doRegister方法
31 
32 protected void doRegister(URL url) { 33     try { 34         // 經過 Zookeeper 客戶端建立節點,節點路徑由 toUrlPath 方法生成,路徑格式以下: 35         // /${group}/${serviceInterface}/providers/${url} 36         // 好比 37         // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
38         zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 39     } catch (Throwable e) { 40         throw new RpcException("Failed to register..."); 41  } 42 } 43 
44 //進入create方法
45 
46 public void create(String path, boolean ephemeral) { 47     if (!ephemeral) { 48         // 若是要建立的節點類型非臨時節點,那麼這裏要檢測節點是否存在
49         if (checkExists(path)) { 50             return; 51  } 52  } 53     int i = path.lastIndexOf('/'); 54     if (i > 0) { 55         // 遞歸建立上一級路徑
56         create(path.substring(0, i), false); 57  } 58     
59     // 根據 ephemeral 的值建立臨時或持久節點
60     if (ephemeral) { 61  createEphemeral(path); 62     } else { 63  createPersistent(path); 64  } 65 } 66 
67 //進入createEphemeral
68 
69 public void createEphemeral(String path) { 70     try { 71         // 經過 Curator 框架建立節點
72  client.create().withMode(CreateMode.EPHEMERAL).forPath(path); 73     } catch (NodeExistsException e) { 74     } catch (Exception e) { 75         throw new IllegalStateException(e.getMessage(), e); 76  } 77 }
View Code

  根據上面的方法,能夠將當前服務對應的配置信息(存儲在URL中的)註冊到註冊中心/dubbo/org.apache.dubbo.demo.DemoService/providers/ 。裏面直接使用了Curator進行建立節點(Curator是Netflix公司開源的一套zookeeper客戶端框架)

7、總結

  到這裏Dubbo的服務註冊流程終因而解釋完。核心在於Dubbo使用規定好的URL+SPI進行尋找和發現服務,經過URL定位註冊中心,再經過將服務的URL發佈到註冊中心從而使得消費者能夠知道服務的有哪些,裏面能夠看見對於URL這種複雜的對象而且須要常常更改的,一般採用建造者模式。而2.7.3版本的Dubbo源碼也使用了Java8之後的新特性Lambda表達式來構建隱式函數。而一整套流程下來能夠在ZooInspector這個zk可視化客戶端看見咱們建立的節點,前提是註冊中心爲zk。

相關文章
相關標籤/搜索