dubbo源碼:provider發佈service服務一

    從上一篇文章中咱們已經知道dubbo是經過在spring中自定義標籤,將這些標籤解析到對應組件中,而後將這些組件加載到spring容器進行管理。這些組件中就包括髮布服務組件dubbo:service,在對service組件解析加載的過程其實也包含着service服務暴露發佈的過程。java

    ServiceBean就是service標籤對應加載到spring容器中bean,咱們來看該bean的源碼,發現ServiceBean實現了InitializingBean接口,則在ServiceBean加載到容器中時會調用初始化方法afterPropertiesSet。spring

public void afterPropertiesSet() throws Exception {
		// 設置生產者配置信息ProviderConfig
		if (getProvider() == null) {
			Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null
					: BeanFactoryUtils.beansOfTypeIncludingAncestors(
							applicationContext, ProviderConfig.class, false,
							false);
			if (providerConfigMap != null && providerConfigMap.size() > 0) {
				Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null
						: BeanFactoryUtils.beansOfTypeIncludingAncestors(
								applicationContext, ProtocolConfig.class,
								false, false);
				if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
						&& providerConfigMap.size() > 1) { // 兼容舊版本
					List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
					for (ProviderConfig config : providerConfigMap.values()) {
						if (config.isDefault() != null
								&& config.isDefault().booleanValue()) {
							providerConfigs.add(config);
						}
					}
					if (providerConfigs.size() > 0) {
						setProviders(providerConfigs);
					}
				} else {
					ProviderConfig providerConfig = null;
					for (ProviderConfig config : providerConfigMap.values()) {
						if (config.isDefault() == null
								|| config.isDefault().booleanValue()) {
							if (providerConfig != null) {
								throw new IllegalStateException(
										"Duplicate provider configs: "
												+ providerConfig + " and "
												+ config);
							}
							providerConfig = config;
						}
					}
					if (providerConfig != null) {
						setProvider(providerConfig);
					}
				}
			}
		}
		// 設置應用配置信息ApplicationConfig
		if (getApplication() == null
				&& (getProvider() == null || getProvider().getApplication() == null)) {
			Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null
					: BeanFactoryUtils.beansOfTypeIncludingAncestors(
							applicationContext, ApplicationConfig.class, false,
							false);
			if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
				ApplicationConfig applicationConfig = null;
				for (ApplicationConfig config : applicationConfigMap.values()) {
					if (config.isDefault() == null
							|| config.isDefault().booleanValue()) {
						if (applicationConfig != null) {
							throw new IllegalStateException(
									"Duplicate application configs: "
											+ applicationConfig + " and "
											+ config);
						}
						applicationConfig = config;
					}
				}
				if (applicationConfig != null) {
					setApplication(applicationConfig);
				}
			}
		}
		// 設置模塊配置信息ModuleConfig
		if (getModule() == null
				&& (getProvider() == null || getProvider().getModule() == null)) {
			Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null
					: BeanFactoryUtils.beansOfTypeIncludingAncestors(
							applicationContext, ModuleConfig.class, false,
							false);
			if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
				ModuleConfig moduleConfig = null;
				for (ModuleConfig config : moduleConfigMap.values()) {
					if (config.isDefault() == null
							|| config.isDefault().booleanValue()) {
						if (moduleConfig != null) {
							throw new IllegalStateException(
									"Duplicate module configs: " + moduleConfig
											+ " and " + config);
						}
						moduleConfig = config;
					}
				}
				if (moduleConfig != null) {
					setModule(moduleConfig);
				}
			}
		}
		// 設置註冊服務器配置信息RegistryConfig
		if ((getRegistries() == null || getRegistries().size() == 0)
				&& (getProvider() == null
						|| getProvider().getRegistries() == null || getProvider()
						.getRegistries().size() == 0)
				&& (getApplication() == null
						|| getApplication().getRegistries() == null || getApplication()
						.getRegistries().size() == 0)) {
			Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null
					: BeanFactoryUtils.beansOfTypeIncludingAncestors(
							applicationContext, RegistryConfig.class, false,
							false);
			if (registryConfigMap != null && registryConfigMap.size() > 0) {
				List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
				for (RegistryConfig config : registryConfigMap.values()) {
					if (config.isDefault() == null
							|| config.isDefault().booleanValue()) {
						registryConfigs.add(config);
					}
				}
				if (registryConfigs != null && registryConfigs.size() > 0) {
					super.setRegistries(registryConfigs);
				}
			}
		}
		// 設置監控配置信息MonitorConfig
		if (getMonitor() == null
				&& (getProvider() == null || getProvider().getMonitor() == null)
				&& (getApplication() == null || getApplication().getMonitor() == null)) {
			Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null
					: BeanFactoryUtils.beansOfTypeIncludingAncestors(
							applicationContext, MonitorConfig.class, false,
							false);
			if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
				MonitorConfig monitorConfig = null;
				for (MonitorConfig config : monitorConfigMap.values()) {
					if (config.isDefault() == null
							|| config.isDefault().booleanValue()) {
						if (monitorConfig != null) {
							throw new IllegalStateException(
									"Duplicate monitor configs: "
											+ monitorConfig + " and " + config);
						}
						monitorConfig = config;
					}
				}
				if (monitorConfig != null) {
					setMonitor(monitorConfig);
				}
			}
		}
		// 設置服務協議配置信息
		if ((getProtocols() == null || getProtocols().size() == 0)
				&& (getProvider() == null
						|| getProvider().getProtocols() == null || getProvider()
						.getProtocols().size() == 0)) {
			Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null
					: BeanFactoryUtils.beansOfTypeIncludingAncestors(
							applicationContext, ProtocolConfig.class, false,
							false);
			if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
				List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
				for (ProtocolConfig config : protocolConfigMap.values()) {
					if (config.isDefault() == null
							|| config.isDefault().booleanValue()) {
						protocolConfigs.add(config);
					}
				}
				if (protocolConfigs != null && protocolConfigs.size() > 0) {
					super.setProtocols(protocolConfigs);
				}
			}
		}
		// 設置服務名稱path
		if (getPath() == null || getPath().length() == 0) {
			if (beanName != null && beanName.length() > 0
					&& getInterface() != null && getInterface().length() > 0
					&& beanName.startsWith(getInterface())) {
				setPath(beanName);
			}
		}
		// 是否延遲發佈
		if (!isDelay()) {
			export();// 發佈該service
		}
	}

