Dubbo 服務暴露和服務引用都是經過的 com.alibaba.dubbo.rpc.Protocol
來實現的。它是一個 SPI 擴展。html
@SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; void destroy(); }
web
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol registry=com.alibaba.dubbo.registry.integration.RegistryProtocol qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper
根據前面 Dubbo SPI 的分析,咱們能夠知道 Dubbo 會默認使用名稱爲 "dubbo" 的 Protocol 協議(能夠經過配置去 override)。 若是咱們經過 Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension();
去獲取 Protocol 的話,則 SPI 擴展文件中的 wrapper 類會包裝在 DubboProtocol 實例上。因此,咱們獲取到的 extension 其實是一個 Wrappered Extention。redis
在 Protocol 的 Extension 中, ProtocolFilterWrapper 爲 Protocol 的 export() 和 refer() 方法添加了一層 Filter 擴展,它會在服務引用和服務消費時,爲 Invoker 實體域上包裝一層層的 Filter 來作代碼加強(AOP)。也就是在 Invoker 被執行以前會通過一堆的 Filter 的處理。緩存
若是說 SPI 是 Dubbo 的靜態擴展能力的話,那麼 Filter 就是 Dubbo 的動態擴展能力。它能經過 URL 中的參數,來動態拼裝 filter。app
ProtocolFilterWrapper 會獲取全部被激活的 Filter(@Activate),而後爲 Invoker 構建一條 filter 鏈。閱讀源碼,咱們會發現,export() 和 refer() 過程當中都會構建 filter 鏈,也就是說,在服務被調用時 consumer 端會先通過一層 filter 的調用,而後經過 netty 調用到 provider 端;而後 provider 端再通過一層filter 以後,再去調用真正的服務接口實現。jvm
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } // 構建 Invoker Chain private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { // 獲取被激活的 Filter 擴展 Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } @Override public int getDefaultPort() { return protocol.getDefaultPort(); } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } @Override public void destroy() { protocol.destroy(); } }
SPI 擴展文件 META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter 內容:tcp
cache=com.alibaba.dubbo.cache.filter.CacheFilter validation=com.alibaba.dubbo.validation.filter.ValidationFilter echo=com.alibaba.dubbo.rpc.filter.EchoFilter generic=com.alibaba.dubbo.rpc.filter.GenericFilter genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter token=com.alibaba.dubbo.rpc.filter.TokenFilter accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter context=com.alibaba.dubbo.rpc.filter.ContextFilter consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
咱們能夠添加自定義的 filter 擴展。ide
Dubbo 服務暴露時,首先會建立一個 DubboExporter,而後再經過 netty 開啓服務端口監聽。memcached
DubboExporter 的做用是緩存 Invoker,方便後續操做獲取 Invoker。ui
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); ...... // 開啓服務監聽 openServer(url); optimizeSerialization(url); return exporter; }
Dubbo 服務引用時,首先建立一條與 provider 的 tcp 鏈接,而後再建立一個 DubboInvoker。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. 同時,建立一條與 provider 的 tcp 鏈接 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }