

在第一篇 我寫了一些bean的加載過程。這個過程也是包含在啓動過程當中的。spring

one、spring 加載dubbo文件,開始解析consumer 配置文件。目的 就是注入。但這時候尚未對象能夠注入。只是有這個操做express



        二、再加載consumer的ip到容器(消費者本身的ip ,非zk  ip)app

        三、調用createProxy(map); 獲取代理對象。這個方法裏面有太多的東西。。。less


private T createProxy(Map<String, String> map) {
		URL tmpUrl = new URL("temp", "localhost", 0, map);
		final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { //指定URL的狀況下,不作本地引用
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
        } else {
            isJvmRefer = isInjvm().booleanValue();
		if (isJvmRefer) {
			URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
			invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
		} else {
            if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL多是對點對直連地址,也多是註冊中心URL
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
            } else { // 經過註冊中心配置拼裝URL
            	List<URL> us = loadRegistries(false);
            	if (us != null && us.size() > 0) {
                	for (URL u : us) {
                	    URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                	    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
            	if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最後一個registry url
                if (registryURL != null) { // 有 註冊中心協議的URL
                    // 對有註冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 註冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        if (c == null) {
            c = true; // default true
        if (c && ! invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        // 建立服務代理
        return (T) proxyFactory.getProxy(invoker);

            也是跟init方法相似。先作了一堆驗證,比較有特色就是【判斷該消費者是不是引用本(JVM)內提供的服務】,若是是,具體作了啥。後續分析。這裏主要的分析 仍是consumer 的啓動過程。ide

            3.二、驗證是否爲直連的,String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);這句。ui


List<URL> us = loadRegistries(false);


protected List<URL> loadRegistries(boolean provider) {
        List<URL> registryList = new ArrayList<URL>();
        if (registries != null && registries.size() > 0) {
            for (RegistryConfig config : registries) {
                String address = config.getAddress();
                if (address == null || address.length() == 0) {
                	address = Constants.ANYHOST_VALUE;
                String sysaddress = System.getProperty("dubbo.registry.address");
                if (sysaddress != null && sysaddress.length() > 0) {
                    address = sysaddress;
                if (address != null && address.length() > 0 
                        && ! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    Map<String, String> map = new HashMap<String, String>();
                    appendParameters(map, application);
                    appendParameters(map, config);
                    map.put("path", RegistryService.class.getName());
                    map.put("dubbo", Version.getVersion());
                    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                    if (ConfigUtils.getPid() > 0) {
                        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                    if (! map.containsKey("protocol")) {
                        if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                            map.put("protocol", "remote");
                        } else {
                            map.put("protocol", "dubbo");
                    List<URL> urls = UrlUtils.parseURLs(address, map);
                    for (URL url : urls) {
                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
        return registryList;

            觀察後發現,List<URL> us = loadRegistries(false); 這句是將原來的url進行了替換 ,換成可以註冊到zk的url並返回一個url集合,那麼 鏈接zk 的地方在哪?

             refer()這個方法纔是鏈接請求zk 的方法,接着看RegistryProtocol的dorefer()方法--registry.register(FailbackRegistry)方法--ZookeeperRegistry的doRegister方法,找到create()方法這裏就是建立了一個zk的臨時節點,並將獲取的提供者列表封裝成invoker返回,

            在doRefer()下還有一個方法subscribe()這裏傳的參數就比較猛了,該方法下doSubscribe(url, listener);會調用zk監聽器實現發佈和訂閱功能,該監聽器也能監聽提供者節點變化等。並在獲取提供者列表後封裝成invoker返回。

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 向服務器端發送訂閱請求
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && urls.size() > 0) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // 若是開啓了啓動時檢測,則直接拋出異常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);

            // 將失敗的訂閱請求記錄到失敗列表,定時重試
            addFailedSubscribed(url, listener);

            3.四、獲取了zk 裏的提供者url就要處理這些className ,而後封裝成invoker對象

            3.五、最後將使用ProxyFactory建立出代理對象,也就是這句getProxy(invoker, interfaces)   並將代理對象返回。反給誰了?

                    private transient volatile T ref  給這個ref了最後就注入了對應的@AutoWired 類中。

                    @AutoWired 注入的類,調用方法時,也就是調用InvokerInvocationHandler 的invoke方法

package com.alibaba.dubbo.rpc.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcInvocation;

 * InvokerHandler
 * @author william.liangf
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        return invoker.invoke(new RpcInvocation(method, args)).recreate();





歸納的說,consumer啓動時,spring 解析xml文件成ReferenceBean對象--調用init()方法--調用getObject(),獲取zk中提供者列表,並解析成invoker對象,調用getProxy()方法獲取代理對象--注入到指定的controller的@autoWired下的類中。



注意:生產者鏈接zk 的過程 是ServiceConfig#doExportUrlsFor1Protocol#protocol.export(invoker);到 RegistryProtocol#export方法的registry.register(registedProviderUrl);  再到 FailbackRegistry#register 的doRegister(url); 方法  方法裏就是具體的主測邏輯。很深。有監聽等等。


生產者打開netty通道 的過程爲 ServiceConfig#doExportUrlsFor1Protocol#protocol.export(invoker); 默認爲DubboProtocol#export 方法,到openServer(url);方法,中一直找bind方法,最後找到了NettyTransporter#bind方法。這裏明顯就能看到是開啓nettyServer的方法,順着NettyServer的構造器找到doOpen()方法,這裏就是開發netty通道的方法了。
