源碼分析Dubbo服務消費端啓動流程

經過前面文章詳解,咱們知道Dubbo服務消費者標籤dubbo:reference最終會在Spring容器中建立一個對應的ReferenceBean實例,而ReferenceBean實現了Spring生命週期接口:InitializingBean,接下來應該看一下其afterPropertiesSet方法的實現。spring

一、源碼分析ReferenceBean#afterPropertiesSet

ReferenceBean#afterPropertiesSet緩存

if (getConsumer() == null) {
            Map<string, consumerconfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
                 ConsumerConfig.class, false, false);
            if (consumerConfigMap != null &amp;&amp; consumerConfigMap.size() &gt; 0) {
                ConsumerConfig consumerConfig = null;
                for (ConsumerConfig config : consumerConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (consumerConfig != null) {
                            throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
                        }
                        consumerConfig = config;
                    }
                }
                if (consumerConfig != null) {
                    setConsumer(consumerConfig);
                }
            }
        }

Step1:若是consumer爲空,說明dubbo:reference標籤未設置consumer屬性,若是一個dubbo:consumer標籤,則取該實例,若是存在多個dubbo:consumer 配置,則consumer必須設置,不然會拋出異常:"Duplicate consumer configs"。架構

Step2:若是application爲空,則嘗試從BeanFactory中查詢dubbo:application實例,若是存在多個dubbo:application配置,則拋出異常:"Duplicate application configs"。併發

Step3:若是ServiceBean的module爲空,則嘗試從BeanFactory中查詢dubbo:module實例,若是存在多個dubbo:module,則拋出異常:"Duplicate module configs: "。app

Step4:嘗試從BeanFactory中加載全部的註冊中心,注意ServiceBean的List< RegistryConfig&gt; registries屬性,爲註冊中心集合。jvm

Step5:嘗試從BeanFacotry中加載一個監控中心,填充ServiceBean的MonitorConfig monitor屬性,若是存在多個dubbo:monitor配置,則拋出"Duplicate monitor configs: "。分佈式

ReferenceBean#afterPropertiesSetide

Boolean b = isInit();
if (b == null &amp;&amp; getConsumer() != null) {
      b = getConsumer().isInit();
}
if (b != null &amp;&amp; b.booleanValue()) {
      getObject();
}

Step6:判斷是否初始化,若是爲初始化,則調用getObject()方法,該方法也是FactoryBean定義的方法,ReferenceBean是dubbo:reference所真實引用的類(interface)的實例工程,getObject發返回的是interface的實例,而不是ReferenceBean實例。高併發

1.1 源碼分析getObject()

public Object getObject() throws Exception {
        return get();
}

ReferenceBean#getObject()方法直接調用其父類的get方法,get方法內部調用init()方法進行初始化源碼分析

1.2 源碼分析ReferenceConfig#init方法

ReferenceConfig#init

if (initialized) {
     return;
 }
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
      throw new IllegalStateException("<dubbo:reference interface="\&quot;\&quot;" /> interface not allow null!");
}

Step1:若是已經初始化,直接返回,若是interfaceName爲空,則拋出異常。

ReferenceConfig#init調用ReferenceConfig#checkDefault

private void checkDefault() {
        if (consumer == null) {
            consumer = new ConsumerConfig();
        }
        appendProperties(consumer);
    }

Step2:若是dubbo:reference標籤也就是ReferenceBean的consumer屬性爲空,調用appendProperties方法,填充默認屬性,其具體加載順序:

  1. 從系統屬性加載對應參數值,參數鍵:dubbo.consumer.屬性名,從系統屬性中獲取屬性值的方法爲:System.getProperty(key)。
  2. 加載屬性配置文件的值。屬性配置文件,可經過系統屬性:dubbo.properties.file,若是該值未配置,則默認取dubbo.properties屬性配置文件。 ReferenceConfig#init
appendProperties(this);

Step3:調用appendProperties方法,填充ReferenceBean的屬性,屬性值來源與step2同樣,固然只填充ReferenceBean中屬性爲空的屬性。

ReferenceConfig#init

if (getGeneric() == null &amp;&amp; getConsumer() != null) {
      setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
      interfaceClass = GenericService.class;
} else {
      try {
              interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
      } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
      }
      checkInterfaceAndMethods(interfaceClass, methods);
}

