Nacos源碼分析-Distro協議詳解

舒適提示:
本文內容基於我的學習Nacos 2.0.1版本代碼總結而來,因我的理解差別,不保證徹底正確。若有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。html

《Distro協議概覽》這篇文章內簡要的從全局角度來分析了Distro協議的總體面貌。若還未閱讀過的同窗能夠去看一下,大腦中對相關的組件有個映像。在DistroProtocol章節介紹了Distro協議是從這裏開始的,本文將圍繞這個入口展開分析。java

Distro協議的初始任務

DistroProtocol章節介紹道,從構造方法中它啓動了一個startDistroTask()任務,內部又分爲驗證任務和同步任務。編程

private void startDistroTask() {
	if (EnvUtil.getStandaloneMode()) {
		isInitialized = true;
		return;
	}
	startVerifyTask();
	startLoadTask();
}

在真正開始分析任務的具體操做以前,請容許我先介紹【DistroProtocol】內部的一些屬性:c#

/**
 * 節點管理器
 */
private final ServerMemberManager memberManager;

/**
 * Distro組件持有者
 */
private final DistroComponentHolder distroComponentHolder;

/**
 * Distro任務引擎持有者
 */
private final DistroTaskEngineHolder distroTaskEngineHolder;

其中【DistroComponentHolder】【DistroTaskEngineHolder】的基本概念已經在《Distro協議概覽》一文中介紹過,請點擊名稱跳轉查看。
此處介紹一下他們的初始化。api

初始化Distro協議的組件

DistroHttpRegistry 是v1版本中的組件,這裏介紹v1相關內容是想讓各位讀者直觀的對比一下v1和v2的巨大差別。由於此篇文章先於《Distro協議概覽》完成,而且當初學習的時候並未注意即使使用了2.0.1版本的代碼,它仍是默認開啓了對v1的兼容。所以花了大量時間研究的倒是v1相關的操做。不過經過對v1和v2的相關處理的分析能夠更加明確v2的提高點。緩存

DistroHttpRegistry

v1版本的組件註冊器服務器

package com.alibaba.nacos.naming.consistency.ephemeral.distro;

@Component
public class DistroHttpRegistry {

	// 一些Distro組件的集合
    private final DistroComponentHolder componentHolder;
	// 一些任務執行引擎的集合
    private final DistroTaskEngineHolder taskEngineHolder;
	// Distro協議http請求方式的數據對象
    private final DataStore dataStore;
	// 協議映射器
    private final DistroMapper distroMapper;
	// 一些全局配置
    private final GlobalConfig globalConfig;
	// Distro 一致性協議服務
    private final DistroConsistencyServiceImpl consistencyService;
	// Nacos節點管理器
    private final ServerMemberManager memberManager;

    public DistroHttpRegistry(DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,
            DataStore dataStore, DistroMapper distroMapper, GlobalConfig globalConfig,
            DistroConsistencyServiceImpl consistencyService, ServerMemberManager memberManager) {
        this.componentHolder = componentHolder;
        this.taskEngineHolder = taskEngineHolder;
        this.dataStore = dataStore;
        this.distroMapper = distroMapper;
        this.globalConfig = globalConfig;
        this.consistencyService = consistencyService;
        this.memberManager = memberManager;
    }

    /**
     * Register necessary component to distro protocol for HTTP implement.
     */
    @PostConstruct
    public void doRegister() {
		// 註冊com.alibaba.nacos.naming.iplist.類型數據的數據倉庫實現
        componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroDataStorageImpl(dataStore, distroMapper));
		// 註冊com.alibaba.nacos.naming.iplist.類型數據的數據傳輸代理對象實現
        componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
		// 註冊com.alibaba.nacos.naming.iplist.類型的失敗任務處理器
        componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpCombinedKeyTaskFailedHandler(taskEngineHolder));
		// 註冊com.alibaba.nacos.naming.iplist.類型的任務處理器
        taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
		// 註冊com.alibaba.nacos.naming.iplist.類型的DistroData數據處理器
        componentHolder.registerDataProcessor(consistencyService);
    }
}

DistroClientComponentRegistry

v2版本的組件註冊器,請對比v1的異同。併發

package com.alibaba.nacos.naming.consistency.ephemeral.distro.v2;

@Component
public class DistroClientComponentRegistry {
    
	// Nacos節點管理器
    private final ServerMemberManager serverMemberManager;
    
	// Distro協議對象
    private final DistroProtocol distroProtocol;
    
	// 一些Distro組件的集合
    private final DistroComponentHolder componentHolder;
	
    // 一些任務執行引擎的集合
    private final DistroTaskEngineHolder taskEngineHolder;
    
	// Nacos 客戶端管理器
    private final ClientManager clientManager;
	
    // 集羣rpc客戶端代理對象
    private final ClusterRpcClientProxy clusterRpcClientProxy;
	
    // 版本升級判斷
    private final UpgradeJudgement upgradeJudgement;
    
    public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol,
            DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,
            ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy,
            UpgradeJudgement upgradeJudgement) {
        this.serverMemberManager = serverMemberManager;
        this.distroProtocol = distroProtocol;
        this.componentHolder = componentHolder;
        this.taskEngineHolder = taskEngineHolder;
        this.clientManager = clientManager;
        this.clusterRpcClientProxy = clusterRpcClientProxy;
        this.upgradeJudgement = upgradeJudgement;
    }
    
    /**
     * Register necessary component to distro protocol for v2 {@link com.alibaba.nacos.naming.core.v2.client.Client}
     * implement.
     */
    @PostConstruct
    public void doRegister() {
        DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol, upgradeJudgement);
        DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy, serverMemberManager);
        DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);
		// 註冊Nacos:Naming:v2:ClientData類型數據的數據倉庫實現
        componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);
		// 註冊Nacos:Naming:v2:ClientData類型的DistroData數據處理器
        componentHolder.registerDataProcessor(dataProcessor);
		// 註冊Nacos:Naming:v2:ClientData類型數據的數據傳輸代理對象實現
        componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);
		// 註冊Nacos:Naming:v2:ClientData類型的失敗任務處理器
        componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);
    }
}

DistroClientComponentRegistry註冊了一些Nacos:Naming:v2:ClientData類型的操做對象。他們的功能和上一節的DistroHttpRegistry同樣。區別就是實現類不一樣。根據DistroComponentHolder註冊的內容來看,它所操做的數據和《Distro協議概覽》中介紹的【Distro協議重要角色】恰好對應。它明確了v2版本中數據保存在哪裏,由什麼數據處理器來處理,用什麼DistroTransportAgent來發送。同時請注意這幾個註冊方法在v1和v2版本註冊時傳遞的key,在v1中它是com.alibaba.nacos.naming.iplist.,v2中它是Nacos:Naming:v2:ClientData,後面你會常常看見他們的。app

節點數據驗證

如今正式開始分析DistroProtocol在構造方法中啓動的任務。dom

驗證的執行流程

