Dubbo的插件化實現很是相似於原生的JAVA的SPI:它只是提供一種協議,並無提供相關插件化實施的接口。用過的同窗都知道,它有一種java原生的支持類:ServiceLoader,經過聲明接口的實現類,在META-INF/services中註冊一個實現類,而後經過ServiceLoader去生成一個接口實例,當更換插件的時候只須要把本身實現的插件替換到META-INF/services中便可。java
在Dubbo中,SPI是一個很是核心的機制,貫穿在幾乎全部的流程中。弄明白這一塊可以幫咱們明白dubo源碼mysql
Dubbo是基於Java原生SPI機制思想的一個改進,因此,先從JAVA SPI機制開始瞭解什麼時SPI之後再去學習Dubbo的SPI就比較簡單spring
SPI全稱(service provider interface),是JDK內置的一種服務提供發現機制,目前市面上有不少框架都是用它來作服務的擴展發現,你們耳熟能詳的如JDBC、日誌框架都有用到;sql
簡單來講,它是一種動態替換髮現的機制。舉個簡單的例子,若是咱們定義了一個規範,須要第三方廠商去實現,那麼對於咱們應用方來講,只須要集成對應廠商的插件,既能夠完成對應規範的實現機制。 造成一種插拔式的擴展手段。數據庫
META-INF/services
在該目錄下建立一個properties文件,該文件須要知足如下幾個條件json
resouces
的META-INF/services
中建立對應的文件,而且經過properties
規則配置實現類的全路徑 以及對應調用方引入api接口,和對應產商的jarapi
而且在對應的resouces中引入接口,若是引入了多個產商的jar,那麼會取到多個產商的東西緩存
SPI在不少地方有應用,你們能夠看看最經常使用的java.sql.Driver驅動。JDK官方提供了java.sql.Driver這個驅動擴展點,可是大家並無看到JDK中有對應的Driver實現。 那在哪裏實現呢?安全
以鏈接Mysql爲例,咱們須要添加mysql-connector-java依賴。而後,大家能夠在這個jar包中找到SPI的配置信息。以下圖,因此java.sql.Driver由各個數據庫廠商自行實現。這就是SPI的實際應用。固然除了這個意外,你們在spring的包中也能夠看到相應的痕跡服務器
使用原生spi,若是路徑下有多個實現都會加載進來,若是有一個加載失敗,會比較麻煩
Dubbo的SPI並不是原生的SPI,Dubbo的規則是在
/META-INF/dubbo
/META-INF/internal
/META-INF/service
而且基於SPI接口去建立一個文件下面以須要實現的接口去建立一個文件,而且在文件中以properties規則同樣配置實現類的全面以及分配實現的一個名稱。
文件名稱和接口名稱保持一致,文件內容和SPI有差別,內容是key對應value
咱們看一下dubbo-cluster模塊的META-INF.dubbo.internal:
META-INF/dubbo/com.alibaba.dubbo.rpc.Protocol
文件,文件內容爲
defineProtocol=com.gupaoedu.dubbo.protocol.DefineProtocol
import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Protocol; import com.alibaba.dubbo.rpc.RpcException; public class DefineProtocol implements Protocol { @Override public int getDefaultPort() { return 8888; } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { return null; } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return null; } @Override public void destroy() { } }
public class App { public static void main(String[] args) throws IOException, InterruptedException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext ("dubbo-client.xml"); Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class). getExtension("defineProtocol"); System.out.println(protocol.getDefaultPort()); System.in.read(); } }
切入點
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class). getExtension("defineProtocol");
dubbo的擴展點框架主要位於這個包下:
com.alibaba.dubbo.common.extension
大概結構以下:
com.alibaba.dubbo.common.extension
|--factory|--AdaptiveExtensionFactory 自適應擴展點工廠 |--SpiExtensionFactory SPI擴展點工廠|--support
|--ActivateComparator|--Activate 自動激活加載擴展的註解
|--Adaptive 自適應擴展點的註解 |--ExtensionFactory 擴展點對象生成工廠接口|--ExtensionLoader 擴展點加載器,擴展點的查找,校驗,加載等核心邏輯的實現類
|--SPI @SPI告訴當前應用其實一個擴展點例如Protocol 必定能夠在對應的meta-inf/dubbo.internal 中看到dubbo-config-spring
|- extension
|--SpringExtensionFactory
其中最核心的類就是ExtensionLoader,幾乎全部特性都在這個類中實現。
ExtensionLoader沒有提供public的構造方法,可是提供了一個public static的getExtensionLoader,這個方法就是獲取ExtensionLoader實例的工廠方法。其public成員方法中有三個比較重要的方法:
getActivateExtension :根據條件獲取當前擴展可自動激活的實現
getExtension : 根據名稱獲取當前擴展的指定實現
getAdaptiveExtension : 獲取當前擴展的自適應實現
@SPI("dubbo") public interface Protocol { /** * 獲取缺省端口,當用戶沒有配置端口時使用。 * * @return 缺省端口 */ int getDefaultPort(); /** * 暴露遠程服務:<br> * 1. 協議在接收請求時,應記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br> * 2. export()必須是冪等的,也就是暴露同一個URL的Invoker兩次,和暴露一次沒有區別。<br> * 3. export()傳入的Invoker由框架實現並傳入,協議不須要關心。<br> * * @param <T> 服務的類型 * @param invoker 服務的執行體 * @return exporter 暴露服務的引用,用於取消暴露 * @throws RpcException 當暴露服務出錯時拋出,好比端口已佔用 */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * 引用遠程服務:<br> * 1. 當用戶調用refer()所返回的Invoker對象的invoke()方法時,協議需相應執行同URL遠端export()傳入的Invoker對象的invoke()方法。<br> * 2. refer()返回的Invoker由協議實現,協議一般須要在此Invoker中發送遠程請求。<br> * 3. 當url中有設置check=false時,鏈接失敗不能拋出異常,並內部自動恢復。<br> * * @param <T> 服務的類型 * @param type 服務的類型 * @param url 遠程服務的URL地址 * @return invoker 服務的本地代理 * @throws RpcException 當鏈接服務提供方失敗時拋出 */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * 釋放協議:<br> * 1. 取消該協議全部已經暴露和引用的服務。<br> * 2. 釋放協議所佔用的全部資源,好比鏈接和端口。<br> * 3. 協議在釋放後,依然能暴露和引用新的服務。<br> */ void destroy(); }
從上述Protocol的源碼,能夠看到兩個註解@SPI("duo")
和 @Adaptive
@SPI :表示當前這個接口是一個擴展點,能夠實現本身的擴展實現
@Adaptive 表示一個自適應擴展點,在方法級別上,會動態生成一個適配器類
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class). getExtension("defineProtocol");
@SuppressWarnings("unchecked") public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) throw new IllegalArgumentException("Extension type == null"); if (!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }
根據一個類型class得到一個ExtensionLoader,須要一個class類型的參數,這個參數必須是接口,並且此接口必需要@SPI
註解註釋,不然拒絕處理。檢查經過以後首先會檢查ExtensionLoader
緩存中是否已經存在該擴展對應的ExtensionLoader
,若是有則返回,不然建立一個新的ExtensionLoader負責加載此擴展實現,同時緩存起來。因此每個擴展,dubbo中只會有一個對應的ExtensionLoader
實例
接下來看下ExtensionLoader的私有構造函數:
private ExtensionLoader(Class<?> type) { this.type = type; objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); }
這裏保存了對應的擴展類型,而且設置了一個額外的objectFactory屬性,他是一個ExtensionFactory類型,ExtensionFactory主要用於加載擴展的實現:
ExtensionFactory主要用於加載擴展的實現:
@SPI public interface ExtensionFactory { /** * Get extension. * * @param type object type. * @param name object name. * @return object instance. */ <T> T getExtension(Class<T> type, String name); }
ExtensionFactory有@SPI註解,說明當前這個接口是一個擴展點。從extension包的結構圖能夠看到。Dubbo內部提供了兩個實現類:SpiExtensionFactory
和AdaptiveExtensionFactory
。不一樣的實現能夠以不一樣的方式來完成擴展點實現的加載。
對應上述ExtensionLoader的getAdaptiveExtension()
咱們查看對應的getAdaptivesion()
方法得到一個自適應的擴展點
若是是配置在類級別上,表示自定義適配器,若是是配置在方法上,表示須要動態生成適配器類
表示當前是自定義擴展點
默認的ExtensionFactory
實現中,AdaptiveExtensionFactotry
被@Adaptive
註解註釋,也就是它就是ExtensionFactory對應的自適應擴展實現(每一個擴展點最多隻能有一個自適應實現,若是全部實現中沒有被@Adaptive註釋的,那麼dubbo會動態生成一個自適應實現類),也就是說,全部對ExtensionFactory調用的地方,實際上調用的都是AdpativeExtensionFactory,那麼咱們看下他的實現代碼:
@Adaptive public class AdaptiveExtensionFactory implements ExtensionFactory { private final List<ExtensionFactory> factories; public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); } public <T> T getExtension(Class<T> type, String name) { for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; }
這段代碼,其實就至關於一個代理入口,它會遍歷當前系統中全部的ExtensionFactory實現來獲取指定的擴展實現,獲取到擴展實現,遍歷完全部ExtensionFactory實現,調用ExtensionLoader的getSupportedExtensions方法來獲取ExtensionFactory的全部實現
從前面ExtensionLoader的私有構造函數中能夠看出,在選擇ExtensionFactory的時候,並非調用getExtension(name)來獲取某個具體的實現類,而是調用getAdaptiveExtension來獲取一個自適應的實現。那麼首先咱們就來分析一下getAdaptiveExtension這個方法的實現吧:
@SuppressWarnings("unchecked") public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if(createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; }
首先檢查緩存的adaptiveInstance是否存在,若是存在則直接使用,不然的話調用createAdaptiveExtension方法來建立新的adaptiveInstance而且緩存起來。也就是說對於某個擴展點,每次調用ExtensionLoader.getAdaptiveExtension獲取到的都是同一個實例。
private T createAdaptiveExtension() { try { return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e); } }
在`createAdaptiveExtension`方法中,首先經過`getAdaptiveExtensionClass`方法獲取到最終的自適應實現類型,而後實例化一個自適應擴展實現的實例,最後進行擴展點注入操做
private Class<?> getAdaptiveExtensionClass() { getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); }
上述代碼中主要作了兩件事情
getExtensionClasses()
加載全部路徑下的擴展點createAdaptiveExtensionClass()
動態建立一個擴展點cachedAdaptiveClass這裏有個判斷,用來判斷當前Protocol這個擴展點是否存在一個自定義的適配器,若是有,則直接返回自定義適配器,不然,就動態建立,這個值是在getExtensionClasses中賦值的,這塊代碼咱們稍後再看
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String,Class<?>>>(); private Map<String, Class<?>> getExtensionClasses() { //com.alibaba.dubbo.rpc.Protocal=>[xx,xx] Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
上述代碼主要作了幾件事
loadExtensionClasses()
,去加載擴展點的實現// 此方法已經getExtensionClasses方法同步過。 private Map<String, Class<?>> loadExtensionClasses() { //type->Protocol.class final SPI defaultAnnotation = type.getAnnotation(SPI.class); if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; } } Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>(); loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; }
從代碼裏面能夠看到,在loadExtensionClasses中首先會檢測擴展點在@SPI註解中配置的默認擴展實現的名稱,並將其賦值給cachedDefaultName屬性進行緩存,後面想要獲取該擴展點的默認實現名稱就能夠直接經過訪問cachedDefaultName字段來完成,好比getDefaultExtensionName方法就是這麼實現的。從這裏的代碼中又能夠看到,具體的擴展實現類型,是經過調用loadFile方法來加載,分別從一下三個地方加載:
META-INF/dubbo/internal
META-INF/dubbo
META-INF/services/
主要邏輯:
Protocol.class
這個類的註解@SPI
@SPI
中的value值@SPI("dubbo")
,則吧此dubbo的值賦值給cachedDefaultName
,這就是爲何咱們可以經過ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension()
可以獲取到DubboProtocol
這個擴展點的緣由private void loadFile(Map<String, Class<?>> extensionClasses, String dir) { String fileName = dir + type.getName(); try { Enumeration<java.net.URL> urls; ClassLoader classLoader = findClassLoader(); if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL url = urls.nextElement(); try { BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); try { String line = null; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) line = line.substring(0, ci); line = line.trim(); if (line.length() > 0) { try { String name = null; int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { Class<?> clazz = Class.forName(line, true, classLoader); if (! type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } //若是在類級別上,表示自定義適配器 //若是是在方法上,表示須要動態生成適配器類 if (clazz.isAnnotationPresent(Adaptive.class)) { if(cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (! cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } } else { try { clazz.getConstructor(type); Set<Class<?>> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet<Class<?>>(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); } catch (NoSuchMethodException e) { clazz.getConstructor(); if (name == null || name.length() == 0) { name = findAnnotationName(clazz); if (name == null || name.length() == 0) { if (clazz.getSimpleName().length() > type.getSimpleName().length() && clazz.getSimpleName().endsWith(type.getSimpleName())) { name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); } else { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); } } } String[] names = NAME_SEPARATOR.split(name); if (names != null && names.length > 0) { Activate activate = clazz.getAnnotation(Activate.class); if (activate != null) { cachedActivates.put(names[0], activate); } for (String n : names) { if (! cachedNames.containsKey(clazz)) { cachedNames.put(clazz, n); } Class<?> c = extensionClasses.get(n); if (c == null) { extensionClasses.put(n, clazz); } else if (c != clazz) { throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName()); } } } } } } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } // end of while read lines } finally { reader.close(); } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", class file: " + url + ") in " + url, t); } } // end of while urls } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName + ").", t); } }
解析指定路徑下的文件,獲取對應的擴展點,經過反射的方式進行實例化以後,put到extensionClasses
這個Map集合中
調用loadFile方法,代碼比較長,主要作了幾個事情,有幾個變量會賦值
cachedAdaptiveClass : 當前Extension類型對應的AdaptiveExtension類型(只能一個)
cachedWrapperClasses : 當前Extension類型對應的全部Wrapper實現類型(無順序)
cachedActivates : 當前Extension實現自動激活實現緩存(map,無序)
cachedNames : 擴展點實現類對應的名稱(如配置多個名稱則值爲第一個)
當loadExtensionClasses方法執行完成以後,還有如下變量被賦值:
cachedDefaultName : 當前擴展點的默認實現名稱
當getExtensionClasses方法執行完成以後,除了上述變量被賦值以外,還有如下變量被賦值:
cachedClasses : 擴展點實現名稱對應的實現類(一個實現類可能有多個名稱)
其實也就是說,在調用了getExtensionClasses方法以後,當前擴展點對應的實現類的一些信息就已經加載進來了而且被緩存了。後面的許多操做均可以直接經過這些緩存數據來進行處理了。
private Class<?> createAdaptiveExtensionClass() { //生成字節碼代碼 String code = createAdaptiveExtensionClassCode(); //得到類加載器 ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); //動態編譯字節碼 return compiler.compile(code, classLoader); }
動態生成適配器代碼,以及動態編譯
compilier.compile
進行編譯(默認狀況下使用的是javassist)上生成的code和類是
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } }
ExtensionLoader.getExtensionLoader(擴展接口類).getExtension(擴展接口實現類名稱)
,而後調用類的方法refer是一個引用,獲得一個url的參數,經過參數判斷是走哪一個協議發佈服務
回到createAdaptiveExtension方法,他調用了getExtesionClasses方法加載擴展點實現信息完成以後,就能夠直接經過判斷cachedAdaptiveClass緩存字段是否被賦值盤肯定當前擴展點是否有默認的AdaptiveExtension實現。若是沒有,那麼就調用createAdaptiveExtensionClass方法來動態生成一個。在dubbo的擴展點框架中大量的使用了緩存技術。
建立自適應擴展點實現類型和實例化就已經完成了,下面就來看下擴展點自動注入的實現
@Adaptive public class AdaptiveExtensionFactory implements ExtensionFactory { private final List<ExtensionFactory> factories; public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); } public <T> T getExtension(Class<T> type, String name) { for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; } }
這裏能夠看到,擴展點自動注入的一句就是根據setter方法對應的參數類型和property名稱從ExtensionFactory中查詢,若是有返回擴展點實例,那麼就進行注入操做。到這裏getAdaptiveExtension方法就分析完畢了。
須要明白一點dubbo的內部傳參基本上都是基於Url來實現的,也就是說Dubbo是基於URL驅動的技術
因此,適配器類的目的是在運行期獲取擴展的真正實現來調用,解耦接口和實現,這樣的話要不咱們本身實現適配器類,要不dubbo幫咱們生成,而這些都是經過Adpative來實現。
到目前爲止,咱們的AdaptiveExtension的主線走完了,能夠簡單整理一下他們的調用關係以下
在整個過程當中,最重要的兩個方法getExtensionClasses和createAdaptiveExtensionClass
getExtensionClasses
這個方法主要是讀取META-INF/services 、META-INF/dubbo、META-INF/internal目錄下的文件內容
分析每一行,若是發現其中有哪一個類的annotation是@Adaptive,就找到對應的AdaptiveClass。若是沒有的話,就動態建立一個
createAdaptiveExtensionClass
該方法是在getExtensionClasses方法找不到AdaptiveClass的狀況下被調用,該方法主要是經過字節碼的方式在內存中新生成一個類,它具備AdaptiveClass的功能,Protocol就是經過這種方式得到AdaptiveClass類的。
NamespaceHandler: 註冊BeanDefinitionParser, 利用它來解析
BeanDefinitionParser: 解析配置文件的元素
spring會默認加載jar包下/META-INF/spring.handlers
找到對應的NamespaceHandler
當spring容器初始化完之後,會調用afterPropertiesSet方法
bean被銷燬的時候調用destory方法
容器初始化完成以後會主動注入applicationContext
事件監聽
對象初始化完以後會獲取bean的自己屬性
delay可以控制延遲發佈
首先咱們要關注的是服務的發佈和服務的消費這兩個主要的流程,那麼就能夠基於這個點去找到源碼分析的突破口。那麼天然而然咱們就能夠想到spring的配置
Dubbo中spring擴展就是使用spring的自定義類型,因此一樣也有NamespaceHandler、BeanDefinitionParser。而NamespaceHandler是DubboNamespaceHandler
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }
BeanDefinitionParser所有都使用了DubboBeanDefinitionParser,若是咱們向看<dubbo:service>的配置,就直接看DubboBeanDefinitionParser中
這個裏面主要作了一件事,把不一樣的配置分別轉化成spring容器中的bean對象
application
對應ApplicationConfig
registry
對應RegistryConfig
monitor
對應MonitorConfig
provider
對應ProviderConfig
consumer
對應ConsumerConfig
爲了在spring啓動的時候,也相應的啓動provider發佈服務註冊服務的過程,而同時爲了讓客戶端在啓動的時候自動訂閱發現服務,加入了兩個bean
ServiceBean
、ReferenceBean
。
分別繼承了ServiceConfig
和ReferenceConfig
同時還分別實現了InitializingBean
、DisposableBean
, ApplicationContextAware
, ApplicationListener
, BeanNameAware
InitializingBean
接口爲bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是繼承該接口的類,在初始化bean的時候會執行該方法。
DisposableBean
bean被銷燬的時候,spring容器會自動執行destory方法,好比釋放資源
ApplicationContextAware
實現了這個接口的bean,當spring容器初始化的時候,會自動的將ApplicationContext注入進來
ApplicationListener
ApplicationEvent事件監聽,spring容器啓動後會發一個事件通知
BeanNameAware
得到自身初始化時,自己的bean的id屬性
那麼基本的實現思路能夠整理出來了
serviceBean
是服務發佈的切入點,經過afterPropertiesSet
方法,調用export()
方法進行發佈。
export
爲父類ServiceConfig
中的方法,因此跳轉到SeviceConfig
類中的export
方法
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) { return; } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); } }
咱們發現,delay的做用就是延遲暴露,而延遲的方式也很直截了當,Thread.sleep(delay)
doExport()
方法。一樣是一堆初始化代碼繼續看doExport(),最終會調用到doExportUrls()中:
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true);//是否是得到註冊中心的配置 for (ProtocolConfig protocolConfig : protocols) { //是否是支持多協議發佈 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
對應上述protocols
長成這樣<dubbo:protocol name="dubbo" port="20880" id ="dubbo"/>
protocols也是根據配置裝配出來的,接下來進入對應的duExportUrlsFor1Protocol
方法查看對應s具體實現
最終實現邏輯
//配置爲none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露本地服務) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } }
從上述代碼doExportUrlsFor1Protocol
方法,先建立兩個URL,分別以下
dubbo://192.168.xx.xx:20880/com.zzjson.IHello
registry://192.168.xx
其對應的url就是services
的providers
的信息
在上面這段代碼中能夠看到Dubbo的比較核心的抽象:Invoker, Invoker是一個代理類,從ProxyFactory中生成。這個地方能夠作一個小結
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } }
上述代碼作兩件事
protocol
的協議地址,若是protocol
爲空,表示經過dubbo
協議發佈服務,不然根據配置的謝意類型來發布服務ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName)
public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; }
@SuppressWarnings("unchecked") private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //對獲取的實例進行依賴注入 injectExtension(instance); //在loadFile中進行賦值的 Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { //對實例進行包裝,分別調用帶Protocol參數的構造函數建立實例,而後進行依賴注入 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
主要作以下四個事情
Protocol
參數的構造函數建立實例,而後進行依賴注入dubbo-rpc-api
的resources
路徑下,找到com.alibaba.dubbo.rpc.Protocol
文件中存在filter/listener
cachedWrapperClass
對DubboProtocol
進行包裝,會經過ProtocolFilterWrapper
,ProtocolListenerWrapper
包裝private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
總結
在ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
這段代碼中,當extName
爲registry
的時候,咱們不須要再次去閱讀這塊代碼了,直接能夠在擴展點中找到相應的實現擴展類,/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
配置以下
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
因此咱們定位到RegistryProtocolRegistryProtocol
這個類中的export
方法
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 訂閱override數據 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter實例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
上述doLoaclExport源碼爲:
private Protocol protocol; public void setProtocol(Protocol protocol) { this.protocol = protocol; } @SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter; }
上述代碼Protocol代碼是怎麼複製的呢,是在injectExtension
方法中對已經加載的擴展點的屬性進行依賴注入了
所以咱們知道protocol是一個自適應擴展點,Protocol$Adaptive
,而後調用這個自適應擴展點中的export
方法,這個時候傳入的協議地址應該是
dubbo://127.0.0.1/xxx
所以在Protocol$Adaptive.export
方法中,ExtensionLoader.getExtension(Protocol.class).getExtension
就是基於DubboProtocol協議發佈服務了麼?固然不是,此處獲取的不是一個單純的DubboProtocol
擴展點,而是會經過Wrapper
對Protocl
進行裝飾,裝飾器分別爲ProtocolFilterWrapper
或者是ProtoclListenerWrapper
,至於爲何MockProtocol
爲何不在裝飾器裏面呢?咱們能夠想到,在ExtensionLoader.loadFile
這段代碼的時候,裝飾器必需要有一個Protocol
的構造方法,以下
public ProtocolFilterWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }
至此,咱們能夠知道Protocol$Adaptive
中的export
方法會調用ProtocolFilterWrapper
以及ProtocolListenerWrapper
類的方法
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } //此方法讀取全部的filter類,利用這些類封裝invoker private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //自動激活擴展點,根據條件獲取當前擴展可自動激活的實現 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } @Override public int getDefaultPort() { return protocol.getDefaultPort(); } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } @Override public void destroy() { protocol.destroy(); } }
ProtocolFilterWrapper
這個類很是重要
Protocol protocol
構造參數Protocol
接口export
和refer
函數進行了封裝咱們查看以下文件dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter generic=com.alibaba.dubbo.rpc.filter.GenericFilter genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter token=com.alibaba.dubbo.rpc.filter.TokenFilter accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter context=com.alibaba.dubbo.rpc.filter.ContextFilter consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
能夠看到,invoker經過以下的Filter組裝成一個責任鏈
其中涉及到不少功能,包括權限驗證,異常,超時等等,固然能夠預計計算調用時間等等應該也是在這其中的某個類實現的,這裏咱們能夠看到export
和refer
過程都會被filter
過濾
在這裏咱們能夠看到export
和refer
分別對應了不一樣的Wrapper
;export
對應的ListenerExporterWrapper
這塊暫不去分析,由於此地方並無提供實現類
public class ProtocolListenerWrapper implements Protocol { private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); } public void destroy() { protocol.destroy(); } }
咱們看一下dubboProtocol的export方法:openServer(url)
經過上述代碼分析完之後,咱們可以定位到DubboProtocol.export()
方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //暴露服務 openServer(url); return exporter; }
接着調用openServer
private void openServer(URL url) { // find server. String key = url.getAddress();//116.62.221.6:20880 //client 也能夠暴露一個只有server能夠調用的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) {//沒有的話就建立服務 serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
繼續看其中的createServer方法:
private ExchangeServer createServer(URL url) { //默認開啓server關閉時發送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默認開啓heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
建立服務,而後開啓心跳監測,默認使用netty
。組裝url
發現ExchangeServer是經過Exchangers建立的,直接看Exchanger.bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); }
getExchanger方法實際上調用的是ExtensionLoader的相關方法,這裏的ExtensionLoader是dubbo插件化的核心,咱們會在後面的插件化講解中詳細講解,這裏咱們只須要知道Exchanger的默認實現只有一個:HeaderExchanger。上面一段代碼最終調用的是:
public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
能夠看到Server與Client實例均是在這裏建立的,HeaderExchangeServer須要一個Server類型的參數,來自Transporters.bind()
:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
getTransporter()獲取的實例來源於配置,默認返回一個NettyTransporter:
經過NettyTranport建立基於Netty
的server
服務
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
在調用HeaderExchanger.bind()
方法的時候,是先new一個HeaderExchangeServer
這個server是幹嗎呢,是對當前這個連接去創建心跳機制
public class HeaderExchangeServer implements ExchangeServer { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,new NamedThreadFactory("dubbo-remoting-server-heartbeat", true); // 心跳定時器 private ScheduledFuture<?> heatbeatTimer; // 心跳超時,毫秒。缺省0,不會執行心跳。 private int heartbeat; private int heartbeatTimeout; private final Server server; private volatile boolean closed = false; public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } //心跳 startHeatbeatTimer(); } private void startHeatbeatTimer() { //關閉心跳定時 stopHeartbeatTimer(); if (heartbeat > 0) { //每隔heartbeat時間執行一次 heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { //獲取channels public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels() ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat,TimeUnit.MILLISECONDS); } } //關閉心跳定時 private void stopHeartbeatTimer() { try { ScheduledFuture<?> timer = heatbeatTimer; if (timer != null && ! timer.isCancelled()) { timer.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } finally { heatbeatTimer =null; } } }
心跳線程HeartBeatTask
在超時時間以內發送數據,在超時時間以外,是客戶端的話,重連;是服務端,那麼關閉服務端發佈。
前面咱們知道,基於Spring的這個解析入口,到發佈服務的過程,接着基於DubboProtocol
去發佈,最終調用Netty
的api建立了一個NettyServer
。
那麼繼續沿着RegistryProtocol.export
這個方法,來看看註冊服務的代碼
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 訂閱override數據 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter實例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
private RegistryFactory registryFactory; public void setRegistryFactory(RegistryFactory registryFactory) { this.registryFactory = registryFactory; } private Registry getRegistry(final Invoker<?> originInvoker){ URL registryUrl = originInvoker.getUrl();//得到registry://192.168.xx.xx:2181的協議地址 if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { //獲得zk的謝意地址 String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); //registryUrl就會變成了zookeeper://192.168.xx.xx registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } //registryFactory是什麼 return registryFactory.getRegistry(registryUrl); }
上述代碼很明顯了,經過分析,其實就是把registry
的協議頭改爲服務提供者配置的協議地址,就是咱們配置
的
<dubbo:registry address="zookeeper://192.168.xx.xx.2181"/>
而後registryFactory.getRegistry
的目的,就是經過協議地址匹配到對應的註冊中心。那registryFactory
是一個什麼樣的對象呢,從上述咱們能夠才,其是一個擴展點,而且咱們可以注意到這裏面的一個方法上有一個@Adaptive
的註解,說明了其實一個自適應擴展點,按照咱們以前看過的代碼,自適應擴展點加在方法層面上,表示會動態生成一個自適應的適配器,因此這個自適應適配器應該是RegistryFactory$Adaptive
@SPI("dubbo") public interface RegistryFactory { /** * 鏈接註冊中心. * * 鏈接註冊中心需處理契約:<br> * 1. 當設置check=false時表示不檢查鏈接,不然在鏈接不上時拋出異常。<br> * 2. 支持URL上的username:password權限認證。<br> * 3. 支持backup=10.20.153.10備選註冊中心集羣地址。<br> * 4. 支持file=registry.cache本地磁盤文件緩存。<br> * 5. 支持timeout=1000請求超時設置。<br> * 6. 支持session=60000會話超時或過時設置。<br> * * @param url 註冊中心地址,不容許爲空 * @return 註冊中心引用,總不返回空 */ @Adaptive({"protocol"}) Registry getRegistry(URL url); }
public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) " +"name from url(" + url.toString() + ") usekeys([protocol])")"; com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.Reg istryFactory.class).getExtension(extName); return extension.getRegistry(arg0); } }
咱們拿到這個動態生成的自適應擴展點,看看這段代碼中的實現
zookeeper://
ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper")
去得到一個指定的擴展點,而這個擴展點的配置在/dubbo-registry/dubbo-registry-zookeeper/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory
內容爲
zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory
public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
此方法中並無getRegistry
方法,而是在父類AbstractRegistryFactory
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // 鎖定註冊中心獲取過程,保證註冊中心單一實例 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // 釋放鎖 LOCK.unlock(); } }
上述方法
REGISTRIES
中,根據key得到對應的Registry
public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
代碼分析到這,咱們對於getRegistry
得出結論根據當前註冊中心的配置信息,得到一個匹配的註冊中心,也就是ZookeeperRegistry registry.register(registedProviderUrl)
繼續往下看會調用對應的registry.register
去把dubbo://
的謝意地址註冊到zookeeper
上,這個方法會調用FailbackRegistry
類中的register
,由於其父類FailbackRegistry
中存在register
方法,而這個類重寫了此方法,因此咱們能夠直接定位到FailbackRegistry
這個類中的register
方法中
@Override public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服務器端發送註冊請求 doRegister(url); } catch (Exception e) { Throwable t = e; // 若是開啓了啓動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 將失敗的註冊請求記錄到失敗列表,定時重試 failedRegistered.add(url); } }
從名字上來看,是一個失敗重試機制,調用父類的register
方法把當前url添加到緩存集合中,調用子類的doRegister
方法
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
能夠看到,調用了zkclient.create
在zookeeper中建立節點
最後RegistryProtocol.export這個方法以後的代碼再也不分析了,就是去服務提供端註冊一個zookeeper監聽,當監聽發生變化的時候,服務端作相應的處理。