Step4:若是使用返回引用,將interface值替換爲GenericService全路徑名,若是不是,則加載interfacename,並檢驗dubbo:reference子標籤dubbo:method引用的方法是否在interface指定的接口中存在。

ReferenceConfig#init

String resolve = System.getProperty(interfaceName);      // @1
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {                       // @2
     resolveFile = System.getProperty("dubbo.resolve.file");    // @3 start
     if (resolveFile == null || resolveFile.length() == 0) {
          File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
          if (userResolveFile.exists()) {
               resolveFile = userResolveFile.getAbsolutePath();
          }
     }    // @3 end
     if (resolveFile != null &amp;&amp; resolveFile.length() &gt; 0) {    // @4
          Properties properties = new Properties();
          FileInputStream fis = null;
          try {
               fis = new FileInputStream(new File(resolveFile));
               properties.load(fis);
           } catch (IOException e) {
               throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
           } finally {
               try {
                    if (null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
               }
           }
          resolve = properties.getProperty(interfaceName);
      }
 }
 if (resolve != null &amp;&amp; resolve.length() &gt; 0) {  // @5
      url = resolve;
      if (logger.isWarnEnabled()) {
         if (resolveFile != null &amp;&amp; resolveFile.length() &gt; 0) {
             logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
         } else {
            logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
        }
    }
}

Step5:處理dubbo服務消費端resolve機制,也就是說消息消費者只連服務提供者,繞過註冊中心。

代碼@1:從系統屬性中獲取該接口的直連服務提供者,若是存在 -Dinterface=dubbo://127.0.0.1:20880,其中interface爲dubbo:reference interface屬性的值。

代碼@2:若是未指定-D屬性,嘗試從resolve配置文件中查找,從這裏看出-D的優先級更高。

代碼@3:首先嚐試獲取resolve配置文件的路徑,其來源能夠經過-Ddubbo.resolve.file=文件路徑名來指定,若是未配置該系統參數,則默認從${user.home}/dubbo-resolve.properties,若是過文件存在,則設置resolveFile的值,不然resolveFile爲null。

代碼@4:若是resolveFile不爲空,則加載resolveFile文件中內容,而後經過interface獲取其配置的直連服務提供者URL。

代碼@5:若是resolve不爲空,則填充ReferenceBean的url屬性爲resolve(點對點服務提供者URL),打印日誌,點對點URL的來源(系統屬性、resolve配置文件)。

ReferenceConfig#init

checkApplication();
checkStubAndMock(interfaceClass);

Step6:校驗ReferenceBean的application是否爲空,若是爲空,new 一個application,並嘗試從系統屬性(優先)、資源文件中填充其屬性;同時校驗stub、mock實現類與interface的兼容性。系統屬性、資源文件屬性的配置以下: application dubbo.application.屬性名,例如 dubbo.application.name

ReferenceConfig#init

Map<string, string> map = new HashMap<string, string>();
Map<object, object> attributes = new HashMap<object, object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() &gt; 0) {
     map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}

Step7:構建Map,封裝服務消費者引用服務提供者URL的屬性,這裏主要填充side:consume(消費端)、dubbo:2.0.0(版本)、timestamp、pid:進程ID。

ReferenceConfig#init

if (!isGeneric()) {
    String revision = Version.getVersion(interfaceClass, version);
    if (revision != null &amp;&amp; revision.length() &gt; 0) {
         map.put("revision", revision);
    }
    String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    if (methods.length == 0) {
           logger.warn("NO method found in service interface " + interfaceClass.getName());
           map.put("methods", Constants.ANY_VALUE);
     } else {
          map.put("methods", StringUtils.join(new HashSet<string>(Arrays.asList(methods)), ","));
     }

}

Step8:若是不是泛化引用,增長methods:interface的全部方法名,多個用逗號隔開。 ReferenceConfig#init

map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);

Step9:用Map存儲application配置、module配置、默認消費者參數(ConsumerConfig)、服務消費者dubbo:reference的屬性。 ReferenceConfig#init

String prefix = StringUtils.getServiceKey(map);
if (methods != null &amp;&amp; !methods.isEmpty()) {
    for (MethodConfig method : methods) {
         appendParameters(map, method, method.getName());
         String retryKey = method.getName() + ".retry";
         if (map.containsKey(retryKey)) {
              String retryValue = map.remove(retryKey);
              if ("false".equals(retryValue)) {
                  map.put(method.getName() + ".retries", "0");
              }
         }
         appendAttributes(attributes, method, prefix + "." + method.getName());
         checkAndConvertImplicitConfig(method, map, attributes);
   }
}