前面說道有v1和v2的實現,即使在2.0.*版本中也依然會兼容v1的邏輯,除非你關閉了這個兼容性。從這點來看,接下來的代碼結構的良好設計爲升級也提供了很好的基礎。本節內容主要分析的就是面向接口編程

private void startVerifyTask() {
	
	GlobalExecutor.schedulePartitionDataTimedSync(
		new DistroVerifyTimedTask(memberManager, distroComponentHolder, distroTaskEngineHolder.getExecuteWorkersManager()), 
		DistroConfig.getInstance().getVerifyIntervalMillis()
	);
}

驗證功能從startVerifyTask()方法開始啓動,此處它構建了一個名爲DistroVerifyTimedTask的定時任務,延遲5秒開始,間隔5秒輪詢。

// DistroVerifyTimedTask.java

@Override
public void run() {
	try {
		// 獲取除自身節點以外的其餘節點
		List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
		if (Loggers.DISTRO.isDebugEnabled()) {
			Loggers.DISTRO.debug("server list is: {}", targetServer);
		}
		// 每一種類型的數據,都要向其餘節點發起驗證
		for (String each : distroComponentHolder.getDataStorageTypes()) {
			// 對dataStorage內的數據進行驗證
			verifyForDataStorage(each, targetServer);
		}
	} catch (Exception e) {
		Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
	}
}

private void verifyForDataStorage(String type, List<Member> targetServer) {
	// 獲取數據類型
	DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
	// 若數據還未同步完畢則不處理
	if (!dataStorage.isFinishInitial()) {
		Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data", dataStorage.getClass().getSimpleName());
		return;
	}
	// ① 獲取驗證數據
	List<DistroData> verifyData = dataStorage.getVerifyData();
	if (null == verifyData || verifyData.isEmpty()) {
		return;
	}
	// 對每一個節點開啓一個異步的線程來執行
	for (Member member : targetServer) {
		DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
		if (null == agent) {
			continue;
		}
		executeTaskExecuteEngine.addTask(member.getAddress() + type, new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
	}
}

DistroVerifyTimedTask任務中,對每個節點的全部驗證數據都建立了一個新的任務 DistroVerifyExecuteTask,由它來執行具體的驗證工做。

重點:
被驗證的數據集合從接口DistroDataStorage中被獲取,並被傳入DistroVerifyExecuteTask任務中。

// DistroVerifyExecuteTask.java 

@Override
public void run() {
	for (DistroData each : verifyData) {
		try {
			// 判斷傳輸對象是否支持回調(如果http的則不支持,實際上沒區別,當前2.0.1版本沒有實現回調的實質內容)
			if (transportAgent.supportCallbackTransport()) {
				doSyncVerifyDataWithCallback(each);
			} else {
				doSyncVerifyData(each);
			}
		} catch (Exception e) {
			Loggers.DISTRO
					.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
		}
	}
}

/**
 * 支持回調的同步數據驗證
 * @param data
 */
private void doSyncVerifyDataWithCallback(DistroData data) {
	// 回調實際上,也沒啥。。。基本算是空對象
	transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback());
}

/**
 * 不支持回調的同步數據驗證
 * @param data
 */
private void doSyncVerifyData(DistroData data) {
	transportAgent.syncVerifyData(data, targetServer);
}

每個DistroVerifyExecuteTask任務都持有一組驗證數據List<DistroData>和數據發送的目的地ServertargetServer,它將使用DistroTransportAgent爲每個DistroData執行一次驗證操做。這裏僅僅描述它的執行流程,不對細節做過多描述。

數據驗證流程:
從DistroDataStorage獲取驗證數據 -> 使用DistroTransportAgent開始驗證。
請注意他們都是接口,對應不一樣類型的數據將會有不一樣的實現。

經過下圖能夠比較直觀的看到驗證任務的建立流程。

Distro協議驗證數據任務流程圖

驗證流程中的任務產生說明:

  • 當前節點的DistroVerifyTimedTask 會根據節點的數量來建立DistroVerifyExecuteTask,並向其傳遞自身負責的全部Client的clientId集合(clientId最終被包裝成DistroData)。
  • 每個DistroVerifyExecuteTask 會爲傳入的List 中的每個DistroData建立一個異步的rpc請求。

疑問:
這裏是將爲本節點中的全部Client單獨都建立一個rpc請求,爲什麼不一次性將全部client發送出去?難道是爲了性能考量?

小技巧:
當你看到紅色部分的Task它是被放入一個阻塞隊列裏面的時候,你就應該要想到它必定會有一個從隊列不斷獲取任務來執行的操做。這樣在你要分析這些產生的任務如何執行的時候就變的至關容易了

驗證數據的類型

前面咱們知道Distro協議數據交互的對象是DistroData。其內部的content保存的數據纔是真正的須要驗證的對象。在v1版本中的DistroData.content保存的是序列化後的Map<String, String>, key爲serviceName,value爲Service下的全部Instance的checksum值;v2版本中的DistroData.content保存的是序列化後的DistroClientVerifyInfo

public class DistroClientVerifyInfo implements Serializable {
    
    private static final long serialVersionUID = 2223964944788737629L;
    // 客戶端ID
    private String clientId;
    // 修訂版本號,驗證時固定爲0
    private long revision;
    
    // ...省略getter/setter
}

執行驗證任務

相信經過前面的章節你已經知道驗證任務會處理不一樣的數據類型,而不一樣的數據類型的處理方式也不相同。這裏假定你已經充分理解不一樣類型數據的處理流程和實際的處理對象。本節的分析將直接使用對應的處理方法而再也不重複它屬於哪一個具體的實例(這裏的實例指的是接口的實例並不是業務中的那個Instance)。

注意:
當你使用2.0.X版本的代碼,而且關閉了雙寫服務,就表示你的Nacos集羣會完全使用2.0版本的特性,它將再也不兼容1.版本,同時1.版本中的檢查邏輯將再也不被執行,也就是com.alibaba.nacos.naming.iplist.*類型的數據檢查將不被執行。取而代之的是Nacos:Naming:v2:ClientData

v1版本的數據驗證

在v1版本中數據驗證是驗證com.alibaba.nacos.naming.iplist.*類型, 驗證主要目的是在v1版本中向其餘節點發送checksum請求。用於報告自身節點內部的服務列表狀態。發送方不用關注,只管將節點內的服務信息發送出去便可。但當前節點也會接收到其餘節點發送來的checksum請求。

從DistroDataStorage獲取驗證數據
// DistroDataStorageImpl.java

