在Dubbo jar包目錄下咱們能夠找到html
其中配置了處理接口DubboNamespaceHandler,能夠看到,ServiceBean是用來處理service的。java
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware
它實現了ApplicationListener接口,因此它能夠監聽容器事件,當容器發出刷新完畢事件後,ServiceBean捕獲事件,執行發佈服務的動做。apache
public void export() { // 暴露服務交給了父類:org.apache.dubbo.config.ServiceConfig#export super.export(); // 暴露完成後發佈對應的事件。由於實現了ApplicationEventPublisherAware接口,因此能獲取到ApplicationEventPublisher對象來完成事件的發佈。 this.publishExportEvent(); }
進入ServiceConfigbootstrap
public synchronized void export() { // 檢查更新子配置 checkAndUpdateSubConfigs(); // 不須要暴露,則返回 if (!shouldExport()) { return; } // 延遲暴露服務 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // 執行暴露 doExport(); } }
進入doExport緩存
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } // 走這裏 doExportUrls(); }
進入doExportUrls服務器
private void doExportUrls() { // 獲取註冊中心地址,URL內容:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=service-provider&client=curator&dubbo=2.0.2&pid=285268&qos.enable=false®istry=zookeeper&release=2.7.3&timeout=5000×tamp=1583994352918 List<URL> registryURLs = loadRegistries(true); // 在每一個協議下導出服務 for (ProtocolConfig protocolConfig : protocols) { // pathKey好比:com.demo.common.HelloService:1.0 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); // ProviderModel存儲服務提供者信息 ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); // 註冊服務提供者信息,存儲到內部的一個ConcurrentHashMap。 // ApplicationModel表示使用Dubbo的應用程序,並存儲基本元數據信息,以便在處理RPC調用時使用。 // ApplicationModel包括許多關於發佈服務的提供者模型和許多關於訂閱服務的消費者模型。 ApplicationModel.initProviderModel(pathKey, providerModel); // 在當前協議下發布服務 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
這裏拉一下大致步驟:併發
0. Dubbo捕獲Spring容器刷新事件(ServiceBean),執行暴露服務邏輯(ServiceConfig)。app
1. 檢查用戶的配置是否合理,或者爲用戶補充缺省配置。dom
2. 發佈服務(org.apache.dubbo.config.ServiceConfig#doExportUrls)異步
- 獲取註冊中心地址
- 遍歷協議,在每一個協議下緩存服務提供者信息並執行發佈邏輯
進入doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 獲取協議名稱,好比:dubbo String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } Map<String, String> map = new HashMap<String, String>(); // 添加 side、版本、時間戳以及進程號等信息到 map 中 map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); // 經過反射將對象的字段信息添加到 map 中 appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider); appendParameters(map, protocolConfig); appendParameters(map, this); // 處理Method配置 // 方法級配置。對應的配置類: org.apache.dubbo.config.MethodConfig。同時該標籤爲 <dubbo:service> 或 <dubbo:reference> 的子標籤,用於控制到方法級。 if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } // 若是是個泛型服務 if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { // 不是泛型服務 // 獲取服務提供者的版本,並添加到map中 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // 獲取接口的方法集合,並添加到map中 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 添加 token 到 map 中 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // 獲取host,這裏的是服務主機地址,好比 10.204.241.66 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 獲取post,好比 20880 Integer port = this.findConfigedPorts(protocolConfig, name, map); // 組裝成URL:dubbo://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=285268&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1583996355545&version=1.0 URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // 上面的代碼: // 首先是將一些信息,好比版本、時間戳、方法名以及各類配置對象的字段信息放入到 map 中,map 中的內容將做爲 URL 的查詢字符串。 // 構建好 map 後,緊接着是獲取上下文路徑、主機名以及端口號等信息。最後將 map 和主機名等數據傳給 URL 構造方法建立 URL 對象。 // ------------------------------- // ExtensionLoader是Dubbo SPI的核心體現,這裏是獲取ConfiguratorFactory的具體實現類 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { // 利用Configurator實例配置 url // dubbo://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=285268&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1583996355545&version=1.0 url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // 若是 scope = none,則什麼都不作 if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // scope != remote,導出到本地 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // scope != local,導出到遠程 if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (!isOnlyInJvm() && logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (CollectionUtils.isNotEmpty(registryURLs)) { // 存在註冊中心 for (URL registryURL : registryURLs) { // 若是協議是injvm,不註冊 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 加載監視器連接 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // 將監視器連接做爲參數添加到 url 中 url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // 利用代理工廠根據服務提供類生成Invoker對象 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 導出服務,並生成 Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // 不存在註冊中心,僅導出服務 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }
這麼一大段,主要作了2件事
1. 建立URL
- 首先將版本、時間戳、方法名以及各類配置對象的字段信息放入到 map 中,map 中的內容將做爲 URL 的查詢字符串。
- 構建好 map 後,緊接着是獲取主機名以及端口號等信息。
- 最後將 map 和主機名等數據傳給 URL 構造方法建立 URL 對象。
2. 暴露服務
- 根據 url 中的 scope 參數決定服務導出方式
- 把服務對象轉換成Invoker對象
- 把Invoker對象轉換成Exporter對象
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 獲取Wrapper對象 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // new一個AbstractProxyInvoker對象並覆蓋抽象方法 doInvoke,將調用請求交給wrapper.invokeMethod。 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } getWrapper public static Wrapper getWrapper(Class<?> c) { while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class. { c = c.getSuperclass(); } if (c == Object.class) { return OBJECT_WRAPPER; } // 從緩存中獲取 Wrapper ret = WRAPPER_MAP.get(c); if (ret == null) { // 緩存沒有則建立 ret = makeWrapper(c); // 放進緩存 WRAPPER_MAP.put(c, ret); } // 返回 return ret; }
看一下Wrapper對象的建立過程
// 這麼多代碼,看起來複雜,實際上很簡單,一句話:利用字節碼類庫javassist 來動態建立一個Wrapper對象。 // 仔細看一下,這裏大部分代碼都是手動拼接字符串,而後利用javassist來生成。這麼麻煩,爲何不實現實現Wrapper類呢?由於Wrapper類的結構要根據實際傳入的Class來決定。 // 這裏的代碼不作解釋,看一篇入門文章便可瞭解:https://www.cnblogs.com/rickiyang/p/11336268.html private static Wrapper makeWrapper(Class<?> c) { if (c.isPrimitive()) { throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c); } String name = c.getName(); ClassLoader cl = ClassUtils.getClassLoader(c); StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ "); StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ "); StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ "); c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types> Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance> List<String> mns = new ArrayList<>(); // method names. List<String> dmns = new ArrayList<>(); // declaring method names. // get all public field. for (Field f : c.getFields()) { String fn = f.getName(); Class<?> ft = f.getType(); if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) { continue; } c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }"); c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }"); pts.put(fn, ft); } Method[] methods = c.getMethods(); // get all public method. boolean hasMethod = hasMethods(methods); if (hasMethod) { c3.append(" try{"); for (Method m : methods) { //ignore Object's method. if (m.getDeclaringClass() == Object.class) { continue; } String mn = m.getName(); c3.append(" if( \"").append(mn).append("\".equals( $2 ) "); int len = m.getParameterTypes().length; c3.append(" && ").append(" $3.length == ").append(len); boolean override = false; for (Method m2 : methods) { if (m != m2 && m.getName().equals(m2.getName())) { override = true; break; } } if (override) { if (len > 0) { for (int l = 0; l < len; l++) { c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"") .append(m.getParameterTypes()[l].getName()).append("\")"); } } } c3.append(" ) { "); if (m.getReturnType() == Void.TYPE) { c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;"); } else { c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");"); } c3.append(" }"); mns.add(mn); if (m.getDeclaringClass() == c) { dmns.add(mn); } ms.put(ReflectUtils.getDesc(m), m); } c3.append(" } catch(Throwable e) { "); c3.append(" throw new java.lang.reflect.InvocationTargetException(e); "); c3.append(" }"); } c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }"); // deal with get/set method. Matcher matcher; for (Map.Entry<String, Method> entry : ms.entrySet()) { String md = entry.getKey(); Method method = entry.getValue(); if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { String pn = propertyName(matcher.group(1)); c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) { String pn = propertyName(matcher.group(1)); c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { Class<?> pt = method.getParameterTypes()[0]; String pn = propertyName(matcher.group(1)); c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }"); pts.put(pn, pt); } } c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }"); c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }"); // make class long id = WRAPPER_CLASS_COUNTER.getAndIncrement(); ClassGenerator cc = ClassGenerator.newInstance(cl); cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id); cc.setSuperClass(Wrapper.class); cc.addDefaultConstructor(); cc.addField("public static String[] pns;"); // property name array. cc.addField("public static " + Map.class.getName() + " pts;"); // property type map. cc.addField("public static String[] mns;"); // all method name array. cc.addField("public static String[] dmns;"); // declared method name array. for (int i = 0, len = ms.size(); i < len; i++) { cc.addField("public static Class[] mts" + i + ";"); } cc.addMethod("public String[] getPropertyNames(){ return pns; }"); cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }"); cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }"); cc.addMethod("public String[] getMethodNames(){ return mns; }"); cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }"); cc.addMethod(c1.toString()); cc.addMethod(c2.toString()); cc.addMethod(c3.toString()); try { Class<?> wc = cc.toClass(); // setup static field. wc.getField("pts").set(null, pts); wc.getField("pns").set(null, pts.keySet().toArray(new String[0])); wc.getField("mns").set(null, mns.toArray(new String[0])); wc.getField("dmns").set(null, dmns.toArray(new String[0])); int ix = 0; for (Method m : ms.values()) { wc.getField("mts" + ix++).set(null, m.getParameterTypes()); } return (Wrapper) wc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Throwable e) { throw new RuntimeException(e.getMessage(), e); } finally { cc.release(); ms.clear(); mns.clear(); dmns.clear(); } }
小結:
1. 利用字節碼類庫Javassist手動建立Wrapper類。
2. 建立AbstractProxyInvoker對象,覆蓋doInvoke方法並直接將調用請求交給Wrapper對象的invokeMethod方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 獲取註冊中心 URL // 好比:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=service-provider&client=curator&dubbo=2.0.2&export=dubbo%3A%2F%2F10.204.241.66%3A20880%2Fcom.demo.common.HelloService%3Factives%3D5%26anyhost%3Dtrue%26application%3Dservice-provider%26bean.name%3DServiceBean%3Acom.demo.common.HelloService%3A1.0%26bind.ip%3D10.204.241.66%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.demo.common.HelloService%26methods%3Dhello%26pid%3D285268%26qos.enable%3Dfalse%26register%3Dtrue%26release%3D2.7.3%26retries%3D3%26revision%3D1.0%26side%3Dprovider%26timeout%3D3000%26timestamp%3D1583996355545%26version%3D1.0&pid=285268&qos.enable=false&release=2.7.3&timeout=5000×tamp=1583994352918 URL registryUrl = getRegistryUrl(originInvoker); // 獲取用來本地發佈的URL // 好比:dubbo://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=285268&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1583996355545&version=1.0 URL providerUrl = getProviderUrl(originInvoker); // 獲取訂閱 URL // 好比:provider://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=285268&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1583996355545&version=1.0 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // 建立監聽器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // dubbo://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=285268&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1583996355545&version=1.0 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // *** 執行本地服務的暴露,並返回Exporter final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // 獲取Registry final Registry registry = getRegistry(originInvoker); // 獲取註冊表中註冊的URL並過濾一次url參數 final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 判斷是否須要推遲發佈 boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // *** 獲取註冊中心Registry,並向註冊中心註冊registeredProviderUrl register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 給exporter設置提供者URL和訂閱URL exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
這個作了什麼事呢?
1. 利用Invoker獲取註冊中心URL、服務提供者URL。
2. 獲取override訂閱URL,建立監聽器,監聽URL的變化。這裏爲何服務提供者也須要監聽URL的變化,在集羣環境下,某個節點改變了zk數據,其它節點可以迅速獲得更新。
3. 執行本地服務的暴露並返回Exporter。
4. 獲取註冊中心Registry,將服務地址註冊到Registry。
這部分又能提煉出兩大核心
1. 本地服務的暴露過程
2. 註冊中心註冊服務的過程
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { // 根據Invoker獲取緩存的key // dubbo://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&deprecated=false&dubbo=2.0.2&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=285268&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1583996355545&version=1.0 String key = getCacheKey(originInvoker); // 這裏的代碼用了Java8的新特性:computeIfAbsent,代替了官網文檔中這部分的雙重檢查鎖代碼 return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { // 建立 Invoker 爲委託類對象 Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); // 調用 protocol 的 export 方法導出服務 return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
進入org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
// 進入org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 獲取服務地址 // dubbo://10.204.241.66:20880/com.demo.common.HelloService?actives=5&anyhost=true&application=service-provider&bean.name=ServiceBean:com.demo.common.HelloService:1.0&bind.ip=10.204.241.66&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.demo.common.HelloService&methods=hello&pid=35584&qos.enable=false®ister=true&release=2.7.3&retries=3&revision=1.0&side=provider&timeout=3000×tamp=1584011280884&version=1.0 URL url = invoker.getUrl(); // 獲取key:com.demo.common.HelloService:1.0:20880 String key = serviceKey(url); // 建立Exporter對象 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 緩存 exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 啓動服務器,監聽消費者客戶端的請求 openServer(url); // 優化序列化 optimizeSerialization(url); return exporter; }
這裏真相漸漸浮出水面
1. Exporter持有什麼呢?不過是具體服務的Invoker對象,標識服務的key,以及全部Exporter的Map集合。
2. 暴露服務呢?不過是根據服務URL解析出用於啓動Netty服務器的地址,啓動服務器監聽來自客戶端的請求。因此Dubbo服務暴露和消費不過是Netty服務器和客戶端之間的通訊罷了,而註冊中心的做用就是告訴客戶端:服務器的地址是什麼。服務器接收到客戶端的請求,解析出目標服務,調用以後返回給客戶端。
看一下啓動服務器的過程
// dubbo默認採用dubbo協議,dubbo協議採用單一長鏈接和NIO異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況 private void openServer(URL url) { // key:10.204.241.66:20880 // 這裏能夠看出,一臺機器只會啓動一個Netty服務(ip+端口惟一),下面加鎖的過程保證了這一點。 String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // 查看緩存 ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // 緩存沒有服務器實例,建立並放入緩存 serverMap.put(key, createServer(url)); } } } else { // 服務器已建立,則根據 url 中的配置重置服務器 server.reset(url); } } }
這裏邏輯:
根據服務URL解析出ip:端口做爲key,先查看緩存有沒有對應的ExchangeServer,沒有則建立,有則重置服務器。
建立服務器
private ExchangeServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // 添加心跳檢測配置到 url 中 .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) // 添加編碼解碼器參數 .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); // 獲取server參數,好比:netty String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); // 經過 SPI 檢測是否存在 server 參數所表明的 Transporter 拓展,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { // 建立 ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 獲取client參數,好比:可能爲null,或者netty或者mina str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
這部分作了啓動以前的工做:
1. 給URL配置服務器參數
2. 調用Exchangers.bind(url, requestHandler)方法來建立ExchangeServer
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger,默認爲 HeaderExchanger。這裏利用了Dubbo SPI 獲取具體的實現類。 // 緊接着調用 HeaderExchanger 的 bind 方法建立 ExchangeServer 實例 return getExchanger(url).bind(url, handler); } // org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
1. 利用Dubbo SPI來獲取Exchanger的具體實現類,並調用其中的bind方法
2. 默認new一個HeaderExchangeServer,而真正讓服務器啓動的是Transporters.bind方法
// org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...) public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { // 若是 handlers 元素數量大於1,則建立 ChannelHandler 分發器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取自適應 Transporter 實例,這裏利用了Dubbo SPI 獲取具體的實現類。 // 並調用實例方法bind。默認NettyTransporter,有兩個版本,一個在netty包下,一個在netty4包下 return getTransporter().bind(url, handler); } //org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
這部分和Exchanger的邏輯相似
1. 利用Dubbo SPI來獲取Transporter的具體實現類,並調用其中的bind方法
2. 默認new一個NettyServer
// org.apache.dubbo.remoting.transport.netty4.NettyServer public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } // org.apache.dubbo.remoting.transport.AbstractServer public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); try { // 看這裏就能夠了 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
這部分不過是例行公事,設置屬性,關鍵一步在doOpen,這裏體現了模板模式
擴展
模板模式(Template Pattern)中,一個抽象類公開定義了執行它的方法的方式/模板。它的子類能夠按須要重寫方法實現,但調用將以抽象類中定義的方式進行。
下面看啓動服務器的過程
// org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
對,我沒有作註釋,有Netty入門的一看就明白:好比ServerBootstrap、NioEventLoopGroup(Boss 和 Worker)
小結服務暴露過程:
類的調用層次:DubboProtocol#export-->openServer-->createServer-->Exchangers#bind-->HeaderExchanger#bind-->Transporters#bind-->NettyTransporter#bind-->NettyServer#doOpen
1. 調用具體的協議實現類好比DubboProtocol的導出方法
2. 建立Exporter對象並緩存
3. 啓動服務器
- 根據服務URL解析出ip:端口做爲key,先查看緩存有沒有對應的ExchangeServer,沒有則建立,有則重置服務器。
4. 建立服務器
- 給URL配置服務器參數
- 利用Dubbo SPI調用Exchanger具體實現類(好比:HeaderExchanger)的bind方法來建立ExchangeServer對象
- 利用Dubbo SPI調用Transporter具體實現類(好比:NettyTransporter)的bind方法來建立Server對象
- new一個NettyServer對象,其構造方法除了初始化一些屬性外,還調用了模板方法doOpen,doOpen裏的邏輯即啓動一個Netty服務器的底層方法
// org.apache.dubbo.registry.integration.RegistryProtocol#register public void register(URL registryUrl, URL registeredProviderUrl) { // 獲取註冊中心 Registry registry = registryFactory.getRegistry(registryUrl); // 註冊服務地址 registry.register(registeredProviderUrl); }
獲取註冊中心
// org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry public Registry getRegistry(URL url) { url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = url.toServiceStringWithoutResolving(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { // 查看緩存 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 經過Dubbo SPI或者IOC類建立 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } // 放進緩存 REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } }
這部分邏輯在export過程是否是也見過?先看緩存有沒有,有則返回無則建立。
看下建立
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; // 建立客戶端 zkClient = zookeeperTransporter.connect(url); // 監聽狀態變化 zkClient.addStateListener(state -> { if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } }); }
這部分也就是建立zookeeper客戶端的過程。而這一步的zkClient是Dubbo的包裝類,你繼續走下去就能看見原生代碼了,這不不作贅述。若是你瞭解ZkClient或者Curator,就能看懂。
下面看下服務註冊
// org.apache.dubbo.registry.support.FailbackRegistry#register public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // 模板方法,具體子類實現 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } } // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister public void doRegister(URL url) { try { // 建立數據節點 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
這裏的邏輯也很容易懂
// org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient#create(java.lang.String, boolean) public void create(String path, boolean ephemeral) { if (!ephemeral) { if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } if (ephemeral) { // 建立臨時節點 createEphemeral(path); } else { // 建立永久節點 createPersistent(path); } } // org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient#createPersistent(java.lang.String) public void createPersistent(String path) { try { // 這裏的client就是原生代碼了 org.apache.curator.framework.CuratorFramework client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
小結
1. 獲取註冊中心
- 上鎖,先查看緩存中是否存在。沒有則建立註冊中心並放入緩存。
2. 把服務地址寫入註冊中心
- 利用上一步獲取的註冊中心,調用內部create方法完成數據節點的建立。
0. Dubbo捕獲Spring容器刷新事件(ServiceBean),執行暴露服務邏輯(ServiceConfig)。
1. 檢查用戶的配置是否合理,或者爲用戶補充缺省配置。
2. 發佈服務(org.apache.dubbo.config.ServiceConfig#doExportUrls)
- 獲取註冊中心地址
- 遍歷協議,在每一個協議下緩存服務提供者信息並執行發佈邏輯
①. 建立URL
- 首先將版本、時間戳、方法名以及各類配置對象的字段信息放入到 map 中,map 中的內容將做爲 URL 的查詢字符串。
- 構建好 map 後,緊接着是獲取主機名以及端口號等信息。
- 最後將 map 和主機名等數據傳給 URL 構造方法建立 URL 對象。②. 暴露服務
- 根據 url 中的 scope 參數決定服務導出方式(本地仍是遠程)
* 把服務對象轉換成Invoker對象(org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker)
// Invoker對象存儲了元數據信息,能夠利用它獲取服務地址,註冊中心地址等
- 利用字節碼類庫Javassist手動建立Wrapper類
- 建立AbstractProxyInvoker對象,覆蓋doInvoke方法並直接將調用請求交給Wrapper對象的invokeMethod方法* 把Invoker對象轉換成Exporter對象(org.apache.dubbo.registry.integration.RegistryProtocol#export)
// 作了兩件大事:1. 向註冊中心註冊服務地址 2. 根據協議選擇具體的實現(好比DubboProtocol),說是export服務,不如說是啓動了服務器,監聽消費者的請求
// 所謂的Dubbo協議,就是通信邏輯的實現。Dubbo 協議的 Invoker 轉爲 Exporter 發生在 DubboProtocol 類的 export 方法,它主要是打開 socket 偵聽服務,並接收客戶端發來的各類請求,通信細節由 Dubbo 本身實現。
// 核心代碼2部分:1. 註冊服務,操做Zookeeper,寫入節點&監聽節點變化 2. 啓動Netty服務器,監聽客戶端鏈接
(一)啓動服務器
1. 調用具體的協議實現類好比DubboProtocol的導出方法
2. 建立Exporter對象並緩存
3. 啓動服務器:根據服務URL解析出ip:端口做爲key,先查看緩存有沒有對應的ExchangeServer,沒有則建立,有則重置服務器。
4. 建立服務器
- 給URL配置服務器參數
- 利用Dubbo SPI調用Exchanger具體實現類(好比:HeaderExchanger)的bind方法來建立ExchangeServer對象
- 利用Dubbo SPI調用Transporter具體實現類(好比:NettyTransporter)的bind方法來建立Server對象
- new一個NettyServer對象,其構造方法除了初始化一些屬性外,還調用了模板方法doOpen,doOpen裏的邏輯即啓動一個Netty服務器的底層方法
(二)註冊服務
* 獲取註冊中心:上鎖,先查看緩存中是否存在。沒有則建立註冊中心並放入緩存。
* 把服務地址寫入註冊中心:利用上一步獲取的註冊中心,調用內部create方法完成數據節點的建立。
另附官網圖片
==================================2020-05-22更:服務暴露流程圖