聊聊Dubbo(五):核心源碼-SPI擴展

0 前言

站在一個框架做者的角度來講,定義一個接口,本身默認給出幾個接口的實現類,同時 容許框架的使用者也可以自定義接口的實現。如今一個簡單的問題就是:如何優雅的根據一個接口來獲取該接口的全部實現類呢?java

JDK SPI 正是爲了優雅解決這個問題而生,SPI 全稱爲 (Service Provider Interface),即服務提供商接口,是JDK內置的一種服務提供發現機制。目前有很多框架用它來作服務的擴展發現,簡單來講,它就是一種動態替換髮現服務實現者的機制數據庫

因此,Dubbo如此被普遍接納的其中的 一個重要緣由就是基於SPI實現的強大靈活的擴展機制,開發者可自定義插件嵌入Dubbo,實現靈活的業務需求。編程

有人會以爲這就是創建在面向接口編程下的一種爲了使組件可擴展或動態變動實現的規範,常見的類SPI的設計有 JDBC、JNDI、JAXP 等,不少開源框架的內部實現也採用了SPI。例如:JDBC的架構是由一套API組成,用於給Java應用提供訪問不一樣數據庫的能力,而數據庫提供商的驅動軟件各不相同,JDBC經過提供一套通用行爲的API接口,底層能夠由提供商自由實現,雖然JDBC的設計沒有指明是SPI,但也和SPI的設計相似。數組

1 JDK SPI擴展

JDK爲SPI的實現提供了工具類,即java.util.ServiceLoader,ServiceLoader中定義的SPI規範沒有什麼特別之處,只須要有一個提供者配置文件(provider-configuration file),該文件須要在resource目錄META-INF/services下,文件名就是服務接口的全限定名緩存

  1. 文件內容是提供者Class的全限定名列表,顯然提供者Class都應該實現服務接口;
  2. 文件必須使用UTF-8編碼

1.1 JDK SPI 示例

代碼示例

/** * SPI服務接口 */
public interface Cmand {
    public void execute();
}
public class ShutdownCommand implements Cmand {
    public void execute() {
        System.out.println("shutdown....");  
    }
}
public class StartCommand implements Cmand {
    public void execute() {
        System.out.println("start....");
    }
}
public class SPIMain {
    public static void main(String[] args) {
        ServiceLoader<Cmand> loader = ServiceLoader.load(Cmand.class);
        System.out.println(loader);
 
        for (Cmand Cmand : loader) {
            Cmand.execute();
        }
    }
}
複製代碼

META-INF/services/com.unei.serviceloader.Cmand文件中配置:bash

com.unei.serviceloader.impl.ShutdownCommand  
com.unei.serviceloader.impl.StartCommand 
複製代碼

運行結果:架構

java.util.ServiceLoader[com.unei.serviceloader.Cmand]
shutdown....
start....
複製代碼

