聊聊Dubbo(九):核心源碼-服務端啓動流程2

3 ServiceConfig#doExportUrlsFor1Protocol 重點分析

3.1 組裝URL所需參數

String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
複製代碼

Step1:用 Map 存儲該協議的全部配置參數,包括:協議名稱、Dubbo版本、當前系統時間戳、進程ID、application配置、module配置、默認服務提供者參數(ProviderConfig)、協議配置、服務提供 Dubbo:service 的屬性。java

if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (arguments != null && !arguments.isEmpty()) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods != null && methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }
複製代碼

Step2:若是 dubbo:servicedubbo:method 子標籤,則 dubbo:method 以及其子標籤的配置屬性,都存入到 Map 中,屬性名稱加上對應的方法名做爲前綴。dubbo:method 的子標籤 dubbo:argument,其鍵爲方法名.參數序號。redis

if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
複製代碼

Step3:添加 methods 鍵值對,存放 dubbo:service 的全部方法名,多個方法名用 , 隔開,若是是泛化實現,填充 genric=true,methods「*」spring

if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }
複製代碼

Step4:根據是否開啓令牌機制,若是開啓,設置 token 鍵,值爲靜態值或 uuidbootstrap

if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
複製代碼

Step5:若是協議爲本地協議( injvm ),則設置 protocolConfig#register 屬性爲 false ,表示不向註冊中心註冊服務,在 map 中存儲鍵爲 notify,值爲 false,表示當註冊中心監聽到服務提供者發生變化(服務提供者增長、服務提供者減小等)事件時不通知。api

String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }
複製代碼

Step6:設置協議的 contextPath,若是未配置,默認爲 /interfacename緩存

String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
複製代碼

Step7:解析服務提供者的IP地址與端口。安全

private String findConfigedHosts(ProtocolConfig protocolConfig, List<URL> registryURLs, Map<String, String> map) {
        boolean anyhost = false;

        String hostToBind = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_BIND);
        if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) {
            throw new IllegalArgumentException("Specified invalid bind ip from property:" + Constants.DUBBO_IP_TO_BIND + ", value:" + hostToBind);
        }

        // if bind ip is not found in environment, keep looking up
        if (hostToBind == null || hostToBind.length() == 0) {
            hostToBind = protocolConfig.getHost();
            if (provider != null && (hostToBind == null || hostToBind.length() == 0)) {
                hostToBind = provider.getHost();
            }
            if (isInvalidLocalHost(hostToBind)) {
                anyhost = true;
                try {
                    hostToBind = InetAddress.getLocalHost().getHostAddress();
                } catch (UnknownHostException e) {
                    logger.warn(e.getMessage(), e);
                }
                if (isInvalidLocalHost(hostToBind)) {
                    if (registryURLs != null && !registryURLs.isEmpty()) {
                        for (URL registryURL : registryURLs) {
                            if (Constants.MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) {
                                // skip multicast registry since we cannot connect to it via Socket
                                continue;
                            }
                            try {
                                Socket socket = new Socket();
                                try {
                                    SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                                    socket.connect(addr, 1000);
                                    hostToBind = socket.getLocalAddress().getHostAddress();
                                    break;
                                } finally {
                                    try {
                                        socket.close();
                                    } catch (Throwable e) {
                                    }
                                }
                            } catch (Exception e) {
                                logger.warn(e.getMessage(), e);
                            }
                        }
                    }
                    if (isInvalidLocalHost(hostToBind)) {
                        hostToBind = getLocalHost();
                    }
                }
            }
        }

        map.put(Constants.BIND_IP_KEY, hostToBind);

        // registry ip is not used for bind ip by default
        String hostToRegistry = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        } else if (hostToRegistry == null || hostToRegistry.length() == 0) {
            // bind ip is used as registry ip by default
            hostToRegistry = hostToBind;
        }

        map.put(Constants.ANYHOST_KEY, String.valueOf(anyhost));

        return hostToRegistry;
    }
複製代碼

