Dubbo源碼分析(二)Dubbo服務發佈Export

服務發佈

打印的日誌

[INFO ]  com.alibaba.dubbo.config.AbstractConfig {ServiceBean.java:107} -  [DUBBO] The service ready on spring started. service: com.alibaba.dubbo.demo.DemoService, dubbo version: 2.0.0, current host: 127.0.0.1
[INFO ]  com.alibaba.dubbo.config.AbstractConfig {ServiceConfig.java:575} -  [DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1
[INFO ]  com.alibaba.dubbo.config.AbstractConfig {ServiceConfig.java:535} -  [DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://192.168.31.132:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=948&side=provider&timestamp=1538476342717, dubbo version: 2.0.0, current host: 127.0.0.1
[INFO ]  com.alibaba.dubbo.config.AbstractConfig {ServiceConfig.java:546} -  [DUBBO] Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://192.168.31.132:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1173&side=provider&timestamp=1538490950321 to registry registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=1173&registry=zookeeper&timestamp=1538490950209, dubbo version: 2.0.0, current host: 127.0.0.1
[INFO ]  com.alibaba.dubbo.remoting.transport.AbstractServer {AbstractServer.java:64} -  [DUBBO] Start NettyServer bind /0.0.0.0:20880, export /192.168.31.132:20880, dubbo version: 2.0.0, current host: 127.0.0.
[INFO ]  com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry {AbstractRegistry.java:223} -  [DUBBO] Load registry store file /Users/isz_pm/.dubbo/dubbo-registry-localhost.cache, data: {
[INFO ]  org.apache.zookeeper.ZooKeeper {ZooKeeper.java:438} - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@26fc13bc

複製代碼

咱們經過啓動provider,看下控制檯的日誌輸出,基本上能夠看出Dubbo的服務發佈的幾個步驟php

Dubbo怎麼和Spring融合

Spring 提供了可擴展的Schema,Dubbo是怎麼擴展的呢,先看下Dubbo的配置文件:java

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方應用信息,用於計算依賴關係 -->
    <dubbo:application name="demo-provider"/>
    <!-- 使用multicast廣播註冊中心暴露服務地址 -->
    <dubbo:registry address="zookeeper://localhost:2181"/>
    <!-- 用dubbo協議在20880端口暴露服務 -->
    <dubbo:protocol name="dubbo" port="20880"/>
    <!-- 聲明須要暴露的服務接口 -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
    <!-- 和本地bean同樣實現服務 -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
</beans>
複製代碼

我看能夠看到http://code.alibabatech.com/schema/dubbo/dubbo.xsd,從源碼中咱們找到:spring

要想擴展Schema,須要這五個文件,具體的做用就不說了,直接看DubboNamespaceHandler類

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }
}
複製代碼

這裏面初始化了Dubbo配置文件,剩下的就很少說了,直接看ServiceBean中Dubbo是怎麼發佈服務的apache

暴露本地服務

咱們先看ServiceBean,這個是Dubbo服務發佈的入口代碼bootstrap

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
複製代碼

這個類實現了不少接口,其中的ApplicationListener接口,實現了onApplicationEvent()事件方法,緩存

public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            //判斷是否須要延遲發佈
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                //調用發佈方法
                export();
            }
        }
    }
複製代碼

進入export()方法bash

public synchronized void export() {
        ···
        if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        //1.若是須要延遲發佈,直接sleep
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            //調用發佈方法
            doExport();
        }
    }
複製代碼

進入doExport()方法,繼續各類check,進入doExportUrls()方法session

private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
複製代碼

這裏registryURL是啥,看debug結果: app

loadRegistries()方法中返回了一個URL的List,咱們看下這個URL是以registry開頭的,localhost:2181是咱們配置在xml中<dubbo:registry address="zookeeper://localhost:2181"/>的這段代碼中獲取的,URL後面都是須要的數據,有沒有理解上一篇中說的一句話, Dubbo是基於URL驅動的
那什麼會有多個URL呢,由於咱們在xml中能夠配置多個dubbo:registry,能夠把服務發佈到多個註冊中心
調用doExportUrlsFor1Protocol()方法,此時protocolConfig是<dubbo:protocol name="dubbo" port="20880"/>

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        ···
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
        
        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置爲none不暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露本地服務)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }
複製代碼

先看服務本地發佈exportLocal()方法
此時的url=dubbo://192.168.31.132:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=843&side=provider&timestamp=1538464205631框架

private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }
複製代碼

繼續組裝url,local=injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=843&side=provider&timestamp=1538464205631
執行proxyFactory.getInvoker(ref, (Class) interfaceClass, local),這個時候的proxyFactory是什麼?

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
複製代碼