1.2 JDK SPI 原理

  1. 配置文件爲何要放在META-INF/services下面?app

    ServiceLoader類定義以下:負載均衡

    private static final String PREFIX = "META-INF/services/"; (JDK已經寫死了)
    複製代碼

    可是 若是ServiceLoader在load時提供Classloader,則能夠從其餘的目錄讀取。框架

  2. ServiceLoader讀取實現類是何時實例化的?來看下ServiceLoader的幾個重要屬性:

    private static final String PREFIX = "META-INF/services/";
    
    // 要加載的接口
    private Class<S> service;
    
    // The class loader used to locate, load, and instantiate providers
    private ClassLoader loader;
    
    // 用於緩存已經加載的接口實現類,其中key爲實現類的完整類名
    private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
    
    // 用於延遲加載接口的實現類
    private LazyIterator lookupIterator;
    
    public void reload() {
        providers.clear();
        lookupIterator = new LazyIterator(service, loader);
    }
    private ServiceLoader(Class<S> svc, ClassLoader cl) {
        service = Objects.requireNonNull(svc, "Service interface cannot be null");
        loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
        acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
        reload();
    }
    
    private class LazyIterator implements Iterator<S> {
    
        Class<S> service;
        ClassLoader loader;
        Enumeration<URL> configs = null;
        Iterator<String> pending = null;
        String nextName = null;
    
        private LazyIterator(Class<S> service, ClassLoader loader) {
            this.service = service;
            this.loader = loader;
        }
    
        public boolean hasNext() {
            if (nextName != null) {
                return true;
            }
            if (configs == null) {
                try {
                    String fullName = PREFIX + service.getName();
                    if (loader == null)
                        configs = ClassLoader.getSystemResources(fullName);
                    else
                       configs = loader.getResources(fullName);
                } catch (IOException x) {
                    fail(service, "Error locating configuration files", x);
                }
            }
            while ((pending == null) || !pending.hasNext()) {
                if (!configs.hasMoreElements()) {
                    return false;
                }
                pending = parse(service, configs.nextElement());
            }
            nextName = pending.next();
            return true;
        }
    
        public S next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            String cn = nextName;
            nextName = null;
            Class<?> c = null;
            try {
                // 遍歷時,查找類對象
                c = Class.forName(cn, false, loader);
            } catch (ClassNotFoundException x) {
                fail(service,  "Provider " + cn + " not found");
            }
            if (!service.isAssignableFrom(c)) {
                fail(service, "Provider " + cn  + " not a subtype");
            }
            try {
                // 遍歷時,纔會初始化類實例對象
                S p = service.cast(c.newInstance());
                providers.put(cn, p);
                return p;
            } catch (Throwable x) {
                fail(service, "Provider " + cn + " could not be instantiated", x);
            }
            throw new Error();
        }
    
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
    複製代碼

1.3 JDK SPI ServiceLoader缺點

  1. 雖然ServiceLoader也算是使用的延遲加載,可是基本只能經過遍歷所有獲取,也就是接口的實現類所有加載並實例化一遍。若是你並不想用某些實現類,它也被加載並實例化了,這就形成了浪費。
  2. 獲取某個實現類的方式不夠靈活,只能經過Iterator形式獲取,不能根據某個參數來獲取對應的實現類。

2 Dubbo SPI擴展

Dubbo對JDK SPI進行了擴展,對服務提供者配置文件中的內容進行了改造,由原來的提供者類的全限定名列表改爲了KV形式的列表,這也致使了Dubbo中沒法直接使用JDK ServiceLoader,因此,與之對應的,在Dubbo中有ExtensionLoader,ExtensionLoader是擴展點載入器,用於載入Dubbo中的各類可配置組件,好比:動態代理方式(ProxyFactory)、負載均衡策略(LoadBalance)、RCP協議(Protocol)、攔截器(Filter)、容器類型(Container)、集羣方式(Cluster)和註冊中心類型(RegistryFactory)等

總之,Dubbo爲了應對各類場景,它的全部內部組件都是經過這種SPI的方式來管理的,這也是爲何Dubbo須要將服務提供者配置文件設計成KV鍵值對形式,這個K就是咱們在Dubbo配置文件或註解中用到的K,Dubbo直接經過服務接口(上面提到的ProxyFactory、LoadBalance、Protocol、Filter等)和配置的K從ExtensionLoader拿到服務提供的實現類

同時,因爲Dubbo使用了URL總線的設計,即不少參數經過URL對象來傳遞,在實際中,具體要用到哪一個值,能夠經過URL中的參數值來指定

2.1 擴展功能介紹

Dubbo對SPI的擴展是 經過ExtensionLoader來實現的,查看ExtensionLoader的源碼,能夠看到Dubbo對JDK SPI 作了三個方面的擴展:

  1. 方便獲取擴展實現:JDK SPI僅僅經過接口類名獲取全部實現,而ExtensionLoader則經過接口類名和key值獲取一個實現

  2. IOC依賴注入功能:Adaptive實現,就是生成一個代理類,這樣就能夠根據實際調用時的一些參數動態決定要調用的類了

    舉例來講:接口A,實現者A一、A2。接口B,實現者B一、B2。

    如今實現者A1含有setB()方法,會自動注入一個接口B的實現者,此時注入B1仍是B2呢?都不是,而是注入一個動態生成的接口B的實現者B$Adpative,該實現者可以根據參數的不一樣,自動引用B1或者B2來完成相應的功能

  3. 採用裝飾器模式進行功能加強,自動包裝實現,這種實現的類通常是自動激活的,經常使用於包裝類,好比:Protocol的兩個實現類:ProtocolFilterWrapper、ProtocolListenerWrapper。

    仍是第2個的例子,接口A的另外一個實現者AWrapper1。大致內容以下:

    private A a;
    AWrapper1(A a){
    	  this.a=a;
    }
    複製代碼

    所以,當在獲取某一個接口A的實現者A1的時候,已經自動被AWrapper1包裝了