在afterPropertiesSet方法中先進行了應用、註冊、監控等信息設置,最後在方法export方法中進行發佈。咱們繼續看export,export是在其父類ServiceConfig中進行實現。服務器

public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && ! export.booleanValue()) {//是否暴露
            return;
        }
        if (delay != null && delay > 0) {//是否延遲暴露
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();//若是延遲暴露,在多線程中等待特定時間而後暴露
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            doExport();//直接暴露
        }
    }

export方法是一個同步方法,在方法中咱們沒有看到具體的發佈過程,該方法主要處理延遲發佈。咱們接着繼續看doExport多線程

protected synchronized void doExport() {
		if (unexported) {
			throw new IllegalStateException("Already unexported!");
		}
		if (exported) {// 已經發布
			return;
		}
		exported = true;
		if (interfaceName == null || interfaceName.length() == 0) {
			throw new IllegalStateException(
					"<dubbo:service interface=\"\" /> interface not allow null!");
		}
		checkDefault();
		if (provider != null) {
			if (application == null) {
				application = provider.getApplication();
			}
			if (module == null) {
				module = provider.getModule();
			}
			if (registries == null) {
				registries = provider.getRegistries();
			}
			if (monitor == null) {
				monitor = provider.getMonitor();
			}
			if (protocols == null) {
				protocols = provider.getProtocols();
			}
		}
		if (module != null) {
			if (registries == null) {
				registries = module.getRegistries();
			}
			if (monitor == null) {
				monitor = module.getMonitor();
			}
		}
		if (application != null) {
			if (registries == null) {
				registries = application.getRegistries();
			}
			if (monitor == null) {
				monitor = application.getMonitor();
			}
		}
		if (ref instanceof GenericService) {// 判斷是否爲泛接口實現方式
			interfaceClass = GenericService.class;
			if (StringUtils.isEmpty(generic)) {
				generic = Boolean.TRUE.toString();
			}
		} else {
			try {
				// 加載service對已接口字節碼
				interfaceClass = Class.forName(interfaceName, true, Thread
						.currentThread().getContextClassLoader());
			} catch (ClassNotFoundException e) {
				throw new IllegalStateException(e.getMessage(), e);
			}
			// 合法性加成,interfaceClass不能爲空、interfaceClass必須爲接口類型、methdos必須爲接口中的方法
			checkInterfaceAndMethods(interfaceClass, methods);
			checkRef();// 檢查配置的引用不能爲空且引用必須實現接口(ref="demoService"該引用是必須在容器中存在,由spring負責加載)
			generic = Boolean.FALSE.toString();
		}
		if (local != null) {
			if ("true".equals(local)) {
				local = interfaceName + "Local";
			}
			Class<?> localClass;
			try {
				localClass = ClassHelper
						.forNameWithThreadContextClassLoader(local);
			} catch (ClassNotFoundException e) {
				throw new IllegalStateException(e.getMessage(), e);
			}
			if (!interfaceClass.isAssignableFrom(localClass)) {
				throw new IllegalStateException("The local implemention class "
						+ localClass.getName() + " not implement interface "
						+ interfaceName);
			}
		}
		if (stub != null) {// 本地存根
			if ("true".equals(stub)) {
				stub = interfaceName + "Stub";
			}
			Class<?> stubClass;
			try {
				stubClass = ClassHelper
						.forNameWithThreadContextClassLoader(stub);
			} catch (ClassNotFoundException e) {
				throw new IllegalStateException(e.getMessage(), e);
			}
			if (!interfaceClass.isAssignableFrom(stubClass)) {
				throw new IllegalStateException("The stub implemention class "
						+ stubClass.getName() + " not implement interface "
						+ interfaceName);
			}
		}
		checkApplication();// 檢查應用信息
		checkRegistry();// 檢查註冊中心
		checkProtocol();// 檢查協議
		appendProperties(this);
		checkStubAndMock(interfaceClass);// 檢查是否爲stub或者mock方式
		if (path == null || path.length() == 0) {
			path = interfaceName;
		}
		doExportUrls();// 在該方法中進行發佈
	}

