首先咱們將引用dubbo官方文檔和其它一些博客中發佈的dubbo的流程圖來對dubbo的流程有個歸納的瞭解。html
參考博客:http://shiyanjun.cn/archives/325.htmljava
接下來咱們將經過調試源代碼的方式來追蹤核心流程,讓咱們更加細緻和深刻的瞭解dubbo的核心流程。git
接口ProcessService定義spring
package com.alibaba.dubbo.test; public interface ProcessService { public Object test(Object object); }
接口實現類ProcessServiceImplapi
package com.alibaba.dubbo.test; public class ProcessServiceImpl implements ProcessService { public Object test(Object object) { //測試方法,直接返回參數值。 return object; } }
服務發佈入口類ServerMain服務器
package com.alibaba.dubbo.test; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ServerMain { public static void main(String[] args) { ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("SpringApplicationContext.xml"); ctx.start(); synchronized (ServerMain.class) { try { ServerMain.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
服務發佈配置文件SpringApplicationContext.xml架構
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd "> <bean id="processServiceImpl" class="com.alibaba.dubbo.test.ProcessServiceImpl" /> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!-- 服務應用配置 --> <dubbo:application name="dubbo_provider" /> <!-- 服務提供者全局配置 --> <dubbo:protocol name="dubbo" port="20885"/> <!-- 服務提供者暴露服務配置 --> <dubbo:service id="processService" interface="com.alibaba.dubbo.test.ProcessService" ref="processServiceImpl"/> </beans>
客戶應用及調用主類ClientMainapp
package com.alibaba.dubbo.test; import com.alibaba.dubbo.config.ApplicationConfig; import com.alibaba.dubbo.config.ReferenceConfig; import com.alibaba.dubbo.config.RegistryConfig; public class ClientMain { public static void main(String[] args) { RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); ApplicationConfig application = new ApplicationConfig(); application.setName("dubbo_consumer"); ReferenceConfig<ProcessService> referenceConfig = new ReferenceConfig<ProcessService>(); referenceConfig.setRegistry(registryConfig); referenceConfig.setApplication(application); referenceConfig.setInterface(ProcessService.class); ProcessService processService = referenceConfig.get(); System.out.println(processService.test("hello, dubbo")); } }
這裏用了一個最簡單的接口和方法,目的是將咱們的關注點放在dubbo自己的實現及流程上,而無須關注業務邏輯上。負載均衡
服務發佈首先進入的是dubbo-config-api模塊的ServiceConfig類,進入該類的export方法。dom
從圖中能夠看出,咱們在Spring容器中發佈dubbo,在Spring容器啓動的時候發佈事件的時候,由於ServiceBean做爲ApplicationListener的一個實現類,可以監聽到容器應用的事件,在處理事件的時候會調用父類ServiceConfig的export方法,而該方法真正實現了服務的發佈。
該方法源碼以下:
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) { return; } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); } }
能夠看出發佈發佈是支持延遲異步發佈服務的,這樣能夠用於當咱們發佈的服務很是多,影響到應用啓動的問題,前提是應用容許服務發佈的延遲特性。
接下來就進入到內部方法doExport。
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); } checkDefault(); if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); checkRef(); generic = Boolean.FALSE.toString(); } if(local !=null){ if(local=="true"){ local=interfaceName+"Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if(!interfaceClass.isAssignableFrom(localClass)){ throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName); } } if(stub !=null){ if(stub=="true"){ stub=interfaceName+"Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if(!interfaceClass.isAssignableFrom(stubClass)){ throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName); } } checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } doExportUrls(); }
咱們能夠看出該方法的實現的邏輯包含了根據配置的優先級將ProviderConfig,ModuleConfig,MonitorConfig,ApplicaitonConfig等一些配置信息進行組裝和合並。還有一些邏輯是檢查配置信息的合法性。最後又調用了doExportUrls方法。
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
該方法第一步是加載註冊中心列表,第二部是將服務發佈到多種協議的url上,而且攜帶註冊中心列表的參數,從這裏咱們能夠看出dubbo是支持同時將一個服務發佈成爲多種協議的,這個需求也是很正常的,客戶端也須要支持多協議,根據不一樣的場景選擇合適的協議。
進入方法loadRegistries看看實現邏輯。
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); if (registries != null && registries.size() > 0) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } if (address != null && address.length() > 0 && ! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); appendParameters(map, application); appendParameters(map, config); map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (! map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); url = url.setProtocol(Constants.REGISTRY_PROTOCOL); if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
該方法先檢查了註冊中心地址的合法性,而後將註冊中心配置轉化爲URL列表,其中若地址爲'N/A' 則表示無效註冊中心地址,則會跳過。最後返回的URL列表的協議名稱爲registry,而且會在URL參數registry的值中保存原始的註冊中心地址協議名稱值。後面會使用到該值。
而後咱們回到方法doExportUrls,下一步是將獲得註冊中心URL列表做爲一個參數,另一個參數是協議配置信息,調用方法doExportUrlsFor1Protocol繼續實現發佈服務邏輯。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (name == null || name.length() == 0) { name = "dubbo"; } String host = protocolConfig.getHost(); if (provider != null && (host == null || host.length() == 0)) { host = provider.getHost(); } boolean anyhost = false; if (NetUtils.isInvalidLocalHost(host)) { anyhost = true; try { host = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { logger.warn(e.getMessage(), e); } if (NetUtils.isInvalidLocalHost(host)) { if (registryURLs != null && registryURLs.size() > 0) { for (URL registryURL : registryURLs) { try { Socket socket = new Socket(); try { SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); socket.connect(addr, 1000); host = socket.getLocalAddress().getHostAddress(); break; } finally { try { socket.close(); } catch (Throwable e) {} } } catch (Exception e) { logger.warn(e.getMessage(), e); } } } if (NetUtils.isInvalidLocalHost(host)) { host = NetUtils.getLocalHost(); } } } Integer port = protocolConfig.getPort(); if (provider != null && (port == null || port == 0)) { port = provider.getPort(); } final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort(); if (port == null || port == 0) { port = defaultPort; } if (port == null || port <= 0) { port = getRandomPort(name); if (port == null || port < 0) { port = NetUtils.getAvailablePort(defaultPort); putRandomPort(name, port); } logger.warn("Use random available port(" + port + ") for protocol " + name); } Map<String, String> map = new HashMap<String, String>(); if (anyhost) { map.put(Constants.ANYHOST_KEY, "true"); } map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); if (methods != null && methods.size() > 0) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (arguments != null && arguments.size() > 0) { for (ArgumentConfig argument : arguments) { //類型自動轉換. if(argument.getType() != null && argument.getType().length() >0){ Method[] methods = interfaceClass.getMethods(); //遍歷全部方法 if(methods != null && methods.length > 0){ for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); //匹配方法名稱,獲取方法簽名. if(methodName.equals(method.getName())){ Class<?>[] argtypes = methods[i].getParameterTypes(); //一個方法中單個callback if (argument.getIndex() != -1 ){ if (argtypes[argument.getIndex()].getName().equals(argument.getType())){ appendParameters(map, argument, method.getName() + "." + argument.getIndex()); }else { throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType()); } } else { //一個方法中多個callback for (int j = 0 ;j<argtypes.length ;j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())){ appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j){ throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType()); } } } } } } } }else if(argument.getIndex() != -1){ appendParameters(map, argument, method.getName() + "." + argument.getIndex()); }else { throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } if (ProtocolUtils.isGeneric(generic)) { map.put("generic", generic); map.put("methods", Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if(methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (! ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put("token", UUID.randomUUID().toString()); } else { map.put("token", token); } } if ("injvm".equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // 導出服務 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); //配置爲none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露遠程服務) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url); }
該方法的邏輯是先根據服務配置、協議配置、發佈服務的服務器信息、方法列表、dubbo版本等等信息組裝成一個發佈的URL對象。
若沒有配置協議的host,則會自動生成一個host,這對於有多個網卡的系統會,好比本機上部署了虛擬機的狀況下,生成的host可能不是開發者指望的地址,這種狀況下須要開發者本身指定一個指望的host地址。
服務配置的scope是發佈範圍,配置爲「none」表示不發佈服務,則會中止發佈操做;
若配置爲非「remote」(包括null),則會調用exportLocal()方法繼續發佈服務;
若配置爲非「local」(包含null),則會將服務發佈到遠程協議上,這裏又2種狀況:
第一種是有效的註冊中心列表和無註冊中心列表,若是有註冊中心列表則會逐一將url的協議替換爲regitrsy,並且dubbo支持多個註冊中心,會註冊到多個註冊中心去,根據SPI會調用實現類RegistryProtocol的export方法發佈服務;RegistryProtocol中主要的邏輯請參考另一篇博文:http://my.oschina.net/ywbrj042/blog/690342
第二種是無有效的註冊中心則會調用配置的協議類型對應的實現類,好比本例中使用的dubbo,則會調用實現類DubboProtocol的export方法方法服務。該協議的詳細實現邏輯請參考博文:http://my.oschina.net/ywbrj042/blog/684718
exportLocal方法源碼以下:
private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(NetUtils.LOCALHOST) .setPort(0); // modified by lishen ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry"); } }
該方法的實現時將url發佈在jvm中,在本地同一個jvm中調用服務,因爲url的協議改成injvm,則咱們根據SPI機制能夠知道最終會調用實現類InjvmProtocol的export方法實現發佈服務在injvm中。該協議的實現類很是簡單,僅僅是用一個map記錄了serviceKey和service bean的對應關係,則調用的時候直接拿到服務對應的bean調用目標方法便可。
咱們經過啓動客戶端應用遠程服務的代碼進入調試,進入ReferenceConfig.get()方法。
經過代碼能夠看出來,先檢查狀態,而後看是否已經有「ref」,若是已經說明已經引用過,則直接返回,不然會調用init()方法進行初始化。
咱們進入該方法看看它的源碼和流程。
private void init() { if (initialized) { return; } initialized = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!"); } // 獲取消費者全局配置 checkDefault(); appendProperties(this); if (getGeneric() == null && getConsumer() != null) { setGeneric(getConsumer().getGeneric()); } if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); } String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); } finally { try { if(null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { url = resolve; if (logger.isWarnEnabled()) { if (resolveFile != null && resolveFile.length() > 0) { logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); } else { logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); } } } if (consumer != null) { if (application == null) { application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } checkApplication(); checkStubAndMock(interfaceClass); Map<String, String> map = new HashMap<String, String>(); Map<Object, Object> attributes = new HashMap<Object, Object>(); map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (! isGeneric()) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if(methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); String prifix = StringUtils.getServiceKey(map); if (methods != null && methods.size() > 0) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } appendAttributes(attributes, method, prifix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } //attributes經過系統context進行存儲. StaticContext.getSystemContext().putAll(attributes); ref = createProxy(map); }
程序的流程是這樣的:
1.先檢查狀態,是否已經銷燬過。
2.配置各類參數,將默認參數值、generic值、cosumerConfig參數值進行合併。
3.dubbo支持通用遠程服務GenericService接口。應該是無須發佈服務,能夠經過該接口遠程調用動態方法。
4.加載接口類並檢查。若接口類不存在,則會拋出異常。檢查類和方法,主要是檢查接口類的合法性,接口方法的合法性等。
5.dubbo支持經過resolveFile的方式直接制定調用URL,實現p2p直接調用,當註冊中心出現故障的時候這樣直接調用能夠容錯。
6.合併application、consumerConfig和module等參數配置。
7.生成合並以後的參數map對象。將引用須要的各項參數進行構造、合併。
8.調用方法ref = createProxy(map);建立代理的引用對象。
咱們要進入createProxy方法繼續跟蹤。
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //指定URL的狀況下,不作本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //默認狀況下若是本地有服務暴露,則引用本地服務. isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL多是對點對直連地址,也多是註冊中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 經過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最後一個registry url } } if (registryURL != null) { // 有 註冊中心協議的URL // 對有註冊中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 註冊中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && ! invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }
首先判斷是否要作本地jvm應用,即直接調用本地jvm中對象的方法。直接制定了url地址不作本地應用,沒有設置爲inJvm協議不作本地引用。
若是作本地應用則會構造一個協議爲injvm的URL,而後最終會調用實現類InjvmProtocol的refer方法得到調用本地jvm對象方法的代理對象。返回的是InjvmInvoker的對象。該方法會直接從map中找到目標對象,而後能夠直接調用該對象的方法。
若是不作本地應用則會構造一個registry協議的URL,而後會加載註冊中心列表,這個與發佈服務類似,不作詳細描述了。這裏有3種狀況。
若是沒有註冊中心列表。由於又沒有配置url,則沒法找到服務發佈,則會拋出異常信息。
若是有一個註冊中心url,則會直接經過該url的註冊中心得到帶來。
若是是多個則會循環每個註冊中心,則會得到多個invokers列表,最後將多個invoker列表使用cluster.join(new StaticDirectory(u, invokers));聚合成一個集羣。
每一個註冊中心都會調用refprotocol.refer(interfaceClass, url)應用遠程服務,因爲url是registry協議,則最終會調用RegistryProtocol實現類的refer方法。詳細說明參考http://my.oschina.net/ywbrj042/blog/690342。
獲得服務的代理對象後,咱們要調用該代理對象的方法了,如圖所示。
咱們看到獲得的是一個動態代理類,對象名稱爲:com.alibaba.dubbo.common.bytecode.proxy0@69fb6037的動態代理對象。F5進入到代理對象的方法,發現進入了InvokerInvocationHandler.invoke方法開始執行代理方法。
若是是在Object類中定義的方法則直接調用invoker的該方法;
若是是toString,hashCode和equals等特殊方法,則也直接調用invoker對象的該方法。
不然其它方法則調用return invoker.invoke(new RpcInvocation(method, args)).recreate();執行。正常狀況下都不是那些特殊方法,通常都是自定義接口的自定義方法,走這條路徑。
咱們進入了實現類MockClusterInvoker的invoke方法,該類事一個支持模擬特性和集羣特性的Invoker實現類。
先檢查方法參數是否開啓模擬,若是開啓模擬則進入執行模擬調用方法的分之;
不然進入調用FailoverClusterInvoker實現類的invoke方法,由於咱們沒有配置集羣策略,模擬就是自動故障轉移模式的集羣。
而後進入抽象類AbstractClusterInvoker的invoke方法,實際類型是FailoverClusterInvoker;以下圖所示:
最後會調用執行的代碼是:return doInvoke(invocation, invokers, loadbalance);
其中三個參數的值以下:
invocation:參數調用信息,封裝了調用的類、方法,參數類型及值等信息。
invokers:從directory中獲取到的可用的invoker列表,實際值是size=1,是一個被多個過濾器、裝飾器封裝的一個invoker對象。
loadbalance:是類型爲RandomLoadBalance的負載均衡器,這是默認的負載均衡策略。
而後進入到FailoverClusterInvoker類的doInvoke方法,以下圖。
執行的是自動故障轉移的集羣執行器,先會得到重試次數,而後從列表中選擇一個invoker對象,使用了負載均衡器,會排出掉已經執行過的invoker,由於咱們這個例子中自由一個invoker對象,所以會一直重試改對象,當某個一個invoker對象執行拋出了非業務的RpcException異常後,則會從新調用下一個。
咱們這裏調用的是類名爲com.alibaba.dubbo.registry.integration.RegistryDirectory$InvokerDelegete@5552768b的對象。進入該方法,最後進入的是類InvokerWrapper.invoke方法。