Step10:獲取服務鍵值 /{group}/interface:版本,若是group爲空,則爲interface:版本,其值存爲prifex,而後將dubbo:method的屬性名稱也填入map中,鍵前綴爲dubbo.method.methodname.屬性名。dubbo:method的子標籤dubbo:argument標籤的屬性也追加到attributes map中,鍵爲 prifex + methodname.屬性名。

ReferenceConfig#init

String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
      hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
      throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + 
           hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

Step11:填充register.ip屬性,該屬性是消息消費者鏈接註冊中心的IP,並非註冊中心自身的IP。 ReferenceConfig#init

ref = createProxy(map);

Step12:調用createProxy方法建立消息消費者代理,下面詳細分析其實現細節。 ReferenceConfig#init

ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);

Step13:將消息消費者緩存在ApplicationModel中。

1.2.1 源碼分析ReferenceConfig#createProxy方法

ReferenceConfig#createProxy

URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
    if (url != null &amp;&amp; url.length() &gt; 0) { // if a url is specified, don't do local reference
          isJvmRefer = false;
    } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
         // by default, reference local service if there is
         isJvmRefer = true;
    } else {
         isJvmRefer = false;
    }
} else {
    isJvmRefer = isInjvm().booleanValue();
}

Step1:判斷該消費者是不是引用本(JVM)內提供的服務。 若是dubbo:reference標籤的injvm(已過時,被local屬性替換)若是不爲空,則直接取該值,若是該值未配置,則判斷ReferenceConfig的url屬性是否爲空,若是不爲空,則isJvmRefer =false,代表該服務消費者將直連該URL的服務提供者;若是url屬性爲空,則判斷該協議是不是isInjvm,其實現邏輯:獲取dubbo:reference的scop屬性,根據其值判斷:

  • 若是爲空,isJvmRefer爲false。
  • 若是協議爲injvm,就是表示爲本地協議,既然提供了本地協議的實現,則無需配置isJvmRefer該標籤爲true,故,isJvmRerfer=false。
  • 若是scope=local或injvm=true,isJvmRefer=true。
  • 若是scope=remote,isJvmRefer設置爲false。
  • 若是是泛化引用,isJvmRefer設置爲false。
  • 其餘默認狀況,isJvmRefer設置爲true。

ReferenceConfig#createProxy

if (isJvmRefer) {
   URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
   invoker = refprotocol.refer(interfaceClass, url);
   if (logger.isInfoEnabled()) {
         logger.info("Using injvm service " + interfaceClass.getName());
   }
}

Step2:若是消費者引用本地JVM中的服務,則利用InjvmProtocol建立Invoker,dubbo中的invoker主要負責服務調用的功能,是其核心實現,後續會在專門的章節中詳細分析,在這裏咱們須要知道,會建立於協議相關的Invoker便可。

ReferenceConfig#createProxy

if (url != null &amp;&amp; url.length() &gt; 0) { // user specified URL, could be peer-to-peer address, or register center's address.
     String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);   // @1
     if (us != null &amp;&amp; us.length &gt; 0) {
           for (String u : us) {
                  URL url = URL.valueOf(u);
                  if (url.getPath() == null || url.getPath().length() == 0) {
                       url = url.setPath(interfaceName);
                  }
                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {   // @2
                      urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                  } else {
                      urls.add(ClusterUtils.mergeUrl(url, map));  // @3
                 }
             }
       }
}

Step3:處理直連狀況,與step2互斥。

代碼@1:對直連URL進行分割,多個直連URL用分號隔開,若是URL中不包含path屬性,則爲URL設置path屬性爲interfaceName。

代碼@2:若是直連提供者的協議爲registry,則對url增長refer屬性,其值爲消息消費者全部的屬性。(表示從註冊中心發現服務提供者)

代碼@3:若是是其餘協議提供者,則合併服務提供者與消息消費者的屬性,並移除服務提供者默認屬性。以default開頭的屬性。

ReferenceConfig#createProxy

List<url> us = loadRegistries(false);   // @1
if (us != null &amp;&amp; !us.isEmpty()) {
     for (URL u : us) {
           URL monitorUrl = loadMonitor(u);   // @2
           if (monitorUrl != null) {
                 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));   // @3
            }
            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));  // @4
       }
}
if (urls == null || urls.isEmpty()) {
       throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="\&quot;...\&quot;" /> to your spring config.");
}