2.2 擴展源碼分析

2.2.1 ExtensionLoader初始化

以獲取DubboProtocol爲例

@SPI("dubbo")
public interface Protocol {
    
    /** * 獲取缺省端口,當用戶沒有配置端口時使用。 * * @return 缺省端口 */
    int getDefaultPort();

    /** * 暴露遠程服務:<br> * 1. 協議在接收請求時,應記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br> * 2. export()必須是冪等的,也就是暴露同一個URL的Invoker兩次,和暴露一次沒有區別。<br> * 3. export()傳入的Invoker由框架實現並傳入,協議不須要關心。<br> * * @param <T> 服務的類型 * @param invoker 服務的執行體 * @return exporter 暴露服務的引用,用於取消暴露 * @throws RpcException 當暴露服務出錯時拋出,好比端口已佔用 */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /** * 引用遠程服務:<br> * 1. 當用戶調用refer()所返回的Invoker對象的invoke()方法時,協議需相應執行同URL遠端export()傳入的Invoker對象的invoke()方法。<br> * 2. refer()返回的Invoker由協議實現,協議一般須要在此Invoker中發送遠程請求。<br> * 3. 當url中有設置check=false時,鏈接失敗不能拋出異常,並內部自動恢復。<br> * * @param <T> 服務的類型 * @param type 服務的類型 * @param url 遠程服務的URL地址 * @return invoker 服務的本地代理 * @throws RpcException 當鏈接服務提供方失敗時拋出 */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /** * 釋放協議:<br> * 1. 取消該協議全部已經暴露和引用的服務。<br> * 2. 釋放協議所佔用的全部資源,好比鏈接和端口。<br> * 3. 協議在釋放後,依然能暴露和引用新的服務。<br> */
    void destroy();

}

public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";
    ...
    ...
}