@Override
public List<DistroData> getVerifyData() {
	// 用於保存屬於當前節點處理的,且數據真實有效的Service
	Map<String, String> keyChecksums = new HashMap<>(64);
	// 遍歷當前節點已有的datastore
	for (String key : dataStore.keys()) {
		// 若當前的服務不是本機處理,則排除
		if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
			continue;
		}
		// 若當key對應的數據爲空,則排除
		Datum datum = dataStore.get(key);
		if (datum == null) {
			continue;
		}
		keyChecksums.put(key, datum.value.getChecksum());
	}
	if (keyChecksums.isEmpty()) {
		return Collections.emptyList();
	}
	// 構建DistroData
	DistroKey distroKey = new DistroKey("checksum", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
	DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(keyChecksums));
	// 設置當前操做類型爲Verify
	data.setType(DataOperation.VERIFY);
	return Collections.singletonList(data);
}

注意:
keyChecksums內部存儲的是每一個服務所對應實例列表的checksum驗證字符串

使用DistroTransportAgent發送驗證數據
// DistroHttpAgent.java

@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
	// 若本機節點緩存中沒有targetServer,說明此節點已不具有服務能力,也沒有報告的必要。
	if (!memberManager.hasMember(targetServer)) {
		return true;
	}
	// 發送checksum請求
	NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);
	return true;
}

// NamingProxy.java

/**
 * Synchronize check sums.
 *
 * @param checksums checksum map bytes
 * @param server    server address
 */
public static void syncCheckSums(byte[] checksums, String server) {
	try {
		Map<String, String> headers = new HashMap<>(128);
		
		headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
		headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
		headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");
		// 請求示例:http://10.53.126.16:8848/nacos/v1/ns/distro/checksum?source=10.53.155.22:8848
		HttpClient.asyncHttpPutLarge("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(), headers, checksums,
				new Callback<String>() {
					@Override
					public void onReceive(RestResult<String> result) {
						if (!result.ok()) {
							Loggers.DISTRO.error("failed to req API: {}, code: {}, msg: {}", "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL, result.getCode(), result.getMessage());
						}
					}
					
					@Override
					public void onError(Throwable throwable) {
						Loggers.DISTRO.error("failed to req API:" + "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL, throwable);
					}
					
					@Override
					public void onCancel() {
					
					}
				});
	} catch (Exception e) {
		Loggers.DISTRO.warn("NamingProxy", e);
	}
}

提示:
請求路徑爲:http://其餘節點的IP地址:其餘節點的端口號/nacos/v1/ns/distro/checksum?source=本機的IP地址:本機的端口號
參數爲:DistroData,DistroData內部包裝的則是一個Map<服務名稱,服務下實例的checksum驗證字符串>

處理DistroTransportAgent的驗證請求

naming模塊下的DistroController用於接收HTTP方式的請求。請注意,此處的驗證請求是處理其餘節點發送來的請求,寫在這裏是爲了便於理解具體的驗證功能。必定要將此小節的內容看作是接收方,由於他們的角色不一樣,內部緩存的服務數據歸屬也不一樣。發送方發送的服務數據是屬於在發送方註冊的,而此處做爲接收方它本身內部也有
本身負責的服務。弄清楚角色的關係才能理解接下來的驗證流程。

// DistroController.java

/**
 * Checksum.
 *
 * @param source  source server
 * @param dataMap checksum map
 * @return 'ok'
 */
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {
	// 構建驗證數據對象
	DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);
	// 開始驗證
	distroProtocol.onVerify(distroHttpData, source);
	return ResponseEntity.ok("ok");
}

// DistroProtocol.java

/**
 * Receive verify data, find processor to process.
 * @param distroData    verify data
 * @param sourceAddress source server address, might be get data from source server
 * @return true if verify data successfully, otherwise false
 */
public boolean onVerify(DistroData distroData, String sourceAddress) {
	if (Loggers.DISTRO.isDebugEnabled()) {
		Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
	}
	// 根據不一樣類型獲取不一樣的數據處理器
	String resourceType = distroData.getDistroKey().getResourceType();
	DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
	if (null == dataProcessor) {
		Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
		return false;
	}
	return dataProcessor.processVerifyData(distroData, sourceAddress);
}

// DistroConsistencyServiceImpl.java

@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
	DistroHttpData distroHttpData = (DistroHttpData) distroData;
	Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
	onReceiveChecksums(verifyData, sourceAddress);
	return true;
}

篩選須要處理的服務

服務篩選的目的能夠用7個字總結:我的自掃門前雪。

/**
 * Check sum when receive checksums request.
 * Service檢查服務,用於從其餘節點更新已變動的Service
 * @param checksumMap map of checksum   某個服務對應的checksum
 * @param server      source server request checksum    checksum請求的來源
 */
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
	// 若已包含此節點的信息,說明正在處理(處理完畢以後會清空)
	if (syncChecksumTasks.containsKey(server)) {
		// Already in process of this server:
		Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
		return;
	}
	// 將當前要處理的節點信息暫存,表示當前正在處理
	syncChecksumTasks.put(server, "1");

	try {

		List<String> toUpdateKeys = new ArrayList<>();
		List<String> toRemoveKeys = new ArrayList<>();

		// 第一個for循環
		
		/**
		 * 判斷當前節點中哪些服務須要去遠程更新
		 * 須要更新的服務將被添加至toUpdateKeys
		 */
		for (Map.Entry<String, String> entry : checksumMap.entrySet()) {

			/**
			 * 判斷當前的entry(指的是服務Service)是否由本節點處理, 如果本機負責的則不必從其餘節點發送過來
			 * 由於在本節點註冊的服務最終任何新的操做都會被路由到本機,那麼它的狀態在本機就是最新的
			 */
			if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
				// this key should not be sent from remote server:
				Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
				// abort the procedure:
				return;
			}

			// 若當前節點不存在此服務,或服務是空的,或服務的實例列表(checksum驗證字符串)跟傳入的不一致,則標記此服務須要更新
			if (!dataStore.contains(entry.getKey()) ||
				dataStore.get(entry.getKey()).value == null ||
				!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
				// 添加到待更新列表
				toUpdateKeys.add(entry.getKey());
			}
		}

		
		// 第二個for循環
		
		/**
		 * 此處用於判斷當前節點已有的服務是否是當前接收的請求的來源節點負責處理的,若是是的話,就說明他是最新的不該該被刪除
		 */
		for (String key : dataStore.keys()) {
			// 不是請求方處理的服務就不處理
			if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
				continue;
			}

			// 是請求方處理的服務但不在請求列表中,說明此服務在請求方已經被刪除了
			if (!checksumMap.containsKey(key)) {
				toRemoveKeys.add(key);
			}
		}

		Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);

		// 刪掉已經變動的服務
		for (String key : toRemoveKeys) {
			// 移除服務,併發送變動通知
			onRemove(key);
		}

		// 若沒有須要更新的終止此流程
		if (toUpdateKeys.isEmpty()) {
			return;
		}

		// 通過上述流程的處理,目前本機緩存的DataStorage中保留的服務是:在其餘節點註冊的、服務。

		try {
			DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
			distroKey.getActualResourceTypes().addAll(toUpdateKeys);
			// 更新變動了的服務
			DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
			if (null != remoteData) {
				processData(remoteData.getContent());
			}
		} catch (Exception e) {
			Loggers.DISTRO.error("get data from " + server + " failed!", e);
		}
	} finally {
		// 所有處理完畢以後,將syncChecksumTasks中標記正在處理的節點移除
		// Remove this 'in process' flag:
		syncChecksumTasks.remove(server);
	}
}