Step4:普通消息消費者,從註冊中心訂閱服務。

代碼@1:獲取全部註冊中心URL,其中參數false表示消費端,須要排除dubbo:registry subscribe=false的註冊中心,其值爲false表示不接受訂閱。

代碼@2:根據註冊中心URL,構建監控中心URL。

代碼@3:若是監控中心不爲空,在註冊中心URL後增長屬性monitor。

代碼@4:在註冊中心URL中,追加屬性refer,其值爲消費端的全部配置組成的URL。

ReferenceConfig#createProxy

if (urls.size() == 1) {
    invoker = refprotocol.refer(interfaceClass, urls.get(0));     // @1
} else {
    List<invoker<?>&gt; invokers = new ArrayList<invoker<?>&gt;();    // @2,多個服務提供者URL,集羣模式
    URL registryURL = null;
    for (URL url : urls) {
         invokers.add(refprotocol.refer(interfaceClass, url));    // @2
         if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
               registryURL = url; // use last registry url
          }
     }
     if (registryURL != null) { // registry url is available
          // use AvailableCluster only when register's cluster is available
          URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
          invoker = cluster.join(new StaticDirectory(u, invokers));    // @3
     } else { // not a registry url
          invoker = cluster.join(new StaticDirectory(invokers));
     }
 }

Step5:根據URL獲取對應協議的Invoker。 代碼@1:若是隻有一個服務提供者URL,則直接根據協議構建Invoker,具體有以下協議: 這裏寫圖片描述

代碼@2:若是有多個服務提供者,則衆多服務提供者構成一個集羣。 首先根據協議構建服務Invoker,默認Dubbo基於服務註冊於發現,在服務消費端不會指定url屬性,從註冊中心獲取服務提供者列表,此時的URL:registry://開頭,url中會包含register屬性,其值爲註冊中心的類型,例如zookeeper,將使用RedisProtocol構建Invoker,該方法將自動發現註冊在註冊中心的服務提供者,後續文章將會zookeeper註冊中心爲例,詳細分析其實現原理。 代碼@3:返回集羣模式實現的Invoker,Dubbo中的Invoker類繼承體系以下: 這裏寫圖片描述

集羣模式的Invoker和單個協議Invoker同樣實現Invoker接口,而後在集羣Invoker中利用Directory保證一個一個協議的調用器,十分的巧妙,在後續章節中將重點分析Dubbo Invoker實現原理,包含集羣實現機制。

ReferenceConfig#createProxy

Boolean c = check;
if (c == null &amp;&amp; consumer != null) {
      c = consumer.isCheck();
}
if (c == null) {
      c = true; // default true
}
if (c &amp;&amp; !invoker.isAvailable()) {
       throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group   
              == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + 
             NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}

代碼@4:若是dubbo:referecnce的check=true或默認爲空,則須要判斷服務提供者是否存在。

ReferenceConfig#createProxy

return (T) proxyFactory.getProxy(invoker);
AbstractProxyFactory#getProxy
public <t> T getProxy(Invoker<t> invoker) throws RpcException {
        Class<!--?-->[] interfaces = null;
        String config = invoker.getUrl().getParameter("interfaces");     // @1
        if (config != null &amp;&amp; config.length() &gt; 0) {
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null &amp;&amp; types.length &gt; 0) {
                interfaces = new Class<!--?-->[types.length + 2];
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;        // @2
                for (int i = 0; i &lt; types.length; i++) {
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        if (interfaces == null) {
            interfaces = new Class<!--?-->[]{invoker.getInterface(), EchoService.class};
        }
        return getProxy(invoker, interfaces);    // @3
    }

根據invoker獲取代理類,其實現邏輯以下:

代碼@1:從消費者URL中獲取interfaces的值,用,分隔出單個服務應用接口。

代碼@2:增長默認接口EchoService接口。

代碼@3:根據須要實現的接口,使用jdk或Javassist建立代理類。 最後給出消息消費者啓動時序圖: 這裏寫圖片描述

本節關於Dubbo服務消費者(服務調用者)的啓動流程就梳理到這裏,下一篇將重點關注Invoker(服務調用相關的實現細節)。


做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。

</t></t></invoker<?></invoker<?></url></string></object,></object,></string,></string,></string,>

相關文章
相關標籤/搜索