服務IP地址解析順序:(序號越小越優先)bash

  1. 系統環境變量,變量名:DUBBO_DUBBO_IP_TO_BIND
  2. 系統屬性,變量名:DUBBO_DUBBO_IP_TO_BIND
  3. 系統環境變量,變量名:DUBBO_IP_TO_BIND
  4. 系統屬性,變量名:DUBBO_IP_TO_BIND
  5. dubbo:protocol 標籤的 host 屬性 --> dubbo:provider 標籤的 host 屬性
  6. 默認網卡IP地址,經過 InetAddress.getLocalHost().getHostAddress() 獲取,若是IP地址不符合要求,繼續下一個匹配。
// 判斷IP地址是否符合要求的標準
   public static boolean isInvalidLocalHost(String host) {
       return host == null
               || host.length() == 0
               || host.equalsIgnoreCase("localhost")
               || host.equals("0.0.0.0")
               || (LOCAL_IP_PATTERN.matcher(host).matches());
   }
複製代碼
  1. 選擇第一個可用網卡,其實現方式是創建 socket,鏈接註冊中心,獲取 socket 的IP地址
Socket socket = new Socket();
       try {
             SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
             socket.connect(addr, 1000);
             hostToBind = socket.getLocalAddress().getHostAddress();
             break;
        } finally {
             try {
                     socket.close();
             } catch (Throwable e) {
             }
       }
複製代碼
private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) {
        Integer portToBind = null;

        // parse bind port from environment
        String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
        portToBind = parsePort(port);

        // if there's no bind port found from environment, keep looking up.
        if (portToBind == null) {
            portToBind = protocolConfig.getPort();
            if (provider != null && (portToBind == null || portToBind == 0)) {
                portToBind = provider.getPort();
            }
            final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
            if (portToBind == null || portToBind == 0) {
                portToBind = defaultPort;
            }
            if (portToBind == null || portToBind <= 0) {
                portToBind = getRandomPort(name);
                if (portToBind == null || portToBind < 0) {
                    portToBind = getAvailablePort(defaultPort);
                    putRandomPort(name, portToBind);
                }
                logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
            }
        }

        // save bind port, used as url's key later
        map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));

        // registry port, not used as bind port by default
        String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
        Integer portToRegistry = parsePort(portToRegistryStr);
        if (portToRegistry == null) {
            portToRegistry = portToBind;
        }

        return portToRegistry;
    }
複製代碼

服務提供者端口解析順序:(序號越小越優先)服務器

  1. 系統環境變量,變量名:DUBBO_DUBBO_PORT_TO_BIND
  2. 系統屬性,變量名:DUBBO_DUBBO_PORT_TO_BIND
  3. 系統環境變量,變量名:DUBBO_PORT_TO_BIND
  4. 系統屬性,變量名:DUBBO_PORT_TO_BIND
  5. dubbo:protocol 標籤 port 屬性 --> dubbo:provider 標籤的 port 屬性。
  6. 隨機選擇一個端口。

3.2 封裝URL實例

URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
複製代碼

Step8:根據協議名稱、協議 host、協議端口、contextPath、相關配置屬性(applicationmoduleproviderprotocolConfigservice 及其子標籤)構建服務提供者URI。網絡

URL運行效果圖,以下:

URL運行效果圖

3.3 構建Invoker實例

String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // @ 代碼1

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { // @ 代碼2
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) { // @ 代碼3
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // @ 代碼4
                        URL monitorUrl = loadMonitor(registryURL); // @ 代碼5
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代碼6
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker); // 代碼7
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
複製代碼

Step9:獲取 dubbo:service 標籤的 scope 屬性,其可選值爲 none (不暴露)、local (本地)、remote (遠程),若是配置爲 none,則不暴露。默認爲 local

Step10:根據 scope 來暴露服務,若是 scope 不配置,則默認本地與遠程都會暴露,若是配置成 localremote,那就只能是二選一。

代碼1:若是 scope 不爲 remote,則先在本地暴露( injvm ),具體暴露服務的具體實現,將在remote 模式中詳細分析。

代碼2:若是 scope 不爲 local,則將服務暴露在遠程。

代碼3remote 方式,檢測當前配置的全部註冊中心,若是註冊中心不爲空,則遍歷註冊中心,將服務依次在不一樣的註冊中心進行註冊。

代碼4:若是 dubbo:servicedynamic 屬性未配置, 嘗試取 dubbo:registrydynamic 屬性,該屬性的做用是否啓用動態註冊,若是設置爲 false,服務註冊後,其狀態顯示爲 disable,須要人工啓用,當服務不可用時,也不會自動移除,一樣須要人工處理,此屬性不要在生產環境上配置。

