思考的過程和設計思想以下:html
一、咱們想要進行遠程服務的調用,那麼確定要創建網絡鏈接,不妨改用TCP長鏈接,並設計通訊協議,並封裝爲一個類,不妨叫作ExchangeClient。用它來進行網絡通訊。算法
二、有了能夠進行遠程通訊的服務對象ExchangeClient後,咱們能夠把遠程服務封裝爲一個Invoker對象,這個Invoker對象內部採用自已定義的協議與遠程服務器通訊,不妨叫作DubboInvoker,由於採用了dubbo協議來進行網絡通訊的。spring
三、有了這個DubboInvoker 我就能夠根據dubbo協議與遠程服務通訊了,可是我還想在本地增長一些過濾器Filter,或者監聽器Listener。不要緊,直接經過責任鏈模式,把這些Filter與這個DubboInvoker進行連接。返回的一個ProtocolFilterWrapper對象。apache
四、同理,若是須要一些監聽器的功能怎麼辦,一樣進行一次封裝。把ProtocolFilterWraper封裝到Listener類型的Invoker對象,不妨叫作ListenerInvokerWrapper。服務器
五、如今考慮遠程服務提供者有不少個,那麼我對每一個遠程服務都須要有一個ListenerInvokerWrapper的對象。以下:
Demoservice::196.254.324.1 ListenerInvokerWrapper1
Demoservice::196.254.324.2 ListenerInvokerWrapper2
Demoservice::196.254.324.3 ListenerInvokerWrapper3
Demoservice::196.254.324.4 ListenerInvokerWrapper4
Demoservice::196.254.324.5 ListenerInvokerWrapper5
.....網絡
六、服務太多了,在本地這樣建立太費事了。引入了註冊中心,直接把服務註冊到服務中心上,而後客戶端直接從註冊中心拉取。咱們把拉取到的服務,統稱爲服務目錄。而且它是從註冊中心拉取到的,那麼不妨名字就叫作RegistryDirectory。那麼這個服務目錄裏確定包含了上面的遠程服務調用對象ListenerInvokerWrapper。咱們把這些對象放到服務目錄的成員上,名字就叫作urlInvokerMap。key: Demoservice::xxxx。value:ListenerInvokerWrapper。app
七、如今咱們能夠在本地調用RegistryDirectory對象,與遠程服務通訊了,想調哪一個服務就從urlInvokerMap取出一個進行調用便可。可是每次指定一個遠程服務器,不只太麻煩了,並且也會形成流量不均勻,負載不平衡。那麼咱們就經過經過負載均衡策略來選擇一個服務調用。就取名LoadBalance吧。他有個方法select。入參就是咱們的服務目錄RegistryDirectory。那麼經過LoadBalance.select(RegistryDirectory) 獲得一個咱們想要的通訊的遠程服務便可。目前負載均衡算法有一致性Hash算法,隨機算法、權重輪訓算法、最短響應時間算法、最少活躍數算法。負載均衡
八、有了負載均衡算法LoadBalance後,我想要這樣的功能,當服務調用失敗的時候,我能夠重試,或者直接直接失敗。那我就把有這種能力服務調用,稱爲一個集羣Cluster。他有一個方法叫作join。入參仍是服務目錄RegistryDirectory。返回一個具備快速失敗、或者重試的服務調用,不妨叫AbstractClusterInvoker。每一個不一樣的策略都去實現它。而且這個對象內部經過LoadBalance來選擇一個服務進行調用,失敗後的策略(是否重試或失敗)由我決定。異步
九、目前咱們已經有了一個XXXclusterInvoker 對象,它具備快速失敗或者重試等功能,且具備負載均衡算法的遠程服務調用對象。可是有時,這些遠程服務提供者這的qps不達標,或者新上線的服務有問題,或者遠程服務調用失敗後,能夠在本地模擬的調用,返回一個mock對象。那麼咱們從新對XXXclusterInvoker進行封裝,就命名爲MockClusterInvoker,具備Mock功能,且具備集羣能力。它持有咱們的服務目錄RegistryDirectory和XXXclusterInvoker對象。jvm
十、目前咱們已經有了一個MockClusterInvoker對象。可是這個invoker對象和咱們像本地同樣調用服務仍是有點差異,最後咱們直接經過Java的動態代理計算Proxy.newInstance()來建立一個具體的服務對象DemoService,而且在InvokeHandler內部調用咱們的MockClusterInvoker對象的invoke 方法。
十一、好比咱們的DubboInvoker是經過Java 異步線程CompletableFuture實現的話,若是須要轉爲同步,還能夠對其封裝從異步轉爲同步的Invoker,不妨命名爲AsyncToSyncInvoker。
則最終在服務消費端呈現給咱們以下一個遠程服務代理對象。
在上一章節,已經說明了getObject()對象的調用時機,內部調用的ReferenceConfig#init方法,該init()方法主要作了以下幾件事情:
一、缺省的配置進行填充,好比registry,application等屬性。
二、校驗配置是否填寫正確,好比<dubbo:reference />中的stub 和mock 是否配置,配置了是否正確。
三、經過SPI機制獲取Protocol$Adaptive自適應協議,經過Protocol$Adaptive#refer()方法獲得一個MockClusterInvoker對象。該方法的調用內容基本和上面的猜測設計一致。
1)和註冊中心創建tcp鏈接。
2)把當前的訂閱服務註冊到註冊中心上的consumer節點上。
3)從註冊中心中把訂閱的服務列表拉取到本地,即RegistryDirectory。
4)根據上面相似猜測建立MockClusterInvoker返回。
四、經過SPI機制獲取ProxyFactory$Adaptive自適應代理工廠,而後經過這個代理工廠建立動態代理對象,並把這個代理對象賦值給ref屬性。
服務的訂閱核心就是這條語句,這條語句博大精深。僅僅一條語句把全部的訂閱工做完成了。
一、首先根據SPI機制獲取自適應的協議對象。語句以下:
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
該語句建立了Protocol$Apdative。它有個自適應refer方法以下:
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (type == null) throw new IllegalArgumentException("url == null"); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])"); Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); return extension.refer(type, url); }
二、Protocol$Apdative#refer()方法內部又經過參數的url的協議頭和SPI機制獲取一個具體的協議。顯而易見,url.getProtocol()返回的是registry。由於當前是服務訂閱。因此是registry打頭。那麼返回的Protocol具體類型就是RegistryProtocol。可是Protocol擴展點有包裹類型:ProtocolListenerWrapper、ProtocolFilterWrapper。因此最終返回的是ProtocolListenerWrapper類型的協議。查看這個2個包裹類型的refer()方法:
類ProtocolListenerWrapper
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, INVOKER_LISTENER_KEY))); }
類ProtocolFilterWrapper
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); }
三、因此Protocol$Apdative#refer()內部的getExtension返回的是ProtocolListenerWrapper的Protocol。又由於url是註冊url,因此知足UrlUtils.isRegistry(url)==true.直接進行一次傳遞調用。
四、最終調到RegistryProtocol#refer()。代碼以下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
即獲得註冊中心Registry,通常是ZookeeperRegistry。獲取註冊中心的內容在以前的章節已見過,就不在多說了。接着會調用doRefer()方法。
在看doRefer()方法以前,咱們來看下其定義:
Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url);
出參:
返回值就是咱們須要的Invoker對象。
入參:
cluster:集羣對象Cluster$Adaptive,經過Spi獲取.內部getExtension獲取Cluster,默認爲FailoverCluster。
registry:註冊中心
type:訂閱的接口類型
url:服務註冊連接註冊中心URL。
五、Cluster的join 接口以下:
Cluster$Adaptive#join()內部實際是默認調用的是FailoverCluster#join()。
而且Cluster擴展點也有其Wrapper類,即MockClusterWrapper。因此Cluster$Adaptive#join()的方法調用
Cluster extension = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(extName);
返回的extension是MockClusterWrapper,MockClusterWrapper#join()代碼以下:
return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
因此Cluster$Adaptive#join()返回的Invoker類型是MockClusterInvoker。MockClusterWrapper持有的cluster是FailoverCluster,因此MockClusterInvoker內部持有invoker類型是FailoverClusterInvoker。
六、源碼doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // new 一個服務目錄,訂閱服務類型爲type 的 RegistryDirectory RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 設置註冊中心 directory.setRegistry(registry); //設置協議,即Protocol$Adaptive directory.setProtocol(protocol); // all attributes of REFER_KEY //獲取訂閱參數 Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); //構建訂閱URL ,以consumer//打頭 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); //把該url註冊到註冊中心上 if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } //設置路由鏈 directory.buildRouterChain(subscribeUrl); //重點,重中之重。這裏訂閱服務,而且會拉取遠程服務invoker 到directory對象的urlInvokerMap成員中。 directory.subscribe(toSubscribeUrl(subscribeUrl)); //由上面分析,獲得是MockClusterInvoker Invoker<T> invoker = cluster.join(directory); //查找註冊協議監聽器,沒有設置爲空 List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } // 若是有其監聽器進行監聽器onRefer()調用,並返回RegistryInvokerWrapper包裹類型。 RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; }
/** * 核心,經過配置的元信息,建立一個代理對象 * @param map * @return */ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { // 首先判斷本地是否有Service提供者, if (shouldJvmRefer(map)) { //若是有,導出jvm導出refer URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { urls.clear(); //指定服務提供者URL。點對點好比在<dubbo:reference url="dubbo://xxxxx:12222"> if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName); } if (UrlUtils.isRegistry(url)) { urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // if protocols not injvm checkRegistry //若是不是jvm 協議,通常是dubbo if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { //檢測註冊中心 checkRegistry(); //根據註冊中心地址,獲得註冊服務 //registry://106.52.187.48:2181/org.apache.dubbo.registry.RegistryService // ?application=dubbo-demo-annotation-consumer&dubbo=2.0.2&pid=9757®istry=zookeeper×tamp=1597380362736 List<URL> us = ConfigValidationUtils.loadRegistries(this, false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { //對每一個註冊中心URL,獲得監控URL URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u); if (monitorUrl != null) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } } //若是註冊中心之一一個的話,通常就一個註冊中心 if (urls.size() == 1) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { //多個註冊中心時,Protocol$Adaptive List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { //把其獲得的Invoker 填入invokers invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (UrlUtils.isRegistry(url)) { registryURL = url; // use last registry url } } //多註冊中心,多訂閱場景 if (registryURL != null) { // registry url is available // for multi-subscription scenario, use 'zone-aware' policy by default URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME); // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker //經過集羣,返回一個invoker invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } if (shouldCheck() && !invoker.isAvailable()) { invoker.destroy(); throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } /** * @since 2.7.0 * ServiceData Store */ /** * * 這裏是發佈元數據信息 */ String metadata = map.get(METADATA_KEY); WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata); if (metadataService != null) { URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); metadataService.publishServiceDefinition(consumerURL); } // create service proxy //經過動態代理把invoker 轉化爲具體的服務類型 return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); }
上面核心的代碼invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0))已分析,接下下來就是經過PROXY_FACTORY.getProxy()建立活動,以後服務調用上進行分析。其餘元數據的註冊,等以後講解配置中心時進行講解。
接下來,以一個圖解來描述服務訂閱的過程。在下一章節來描述如何具體的拉取遠程服務invoker到服務目錄RegistryDirectory上的urlInvokerMap。