要實現自定義擴展,有三個步驟(在spring中定義了兩個接口,用來實現擴展)java
如下是Dubbo-config模塊下的dubbo-config-springspring
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }
爲了在spring啓動的時候,也相應的啓動provider發佈服務註冊服務的過程,而同時爲了讓客戶端在啓動的時候自動訂閱發現服務,加入了兩個beanjson
ServiceBean、ReferenceBean。api
DisposableBean bean被銷燬的時候,spring容器會自動執行destory方法,好比釋放資源緩存
ApplicationContextAware 實現了這個接口的bean,當spring容器初始化的時候,會自動的將ApplicationContext注入進來安全
ApplicationListener ApplicationEvent事件監聽,spring容器啓動後會發一個事件通知app
BeanNameAware 得到自身初始化時,自己的bean的id屬性ide
那麼基本的實現思路能夠整理出來了函數
delay的使用性能
咱們發現,delay的做用就是延遲暴露,而延遲的方式也很直截了當,Thread.sleep(delay)
繼續看doExport(),最終會調用到doExportUrls()中:
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //經過proxyFactory來獲取Invoker對象 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //註冊服務 Exporter<?> exporter = protocol.export(invoker); //將exporter添加到list中 exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } }
看到這裏就比較明白dubbo的工做原理了doExportUrlsFor1Protocol方法,先建立兩個URL,分別以下
是否是以爲這個URL很眼熟,沒錯在註冊中心看到的services的providers信息就是這個
在上面這段代碼中能夠看到Dubbo的比較核心的抽象:Invoker, Invoker是一個代理類,從ProxyFactory中生成。
這個地方能夠作一個小結:
private static final Protocol protocol = ExtensionLoader. getExtensionLoader(Protocol.class). getAdaptiveExtension(); //Protocol$Adaptive
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } }
上面這段代碼作兩個事情
這個方法的主要做用是用來獲取ExtensionLoader實例表明的擴展的指定實現。已擴展實現的名字做爲參數,結合前面學習getAdaptiveExtension的代碼.
@SuppressWarnings("unchecked") public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } //判斷是否已經緩存過該擴展點 Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { //createExtension ,建立擴展點 instance = createExtension(name); holder.set(instance); } } } return (T) instance; }
這個方法主要作4個事情
@SuppressWarnings("unchecked") private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance);//對獲取的的和實例進行依賴注入 Set<Class<?>> wrapperClasses = cachedWrapperClasses;//cachedWrapperClasses是在loadFile中進行賦值的 if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { // 對實例進行包裝,分別調用帶Protocol參數的構造函數建立實例,而後進行依賴注入。 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
getExtensionClasses
這個方法以前在講自適應擴展點的時候講過了,其實就是加載擴展點實現類了。而後調用loadExtensionClasses,去對應文件下去加載指定的擴展點:
private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
url = registry://192.168.48.133:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-server&dubbo=2.6.0 &export=dubbo%3A%2F%2F172.31.225.23%3A20880%2Fcom.gupaoedu.dubbo.IGpHello%3Fanyhost%3Dtrue%26applica tion%3Ddubbo-server%26bind.ip%3D172.31.225.23%26bind.port%3D20880%26default.service.filter%3DtraceFilter%26dubbo% 3D2.6.0%26generic%3Dfalse%26interface%3Dcom.gupaoedu.dubbo.IGpHello%26methods%3DsayHello%26owner%3Dm ic%26pid%3D98424%26side%3Dprovider%26timestamp%3D1543485230853 &owner=mic&pid=98424®istry=zookeeper×tamp=1543485230834
因此,咱們能夠定位到RegistryProtocol好這個類中的export方法
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker , 本地發佈服務(啓動netty) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 訂閱override數據 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter實例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter; }
上面代碼中,protocol代碼是怎麼賦值的呢?咱們看看代碼,熟悉嗎?是一個依賴注入的擴展點。不熟悉的話,咱們再回想一下,在加載擴展點的時候,
private Protocol protocol; public void setProtocol(Protocol protocol) { this.protocol = protocol; }
dubbo://127.0.0.1/xxxx… 所以在Protocol$Adaptive.export方法中,ExtensionLoader.getExtension(Protocol.class).getExtension。應該就是基於DubboProtocol協議去發佈服務了嗎?若是是這樣,那大家太單純了。
(得到的DubboProtocol,在獲取過程當中createExtension 中就已經對其進行了依賴注入和裝飾器一層一層外套)
這裏並非得到一個單純的DubboProtocol擴展點,而是會經過Wrapper對Protocol進行裝飾,裝飾器分別爲: ProtocolFilterWrapper/ ProtocolListenerWrapper; 至於MockProtocol爲何不在裝飾器裏面呢?你們再回想一下咱們在看ExtensionLoader.loadFile這段代碼的時候,有一個判斷,裝飾器必需要具有一個帶有Protocol的構造方法,以下
public ProtocolFilterWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }
這兩個裝飾器是用來幹嗎的呢?咱們來分析下
這個類很是重要,dubbo機制裏面日誌記錄、超時等等功能都是在這一部分實現的
這個類有3個特色,
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } public void destroy() { protocol.destroy(); } //buildInvokerChain函數:它讀取全部的filter類,利用這些類封裝invoker private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);//自動激活擴展點,根據條件獲取當前擴展可自動激活的實現 if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
咱們看以下文件: /dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
其實就是對Invoker,經過以下的Filter組裝成一個責任鏈:
echo=com.alibaba.dubbo.rpc.filter.EchoFilter generic=com.alibaba.dubbo.rpc.filter.GenericFilter genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter token=com.alibaba.dubbo.rpc.filter.TokenFilter accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter context=com.alibaba.dubbo.rpc.filter.ContextFilter consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
ProtocolListenerWrapper
在這裏咱們能夠看到export和refer分別對應了不一樣的Wrapper;export是對應的ListenerExporterWrapper。
這塊暫時先不去分析,由於這個地方並無提供實現類。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); }
DubboProtocol.export
經過上面的代碼分析完之後,最終咱們可以定位到DubboProtocol.export方法。咱們看一下dubboProtocol的export方法:openServer(url)
export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //暴露服務 openServer(url); return exporter; }
openServer
private void openServer(URL url) { // find server. String key = url.getAddress();//192.168.11.156:20880 //client 也能夠暴露一個只有server能夠調用的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) {//沒有的話就是建立服務 serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
private ExchangeServer createServer(URL url) { //默認開啓server關閉時發送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默認開啓heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); }
getExchanger
public static Exchanger getExchanger(URL url) { //url中得到exchanger, 默認爲header String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
經過transporter.bind來進行綁定。
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
經過NettyTranport建立基於Netty的server服務
public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
在調用HeaderExchanger.bind方法的時候,是先new一個HeaderExchangeServer. 這個server是幹嗎呢? 是對當前這個鏈接去創建心跳機制
public class HeaderExchangeServer implements ExchangeServer { private final ScheduledExecutorService scheduled = Executors. newScheduledThreadPool(1,new NamedThreadFactory( "dubbo-remoting-server-heartbeat", true)); // 心跳定時器 private ScheduledFuture<?> heatbeatTimer; // 心跳超時,毫秒。缺省0,不會執行心跳。 private int heartbeat; private int heartbeatTimeout; private final Server server; private volatile boolean closed = false; public HeaderExchangeServer(Server server) { //..屬性賦值 //心跳 startHeatbeatTimer(); } private void startHeatbeatTimer() { //關閉心跳定時 stopHeartbeatTimer(); if (heartbeat > 0) { //每隔heartbeat時間執行一次 heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { //獲取channels public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels() ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat,TimeUnit.MILLISECONDS); } } //關閉心跳定時 private void stopHeartbeatTimer() { try { ScheduledFuture<?> timer = heatbeatTimer; if (timer != null && ! timer.isCancelled()) { timer.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } finally { heatbeatTimer =null; } }
心跳線程HeartBeatTask
直接從官方網站上扒了一個圖過來,,好這個圖顯示的很清楚了。