代碼5:根據註冊中心URL,構建監控中心的URL,若是監控中心URL不爲空,則在服務提供者URL上追加 monitor,其值爲監控中心URL(已編碼)。

1)若是dubbo spring xml配置文件中沒有配置監控中心(dubbo:monitor),就從系統屬性-Ddubbo.monitor.address,-Ddubbo.monitor.protocol構建MonitorConfig對象,不然從dubbo的properties配置文件中尋找這個兩個參數,若是沒有配置,則返回null。 
2)若是有配置,則追加相關參數,dubbo:monitor標籤只有兩個屬性:address、protocol,其次會追加interface(MonitorService)、協議等。
複製代碼

代碼6:經過動態代理機制建立 Invoker,Dubbo的遠程調用實現類。

經過動態代理機制建立Invoker

Dubbo遠程調用器如何構建,這裏不詳細深刻,重點關注WrapperInvoker的url爲:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6328%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527255510215
&pid=6328
&qos.port=22222
&registry=zookeeper
&timestamp=1527255510202
複製代碼

這裏有兩個重點值得關注:

  1. path屬性com.alibaba.dubbo.registry.RegistryService,註冊中心也相似於服務提供者。
  2. export屬性:值爲服務提供者的URL,爲何須要關注這個URL呢?請看代碼7,protocol 屬性爲 Protocol$Adaptive,Dubbo在加載組件實現類時採用SPI(關於SPI細節,可參閱《☆聊聊Dubbo(五):核心源碼-SPI擴展》 ),在這裏咱們只須要知道,根據URL冒號以前的協議名將會調用相應的方法。

Protocol 適配實現類實例

其映射關係(列出與服務啓動相關協議實現類):

dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol // 文件位於dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol // 文件位於dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 
複製代碼

代碼7:根據代碼6的分析,將調用 RegistryProtocol#export 方法。

這裏很重要的是 Invoker 實例,做爲Dubbo的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起invoke調用,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現

因此,下面重點分析 代碼6 & 代碼7 兩處代碼實現,源碼以下:

// 使用ProxyFactory將服務實現封裝成一個Invoker對象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代碼6

// 根據指定協議本地暴露和向註冊中心註冊服務
Exporter<?> exporter = protocol.export(invoker); // @ 代碼7

//用於unexport
exporters.add(exporter);
複製代碼

上面 proxyFactoryprotocol 兩個變量,具體定義以下:

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();  
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 
複製代碼

Invoker 實例從 proxyFactory 獲取,而 proxyFactory 在這裏實際是個適配器,經過調用 getAdaptiveExtension() 方法,會以拼接源碼的方式動態生成目標ProxyFactory Class,生成的Class方法中會獲取 url 中的參數來構建合適的具體實現對象,若是 url 中未配置,則使用 @SPI 配置的默認值。

查看 ProxyFactoryProtocol 接口,默認 ProxyFactory 實現爲 JavassistProxyFactory,默認 Protocol 實現爲 DubboProtocol。源碼以下:

// 默認javassist
@SPI("javassist")
public interface ProxyFactory {  
    ...
}

// 默認dubbo
@SPI("dubbo")
public interface Protocol {  
    ...
}
複製代碼

ExtensionLoader#getAdaptiveExtension() 調用棧,以下:

ExtensionLoader<T>.getAdaptiveExtension()  
    ExtensionLoader<T>.createAdaptiveExtension()
        ExtensionLoader<T>.getAdaptiveExtensionClass()
            ExtensionLoader<T>.createAdaptiveExtensionClass()
                ExtensionLoader<T>.createAdaptiveExtensionClassCode()
複製代碼

最終,生成目標ProxyFactory Class,源碼以下:

public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}
複製代碼

能夠看到,在上面的 getInvoker 方法中,會優先獲取 proxy 擴展,不然默認獲取 javassist 擴展。通常狀況下,咱們未主動擴展配置代理工廠的話,使用 JavassistProxyFactory,源碼以下:

public class JavassistProxyFactory extends AbstractProxyFactory {
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}
複製代碼