// 示例:
ExtensionLoader<Protocol> protocolLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol dubboProtocol = protocolLoader.getExtension(DubboProtocol.NAME);
複製代碼
  1. ExtensionLoader.getExtensionLoader(Protocol.class):獲取ExtensionLoader實例

    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
         if (type == null)
             throw new IllegalArgumentException("Extension type == null");
         if(!type.isInterface()) {
             throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
         }
         // 0. 判斷是否爲經過SPI註解定義的可擴展接口
         if(!withExtensionAnnotation(type)) {
             throw new IllegalArgumentException("Extension type(" + type + 
                     ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
         }
         // 1. 先從EXTENSION_LOADERS中,根據傳入可擴展類型type查找
         ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
         if (loader == null) {
             // 2. 不存在,則新建ExtensionLoader實例
             EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
             loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
         }
         return loader;
    }
    
     private ExtensionLoader(Class<?> type) {
         this.type = type;
         objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
     }
    
     private static <T> boolean withExtensionAnnotation(Class<T> type) {
         return type.isAnnotationPresent(SPI.class);
     }
    複製代碼

    ExtensionLoader充當插件工廠角色,提供了一個私有的構造器。其入參type爲擴展接口類型。Dubbo經過SPI註解定義了可擴展的接口,如Filter、Transporter等。每一個類型的擴展對應一個ExtensionLoader。SPI的value參數決定了默認的擴展實現

    當擴展類型是ExtensionFactory時,不指定objectFactory,不然初始化ExtensionFactory的ExtensionLoader並獲取一個擴展適配器

  2. protocolLoader.getExtension(DubboProtocol.NAME):根據Key獲取相應的擴展實現類實例

    /** * 返回指定名字的擴展。若是指定名字的擴展不存在,則拋異常 {@link IllegalStateException}. * * @param name * @return */
     @SuppressWarnings("unchecked")
     public T getExtension(String name) {
         if (name == null || name.length() == 0)
             throw new IllegalArgumentException("Extension name == null");
         if ("true".equals(name)) {
             return getDefaultExtension();
         }
         // 1. 先從緩存中取相應的擴展實現類實例
         Holder<Object> holder = cachedInstances.get(name);
         if (holder == null) {
             cachedInstances.putIfAbsent(name, new Holder<Object>());
             holder = cachedInstances.get(name);
         }
         Object instance = holder.get();
         if (instance == null) {
             synchronized (holder) {
                 instance = holder.get();
                 if (instance == null) {
                     // 2. 建立相應的擴展實現類實例
                     instance = createExtension(name);
                     holder.set(instance);
                 }
             }
         }
         return (T) instance;
    }
    
     @SuppressWarnings("unchecked")
     private T createExtension(String name) {
         // 3. 根據name獲取相應擴展類的類實例
         Class<?> clazz = getExtensionClasses().get(name);
         if (clazz == null) {
             throw findException(name);
         }
         try {
             T instance = (T) EXTENSION_INSTANCES.get(clazz);
             if (instance == null) {
                 EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                 instance = (T) EXTENSION_INSTANCES.get(clazz);
             }
             injectExtension(instance);
             Set<Class<?>> wrapperClasses = cachedWrapperClasses;
             if (wrapperClasses != null && wrapperClasses.size() > 0) {
                 for (Class<?> wrapperClass : wrapperClasses) {
                     instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                 }
             }
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                     type + ") could not be instantiated: " + t.getMessage(), t);
         }
     }   
    複製代碼

    createExtension方法就是建立相應的擴展類實例,具體裏面的建立步驟下文會具體說到,接下來先繼續深刻看getExtensionClasses方法是如何從配置文件中找到相應的擴展類的類配置

2.2.2 配置文件掃描

Dubbo默認依次掃描**META-INF/dubbo/internal/、META-INF/dubbo/、META-INF/services/三個classpath目錄下的配置文件**。配置文件以具體擴展接口全名命名,如:com.alibaba.dubbo.rpc.Filter,內容以下:

# 等號前爲擴展名,其後爲擴展實現類全路徑名
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
複製代碼

從上一小節的,getExtensionClasses()方法源碼看起:

