又過了好久纔敢寫。本身也在反覆看,consumer在啓動時是如何建立代理並注入的呢?java
在第一篇 我寫了一些bean的加載過程。這個過程也是包含在啓動過程當中的。spring
one、spring 加載dubbo文件,開始解析consumer 配置文件。目的 就是注入。但這時候尚未對象能夠注入。只是有這個操做express
two、ReferenceBean#afterPropertiesSet方法作了一大堆的驗證,主要是驗證配置文件裏有幾個consumer配置,有幾個zk配置,有幾個監控中心配置,等等一系列的驗證。在方法的最後,還有一個隊init方法的驗證,即,是否爲初始化操做,若是是就進行getObject操做。apache
一、這個方法裏面包含了不少操做。首先就是調用ReferenceBean父類的init方法。加載配置接口的名稱(存容器裏了),檢驗dubbo.consumer標籤的其餘屬性。服務器
二、再加載consumer的ip到容器(消費者本身的ip ,非zk ip)app
三、調用createProxy(map); 獲取代理對象。這個方法裏面有太多的東西。。。less
3.一、其源碼以下:jvm
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //指定URL的狀況下,不作本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //默認狀況下若是本地有服務暴露,則引用本地服務. isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL多是對點對直連地址,也多是註冊中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 經過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最後一個registry url } } if (registryURL != null) { // 有 註冊中心協議的URL // 對有註冊中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 註冊中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && ! invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }
也是跟init方法相似。先作了一堆驗證,比較有特色就是【判斷該消費者是不是引用本(JVM)內提供的服務】,若是是,具體作了啥。後續分析。這裏主要的分析 仍是consumer 的啓動過程。ide
3.二、驗證是否爲直連的,String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);這句。ui
3.三、最後纔是普通註冊中心的鏈接。先看一段url替換代碼
List<URL> us = loadRegistries(false);
也就是上面那句。這句裏面作了啥?
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); if (registries != null && registries.size() > 0) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } if (address != null && address.length() > 0 && ! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); appendParameters(map, application); appendParameters(map, config); map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (! map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); url = url.setProtocol(Constants.REGISTRY_PROTOCOL); if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
觀察後發現,List<URL> us = loadRegistries(false); 這句是將原來的url進行了替換 ,換成可以註冊到zk的url並返回一個url集合,那麼 鏈接zk 的地方在哪?
refer()這個方法纔是鏈接請求zk 的方法,接着看RegistryProtocol的dorefer()方法--registry.register(FailbackRegistry)方法--ZookeeperRegistry的doRegister方法,找到create()方法這裏就是建立了一個zk的臨時節點,並將獲取的提供者列表封裝成invoker返回,
在doRefer()下還有一個方法subscribe()這裏傳的參數就比較猛了,該方法下doSubscribe(url, listener);會調用zk監聽器實現發佈和訂閱功能,該監聽器也能監聽提供者節點變化等。並在獲取提供者列表後封裝成invoker返回。
@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服務器端發送訂閱請求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 若是開啓了啓動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 將失敗的訂閱請求記錄到失敗列表,定時重試 addFailedSubscribed(url, listener); } }
3.四、獲取了zk 裏的提供者url就要處理這些className ,而後封裝成invoker對象
3.五、最後將使用ProxyFactory建立出代理對象,也就是這句getProxy(invoker, interfaces) 並將代理對象返回。反給誰了?
private transient volatile T ref 給這個ref了最後就注入了對應的@AutoWired 類中。
@AutoWired 注入的類,調用方法時,也就是調用InvokerInvocationHandler 的invoke方法
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.proxy; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcInvocation; /** * InvokerHandler * * @author william.liangf */ public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler){ this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
總結:
歸納的說,consumer啓動時,spring 解析xml文件成ReferenceBean對象--調用init()方法--調用getObject(),獲取zk中提供者列表,並解析成invoker對象,調用getProxy()方法獲取代理對象--注入到指定的controller的@autoWired下的類中。
並且在該代理對象被調用invoke方法時,就會觸發invoke方法中的通訊方法。很深。目前還沒分析好。
注意:生產者鏈接zk 的過程 是ServiceConfig#doExportUrlsFor1Protocol#protocol.export(invoker);到 RegistryProtocol#export方法的registry.register(registedProviderUrl); 再到 FailbackRegistry#register 的doRegister(url); 方法 方法裏就是具體的主測邏輯。很深。有監聽等等。
2019-03-27修改
生產者打開netty通道 的過程爲 ServiceConfig#doExportUrlsFor1Protocol#protocol.export(invoker); 默認爲DubboProtocol#export 方法,到openServer(url);方法,中一直找bind方法,最後找到了NettyTransporter#bind方法。這裏明顯就能看到是開啓nettyServer的方法,順着NettyServer的構造器找到doOpen()方法,這裏就是開發netty通道的方法了。