注意到這裏的入參包括 proxy 服務實例和其接口類型,由於須要對服務進行代理封裝,最終是生成一個 AbstractProxyInvoker 實例,其 doInvoker 方法成爲服務調用的入口。如下是具體的封裝過程:

public static Wrapper getWrapper(Class<?> c) {
    while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
        c = c.getSuperclass();
    if( c == Object.class )
        return OBJECT_WRAPPER;
    Wrapper ret = WRAPPER_MAP.get(c);
    if( ret == null )
    {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c,ret);
    }
    return ret;
}
複製代碼

具體的 makeWrapper 方法是利用 javassist 技術動態構造 Wapper 類型並建立實例,源碼較長這裏再也不列出,如下是 Wapper 類型的 invokeMethod 方法源碼(注意是 javasssit 語法形式):

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { 
    indi.cesc.inno.learn.dubbo.HelloService w; 
    try{ 
        w = ((indi.cesc.inno.learn.dubbo.HelloService)$1); 
    }catch(Throwable e){ 
        throw new IllegalArgumentException(e); 
    } 
    try{ 
        if( "sayHello".equals( $2 )  &&  $3.length == 1 ) {  
            return ($w)w.sayHello((indi.cesc.inno.learn.dubbo.HelloRequest)$4[0]);  // 真實方法調用
        } 
    } catch(Throwable e) {      
        throw new java.lang.reflect.InvocationTargetException(e);  
    } 
    throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class indi.cesc.inno.learn.dubbo.HelloService."); 
}
複製代碼

能夠看到 w.sayHello() 這就是直接經過服務的實現對象調用具體方法,並非經過反射,效率會高些。默認使用Javassist而不是JDK動態代理也是出於效率的考慮

這裏就將真實服務加入到總體調用鏈條之中,後續再將 Invoker 往上層傳遞,打通整個鏈條。

繼續上面 代碼7 處的代碼,protocol 實例調用 export 方法進入後續流程。這裏的 protocol 類型實際依舊是個適配器,export 方法源碼以下:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
    if (arg0 == null) 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) 
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
}
複製代碼

注意 invokerurl 不是服務暴露的 url,而是協議註冊的 url,所以 url 裏面的協議是 registry。嘗試獲取名爲 registryProtocol 擴展,但進入 ExtensionLoader 後被攔截,實際拿到了其封裝類 ProtocolFilterWrapper,其負責組裝過濾器鏈

/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 中配置有:

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper  
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper  
複製代碼

則獲取 RegistryProtocol 實例會被 ProtocolFilterWrapperProtocolListenerWrapper 裝飾,分別用來實現攔截器和監聽器功能,查看這兩個Wrapper的代碼能夠看出,對於註冊url都作了特別處理,向註冊中心發佈url不會觸發攔截器和監聽器功能,只有在真正暴露服務時纔會註冊攔截器,觸發監聽器

ProtocolFilterWrapper#export 方法,源碼以下:

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        // 此處,將直接進入 RegistryProtocol 的 export 方法
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
複製代碼

此處,將直接進入 RegistryProtocolexport 方法。

3.4 註冊發佈服務

依據上面分析,最終註冊發佈服務調用鏈:ServiceBean#afterPropertiesSet —> ServiceConfig#export —> ServiceConfig#doExport —> ServiceConfig#doExportUrlsFor1Protocol —> RegistryProtocol#export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // @ 代碼1

        URL registryUrl = getRegistryUrl(originInvoker); // @ 代碼2

        //registry provider
        final Registry registry = getRegistry(originInvoker); // @ 代碼3
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);  // @ 代碼4 start

        //to judge to delay publish whether or not
        boolean register = registeredProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        if (register) {
            register(registryUrl, registeredProviderUrl); // @ 代碼4 end
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // @ 代碼5 start
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @ 代碼5 end
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }
複製代碼

代碼1:啓動服務提供者服務,監聽指定端口,準備服務消費者的請求,這裏其實就是從 WrapperInvoker 中的 url (註冊中心 url )中提取 export 屬性,描述服務提供者的 url,而後啓動服務提供者。

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
複製代碼

調用DubboProtocol#export完成Dubbo服務的啓動