請各位看官務必詳細閱讀onReceiveChecksums(Map<String, String> checksumMap, String server)方法內的註釋。當前的方法主要處理服務的3種場景:不須要處理的、須要更新的、須要刪除的。

checksum的請求發送方發送的是它自身節點負責的數據,那麼接收方接收到的也都不是本身負責的數據。所以在onReciveChecksums方法的第一個for循環中就對數據進行篩選,避免本機負責的數據由其餘節點誤發過來(這種可能性也有可能存在,那就是當本機暫時不可用,某服務原本是本機負責,又被轉移到其餘節點的時候,此觀點暫未實際測試),這裏處理的是不須要處理的數據。

接着在第一個for循環內部的第二個if語句中判斷傳遞過來的服務信息是否存在,是否爲空,是否和傳遞過來的不一致。若條件成立則將其放入須要更新toUPdateKeys集合內。第二個for循環則處理的是須要刪除的服務,這裏是處理本機內部緩存的服務,首先判斷本機內的哪些服務不是由傳遞請求的這個節點處理的,對於這類服務忽略掉,由於它所屬的節點也會來發送checksum請求,放在那次處理便可。如果傳遞請求的節點處理的但又沒有在此次請求中帶過來則須要標記刪除,接着對標記刪除的服務進行移除。

服務篩選示意圖:

  • 假設如今有3個節點,A、B、C他們以不一樣顏色區分,每一個節點內部DataStore緩存的服務列表假設都是一致每一個節點的服務名稱和顏色都與節點相關聯。
  • 這裏虛擬了一個服務的名稱,用於描述它所屬的節點、狀態、以及它的實例數量以#分隔。實例數量用於表示某個服務在某一時刻的狀態。
  • 每一個節點內部保留着本節點的服務和其餘節點的服務,但每一個節點只保證本身的服務是最新的(和節點顏色相同的服務)。
  • 在發送checksum請求的時候,以服務名稱做爲key。

Checksum的篩選

上圖表示了節點A向節點B發送checksum請求的時候所帶參數和接收方的篩選結果

  • 節點A請求的時候攜帶了自身負責處理的紅色服務,有多少發多少。
  • 節點B接收到請求以後發現節點A的三、四、5服務實例數量有變動,而且A節點傳遞過來的6服務節點B中沒有,所以將三、四、五、6服務添加到toUpdateKeys列表中稍後用於更新
  • 節點B同時發現節點A這次沒有攜帶2服務,說明2服務在節點A已經被刪除了,所以將其添加到toRemoveKeys列表中稍後要從本機的DataStore中移除
  • 節點B中保存的C節點的服務,無論他,由於它們須要等待節點C向節點B發送checksum。

更新已變動服務

在對傳入的服務進行篩選以後就開始更新。

// 組裝DistroData查詢對象
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
distroKey.getActualResourceTypes().addAll(toUpdateKeys);
// 從請求發送方獲取最新數據
DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
if (null != remoteData) {
	// 處理查詢結果
	processData(remoteData.getContent());
}

// DistroProtocol.java

/**
 * Query data from specified server.
 * 從指定的節點獲取DistroData
 * @param distroKey data key
 * @return data
 */
public DistroData queryFromRemote(DistroKey distroKey) {
	if (null == distroKey.getTargetServer()) {
		Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
		return null;
	}
	String resourceType = distroKey.getResourceType();
	DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
	if (null == transportAgent) {
		Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
		return null;
	}
	// 使用DistroTransportAgent獲取數據
	return transportAgent.getData(distroKey, distroKey.getTargetServer());
}

根據當前處理的服務類型,能夠得知這裏的DistroTransportAgent爲DistroHttpAgent。

// DistroTransportAgent.java

@Override
public DistroData getData(DistroKey key, String targetServer) {
	try {
		List<String> toUpdateKeys = null;
		if (key instanceof DistroHttpCombinedKey) {
			toUpdateKeys = ((DistroHttpCombinedKey) key).getActualResourceTypes();
		} else {
			toUpdateKeys = new ArrayList<>(1);
			toUpdateKeys.add(key.getResourceKey());
		}
		// 使用NamingProxy獲取數據
		byte[] queriedData = NamingProxy.getData(toUpdateKeys, key.getTargetServer());
		return new DistroData(key, queriedData);
	} catch (Exception e) {
		throw new DistroException(String.format("Get data from %s failed.", key.getTargetServer()), e);
	}
}

// NamingProxy.java
/**
 * Get Data from other server.
 *
 * @param keys   keys of datum
 * @param server target server address
 * @return datum byte array
 * @throws Exception exception
 */
public static byte[] getData(List<String> keys, String server) throws Exception {

	// 組裝http請求參數
	Map<String, String> params = new HashMap<>(8);
	params.put("keys", StringUtils.join(keys, ","));
	// 發送http請求
	RestResult<String> result = HttpClient.httpGetLarge("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new HashMap<>(8), JacksonUtils.toJson(params));

	// 處理請求結果
	if (result.ok()) {
		return result.getData().getBytes();
	}

	throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
			+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.getCode() + " msg: "
			+ result.getMessage());
}

成功獲取遠端節點的DistroData數據以後將進行解析, 對於Service類型的數據這裏作了兩個操做:一是將獲取的最新數據放入當前節點的DataStore內;二是對獲取的最新服務開啓健康檢查。

// DistroConsistencyServiceImpl.java

private boolean processData(byte[] data) throws Exception {
	if (data.length > 0) {
		// 序列化將要解析的值
		Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
		
		// 此for循環主要功能是用於將獲取到的Service添加到監聽器列表
		for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
			// 遍歷並放入數據倉庫中
			dataStore.put(entry.getKey(), entry.getValue());
			// ① 判斷是否有當前服務的監聽器,如有則觸發監聽器用於更新服務信息
			if (!listeners.containsKey(entry.getKey())) {
				// 由於當前的類是用於處理臨時節點的同步信息的(詳情請看v1版本ServiceManager內部的邏輯)
				// pretty sure the service not exist:
				if (switchDomain.isDefaultInstanceEphemeral()) {
					// 根據獲取的數據信息構建一個v1版本的Service對象
					// create empty service
					Loggers.DISTRO.info("creating service {}", entry.getKey());
					Service service = new Service();
					String serviceName = KeyBuilder.getServiceName(entry.getKey());
					String namespaceId = KeyBuilder.getNamespace(entry.getKey());
					service.setName(serviceName);
					service.setNamespaceId(namespaceId);
					service.setGroupName(Constants.DEFAULT_GROUP);
					// now validate the service. if failed, exception will be thrown
					service.setLastModifiedMillis(System.currentTimeMillis());
					service.recalculateChecksum();
					// 尤爲是要注意此處的key類型,由於listener集合內部可能存放多種類型的監聽器
					// ② 獲取ServiceManager,由於其也是一個監聽器
					// The Listener corresponding to the key value must not be empty
					RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
					if (Objects.isNull(listener)) {
						return false;
					}
					// 觸發ServiceManager的onChange事件,
					listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
				}
			}
		}
		
		// 此for循環主要用於觸發監聽器列表中的Service監聽器
		for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
			
			if (!listeners.containsKey(entry.getKey())) {
				// Should not happen:
				Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
				continue;
			}
			
			try {
				for (RecordListener listener : listeners.get(entry.getKey())) {
					listener.onChange(entry.getKey(), entry.getValue().value);
				}
			} catch (Exception e) {
				Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
				continue;
			}
			
			// Update data store if listener executed successfully:
			dataStore.put(entry.getKey(), entry.getValue());
		}
	}
	return true;
}