咱們再doExport方法中只看到了一些檢查判斷,仍是沒有看到具體的暴露發佈,咱們接着繼續看doExportUrls中的代碼app

private void doExportUrls() {
		// 根據註冊中心構造url路徑
		// registry://10.118.242.90:2181/com.alibaba.dubbo.registry.RegistryService?application=jiafeng-provider
		//&client=curator&dubbo=2.0.0&group=china&pid=6220&registry=zookeeper&timestamp=1488351817625
		List<URL> registryURLs = loadRegistries(true);
		//由於能夠支持多個協議發佈服務
		for (ProtocolConfig protocolConfig : protocols) {
			//真正的服務暴露發佈過程
			doExportUrlsFor1Protocol(protocolConfig, registryURLs);
		}
	}

在該方法中咱們看到了整理註冊中心信息爲URL路徑方式,而後分佈對每種協議進行發佈(能夠配置多個註冊中心,多個協議類型)。咱們繼續往下看doExportUrlsFor1Protocol方法dom

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig,
			List<URL> registryURLs) {
		String name = protocolConfig.getName();// 服務協議,不配置時默認dubbo
		if (name == null || name.length() == 0) {
			name = "dubbo";
		}

		String host = protocolConfig.getHost();// 服務ip地址
		if (provider != null && (host == null || host.length() == 0)) {
			host = provider.getHost();
		}
		boolean anyhost = false;
		if (NetUtils.isInvalidLocalHost(host)) {// 是否合法IP地址
			anyhost = true;
			try {
				host = InetAddress.getLocalHost().getHostAddress();
			} catch (UnknownHostException e) {
				logger.warn(e.getMessage(), e);
			}
			if (NetUtils.isInvalidLocalHost(host)) {
				if (registryURLs != null && registryURLs.size() > 0) {
					for (URL registryURL : registryURLs) {
						try {
							Socket socket = new Socket();// 經過套接字進行數據通訊
							try {
								SocketAddress addr = new InetSocketAddress(
										registryURL.getHost(),
										registryURL.getPort());
								socket.connect(addr, 1000);// 設置超時時間1秒
								host = socket.getLocalAddress()
										.getHostAddress();
								break;
							} finally {
								try {
									socket.close();
								} catch (Throwable e) {
								}
							}
						} catch (Exception e) {
							logger.warn(e.getMessage(), e);
						}
					}
				}
				if (NetUtils.isInvalidLocalHost(host)) {
					host = NetUtils.getLocalHost();
				}
			}
		}
		// 服務端口設置
		Integer port = protocolConfig.getPort();
		if (provider != null && (port == null || port == 0)) {
			port = provider.getPort();
		}
		final int defaultPort = ExtensionLoader
				.getExtensionLoader(Protocol.class).getExtension(name)
				.getDefaultPort();
		if (port == null || port == 0) {
			port = defaultPort;
		}
		if (port == null || port <= 0) {
			port = getRandomPort(name);
			if (port == null || port < 0) {
				port = NetUtils.getAvailablePort(defaultPort);
				putRandomPort(name, port);
			}
			logger.warn("Use random available port(" + port + ") for protocol "
					+ name);
		}

		Map<String, String> map = new HashMap<String, String>();
		if (anyhost) {
			map.put(Constants.ANYHOST_KEY, "true");
		}
		map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
		map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
		map.put(Constants.TIMESTAMP_KEY,
				String.valueOf(System.currentTimeMillis()));
		if (ConfigUtils.getPid() > 0) {
			map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
		}
		appendParameters(map, application);
		appendParameters(map, module);
		appendParameters(map, provider, Constants.DEFAULT_KEY);
		appendParameters(map, protocolConfig);
		appendParameters(map, this);
		// 下面爲設置每一個方法的暴露屬性
		if (methods != null && methods.size() > 0) {
			for (MethodConfig method : methods) {
				appendParameters(map, method, method.getName());
				String retryKey = method.getName() + ".retry";
				if (map.containsKey(retryKey)) {
					String retryValue = map.remove(retryKey);
					if ("false".equals(retryValue)) {
						map.put(method.getName() + ".retries", "0");
					}
				}
				// 方法中參數設置
				List<ArgumentConfig> arguments = method.getArguments();
				if (arguments != null && arguments.size() > 0) {
					for (ArgumentConfig argument : arguments) {
						// 類型自動轉換.
						if (argument.getType() != null
								&& argument.getType().length() > 0) {
							Method[] methods = interfaceClass.getMethods();
							// 遍歷全部方法
							if (methods != null && methods.length > 0) {
								for (int i = 0; i < methods.length; i++) {
									String methodName = methods[i].getName();
									// 匹配方法名稱,獲取方法簽名.
									if (methodName.equals(method.getName())) {
										Class<?>[] argtypes = methods[i]
												.getParameterTypes();
										// 一個方法中單個callback
										if (argument.getIndex() != -1) {
											if (argtypes[argument.getIndex()]
													.getName().equals(
															argument.getType())) {
												appendParameters(
														map,
														argument,
														method.getName()
																+ "."
																+ argument
																		.getIndex());
											} else {
												throw new IllegalArgumentException(
														"argument config error : the index attribute and type attirbute not match :index :"
																+ argument
																		.getIndex()
																+ ", type:"
																+ argument
																		.getType());
											}
										} else {
											// 一個方法中多個callback
											for (int j = 0; j < argtypes.length; j++) {
												Class<?> argclazz = argtypes[j];
												if (argclazz.getName().equals(
														argument.getType())) {
													appendParameters(map,
															argument,
															method.getName()
																	+ "." + j);
													if (argument.getIndex() != -1
															&& argument
																	.getIndex() != j) {
														throw new IllegalArgumentException(
																"argument config error : the index attribute and type attirbute not match :index :"
																		+ argument
																				.getIndex()
																		+ ", type:"
																		+ argument
																				.getType());
													}
												}
											}
										}
									}
								}
							}
						} else if (argument.getIndex() != -1) {
							appendParameters(map, argument, method.getName()
									+ "." + argument.getIndex());
						} else {
							throw new IllegalArgumentException(
									"argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
						}

					}
				}
			} // end of methods for
		}

		if (ProtocolUtils.isGeneric(generic)) {
			map.put("generic", generic);
			map.put("methods", Constants.ANY_VALUE);
		} else {
			String revision = Version.getVersion(interfaceClass, version);
			if (revision != null && revision.length() > 0) {
				map.put("revision", revision);
			}

			String[] methods = Wrapper.getWrapper(interfaceClass)
					.getMethodNames();
			if (methods.length == 0) {
				logger.warn("NO method found in service interface "
						+ interfaceClass.getName());
				map.put("methods", Constants.ANY_VALUE);
			} else {
				map.put("methods", StringUtils.join(
						new HashSet<String>(Arrays.asList(methods)), ","));
			}
		}
		// 是否使用口令
		if (!ConfigUtils.isEmpty(token)) {
			if (ConfigUtils.isDefault(token)) {
				map.put("token", UUID.randomUUID().toString());
			} else {
				map.put("token", token);
			}
		}
		// 若是爲本地調用就不往註冊中心註冊
		if ("injvm".equals(protocolConfig.getName())) {
			protocolConfig.setRegister(false);
			map.put("notify", "false");
		}
		// 導出服務
		String contextPath = protocolConfig.getContextpath();
		if ((contextPath == null || contextPath.length() == 0)
				&& provider != null) {
			contextPath = provider.getContextpath();
		}
		// 拼裝url,協議名稱、ip地址、端口、上下文路徑、服務名稱、屬性
		URL url = new URL(name, host, port, (contextPath == null
				|| contextPath.length() == 0 ? "" : contextPath + "/")
				+ path, map);

		if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
				.hasExtension(url.getProtocol())) {
			url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
					.getExtension(url.getProtocol()).getConfigurator(url)
					.configure(url);
		}

		String scope = url.getParameter(Constants.SCOPE_KEY);
		// 配置爲none不暴露
		if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

			// 配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務)
			if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
				exportLocal(url);// 暴露本地服務
			}
			// 若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露本地服務)
			if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
				if (logger.isInfoEnabled()) {
					logger.info("Export dubbo service "
							+ interfaceClass.getName() + " to url " + url);
				}
				if (registryURLs != null && registryURLs.size() > 0
						&& url.getParameter("register", true)) {
					// 多個協議循環進行暴露發佈
					for (URL registryURL : registryURLs) {
						url = url.addParameterIfAbsent("dynamic",
								registryURL.getParameter("dynamic"));

						URL monitorUrl = loadMonitor(registryURL);// 加載監控服務路徑
						if (monitorUrl != null) {
							url = url.addParameterAndEncoded(
									Constants.MONITOR_KEY,
									monitorUrl.toFullString());
						}
						if (logger.isInfoEnabled()) {
							logger.info("Register dubbo service "
									+ interfaceClass.getName() + " url " + url
									+ " to registry " + registryURL);
						}

						// 代理工廠建立invoker,服務代理,經過該代理進行遠程調用
						Invoker<?> invoker = proxyFactory.getInvoker(ref,
								(Class) interfaceClass, registryURL
										.addParameterAndEncoded(
												Constants.EXPORT_KEY,
												url.toFullString()));
						// 暴露遠程服務
						Exporter<?> exporter = protocol.export(invoker);
						exporters.add(exporter);

					}
				} else {
					Invoker<?> invoker = proxyFactory.getInvoker(ref,
							(Class) interfaceClass, url);

					Exporter<?> exporter = protocol.export(invoker);
					exporters.add(exporter);
				}
			}
		}
		this.urls.add(url);
	}

