Dubbo原理和源碼解析之「微內核+插件」機制

github新增倉庫 "dubbo-read"(點此查看,集合全部《Dubbo原理和源碼解析》系列文章,後續將繼續補充該系列,同時將針對Dubbo所作的功能擴展也進行分享。不按期更新,歡迎Follow。html

 

1 框架設計

在官方《Dubbo 開發指南》框架設計部分提到,Dubbo 服務框架的基本設計原則是:java

  • 採用 URL 做爲配置信息的統一格式,全部擴展點都經過傳遞 URL 攜帶配置信息;
  • 採用 Microkernel + Plugin 模式,Microkernel 只負責組裝 Plugin,Dubbo 自身的功能也是經過擴展點實現的,也就是 Dubbo 的全部功能點均可被用戶自定義擴展所替換;

對於第一點比較容易理解,全部的參數都封裝成 Dubbo 自定義的 URL 對象進行傳遞。URL 對象主要包括如下屬性:git

  • String protocol
  • String host
  • int port
  • String path
  • Map<String, String> parameters

本文將重點介紹第二點,對 Microkernel + Plugin 機制的實現原理、源碼進行分析和跟蹤。github

 

2 API 和 SPI

框架或組件一般有兩類客戶,一個是使用者,一個是擴展者。API (Application Programming Interface) 是給使用者用的,而 SPI (Service Provide Interface) 是給擴展者用的。
咱們系統裏抽象的各個模塊,每每有不少不一樣的實現方案,好比日誌模塊的方案、jdbc模塊的方案等。面向的對象的設計裏,咱們通常推薦 模塊之間基於接口編程,模塊之間不對實現類進行硬編碼。一旦代碼裏涉及具體的實現類,就違反了 可拔插的原則,若是須要替換一種實現,就須要修改代碼。
爲了實如今模塊裝配的時候能不在程序裏動態指明,這就須要一種服務發現機制。JAVA SPI 就提供了這樣的一個機制——爲某個接口尋找服務實現的機制。有點相似 IOC 的思想,將裝配的控制權移到程序以外,在 模塊化設計 中這個機制尤爲重要。apache

 

3 JAVA SPI

JAVA SPI 其實是 」基於接口編程+策略模式+配置文件「 組合實現的動態加載機制。具體步驟爲:編程

    1. 定義一個接口;
    2. 編寫接口的一個或多個實現;
    3. src/main/resources/ 下創建 /META-INF/services 目錄, 新增一個以接口命名的文件,內容是實現類類名;
    4. 使用 ServiceLoader 來加載配置文件中指定的實現。

假設咱們提供了一個「打招呼」的接口,有中文版和英文版兩種實現:緩存

3.1 定義接口

package com.spi.service;

public interface HelloService {

    public String sayHello();

}

3.2 編寫實現

分別編寫中文版、英文版的實現:ruby

package com.spi.service.impl;

public class ChineseHelloService implements HelloService {

    @Override
    public String sayHello() {
        return "你好";
    }

}

  

package com.spi.service.impl;

public class EnglishHelloService implements HelloService {

    @Override
    public String sayHello() {
        return "hello";
    }

}

  

編寫工廠類,用於封裝實現類的獲取邏輯:app

public class HelloServiceFactory {

    public HelloServiceFactory(){ }

    public static HelloService newHelloService(){
        HelloService helloService = null;
        ServiceLoader<HelloService> serviceLoader = ServiceLoader.load(HelloService.class);
        Iterator<HelloService> services = serviceLoader.iterator();
        if(services.hasNext()){
            helloService = services.next();
        }
        return helloService;
    }

}

  

3.3 建立文件

/src/main/resource/META-INF/services 下建立 com.spi.service.HelloService 文件,內容爲兩個具體實現類的類名:負載均衡

package com.spi.service.impl.EnglishHelloService

package com.spi.service.impl.ChineseHelloService 

 

3.4 執行測試

package com.spi;

public class Main {

    public static void main(String[] args) {
        HelloService helloService = HelloServiceFactory.newHelloService();
        System.out.println(helloService.sayHello());
    }

}

  

當文件內容爲 package com.spi.service.impl.EnglishHelloService 時,執行結果爲:

hello

  

當文件內容爲 package com.spi.service.impl.ChineseHelloService 時,執行結果爲:

你好

  

以此類推,若是你把全部實現類類名都寫到文件中,由調用者自行選擇實現類,那麼能夠經過如下方式實現(簡陋版,純屬舉例用):

com.spi.service.HelloService文件:

package com.spi.service.impl.EnglishHelloService
package com.spi.service.impl.ChineseHelloService

  

HelloServiceFactory:

public class HelloServiceFactory {

    private HelloServiceFactory(){

    }

    public static HelloService newHelloService(String name){
        HelloService helloService = null;
        ServiceLoader<HelloService> serviceLoader = ServiceLoader.load(HelloService.class);
        Iterator<HelloService> services = serviceLoader.iterator();
        while(services.hasNext()){
            HelloService tmp = services.next();
            if(tmp.getClass().toString().contains(name)){
                helloService = tmp;
                break;
            }
        }
        return helloService;
    }

}

  

Main:

public class Main {

    public static String name = "com.spi.service.impl.EnglishHelloService";

    public static void main(String[] args) {
        HelloService helloService = HelloServiceFactory.newHelloService(name);
        System.out.println(helloService.sayHello());
    }

}

  

4 Dubbo Microkernel + Plugin

Dubbo 「微內核+插件「機制的總體特性以下:

下面結合源碼進行分析

4.1 ExtensionLoader

Dubbo 實現 「微內核+插件「機制的核心是 ExtensionLoader,它取代了 JDK 自帶的 ServiceLoader。 在 Dubbo 官方文檔中提到,ExtensionLoader 改進了 JAVA ServiceLoader 的如下問題:

    1. JDK 標準的 SPI 會一次性實例化擴展點全部實現,沒用上也加載,若是有擴展實現初始化很耗時,會很浪費資源。
    2. 若是擴展點加載失敗,連擴展點的名稱都拿不到了。好比:JDK 標準的 ScriptEngine,經過 getName() 獲取腳本類型的名稱,但若是 RubyScriptEngine 由於所依賴的 jruby.jar 不存在,致使 RubyScriptEngine 類加載失敗,這個失敗緣由被吃掉了,和 ruby 對應不起來,當用戶執行 ruby 腳本時,會報不支持 ruby,而不是真正失敗的緣由。
    3. 增長了對擴展點 IoC 和 AOP 的支持,一個擴展點能夠直接 setter 注入其它擴展點。

以 LoadBalance 爲例,文件 com.alibaba.dubbo.rpc.cluster.LoadBalance 中內容爲:

random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

用戶使用時,在 XML 中配置 loadbalance="random",那麼 Dubbo 將加載(且僅加載)RandomLoadBalance

從源碼角度分析,ExtensionLoader 加載擴展點流程以下:

4.1.1 獲取ExtensionLoader

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    if(!type.isInterface()) {
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    if(!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type(" + type + 
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }
    
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

在獲取 ExtensionLoader 時,或判斷傳入的 Class 是否爲 interface 並是否有 @SPI 註解。建立 ExtensionLoader 實例後在內存中緩存,保證每一個擴展點具備惟一的 ExtensionLoader 單例。

4.1.2 獲取擴展點

//根據名字獲取擴展點實例
public T getExtension(String name) {
		if (name == null || name.length() == 0)
		    throw new IllegalArgumentException("Extension name == null");
		if ("true".equals(name)) {
		    return getDefaultExtension();
		}
		Holder<Object> holder = cachedInstances.get(name);
		if (holder == null) {
		    cachedInstances.putIfAbsent(name, new Holder<Object>());
		    holder = cachedInstances.get(name);
		}
		Object instance = holder.get();
		if (instance == null) {
		    synchronized (holder) {
	            instance = holder.get();
	            if (instance == null) {
	                instance = createExtension(name);
	                holder.set(instance);
	            }
	        }
		}
		return (T) instance;
}

//實例化擴展點
private T createExtension(String name) {
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && wrapperClasses.size() > 0) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}


private Map<String, Class<?>> getExtensionClasses() {
      Map<String, Class<?>> classes = cachedClasses.get();
      if (classes == null) {
          synchronized (cachedClasses) {
              classes = cachedClasses.get();
              if (classes == null) {
                  classes = loadExtensionClasses();
                  cachedClasses.set(classes);
              }
          }
      }
      return classes;
}

//從配置文件中加載擴展點
private Map<String, Class<?>> loadExtensionClasses() {
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if(defaultAnnotation != null) {
        String value = defaultAnnotation.value();
        if(value != null && (value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if(names.length > 1) {
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if(names.length == 1) cachedDefaultName = names[0];
        }
    }
    
    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
    loadFile(extensionClasses, DUBBO_DIRECTORY);
    loadFile(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses;
}

獲取擴展點時,從內存緩存中獲取擴展點實例。擴展點實例在進程中也是個單例。Dubbo 從如下三個路徑中讀取擴展點配置文件並加載:

  • META-INF/services/
  • META-INF/dubbo/
  • META-INF/dubbo/internal/

4.2 setter & Wrapper

在實例化擴展點的代碼中,咱們能夠看到有如下兩個處理:

  • setter 注入
  • Wrapper 包裝

 

//實例化擴展點
private T createExtension(String name) {
    //......
    
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && wrapperClasses.size() > 0) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
        
    //......
}

//注入擴展點
private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

setter

擴展點實現類的成員若是爲其它擴展點類型,ExtensionLoader 在會自動注入依賴的擴展點。ExtensionLoader 經過掃描擴展點實現類的全部set方法來斷定其成員。

Wrapper 

若是擴展點實現類有拷貝構造函數,則認爲是包裝類。包裝類持有實際的擴展點實現類,經過包裝類能夠把全部擴展點的公共邏輯移到包裝類,相似AOP。

4.3 Adaptive & Activate

從文件加載擴展點代碼以下:

private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
    //......
    BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
    try {
        String line = null;
        while ((line = reader.readLine()) != null) {
            final int ci = line.indexOf('#');
            if (ci >= 0) line = line.substring(0, ci);
            line = line.trim();
            if (line.length() > 0) {
                try {
                    String name = null;
                    int i = line.indexOf('=');
                    if (i > 0) {
                        name = line.substring(0, i).trim();
                        line = line.substring(i + 1).trim();
                    }
                    if (line.length() > 0) {
                        Class<?> clazz = Class.forName(line, true, classLoader);
                        if (! type.isAssignableFrom(clazz)) {
                            throw new IllegalStateException("Error when load extension class(interface: " +
                                    type + ", class line: " + clazz.getName() + "), class " 
                                    + clazz.getName() + "is not subtype of interface.");
                        }
                        if (clazz.isAnnotationPresent(Adaptive.class)) {
                            if(cachedAdaptiveClass == null) {
                                cachedAdaptiveClass = clazz;
                            } else if (! cachedAdaptiveClass.equals(clazz)) {
                                throw new IllegalStateException("More than 1 adaptive class found: "
                                        + cachedAdaptiveClass.getClass().getName()
                                        + ", " + clazz.getClass().getName());
                            }
                        } else {
                            try {
                                clazz.getConstructor(type);
                                Set<Class<?>> wrappers = cachedWrapperClasses;
                                if (wrappers == null) {
                                    cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                    wrappers = cachedWrapperClasses;
                                }
                                wrappers.add(clazz);
                            } catch (NoSuchMethodException e) {
                                clazz.getConstructor();
                                if (name == null || name.length() == 0) {
                                    name = findAnnotationName(clazz);
                                    if (name == null || name.length() == 0) {
                                        if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                            name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                        } else {
                                            throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                        }
                                    }
                                }
                                String[] names = NAME_SEPARATOR.split(name);
                                if (names != null && names.length > 0) {
                                    Activate activate = clazz.getAnnotation(Activate.class);
                                    if (activate != null) {
                                        cachedActivates.put(names[0], activate);
                                    }
                                    for (String n : names) {
                                        if (! cachedNames.containsKey(clazz)) {
                                            cachedNames.put(clazz, n);
                                        }
                                        Class<?> c = extensionClasses.get(n);
                                        if (c == null) {
                                            extensionClasses.put(n, clazz);
                                        } else if (c != clazz) {
                                            throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                        }
                                    }
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                    exceptions.put(line, e);
                }
            }
        } // end of while read lines
    } finally {
        reader.close();
    }
    //......
}

簡單來講,上面的代碼作了如下幾件事:

    1. 忽略已註釋的行
    2. 解析出名稱和擴展點實現類名
    3. 判斷是否有@Adaptive註解
    4. 匹配構造函數,判斷是否爲Wrapper類
    5. 判斷是否有@Activate註解

@Adaptive 

擴展點自適應,直到擴展點方法執行時才決定調用哪個擴展點實現。擴展點的調用會有URL 做爲參數,經過@Adaptive 註解能夠提取約定 key 來決定調用哪一個實現的方法。

@Activate

擴展點自動激活,指定 URL 中激活擴展點的 key,未指定 key 時表示無條件激活。 好比 LoadBalance

SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    @Adaptive("loadbalance")
	<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

表示默認使用 Random 負載均衡策略,同時會根據用戶在 XML 中配置的 loadbalance 參數來最終決定調用哪一個擴展點實現類。

再好比 AsyncFilter

@Activate(group = Constants.CONSUMER)
public class AsyncFilter implements Filter{

}

表示只有在 Consumer 端纔會激活。

相關文章
相關標籤/搜索