提示:

  • 方法中標記的①②處主要就是用於在拿到其餘節點的數據時判斷當前節點有沒有處理過這個服務(Service),如有處理過那麼ServiceManager就會將本服務放入ConsistencyService的監聽器列表屬性中(當前類型的數據處理器是DistroConsistencyServiceImpl)。由於Service自己它也是一個監聽器,請結合代碼中的註釋來理解此段代碼的兩個for循環。
  • ServiceManager 自己做爲監聽器加入listener時的key爲:com.alibaba.nacos.naming.domains.meta.
  • Service 自己做爲監聽器加入listener時的key爲:com.alibaba.nacos.naming.iplist.ephemeral.命名空間##分組名稱@@服務名稱

監聽器列表示例數據:

listeners = {ConcurrentHashMap@13820}  size = 2
	"com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@TO_REMOTE_65_T001" -> {ConcurrentLinkedQueue@13869}  size = 1
		key = "com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@TO_REMOTE_65_T001"
		value = {ConcurrentLinkedQueue@13869}  size = 1
			0 = {Service@10668} "Service{name='DEFAULT_GROUP@@TO_REMOTE_65_T001', protectThreshold=0.0, appName='null', groupName='DEFAULT_GROUP', metadata={}}"
	"com.alibaba.nacos.naming.domains.meta." -> {ConcurrentLinkedQueue@13871}  size = 1
		key = "com.alibaba.nacos.naming.domains.meta."
		value = {ConcurrentLinkedQueue@13871}  size = 1
			0 = {ServiceManager@10671}

ServiceManager.onChange()

// ServiceManager.java

@Override
public void onChange(String key, Service service) throws Exception {
	try {
		if (service == null) {
			Loggers.SRV_LOG.warn("received empty push from raft, key: {}", key);
			return;
		}
		
		// 檢查namespace
		if (StringUtils.isBlank(service.getNamespaceId())) {
			service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);
		}
		
		Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}", key, service);
		// 從ServiceManager內部的serviceMap緩存中獲取
		Service oldDom = getService(service.getNamespaceId(), service.getName());
		// 若不爲空,則更新
		if (oldDom != null) {
			oldDom.update(service);
			// 從新將其放入監聽器列表
			// re-listen to handle the situation when the underlying listener is removed:
			consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), oldDom);
			consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), oldDom);
		} else {
			// 若以前沒有緩存,添加並初始化
			putServiceAndInit(service);
		}
	} catch (Throwable e) {
		Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update", e);
	}
}

private void putServiceAndInit(Service service) throws NacosException {
	// 插入緩存列表
	putService(service);
	service = getService(service.getNamespaceId(), service.getName());
	// 初始化
	service.init();
	// 放入監聽器列表
	consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
	consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
	Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

public void putService(Service service) {
	// 判斷namespace是否存在,若不存在建立一個namespace,並new一個新的Map用於存放namespace對應的service
	if (!serviceMap.containsKey(service.getNamespaceId())) {
		synchronized (putServiceLock) {
			if (!serviceMap.containsKey(service.getNamespaceId())) {
				serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
			}
		}
	}
	// 將service存放於它所屬的namespace容器內部
	serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}

在ServiceManager的onChange方法內部主要就是更新本次獲取的Service信息,並將其添加到監聽器列表內部。最重要的是,當這次傳遞進來的Service不在緩存中的時候它觸發了一個Service.init()方法。它將會開啓Service和Cluster的心跳檢查。不過他們開啓的是針對v1版本的心跳檢查,若版本是2.0.x的話,心跳檢查將不會執行。

// Service.java

private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

public void init() {
	HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
	for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
		entry.getValue().setService(this);
		// 還開啓了Cluster的健康檢查
		entry.getValue().init();
	}
}

// Cluster.java

/**
 * Init cluster.
 */
public void init() {
	if (inited) {
		return;
	}
	checkTask = new HealthCheckTask(this);
	HealthCheckReactor.scheduleCheck(checkTask);
	inited = true;
}

// ClientBeatCheckTask.java

public void run() {
	try {
		// If upgrade to 2.0.X stop health check with v1
		if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
			return;
		}
		// 省略部分代碼...

	} catch (Exception e) {
		Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
	}
}

// HealthCheckTask.java

@Override
public void run() {
	
	try {
		// If upgrade to 2.0.X stop health check with v1
		if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
			return;
		}
	} catch (Throwable e) {
		Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}", cluster.getService().getName(), cluster.getName(), e);
	}
}

Service.onChange()

v2版本的數據驗證

在v2版本中Nacos:Naming:v2:ClientData類型做爲驗證數據類型,相較於v1版本以Service爲維度的數據驗證在總體處理流程上簡潔了許多,一次驗證一個客戶端提供的全部服務在邏輯上的完整性也很強。

從DistroDataStorage獲取驗證數據
// DistroClientDataProcessor.java

@Override
public List<DistroData> getVerifyData() {
	List<DistroData> result = new LinkedList<>();
	// 遍歷當前節點緩存的全部client
	for (String each : clientManager.allClientId()) {
		Client client = clientManager.getClient(each);
		if (null == client || !client.isEphemeral()) {
			continue;
		}
		// 是本機負責的Client才進行處理
		if (clientManager.isResponsibleClient(client)) {
			// TODO add revision for client.
			DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
			DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
			DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
			data.setType(DataOperation.VERIFY);
			result.add(data);
		}
	}
	return result;
}

注意:

  • 在當前節點執行Nacos:Naming:v2:ClientData類型數據的驗證任務時,它只會向集羣中的其餘節點發送本身負責的,且未被移除的數據。
使用DistroTransportAgent發送驗證數據

提示:

  • 執行發送數據的這個方法調用的自己是在上層DistroVerifyExecuteTask的run方法的for循環內部的。也就是說爲每一個Client發送一次驗證請求。
// DistroClientTransportAgent.java

