【Dubbo 源碼解析】03_Dubbo Protocol&Filter

Protocol & Filter

Dubbo 服務暴露和服務引用都是經過的 com.alibaba.dubbo.rpc.Protocol 來實現的。它是一個 SPI 擴展。html

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();
    
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
    void destroy();
}

 

SPI 擴展文件 META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 內容:web

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper

 

根據前面 Dubbo SPI 的分析,咱們能夠知道 Dubbo 會默認使用名稱爲 "dubbo" 的 Protocol 協議(能夠經過配置去 override)。 若是咱們經過 Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension();去獲取 Protocol 的話,則 SPI 擴展文件中的 wrapper 類會包裝在 DubboProtocol 實例上。因此,咱們獲取到的 extension 其實是一個 Wrappered Extention。redis

 

ProtocolFilterWrapper

在 Protocol 的 Extension 中, ProtocolFilterWrapper 爲 Protocol 的 export() 和 refer() 方法添加了一層 Filter 擴展,它會在服務引用和服務消費時,爲 Invoker 實體域上包裝一層層的 Filter 來作代碼加強(AOP)。也就是在 Invoker 被執行以前會通過一堆的 Filter 的處理。緩存

若是說 SPI 是 Dubbo 的靜態擴展能力的話,那麼 Filter 就是 Dubbo 的動態擴展能力。它能經過 URL 中的參數,來動態拼裝 filter。app

ProtocolFilterWrapper 會獲取全部被激活的 Filter(@Activate),而後爲 Invoker 構建一條 filter 鏈。閱讀源碼,咱們會發現,export() 和 refer() 過程當中都會構建 filter 鏈,也就是說,在服務被調用時 consumer 端會先通過一層 filter 的調用,而後經過 netty 調用到 provider 端;而後 provider 端再通過一層filter 以後,再去調用真正的服務接口實現。jvm

public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
    // 構建 Invoker Chain
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        // 獲取被激活的 Filter 擴展
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
​
    @Override
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }
​
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
​
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
​
    @Override
    public void destroy() {
        protocol.destroy();
    }
​
}

 

 

SPI 擴展文件 META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter 內容:tcp

cache=com.alibaba.dubbo.cache.filter.CacheFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

 

咱們能夠添加自定義的 filter 擴展。ide

 

DubboProtocol#export(Invoker<T> invoker)

Dubbo 服務暴露時,首先會建立一個 DubboExporter,而後再經過 netty 開啓服務端口監聽。memcached

DubboExporter 的做用是緩存 Invoker,方便後續操做獲取 Invoker。ui

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    ......
    // 開啓服務監聽
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

 

 

 

DubboProtocol#refer(Class<T> type,URL url)

Dubbo 服務引用時,首先建立一條與 provider 的 tcp 鏈接,而後再建立一個 DubboInvoker。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // create rpc invoker. 同時,建立一條與 provider 的 tcp 鏈接
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

 

建立鏈接:ExchangeClient[] getClients(@NotNull URL url)

相關文章
相關標籤/搜索