這個時候proxyFactory是一個動態適配器代理類ProxyFactory$Adpative,咱們把這個代理類拿出來

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
}
複製代碼

這個時候執行proxyFactory.getInvoker(ref, (Class) interfaceClass, local),這個時候的local是injvm開頭,因此在getInvoke方法中,extName=javassist,同理執行getExtension("javassist")方法,這個時候應該返回什麼呢?是JavassistProxyFactory嗎?不是,由於

咱們看到,這個時候有一個包裝類,因此這個時候返回的是StubProxyFactoryWrapper,因此這個時候執行的是StubProxyFactoryWrapper.getInvoker方法

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
}
複製代碼

先理一下這個方法的三個參數,proxy是接口實現類的引用,type是接口,url是injvm開頭的
這個時候的proxyFactory是什麼?咱們知道Dubbo的IOC幫咱們注入了包裝類須要的實例參數,因此這個時候proxyFactory是JavassistProxyFactory,

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
複製代碼

先看下Wrapper wrapper = Wrapper.getWrapper()方法,傳入的參數是proxy.getClass()就是咱們的實現類DemoServiveImpl的Class,

public static Wrapper getWrapper(Class<?> c) {
        ···
            ret = makeWrapper(c);
            WRAPPER_MAP.put(c, ret);
        }
        return ret;
    }
複製代碼

這個裏面的makeWrapper方法就是建立一個動態代理類,咱們看下這個方法的部份內容:

private static Wrapper makeWrapper(Class<?> c) {
    ···
        StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
        StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
        StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
    ···
    return (Wrapper) wc.newInstance();
    ···
複製代碼

這裏生成了invokeMethod方法,最終把這個對象實例化,咱們猜測一下,Dubbo的消費端來調用服務端的接口的時候,是否是經過這個代理對象中的invokerMethod方法最終去執行接口實現類的代碼呢? 咱們看到 new AbstractProxyInvoker(proxy, type, url)中重寫了一個doInvoke方法,這個doInvoke方法中調用了wrapper.invokeMethod(),這個wrapper就是剛剛生成的wrapper
接着回到ServiceConfig.exportLocal()方法中
執行Exporter<?> exporter = protocol.export(invoker);這個時候protocol是什麼?貼代碼

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
複製代碼

這個時候protocol=Protocol$Adpative,執行Protocol$Adpative.export方法
Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("injvm"), 這個時候獲得的extension什麼?是ProtocolListenrWrapper(ProtocolFiterWrapper(InjvmProtocol)),這樣的一個wrapper對象,執行export方法,這裏的Fiter會生成8個Fiter鏈,看下InjvmProtocol中的export方法,目的是exporterMap.put(key, this) 此時,key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
       return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
   }
   InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
       super(invoker);
       this.key = key;
       this.exporterMap = exporterMap;
       exporterMap.put(key, this);
   }
複製代碼

最終獲得一個ListenerExportWrapper,放到exporters中,到此,本地服務發佈完成

[INFO ]  com.alibaba.dubbo.config.AbstractConfig {ServiceConfig.java:575} -  [DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1
複製代碼

暴露遠程服務

從這段代碼開始讀

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
複製代碼

registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()),這個返回一個URL=registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.31.132%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D948%26side%3Dprovider%26timestamp%3D1538476342717&pid=948&registry=zookeeper&timestamp=1538476342610
這個時候proxyFactory一樣是Protocol$Adpative,進入getInvoker方法: ProxyFactory extension = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");
這裏extension=StubProxyFactoryWrapper包裝類,繼續執行getInvoker()方法,最終在JavassistProxyFactory的getInvoker中 new AbstractProxyInvoker(proxy, type, url)並返回,這個就是咱們須要的invoker
繼續執行Exporter<?> exporter = protocol.export(invoker);
此時protocol=Protocol$Adaptive ,執行Protocol$Adaptive.export方法

Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
return extension.export(arg0);
複製代碼

同理extension=ProtocolListenrWrapper(ProtocolFiterWrapper(RegistryProtocol)),直接進入RegistryProtocol.export方法中:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
       //export invoker 先啓動本地監聽服務
       final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
       //registry provider
       final Registry registry = getRegistry(originInvoker);
       final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
       registry.register(registedProviderUrl);
       // 訂閱override數據
       // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。
       final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
       final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
       overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
       registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
       //保證每次export都返回一個新的exporter實例
       return new Exporter<T>() {
           public Invoker<T> getInvoker() {
               return exporter.getInvoker();
           }

           public void unexport() {
               try {
                   exporter.unexport();
               } catch (Throwable t) {
                   logger.warn(t.getMessage(), t);
               }
               try {
                   registry.unregister(registedProviderUrl);
               } catch (Throwable t) {
                   logger.warn(t.getMessage(), t);
               }
               try {
                   overrideListeners.remove(overrideSubscribeUrl);
                   registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
               } catch (Throwable t) {
                   logger.warn(t.getMessage(), t);
               }
           }
       };
   }