@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
	// 若此節點不在當前節點緩存中,直接返回,由於可能下線、或者過時,不須要驗證了
	if (isNoExistTarget(targetServer)) {
		callback.onSuccess();
	}
	// 構建請求對象
	DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
	Member member = memberManager.find(targetServer);
	try {
		// 建立一個回調對象(Wrapper實現了RequestCallBack接口)
		DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer, verifyData.getDistroKey().getResourceKey(), callback, member);
		// 使用集羣Rpc請求對象發送異步任務
		clusterRpcClientProxy.asyncRequest(member, request, wrapper);
	} catch (NacosException nacosException) {
		callback.onFailed(nacosException);
	}
}

// ClusterRpcClientProxy.java

/**
 * aync send request to member with callback.
 * 其餘節點發送異步請求並回調
 * @param member   member of server.
 * @param request  request.
 * @param callBack RequestCallBack.
 * @throws NacosException exception may throws.
 */
public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {
	// 獲取目標節點對應的RpcClient
	RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
	if (client != null) {
		// 經過Client發送異步請求
		client.asyncRequest(request, callBack);
	} else {
		throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
	}
}

// RpcClient.java

/**
 * send async request.
 *
 * @param request request.
 */
public void asyncRequest(Request request, RequestCallBack callback) throws NacosException {
	
	int retryTimes = 0;

	Exception exceptionToThrow = null;
	long start = System.currentTimeMillis();
	// 重試次數少於3次,且總執行時間小於3秒
	while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < start + callback.getTimeout()) {
		boolean waitReconnect = false;
		try {
			if (this.currentConnection == null || !isRunning()) {
				waitReconnect = true;
				throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
			}
			// 使用GrpcConnection發送異步請求
			this.currentConnection.asyncRequest(request, callback);
			return;
		} catch (Exception e) {
			if (waitReconnect) {
				try {
					//wait client to re connect.
					Thread.sleep(Math.min(100, callback.getTimeout() / 3));
				} catch (Exception exception) {
					//Do nothing.
				}
			}
			LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Send request fail, request={}, retryTimes={},errorMessage={}", name, request, retryTimes, e.getMessage());
			exceptionToThrow = e;

		}
		retryTimes++;
	}

	// 判斷RpcClientStatus 是否是RUNNING狀態,若不是則設置其爲UNHEALTHY
	if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
		// 若成功設置了UNHEALTHY,則置爲失敗
		switchServerAsyncOnRequestFail();
	}
	if (exceptionToThrow != null) {
		throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow : new NacosException(SERVER_ERROR, exceptionToThrow);
	} else {
		throw new NacosException(SERVER_ERROR, "AsyncRequest fail, unknown error");
	}
}

// GrpcConnection.java

@Override
public void asyncRequest(Request request, final RequestCallBack requestCallBack) throws NacosException {
	// ① 轉換爲Grpc的請求載體對象
	Payload grpcRequest = GrpcUtils.convert(request);
	// 發送Grpc請求
	ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);

	//set callback .
	Futures.addCallback(requestFuture, new FutureCallback<Payload>() {
		@Override
		public void onSuccess(@Nullable Payload grpcResponse) {
			Response response = (Response) GrpcUtils.parse(grpcResponse);

			if (response != null) {
				if (response instanceof ErrorResponse) {
					requestCallBack.onException(new NacosException(response.getErrorCode(), response.getMessage()));
				} else {
					// 若成功返回,觸發回調的onResponse對象,由回調函數處理後續邏輯
					requestCallBack.onResponse(response);
				}
			} else {
				requestCallBack.onException(new NacosException(ResponseCode.FAIL.getCode(), "response is null"));
			}
		}

		@Override
		public void onFailure(Throwable throwable) {
			if (throwable instanceof CancellationException) {
				requestCallBack.onException(
						new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " milliseconds."));
			} else {
				requestCallBack.onException(throwable);
			}
		}
	}, requestCallBack.getExecutor() != null ? requestCallBack.getExecutor() : this.executor);
	// set timeout future.
	ListenableFuture<Payload> payloadListenableFuture = Futures
			.withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS,
					RpcScheduledExecutor.TIMEOUT_SCHEDULER);

}

這裏不一樣於v1版本的checksum,在v2版本的驗證任務中使用了rpc請求方式。和v1版本的請求邏輯相同,都是發送當前節點所負責的Client信息到其餘節點。

注意:
在①標註的地方,它在轉換request的時候做了一些必要的操做,包括構建Grpc請求對象(Payload)所須要的元數據,其中最須要注意的就是Metadata內部的type屬性,接收方將根據這個屬性進行判斷,分別處理不一樣類型的請求。

處理DistroTransportAgent的驗證請求

其餘節點接收到驗證請求,是如何處理呢?GrpcRequestAcceptor用於接收rpc請求。須要注意的是,它將接收全部類型的rpc請求,例如:ServerCheckRequestDistroDataRequestHealthCheckRequest等,具體可查看com.alibaba.nacos.api.remote.request包下的請求對象。此處咱們只關注驗證請求DistroDataRequest

// GrpcRequestAcceptor.java

@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
	// 若是有必要跟蹤的話
	traceIfNecessary(grpcRequest, true);
	
	// 獲取請求對象中的請求類型
	String type = grpcRequest.getMetadata().getType();

	// 若當前節點還未啓動完畢,則直接返回
	//server is on starting.
	if (!ApplicationUtils.isStarted()) {
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);

		responseObserver.onCompleted();
		return;
	}

	// 判斷請求是否爲ServerCheckRequest
	// server check.
	if (ServerCheckRequest.class.getSimpleName().equals(type)) {
		// 構建一個ServerCheckResponse做爲返回對象,幷包裝爲Payload返回給調用方
		Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
		traceIfNecessary(serverCheckResponseP, false);
		responseObserver.onNext(serverCheckResponseP);
		responseObserver.onCompleted();
		return;
	}

	// 根據請求的類型,找到對應的處理器
	RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
	//no handler found.
	if (requestHandler == null) {
		// 若未找到類型對應的處理器,返回給調用方一個ErrorResponse
		Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
		return;
	}

	// 檢查鏈接狀態
	//check connection status.
	String connectionId = CONTEXT_KEY_CONN_ID.get();
	boolean requestValid = connectionManager.checkValid(connectionId);
	if (!requestValid) {
		Loggers.REMOTE_DIGEST.warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.UN_REGISTER, "Connection is unregistered."));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
		return;
	}

	// 接下來開始真正的處理請求邏輯

	// 解析請求對象
	Object parseObj = null;
	try {
		parseObj = GrpcUtils.parse(grpcRequest);
	} catch (Exception e) {
		// 解析失敗返回
		Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
		return;
	}

	// 請求對象爲空,返回
	if (parseObj == null) {
		Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive  ,parse request is null", connectionId);
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
	}

	// 請求對象貨不對板,返回
	if (!(parseObj instanceof Request)) {
		Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive  ,parsed payload is not a request,parseObj={}", connectionId, parseObj);
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
		return;
	}

	// 將接收的對象轉換爲請求對象
	Request request = (Request) parseObj;
	try {
		Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
		RequestMeta requestMeta = new RequestMeta();
		requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
		requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
		requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
		requestMeta.setLabels(connection.getMetaInfo().getLabels());
		connectionManager.refreshActiveTime(requestMeta.getConnectionId());
		// 使用對應的處理器進行處理
		Response response = requestHandler.handleRequest(request, requestMeta);
		Payload payloadResponse = GrpcUtils.convert(response);
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
	} catch (Throwable e) {
		Loggers.REMOTE_DIGEST.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId, e);
		Payload payloadResponse = GrpcUtils.convert(buildErrorResponse((e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(), e.getMessage()));
		traceIfNecessary(payloadResponse, false);
		responseObserver.onNext(payloadResponse);
		responseObserver.onCompleted();
		return;
	}

}