從上圖中,能夠看出,將調用 DubboProtocol#export 完成Dubbo服務的啓動,利用netty構建一個微型服務端,監聽端口,準備接受服務消費者的網絡請求,而後將 dubbo:service 的服務handler加入到命令處理器中,當有消息消費者鏈接該端口時,經過網絡解包,將須要調用的服務和參數等信息解析處理後,轉交給對應的服務實現類處理便可

代碼2:獲取真實註冊中心的URL,例如:zookeeper註冊中心的URL。

zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D10252%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527263060882
&pid=10252
&qos.port=22222
&timestamp=1527263060867
複製代碼

代碼3:根據註冊中心URL,從註冊中心工廠中獲取指定的註冊中心實現類:zookeeper註冊中心的實現類爲:ZookeeperRegistry

代碼4:獲取服務提供者URL中的 register 屬性,若是爲 true,則調用註冊中心的 ZookeeperRegistry#register 方法向註冊中心註冊服務(實際由其父類 FailbackRegistry 實現)。

RegistryProtocol#register 方法,源碼以下:

public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }
複製代碼

FailbackRegistry#register 方法,源碼以下:

@Override
    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            failedRegistered.add(url);
        }
    }
複製代碼

ZookeeperRegistry#doRegister 方法,源碼以下:

@Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
複製代碼

代碼5:服務提供者向註冊中心訂閱本身,主要是爲了服務提供者URL發生變化的時候,會觸發 overrideSubscribeListenernotify 方法從新暴露服務。固然,會將 dubbo:referencecheck 屬性設置爲 false

爲了感知註冊中心的一些配置變化,提供者會監聽註冊中心路徑 /dubbo/${interfaceClass}/configurators 的節點,監聽該節點在註冊中心的一些配置信息變動。Zookeeper註冊中心經過zookeeper框架的監聽回調接口進行監聽(redis註冊中心經過訂閱命令(subscribe)監聽),服務器緩存註冊中心的配置,當配置發生變動時,服務會刷新本地緩存。

FailbackRegistry#subscribe 訂閱方法,源碼以下:

@Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && !urls.isEmpty()) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }
複製代碼

ZookeeperRegistry#doSubscribe 訂閱方法,源碼以下:

@Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
複製代碼

3.5 打通服務網絡

本節是切實最最核心的,重點關注 RegistryProtocol#export 中調用 doLocalExport 方法,其實主要是 根據各自協議,服務提供者創建網絡服務器,在特定端口創建監聽,監聽來自消息消費端服務的請求

RegistryProtocol#doLocalExport,源碼以下:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // @ 代碼1
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // @ 代碼2
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
複製代碼

代碼1:若是服務提供者以 dubbo 協議暴露服務,getProviderUrl(originInvoker)返回的URL將以 dubbo:// 開頭。

代碼2:根據Dubbo內置的SPI機制,將調用 DubboProtocol#export方法。

DubboProtocol#export,源碼以下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl(); // @ 代碼1

        // export service.
        String key = serviceKey(url); // @ 代碼2
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); // @ 代碼3 start
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        } // @ 代碼3 end

        openServer(url); // @ 代碼4
        optimizeSerialization(url); // @ 代碼5
        return exporter;
    }
複製代碼

代碼1:獲取服務提供者URL,以協議名稱,這裏是 dubbo:// 開頭。

代碼2:從服務提供者URL中獲取服務名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880

代碼3:是否將轉發事件導出成 stub

代碼4:根據url打開服務。

代碼5:根據url優化器序列化方式。

DubboProtocol#openServer,源碼以下:

private void openServer(URL url) {
        // find server.
        String key = url.getAddress(); // @ 代碼1
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key); // @ 代碼2
                    if (server == null) {
                        serverMap.put(key, createServer(url)); // @ 代碼3
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url); // @代碼4
            }
        }
    }
複製代碼

代碼1:根據url獲取網絡地址:ip:port,例如:192.168.56.1:20880,服務提供者IP與暴露服務端口號。

代碼2:根據key從服務器緩存中獲取,若是存在,則執行代碼4,若是不存在,則執行代碼3.

代碼3:根據URL建立一服務器,Dubbo服務提供者服務器實現類爲 ExchangeServer