複製代碼

進入doLocalExport:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
       String key = getCacheKey(originInvoker);
       ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
       if (exporter == null) {
           synchronized (bounds) {
               exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
               if (exporter == null) {
                   final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                   exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                   bounds.put(key, exporter);
               }
           }
       }
       return (ExporterChangeableWrapper<T>) exporter;
   }
複製代碼

此時,咱們看到(Exporter) protocol.export(invokerDelegete)這段代碼,這個時候protocol是什麼呢,是Protocol$Adaptive,何時時候賦值的呢,是加載擴展點的時候,有個injectExtension方法,依賴注入了protocol
繼續執行Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo"),這個extension是ProtocolListenrWrapper(ProtocolFiterWrapper(DubboProtocol))
執行DubboProtocol.export方法:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        return exporter;
    }
複製代碼

serviceKey(url), 這裏咱們獲得一個key=com.alibaba.dubbo.demo.DemoService:20880,目的是要exporterMap.put(key, this)// key=com.alibaba.dubbo.demo.DemoService:20880, this=DubboExporter ,這裏其中最重要的一點就是將invoker轉化爲exporter,專題最後咱們會分析一下全部的invoker

終於咱們看到了一段代碼,openServer(url),千幸萬苦,咱們要開始啓動Netty服務了,先休息一下,寫的好累啊

啓動Netty服務

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也能夠暴露一個只有server能夠調用的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }
複製代碼

首先獲得一個key=ip:端口,再調用createServer(),建立服務,開啓心跳檢測,默認使用 netty。組裝 url

private ExchangeServer createServer(URL url) {
        //默認開啓server關閉時發送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默認開啓heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
複製代碼

進入 Exchangers.bind(url, requestHandler)方法

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
       ···
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
複製代碼

進入getExchanger(url)方法:

public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
//此時type=header
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
複製代碼

這裏返回HeaderExchanger,進入HeaderExchanger.bind方法

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
複製代碼

這裏初始化一個new DecodeHandler(new HeaderExchangeHandler(handler)),進入Transporters.bind()方法,這裏進入transporter層

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ···
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
複製代碼

進入getTransporter()方法

public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
複製代碼

老規矩,先看下Transporter接口

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}
複製代碼

@SPI註解,默認netty,兩個方法,一個bind方法,一個connect方法

獲取自適應適配器擴展點Transporter Adpative,進入TransporterAdpative.bind()方法,執行

ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
複製代碼

執行NettyTransport.bind方法

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
}
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
       ···
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
複製代碼

AbstractServer的構造方法中,進入doOpen();這個是一個抽象方法,具體實現再NettyTransport中,這裏能夠看到具體的實現都是在各自的擴展類中去實現,這些不一樣的擴展類會有一些公共的方法,就能夠提取出來一個抽象類去實現

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
複製代碼

在這裏,咱們看到了熟悉的netty代碼,設置 NioServerSocketChannelFactory boss worker的線程池 線程個數爲3,再設置設置編解碼和hander處理類, 回到new HeaderExchangeServer()方法中

public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        //這是一個心跳定時器,採用了線程池,若是斷開就心跳重連。
        startHeatbeatTimer();
}
複製代碼
private void startHeatbeatTimer() {
       stopHeartbeatTimer();
       if (heartbeat > 0) {
       //每隔 heartbeat 時間執行一次,此時默認爲60000ms
           heatbeatTimer = scheduled.scheduleWithFixedDelay(
                   new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                       public Collection<Channel> getChannels() {
                       //獲取channels
                           return Collections.unmodifiableCollection(
                                   HeaderExchangeServer.this.getChannels());
                       }
                   }, heartbeat, heartbeatTimeout),
                   heartbeat, heartbeat, TimeUnit.MILLISECONDS);
       }
   }
複製代碼

到此,Netty服務已經啓動

鏈接註冊中心

咱們回到Registry.export()方法中,

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider 如今咱們要從這裏開始鏈接註冊中心
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 訂閱override數據
        // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保證每次export都返回一個新的exporter實例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }
複製代碼