最終的DistroData數據將會由對應的Handler來處理,並返回結果給調用方。

/**
 * Distro data request handler.
 * Distro協議數據的請求處理器,用於處理客戶端發送來的Distro協議 rpc 請求
 * @author xiweng.yy
 */
@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {

    private final DistroProtocol distroProtocol;

    public DistroDataRequestHandler(DistroProtocol distroProtocol) {
        this.distroProtocol = distroProtocol;
    }

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                case VERIFY:
                    return handleVerify(request.getDistroData(), meta);
                case SNAPSHOT:
                    return handleSnapshot();
                case ADD:
                case CHANGE:
                case DELETE:
                    return handleSyncData(request.getDistroData());
                case QUERY:
                    return handleQueryData(request.getDistroData());
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }

    /**
     * 處理驗證信息
     * @param distroData    請求方發送來的驗證信息
     * @param meta          請求元數據
     * @return
     */
    private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
        DistroDataResponse result = new DistroDataResponse();
		// 使用DistroProtocol來處理
        if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
            result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
        }
        return result;
    }

    private DistroDataResponse handleSnapshot() {
        DistroDataResponse result = new DistroDataResponse();
        DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
        result.setDistroData(distroData);
        return result;
    }

    private DistroDataResponse handleSyncData(DistroData distroData) {
        DistroDataResponse result = new DistroDataResponse();
        if (!distroProtocol.onReceive(distroData)) {
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("[DISTRO-FAILED] distro data handle failed");
        }
        return result;
    }

    private DistroDataResponse handleQueryData(DistroData distroData) {
        DistroDataResponse result = new DistroDataResponse();
        DistroKey distroKey = distroData.getDistroKey();
        DistroData queryData = distroProtocol.onQuery(distroKey);
        result.setDistroData(queryData);
        return result;
    }
}

根據DistroDataRequestHandler.handle()方法能夠看出來,它不止能夠處理VERIFY請求,這裏咱們只分析VERIFY類型的請求處理。能夠發現它調用了DistroProtocol.onVerify()來處理。

// DistroProtocol.java

/**
 * Receive verify data, find processor to process.
 * @param distroData    verify data
 * @param sourceAddress source server address, might be get data from source server
 * @return true if verify data successfully, otherwise false
 */
public boolean onVerify(DistroData distroData, String sourceAddress) {
	if (Loggers.DISTRO.isDebugEnabled()) {
		Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
	}
	// 根據這次處理的數據類型獲取對應的處理器,此處咱們處理的類型是Client類型(Nacos:Naming:v2:ClientData)
	String resourceType = distroData.getDistroKey().getResourceType();
	DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
	if (null == dataProcessor) {
		Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
		return false;
	}
	return dataProcessor.processVerifyData(distroData, sourceAddress);
}

// DistroClientDataProcessor.java

@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
	DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
	// 調用ClientManager來驗證
	if (clientManager.verifyClient(verifyData.getClientId())) {
		return true;
	}
	Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
	return false;
}

// EphemeralIpPortClientManager.java

@Override
public boolean verifyClient(String clientId) {
	// 從客戶端管理器中獲取客戶端實例
	IpPortBasedClient client = clients.get(clientId);
	if (null != client) {
		// 若不爲空,啓動一個心跳更新任務
		NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
		return true;
	}
	return false;
}


public class ClientBeatUpdateTask extends AbstractExecuteTask {

    private final IpPortBasedClient client;

    public ClientBeatUpdateTask(IpPortBasedClient client) {
        this.client = client;
    }

    @Override
    public void run() {
        // 獲取當前時間,更新Client和Client下的Instance的最新活躍時間
        long currentTime = System.currentTimeMillis();
        for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
            ((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
        }
		// 更新client的最新更新時間
        client.setLastUpdatedTime();
    }
}

經過最後一步的ClientBeatUpdateTask能夠發現,其餘節點發送來的verify請求,實際上就是發送端的狀態報告請求。Client.getAllInstancePublishInfo()返回的是某一個客戶端下的全部實例信息列表,最後再更新Client自身的最新活躍時間。

驗證任務功能總結:

  • 每一個節點在應用啓動完畢以後延遲5秒,以後間隔5秒向其餘節點發送Verify請求。
  • 每一個節點只發送本身負責的、且未被移除的Client,且每個Client都會發送一次請求,請求參數裏面只附帶了Client的clientId屬性。(意味着當前節點只會告訴其餘節點我目前有這個Client正在提供服務,並未提供服務的具體信息。)
  • 接收Verify請求的節點從請求參數中獲取clientId,並檢查自身是否有這個Client,若此Client存在,則更新Client下的全部Instance、以及Client自身的最新活躍時間爲當前時間。

節點數據同步

除了開啓驗證任務以外,還開啓了一個數據加載的任務startLoadTask(),用於從其餘節點同步數據到本節點。同時它也維護着本機節點是否初始化完成的一個狀態isInitialized,當前節點從其餘節點加載數據成功以後,將這個狀態設置爲true。表示當前節點已經同步完畢數據,能夠參與正常服務;同時又設置了DistroDataStorageisFinishInitial屬性爲true,表示數據已準備好,驗證任務能夠執行了。

數據同步的執行流程

// DistroProtocol.java

private void startLoadTask() {
	DistroCallback loadCallback = new DistroCallback() {
		@Override
		public void onSuccess() {
			isInitialized = true;
		}

		@Override
		public void onFailed(Throwable throwable) {
			isInitialized = false;
		}
	};
	GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}

加載任務的一開始建立了一個 DistroLoadDataTask 任務,並傳入了一個在加載完畢以後更改當前節點Distro協議完成狀態的回調函數。

// DistroLoadDataTask.java 

private void load() throws Exception {
	// 若出自身以外沒有其餘節點,則休眠1秒,可能其餘節點還未啓動完畢
	while (memberManager.allMembersWithoutSelf().isEmpty()) {
		Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
		TimeUnit.SECONDS.sleep(1);
	}
	// 若數據類型爲空,說明distroComponentHolder的組件註冊器還未初始化完畢(v1版本爲DistroHttpRegistry, v2版本爲DistroClientComponentRegistry)
	while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
		Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
		TimeUnit.SECONDS.sleep(1);
	}
	// 加載每一個類型的數據
	for (String each : distroComponentHolder.getDataStorageTypes()) {
		if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
			// 調用加載方法,並標記已處理
			loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
		}
	}
}