代碼4:若是服務器已經存在,用當前URL重置服務器,這個不難理解,由於一個Dubbo服務中,會存在多個 dubbo:service 標籤,這些標籤都會在服務檯提供者的同一個IP地址、端口號上暴露服務

DubboProtocol#createServer,源碼以下:

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @ 代碼1
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @ 代碼2
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @ 代碼3

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @ 代碼4
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @ 代碼5
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler); // @ 代碼6
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY); // @ 代碼7
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
複製代碼

代碼1:爲服務提供者url增長 channel.readonly.sent 屬性,默認爲 true,表示在發送請求時,是否等待將字節寫入socket後再返回,默認爲 true

代碼2:爲服務提供者url增長 heartbeat 屬性,表示心跳間隔時間,默認爲 60*1000,表示60s。

代碼3:爲服務提供者url增長 server 屬性,可選值爲 netty,mina 等等,默認爲 netty

代碼4:根據SPI機制,判斷 server 屬性是否支持。

代碼5:爲服務提供者url增長 codec 屬性,默認值爲 dubbo,協議編碼方式。

代碼6:根據服務提供者URI,服務提供者命令請求處理器 requestHandler 構建 ExchangeServer 實例。requestHandler 的實現具體在之後詳細分析Dubbo服務調用時再詳細分析。

代碼7:驗證客戶端類型是否可用。

Exchangers#bind方法,根據 URLExchangeHandler 構建服務器,源碼以下:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
複製代碼

上述代碼不難看出,首先根據 url 獲取 Exchanger 實例,而後調用 bind 方法構建 ExchangeServerExchanger 接口方法以下:

Exchanger接口方法

  1. ExchangeServer bind(URL url, ExchangeHandler handler):服務提供者調用。
  2. ExchangeClient connect(URL url, ExchangeHandler handler):服務消費者調用。

Dubbo提供的實現類爲:HeaderExchanger,其 bind 方法以下:

@Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
複製代碼

今後處能夠看到,端口的綁定由 Transportersbind 方法實現。源碼以下:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
複製代碼

從這裏得知,Dubbo網絡傳輸的接口有 Transporter 接口實現,其繼承類圖所示:

Transporter繼承類圖

本文以netty版原本查看一下 Transporter 實現。 NettyTransporter 源碼以下:

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty3";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}
複製代碼

建立 NettyServer 實例時,其父類構造函數會調用 doOpen() 創建網絡鏈接,源碼以下:

@Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @ 代碼1
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler); // @ 代碼2
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
複製代碼

從本方法 代碼1 & 代碼2 瞭解,首先建立 NettyServer 必須傳入一個服務提供者 URL,但從 DubboProtocol#createServer 中能夠看出,Server是基於網絡套接字 (ip:port) 緩存的,一個JVM應用中,必然會存在多個 dubbo:service 標籤,就會有多個 URL這裏爲何能夠這樣作呢?

DubboProtocol#createServer 中能夠看出,在解析第二個 dubbo:service 標籤時並不會調用 createServer,而是會調用 Server#reset 方法,是否是這個方法有什麼魔法,在reset方法時能將URL也註冊到Server上

那接下來分析 NettyServer#reset 方法是如何實現的?DubboProtocol#reset 方法最終將調用 Serverreset 方法,一樣仍是以netty版本的 NettyServer 爲例,查看reset方法的實現原理。 NettyServer#reset—>父類(AbstractServer) AbstractServer#reset,源碼以下:

@Override
    public void reset(URL url) {
        if (url == null) {
            return;
        }
        try {
            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
                if (a > 0) {
                    this.accepts = a;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.idleTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.THREADS_KEY)
                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { // @ 代碼1 start
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(Constants.THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads > 0 && (threads != max || threads != core)) {
                    if (threads < core) {
                        threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        } // @ 代碼1 end
        super.setUrl(getUrl().addParameters(url.getParameters())); // @ 代碼2 
    }
複製代碼

代碼1:首先是調整線程池的相關線程數量,這個好理解。

代碼2:而後設置調用 setUrl 覆蓋原先 NettyServerprivate volatile URL url 的屬性,那爲何不會影響原先註冊的 dubbo:service 呢?原來 NettyHandler 上加了註解: @Sharable,由該註解去實現線程安全。

掃碼入羣討論

相關文章
相關標籤/搜索