進入getRegistry()方法,根據invoker的地址獲取registry實例

private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryFactory.getRegistry(registryUrl);
}
複製代碼

此時,registryUrl=zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.31.132%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D1122%26side%3Dprovider%26timestamp%3D1538487268569&pid=1122&timestamp=1538487268461
此時registryFactory=RegistryFactory$Adaptive ,執行getRegistry方法
ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper");這裏獲得ZookeeperRegistryFactory,執行ZookeeperRegistryFactory.getRegistry()

public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();
        // 鎖定註冊中心獲取過程,保證註冊中心單一實例
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // 釋放鎖
            LOCK.unlock();
        }
    }

複製代碼

進入createRegistry()方法:

public Registry createRegistry(URL url) {
       return new ZookeeperRegistry(url, zookeeperTransporter);
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
       super(url);
       if (url.isAnyHost()) {
           throw new IllegalStateException("registry address == null");
       }
       String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
       if (!group.startsWith(Constants.PATH_SEPARATOR)) {
           group = Constants.PATH_SEPARATOR + group;
       }
       //設置根節點
       this.root = group;
       zkClient = zookeeperTransporter.connect(url);
       zkClient.addStateListener(new StateListener() {
           public void stateChanged(int state) {
               if (state == RECONNECTED) {
                   try {
                       recover();
                   } catch (Exception e) {
                       logger.error(e.getMessage(), e);
                   }
               }
           }
       });
   }
複製代碼

首先AbstractRegistry抽象類中,調用loadProperties()目的把註冊的服務緩存到本地

而後建立鏈接,zkClient = zookeeperTransporter.connect(url);進入ZkclientZookeeperTransporter.connect方法

public ZookeeperClient connect(URL url) {
       return new ZkclientZookeeperClient(url);
}
public ZkclientZookeeperClient(URL url) {
       super(url);
       //建立zk鏈接
       client = new ZkClient(url.getBackupAddress());
       //訂閱的目標:鏈接斷開,重連
       client.subscribeStateChanges(new IZkStateListener() {
           public void handleStateChanged(KeeperState state) throws Exception {
               ZkclientZookeeperClient.this.state = state;
               if (state == KeeperState.Disconnected) {
                   stateChanged(StateListener.DISCONNECTED);
               } else if (state == KeeperState.SyncConnected) {
                   stateChanged(StateListener.CONNECTED);
               }
           }

           public void handleNewSession() throws Exception {
               stateChanged(StateListener.RECONNECTED);
           }
       });
}
複製代碼

建立zk鏈接後,返回到ZookeeperRegistry的構造方法中,

zkClient.addStateListener(new StateListener() {
           public void stateChanged(int state) {
               if (state == RECONNECTED) {
                   try {
                       recover();
                   } catch (Exception e) {
                       logger.error(e.getMessage(), e);
                   }
               }
           }
       });
複製代碼

recover方法做用是鏈接失敗 重連
回到ResigstryProtocol.export()方法,執行registry.register(registedProviderUrl); 調用 FailbackRegistry 類中的 register. 爲何呢?由於 ZookeeperRegistry 這個類中並無 register 這個方法,可是他的父類 FailbackRegistry 中存在 register 方法,而這個類又重寫了 AbstractRegistry 類中的 register 方法。因此咱們能夠直接定位大 FailbackRegistry 這個類 中的 register 方法中

public FailbackRegistry(URL url) {
       super(url);
       int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
       this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
           public void run() {
               // 檢測並鏈接註冊中心
               try {
               //失敗重連
                   retry();
               } catch (Throwable t) { // 防護性容錯
                   logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
               }
           }
       }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
   }
複製代碼

FailbackRegistry,從名字上來看,是一個失敗重試機制
調用父類的register方法,講當前url添加到緩存集合中
調用 doRegister 方法,這個方法很明顯,是一個抽象方法,會由ZookeeperRegistry 子類實現

protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
}
複製代碼

在這裏想zk註冊服務,最終實現註冊的是AbstractZookeeperClient.create方法

public void create(String path, boolean ephemeral) {
        int i = path.lastIndexOf('/');
        if (i > 0) {
            create(path.substring(0, i), false);
        }
        if (ephemeral) {
            createEphemeral(path);
        } else {
            createPersistent(path);
        }
    }
複製代碼

後續的註冊監聽的代碼就不分析了,服務端去zk註冊一個監聽,當zk節點發生變化時,通知服務端處理
至此,Dubbo服務的發佈源碼分析完成

總結

從Dubbo官網上找到一張服務暴露的時序圖

相關文章
相關標籤/搜索