private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
	}

    // 此方法已經getExtensionClasses方法同步過。
    private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if(defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if(names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
        
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadFile(extensionClasses, DUBBO_DIRECTORY);
        loadFile(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }

    private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                // 1. 逐行讀取配置文件,提取出擴展名或擴展類路徑
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            // 2. 利用Class.forName方法進行類加載
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            // 3. 處理Adaptive註解,若存在則將該實現類保存至cachedAdaptiveClass屬性
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                try {
                                                    // 4. 嘗試獲取參數類型爲當前擴展類型的構造器方法,若成功則代表存在該擴展的封裝類型,將封裝類型存入wrappers集合;不然轉入第五步
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }

                                                    // 5. 處理active註解,將擴展名對應active註解存入cachedActivates
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }
複製代碼

以上配置文件加載步驟以下:

  1. 逐行讀取配置文件,提取出擴展名或擴展類路徑;

  2. 利用Class.forName方法進行類加載;

    Class<?> clazz = Class.forName(line, true, classLoader);
    複製代碼
  3. 處理Adaptive註解,若存在則將該實現類保存至cachedAdaptiveClass屬性

    if (clazz.isAnnotationPresent(Adaptive.class)) {
       if(cachedAdaptiveClass == null) {
           cachedAdaptiveClass = clazz;
       } else if (! cachedAdaptiveClass.equals(clazz)) {
           throw new IllegalStateException("More than 1 adaptive class found:"    + cachedAdaptiveClass.getClass().getName()
                 + ", " + clazz.getClass().getName());
       }
    }
    複製代碼
  4. 嘗試獲取參數類型爲當前擴展類型的構造器方法,若成功則代表存在該擴展的封裝類型,將封裝類型存入wrappers集合;不然拋出異常轉入第五步;

    try {
        // 擴展類型參數的構造器是封裝器的約定特徵,目前dubbo中默認的只有Filter和Listener的封裝器
        clazz.getConstructor(type); 
        Set<Class<?>> wrappers = cachedWrapperClasses;
        if (wrappers == null) {
            cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
            wrappers = cachedWrapperClasses;
        }
        wrappers.add(clazz);
    } catch (NoSuchMethodException e) {
        // 第五步
    }
    複製代碼
  5. 處理active註解,將擴展名對應active註解存入cachedActivates;

    Activate activate = clazz.getAnnotation(Activate.class);
    if (activate != null) {
        cachedActivates.put(names[0], activate);
    }
    複製代碼

2.2.3 擴展適配器

在dubbo擴展中,適配器模式被普遍使用,其做用在於爲同一擴展類型下的多個擴展實現的調用提供路由功能,如指定優先級等。dubbo提供了兩種方式來生成擴展適配器:

  1. 靜態適配器擴展

    所謂的靜態適配器擴展就是提早經過編碼的形式肯定擴展的具體實現,且該實現類由Adaptive註解標註,如:AdaptiveCompiler。在加載配置文件的loadFile方法中,已經描述過處理該類型擴展的邏輯,具體可參考上一小節loadFile()方法源碼

    @Adaptive
    public class AdaptiveCompiler implements Compiler {
    
        private static volatile String DEFAULT_COMPILER;
    
        public static void setDefaultCompiler(String compiler) {
            DEFAULT_COMPILER = compiler;
        }
    
        public Class<?> compile(String code, ClassLoader classLoader) {
            Compiler compiler;
            ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
            String name = DEFAULT_COMPILER; // copy reference
            if (name != null && name.length() > 0) {
                compiler = loader.getExtension(name);
            } else {
                compiler = loader.getDefaultExtension();
            }
            return compiler.compile(code, classLoader);
        }
    }
    複製代碼
  2. 動態適配器擴展

    動態適配器擴展即經過動態代理生成擴展類的動態代理類,在dubbo中是經過javassist技術生成的。與傳統的jdk動態代理、cglib不一樣,javassist提供封裝後的API對字節碼進行間接操做,簡單易用,不關心具體字節碼,靈活性更高,且處理效率也較高,是dubbo默認的編譯器。

首先,從ExtensionLoader構造器中會調用getAdaptiveExtension()方法觸發爲當前擴展類型生成適配器,源碼以下:

private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

    public T getAdaptiveExtension() {
        // 1. 首先,檢查是否存在當前擴展類靜態適配器
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if(createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            // 2. 建立當前擴展類動態適配器
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            }
            else {
                throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }

    private T createAdaptiveExtension() {
        try {
            // IOC屬性注入
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
        }
    }
    
    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    
    private Class<?> createAdaptiveExtensionClass() {
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        // 獲得Adaptive類代碼內容,經過Compiler進行編譯和類加載
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }
    
    // 建立當前擴展動態適配器
    private String createAdaptiveExtensionClassCode() {
        StringBuilder codeBuidler = new StringBuilder();
        Method[] methods = type.getMethods();
        boolean hasAdaptiveAnnotation = false;
        // 1. 檢查是否至少有一個方法有Adaptive註解,若不存在則拋出異常,即要完成動態代理,必須有方法標註了Adaptive註解
        for(Method m : methods) {
            if(m.isAnnotationPresent(Adaptive.class)) {
                hasAdaptiveAnnotation = true;
                break;
            }
        }
        // 徹底沒有Adaptive方法,則不須要生成Adaptive類
        if(! hasAdaptiveAnnotation)
            throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
        
        codeBuidler.append("package " + type.getPackage().getName() + ";");
        codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
        codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {");
        
        for (Method method : methods) {
            Class<?> rt = method.getReturnType();
            Class<?>[] pts = method.getParameterTypes();
            Class<?>[] ets = method.getExceptionTypes();

            Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
            StringBuilder code = new StringBuilder(512);
            if (adaptiveAnnotation == null) {
                code.append("throw new UnsupportedOperationException(\"method ")
                        .append(method.toString()).append(" of interface ")
                        .append(type.getName()).append(" is not adaptive method!\");");
            } else {
                int urlTypeIndex = -1;
                for (int i = 0; i < pts.length; ++i) {
                    if (pts[i].equals(URL.class)) {
                        urlTypeIndex = i;
                        break;
                    }
                }
                // 對於有Adaptive註解的方法,判斷其入參中是否有URL類型的參數,或者複雜參數中是否有URL類型的屬性,若沒有則拋出異常。
                // 這裏體現出了爲何dubbo要提供動態適配器生成機制。dubbo中的URL總線提供了服務的所有信息,而開發者能夠定義差別化的服務配置,所以生成的URL差別化也較大,若所有靠用戶硬編碼靜態適配器的話效率過低。
                // 有了動態代理,dubbo能夠根據URL參數動態地生成適配器的適配邏輯,肯定擴展實現的獲取優先級。所以,URL做爲參數直接或間接傳入是必須的,不然失去了動態生成的憑據。
                // 有類型爲URL的參數
                if (urlTypeIndex != -1) {
                    // Null Point check
                    String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
                                    urlTypeIndex);
                    code.append(s);
                    
                    s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex); 
                    code.append(s);
                }
                // 參數沒有URL類型
                else {
                    String attribMethod = null;
                    
                    // 找到參數的URL屬性
                    LBL_PTS:
                    for (int i = 0; i < pts.length; ++i) {
                        Method[] ms = pts[i].getMethods();
                        for (Method m : ms) {
                            String name = m.getName();
                            if ((name.startsWith("get") || name.length() > 3)
                                    && Modifier.isPublic(m.getModifiers())
                                    && !Modifier.isStatic(m.getModifiers())
                                    && m.getParameterTypes().length == 0
                                    && m.getReturnType() == URL.class) {
                                urlTypeIndex = i;
                                attribMethod = name;
                                break LBL_PTS;
                            }
                        }
                    }
                    if(attribMethod == null) {
                        throw new IllegalStateException("fail to create adative class for interface " + type.getName()
                        		+ ": not found url parameter or url attribute in parameters of method " + method.getName());
                    }
                    
                    // Null point check
                    String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
                                    urlTypeIndex, pts[urlTypeIndex].getName());
                    code.append(s);
                    s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
                                    urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
                    code.append(s);

                    s = String.format("%s url = arg%d.%s();",URL.class.getName(), urlTypeIndex, attribMethod); 
                    code.append(s);
                }
                
                String[] value = adaptiveAnnotation.value();
                // 沒有設置Key,則使用「擴展點接口名的點分隔 做爲Key
                if(value.length == 0) {
                    char[] charArray = type.getSimpleName().toCharArray();
                    StringBuilder sb = new StringBuilder(128);
                    for (int i = 0; i < charArray.length; i++) {
                        if(Character.isUpperCase(charArray[i])) {
                            if(i != 0) {
                                sb.append(".");
                            }
                            sb.append(Character.toLowerCase(charArray[i]));
                        }
                        else {
                            sb.append(charArray[i]);
                        }
                    }
                    value = new String[] {sb.toString()};
                }
                
                boolean hasInvocation = false;
                for (int i = 0; i < pts.length; ++i) {
                    if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
                        // Null Point check
                        String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
                        code.append(s);
                        s = String.format("\nString methodName = arg%d.getMethodName();", i); 
                        code.append(s);
                        hasInvocation = true;
                        break;
                    }
                }
                
                String defaultExtName = cachedDefaultName;
                String getNameCode = null;
                for (int i = value.length - 1; i >= 0; --i) {
                    if(i == value.length - 1) {
                        if(null != defaultExtName) {
                            if(!"protocol".equals(value[i]))
                                if (hasInvocation) 
                                    getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                                else
                                    getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
                            else
                                getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
                        }
                        else {
                            if(!"protocol".equals(value[i]))
                                if (hasInvocation) 
                                    getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                                else
                                    getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
                            else
                                getNameCode = "url.getProtocol()";
                        }
                    }
                    else {
                        if(!"protocol".equals(value[i]))
                            if (hasInvocation) 
                                getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                            else
                                getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
                        else
                            getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
                    }
                }
                code.append("\nString extName = ").append(getNameCode).append(";");
                // check extName == null?
                String s = String.format("\nif(extName == null) " +
                		"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
                        type.getName(), Arrays.toString(value));
                code.append(s);
                
                s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
                        type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
                code.append(s);
                
                // return statement
                if (!rt.equals(void.class)) {
                    code.append("\nreturn ");
                }

                s = String.format("extension.%s(", method.getName());
                code.append(s);
                for (int i = 0; i < pts.length; i++) {
                    if (i != 0)
                        code.append(", ");
                    code.append("arg").append(i);
                }
                code.append(");");
            }
            
            codeBuidler.append("\npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
            for (int i = 0; i < pts.length; i ++) {
                if (i > 0) {
                    codeBuidler.append(", ");
                }
                codeBuidler.append(pts[i].getCanonicalName());
                codeBuidler.append(" ");
                codeBuidler.append("arg" + i);
            }
            codeBuidler.append(")");
            if (ets.length > 0) {
                codeBuidler.append(" throws ");
                for (int i = 0; i < ets.length; i ++) {
                    if (i > 0) {
                        codeBuidler.append(", ");
                    }
                    codeBuidler.append(ets[i].getCanonicalName());
                }
            }
            codeBuidler.append(" {");
            codeBuidler.append(code.toString());
            codeBuidler.append("\n}");
        }
        codeBuidler.append("\n}");
        if (logger.isDebugEnabled()) {
            logger.debug(codeBuidler.toString());
        }
        return codeBuidler.toString();
    }
