github新增倉庫 "dubbo-read"(點此查看),集合全部《Dubbo原理和源碼解析》系列文章,後續將繼續補充該系列,同時將針對Dubbo所作的功能擴展也進行分享。不按期更新,歡迎Follow。html
在官方《Dubbo 開發指南》框架設計部分提到,Dubbo 服務框架的基本設計原則是:java
對於第一點比較容易理解,全部的參數都封裝成 Dubbo 自定義的 URL 對象進行傳遞。URL 對象主要包括如下屬性:git
本文將重點介紹第二點,對 Microkernel + Plugin 機制的實現原理、源碼進行分析和跟蹤。github
框架或組件一般有兩類客戶,一個是使用者,一個是擴展者。API (Application Programming Interface) 是給使用者用的,而 SPI (Service Provide Interface) 是給擴展者用的。
咱們系統裏抽象的各個模塊,每每有不少不一樣的實現方案,好比日誌模塊的方案、jdbc模塊的方案等。面向的對象的設計裏,咱們通常推薦 模塊之間基於接口編程,模塊之間不對實現類進行硬編碼。一旦代碼裏涉及具體的實現類,就違反了 可拔插的原則,若是須要替換一種實現,就須要修改代碼。
爲了實如今模塊裝配的時候能不在程序裏動態指明,這就須要一種服務發現機制。JAVA SPI 就提供了這樣的一個機制——爲某個接口尋找服務實現的機制。有點相似 IOC 的思想,將裝配的控制權移到程序以外,在 模塊化設計 中這個機制尤爲重要。apache
JAVA SPI 其實是 」基於接口編程+策略模式+配置文件「 組合實現的動態加載機制。具體步驟爲:編程
假設咱們提供了一個「打招呼」的接口,有中文版和英文版兩種實現:緩存
package com.spi.service; public interface HelloService { public String sayHello(); }
分別編寫中文版、英文版的實現:ruby
package com.spi.service.impl; public class ChineseHelloService implements HelloService { @Override public String sayHello() { return "你好"; } }
package com.spi.service.impl; public class EnglishHelloService implements HelloService { @Override public String sayHello() { return "hello"; } }
編寫工廠類,用於封裝實現類的獲取邏輯:app
public class HelloServiceFactory { public HelloServiceFactory(){ } public static HelloService newHelloService(){ HelloService helloService = null; ServiceLoader<HelloService> serviceLoader = ServiceLoader.load(HelloService.class); Iterator<HelloService> services = serviceLoader.iterator(); if(services.hasNext()){ helloService = services.next(); } return helloService; } }
在 /src/main/resource/META-INF/services 下建立 com.spi.service.HelloService 文件,內容爲兩個具體實現類的類名:負載均衡
package com.spi.service.impl.EnglishHelloService
或
package com.spi.service.impl.ChineseHelloService
package com.spi; public class Main { public static void main(String[] args) { HelloService helloService = HelloServiceFactory.newHelloService(); System.out.println(helloService.sayHello()); } }
當文件內容爲 package com.spi.service.impl.EnglishHelloService 時,執行結果爲:
hello
當文件內容爲 package com.spi.service.impl.ChineseHelloService 時,執行結果爲:
你好
以此類推,若是你把全部實現類類名都寫到文件中,由調用者自行選擇實現類,那麼能夠經過如下方式實現(簡陋版,純屬舉例用):
com.spi.service.HelloService文件:
package com.spi.service.impl.EnglishHelloService package com.spi.service.impl.ChineseHelloService
HelloServiceFactory:
public class HelloServiceFactory { private HelloServiceFactory(){ } public static HelloService newHelloService(String name){ HelloService helloService = null; ServiceLoader<HelloService> serviceLoader = ServiceLoader.load(HelloService.class); Iterator<HelloService> services = serviceLoader.iterator(); while(services.hasNext()){ HelloService tmp = services.next(); if(tmp.getClass().toString().contains(name)){ helloService = tmp; break; } } return helloService; } }
Main:
public class Main { public static String name = "com.spi.service.impl.EnglishHelloService"; public static void main(String[] args) { HelloService helloService = HelloServiceFactory.newHelloService(name); System.out.println(helloService.sayHello()); } }
Dubbo 「微內核+插件「機制的總體特性以下:
下面結合源碼進行分析
Dubbo 實現 「微內核+插件「機制的核心是 ExtensionLoader,它取代了 JDK 自帶的 ServiceLoader。 在 Dubbo 官方文檔中提到,ExtensionLoader 改進了 JAVA ServiceLoader 的如下問題:
以 LoadBalance 爲例,文件 com.alibaba.dubbo.rpc.cluster.LoadBalance 中內容爲:
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
用戶使用時,在 XML 中配置 loadbalance="random",那麼 Dubbo 將加載(且僅加載)RandomLoadBalance。
從源碼角度分析,ExtensionLoader 加載擴展點流程以下:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) throw new IllegalArgumentException("Extension type == null"); if(!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } if(!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }
在獲取 ExtensionLoader 時,或判斷傳入的 Class 是否爲 interface 並是否有 @SPI 註解。建立 ExtensionLoader 實例後在內存中緩存,保證每一個擴展點具備惟一的 ExtensionLoader 單例。
//根據名字獲取擴展點實例 public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; } //實例化擴展點 private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } } private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; } //從配置文件中加載擴展點 private Map<String, Class<?>> loadExtensionClasses() { final SPI defaultAnnotation = type.getAnnotation(SPI.class); if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; } } Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>(); loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; }
獲取擴展點時,從內存緩存中獲取擴展點實例。擴展點實例在進程中也是個單例。Dubbo 從如下三個路徑中讀取擴展點配置文件並加載:
在實例化擴展點的代碼中,咱們能夠看到有如下兩個處理:
//實例化擴展點 private T createExtension(String name) { //...... injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; //...... } //注入擴展點 private T injectExtension(T instance) { try { if (objectFactory != null) { for (Method method : instance.getClass().getMethods()) { if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) { Class<?> pt = method.getParameterTypes()[0]; try { String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : ""; Object object = objectFactory.getExtension(pt, property); if (object != null) { method.invoke(instance, object); } } catch (Exception e) { logger.error("fail to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; }
setter
擴展點實現類的成員若是爲其它擴展點類型,ExtensionLoader 在會自動注入依賴的擴展點。ExtensionLoader 經過掃描擴展點實現類的全部set方法來斷定其成員。
Wrapper
若是擴展點實現類有拷貝構造函數,則認爲是包裝類。包裝類持有實際的擴展點實現類,經過包裝類能夠把全部擴展點的公共邏輯移到包裝類,相似AOP。
從文件加載擴展點代碼以下:
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) { //...... BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); try { String line = null; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) line = line.substring(0, ci); line = line.trim(); if (line.length() > 0) { try { String name = null; int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { Class<?> clazz = Class.forName(line, true, classLoader); if (! type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } if (clazz.isAnnotationPresent(Adaptive.class)) { if(cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (! cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } } else { try { clazz.getConstructor(type); Set<Class<?>> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet<Class<?>>(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); } catch (NoSuchMethodException e) { clazz.getConstructor(); if (name == null || name.length() == 0) { name = findAnnotationName(clazz); if (name == null || name.length() == 0) { if (clazz.getSimpleName().length() > type.getSimpleName().length() && clazz.getSimpleName().endsWith(type.getSimpleName())) { name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); } else { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); } } } String[] names = NAME_SEPARATOR.split(name); if (names != null && names.length > 0) { Activate activate = clazz.getAnnotation(Activate.class); if (activate != null) { cachedActivates.put(names[0], activate); } for (String n : names) { if (! cachedNames.containsKey(clazz)) { cachedNames.put(clazz, n); } Class<?> c = extensionClasses.get(n); if (c == null) { extensionClasses.put(n, clazz); } else if (c != clazz) { throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName()); } } } } } } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } // end of while read lines } finally { reader.close(); } //...... }
簡單來講,上面的代碼作了如下幾件事:
@Adaptive
擴展點自適應,直到擴展點方法執行時才決定調用哪個擴展點實現。擴展點的調用會有URL 做爲參數,經過@Adaptive 註解能夠提取約定 key 來決定調用哪一個實現的方法。
@Activate
擴展點自動激活,指定 URL 中激活擴展點的 key,未指定 key 時表示無條件激活。 好比 LoadBalance:
SPI(RandomLoadBalance.NAME) public interface LoadBalance { @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
表示默認使用 Random 負載均衡策略,同時會根據用戶在 XML 中配置的 loadbalance 參數來最終決定調用哪一個擴展點實現類。
再好比 AsyncFilter:
@Activate(group = Constants.CONSUMER) public class AsyncFilter implements Filter{ }
表示只有在 Consumer 端纔會激活。