在該方法中仍是大量代碼進行服務配置信息設置,以及service中每一個暴露方法的和方法參數的信息設置。其結果都是將各類信息最終拼加到URL中。在這當中咱們比較關心的代碼有兩部分:1.暴露本地服務,2.暴露遠程服務。jvm

本地服務暴露socket

private void exportLocal(URL url) {
		if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
			// 本地暴露設置協議爲injvm、ip爲127.0.0.一、端口爲0
			URL local = URL.valueOf(url.toFullString())
					.setProtocol(Constants.LOCAL_PROTOCOL)
					.setHost(NetUtils.LOCALHOST).setPort(0);
			Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref,
					(Class) interfaceClass, local));
			exporters.add(exporter);// 加入全局列表
			logger.info("Export dubbo service " + interfaceClass.getName()
					+ " to local registry");
		}
	}

遠程服務暴露,在上面doExportUrlsFor1Protocol方法中有這部分代碼,其實爲遠程暴露部分處理入口ide

// 代理工廠建立invoker,服務代理,經過該代理進行遠程調用
						Invoker<?> invoker = proxyFactory.getInvoker(ref,
								(Class) interfaceClass, registryURL
										.addParameterAndEncoded(
												Constants.EXPORT_KEY,
												url.toFullString()));
						// 暴露遠程服務
						Exporter<?> exporter = protocol.export(invoker);
						exporters.add(exporter);

      至此咱們都尚未看到dubbo發佈服務的核心代碼,前面咱們看到的基本都是對各類屬性的校驗以及整理,最終會把包括配置在內的全部相關屬性都封裝到URL中,因此URL在dubbo中仍是比較關鍵的一個元素。(通篇都在貼代碼看起來的話應該挺累的,可是話說回來還有什麼能比代碼更能說明問題的呢)。this

相關文章
相關標籤/搜索