複製代碼

根據adaptive註解的value數組值,及SPI註解定義的默認擴展名,肯定適配邏輯,即擴展獲取的優先級,這裏再也不羅列代碼,下面爲一個具體生成的適配器源碼:

package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
    public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "netty")); // 處理順序
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.connect(arg0, arg1);
    }
    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("server", url.getParameter("transporter", "netty")); // 處理順序
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.bind(arg0, arg1);
    }
}
複製代碼

能夠看到,核心邏輯是獲取擴展名extName,以bind方法爲例,其獲取優先級是server,transporter,netty,可參見URL的getParameter方法源碼。其中netty是Transporter接口的SPI註解肯定的默認值,而server和transporter是bind方法的Adaptive註解定義的。

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
複製代碼

拿到擴展名後,再從ExtensionLoader獲取到擴展實例,調用具體的bind方法

源碼生成後,ExtensionLoader再調用默認的JavassitCompiler進行編譯和類加載,其具體實現原理不在本文討論範圍,有機會的話後續會介紹這部份內容。

綜上可知,ExtensionLoader提供了獲取擴展適配器的方法,優先查看是否有靜態適配器,不然會使用動態適配器

2.2.4 封裝類

dubbo中存在 一種對於擴展的封裝類,其功能是將各擴展實例串聯起來,造成擴展鏈,好比過濾器鏈,監聽鏈。當調用ExtensionLoader的getExtension方法時,會作攔截處理,若是存在封裝器,則返回封裝器實現,而將真實實現經過構造方法注入到封裝器中

@SuppressWarnings("unchecked")
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            // IOC 注入
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ") could not be instantiated: " + t.getMessage(), t);
        }
    }
    
    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                // 遍歷當前實例全部方法,判斷是否須要進行set屬性注入
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            // 經過ExtensionFactory獲取被注入set屬性實例
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }
複製代碼

這裏有個injectExtension方法,其做用是:

若是當前擴展實例存在其餘的擴展屬性,則經過反射調用其set方法設置擴展屬性。若該擴展屬性是適配器類型,也是經過ExtensionLoader獲取的。

因此,ExtensionLoader做爲一個IOC插件容器,爲dubbo的插件體系運做提供了保障,能夠說是dubbo中的核心。掌握了其基本原理,纔有助於咱們更好地分析dubbo源碼。

相關文章
相關標籤/搜索