/**
 * 從其餘節點獲取同步數據
 * @param resourceType
 * @return
 */
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
	// 獲取數據傳輸對象
	DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
	// 獲取數據處理器
	DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
	if (null == transportAgent || null == dataProcessor) {
		Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor);
		return false;
	}
	// 向每一個節點請求數據
	for (Member each : memberManager.allMembersWithoutSelf()) {
		try {
			Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
			// 獲取到數據
			DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
			// 解析數據
			boolean result = dataProcessor.processSnapshot(distroData);
			Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result);
			// 若解析成功,標記此類型數據已加載完畢
			if (result) {
				distroComponentHolder.findDataStorage(resourceType).finishInitial();
				return true;
			}
		} catch (Exception e) {
			Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
		}
	}
	return false;
}

加載的流程實際上和驗證相似,都是經過獲取須要加載的數據類型,使用DistroTransportAgent獲取數據,使用DistroDataProcessor來處理數據。

執行同步任務

提示:
對於v1版本的相關操做,後續將再也不分析,感興趣的同窗能夠自行研究。此處已經給出了數據加載流程,無非就是他們的處理邏輯不一樣。

v2版本的數據同步

真正的同步任務從DistroLoadDataTaskloadAllDataSnapshotFromRemote方法開始。

使用DistroTransportAgent獲取數據
// DistroClientTransportAgent.java

public DistroData getDatumSnapshot(String targetServer) {
	// 從節點管理器獲取目標節點信息
	Member member = memberManager.find(targetServer);
	// 判斷目標服務器是否健康
	if (checkTargetServerStatusUnhealthy(member)) {
		throw new DistroException(String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
	}
	
	// 構建請求參數
	DistroDataRequest request = new DistroDataRequest();
	// 設置請求的操做類型爲DataOperation.SNAPSHOT
	request.setDataOperation(DataOperation.SNAPSHOT);
	try {
		// 使用Rpc代理對象發送同步rpc請求
		Response response = clusterRpcClientProxy.sendRequest(member, request);
		if (checkResponse(response)) {
			return ((DistroDataResponse) response).getDistroData();
		} else {
			throw new DistroException(String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s", targetServer, response.getErrorCode(), response.getMessage()));
		}
	} catch (NacosException e) {
		throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
	}
}
使用DistroDataProcessor處理數據
// DistroClientDataProcessor.java

public boolean processSnapshot(DistroData distroData) {
	// 反序列化獲取的DistroData爲ClientSyncDatumSnapshot
	ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
	// 處理結果集,這裏將返回遠程節點負責的全部client以及client下面的service、instance信息
	for (ClientSyncData each : snapshot.getClientSyncDataList()) {
		// 每次處理一個client
		handlerClientSyncData(each);
	}
	return true;
}

private void handlerClientSyncData(ClientSyncData clientSyncData) {
	Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
	// 由於是同步數據,所以建立IpPortBasedClient,並緩存
	clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
	Client client = clientManager.getClient(clientSyncData.getClientId());
	// 升級此客戶端的服務信息
	upgradeClient(client, clientSyncData);
}

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
	// 當前處理的遠端節點中的數據集合
	// 獲取全部的namespace
	List<String> namespaces = clientSyncData.getNamespaces();
	// 獲取全部的groupNames
	List<String> groupNames = clientSyncData.getGroupNames();
	// 獲取全部的serviceNames
	List<String> serviceNames = clientSyncData.getServiceNames();
	// 獲取全部的instance
	List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
	// 已同步的服務集合
	Set<Service> syncedService = new HashSet<>();

	// ①
	for (int i = 0; i < namespaces.size(); i++) {
		// 從獲取的數據中構建一個Service對象
		Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
		Service singleton = ServiceManager.getInstance().getSingleton(service);
		// 標記此service已被處理
		syncedService.add(singleton);
		// 獲取當前的實例
		InstancePublishInfo instancePublishInfo = instances.get(i);
		// 判斷是否已經包含當前實例
		if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
			// 不包含則添加
			client.addServiceInstance(singleton, instancePublishInfo);
			// 當前節點發布服務註冊事件
			NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
		}
	}
	// 若當前client內部已發佈的service不在本次同步的列表內,說明已通過時了,要刪掉
	for (Service each : client.getAllPublishedService()) {
		if (!syncedService.contains(each)) {
			client.removeServiceInstance(each);
			// 發佈客戶端下線事件
			NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
		}
	}
}

註釋①標記處的處理邏輯和ClientSyncData這個對象存儲的對象有關係,此處是存放的以Service爲維度的信息,它將一個Service的所有信息分別保存,並保證全部列表中的數據順序一致。

ClientSyncData示例數據:

clientSyncData = {ClientSyncData@12737} 
	clientId = "10.55.56.1:8888#true"
	attributes = {ClientSyncAttributes@12740} 
	namespaces = {ArrayList@12741}  size = 2
		0 = "public"
		1 = "public"
	groupNames = {ArrayList@12742}  size = 2
		0 = "DEFAULT_GROUP"
		1 = "DEFAULT_GROUP"
	serviceNames = {ArrayList@12743}  size = 2
		0 = "SERVICE_01"
		1 = "SERVICE_02"
	instancePublishInfos = {ArrayList@12744}  size = 2
		0 = {InstancePublishInfo@12941} "InstancePublishInfo{ip='10.55.56.1', port=8888, healthy=false}"
		1 = {InstancePublishInfo@12942} "InstancePublishInfo{ip='10.55.56.1', port=8888, healthy=false}"

經過示例數據能夠看出在10.55.56.1這個客戶端中有兩個服務,他們都在同一個namespace、同一個group中,由於InstancePublishInfo是和Service一對一的關係,而一個客戶端下的服務IP必定和客戶端的IP是一致的,因此也會存在兩條instance信息。upgradeClient的主要功能就是,將從其餘節點獲取的全部註冊的服務註冊到當前節點內。

TODO 這裏有個疑問,首次同步數據只會執行一次拉取(若拉取失敗則會再次拉取,若拉取事後沒有數據也不會再次拉取),並且拉取的是某個節點負責的服務數據,爲什麼當前節點要發佈事件呢?服務的狀態維護不是應該由它負責的節點來維護嘛,好比我拉取的A服務是B節點的,我同步過來就OK了,若是A服務下線了,B節點來觸發變動不就好了。而後再通知其餘節點下線。

同步數據功能總結:
在一個節點啓動以後,會主動向集羣中的其餘節點發送數據同步請求,接收到結果以後將其註冊到自身節點內。

DistroProtocol 功能總結:

  • 節點狀態報告:能用於向其餘節點報告自身的服務狀態。
  • 數據同步: 從其餘節點拉取服務數據註冊。
相關文章
相關標籤/搜索