Nacos源碼分析-Distro協議概覽

舒適提示:
本文內容基於我的學習Nacos 2.0.1版本代碼總結而來,因我的理解差別,不保證徹底正確。若有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。html

什麼是Distro協議

今天來分析Nacos中使用的一種叫做Distro的協議,Distro是阿里巴巴內部使用的一種協議,用於實現分佈式環境下的數據一致性。協議約定了節點之間通訊的數據格式,數據解析規則,數據傳輸載體。它是一種臨時數據一致性協議,所管理的數據僅保留在內存中。java

Distro協議用來作什麼

Nacos做爲一個分佈式服務管理平臺(其最主要的功能之一),在分佈式環境下每一個節點上面的服務信息都會有不一樣的狀態,當服務的可用狀態變動等一系列的問題都須要通知其餘節點,每一個節點上的服務列表也須要進行同步。Distro協議就是用於在不一樣節點之間同步節點之間的服務。除了字面上的同步以外,它還負責向其餘節點報告自身的服務狀態。事實上也能夠看作是一種同步。node

本篇內容不設計該協議的具體操做,僅從Nacos中全部關於Distro的類中來看看它能作什麼。經過在Idea內搜索Distro開頭的類能夠發現它有30個類,分別分佈在nacos-corenacos-namingnacos-config模塊中。本篇只分析nacos-core模塊下的內容,由於它已經覆蓋了Distro協議的完整流程。redis

提示:
這裏能夠先記住一個關鍵詞同步。所謂的同步無非就是從遠端獲取數據到本地,或者是從本地發送數據到遠端,同步的數據在這裏確定就是 服務相關的了。畢竟在官方文檔中都是這樣寫的:」服務(Service)是 Nacos 世界的一等公民」。
本篇介紹的全部內容均是爲了服務於同步這個概念的。數組

Distro協議的核心組件

nacos-core模塊下,定義了Distro協議的全部組件。緩存

distro
	component						Distro的一些組件,例如數據存儲對象、數據處理器、數據傳輸代理等
	entity							實體對象
	exception						異常處理
	task							任務相關
		delay						延遲任務相關組件
		execute						任務執行器相關組件
		load						加載任務相關組件
		verify						驗證任務相關組件
	DistroConfig.java				Distro配置信息
	DistroConstants.java			Distro常量
	DistroProtocol.java 			Distro協議入口

com.alibaba.nacos.core.distributed.distro.component

DistroCallback

Distro回調接口,用於異步處理以後須要回調的場景。服務器

public interface DistroCallback {
    
    /**
     * Callback when distro task execute successfully.
     */
    void onSuccess();
    
    /**
     * Callback when distro task execute failed.
     *
     * @param throwable throwable if execute failed caused by exception
     */
    void onFailed(Throwable throwable);
}

DistroComponentHolder

Distro組件持有者,它內部定義了一些容器(HashMap)來存儲Distro協議須要用到的數據,至關於一個大管家。app

@Component
public class DistroComponentHolder {
	
    // 存儲不一樣類型的DistroData傳輸對象
    private final Map<String, DistroTransportAgent> transportAgentMap = new HashMap<>();
    // 存儲不一樣類型的DistroData裝載容器
    private final Map<String, DistroDataStorage> dataStorageMap = new HashMap<>();
    // 存儲不一樣類型的Distro失敗任務處理器
    private final Map<String, DistroFailedTaskHandler> failedTaskHandlerMap = new HashMap<>();
    // 存儲不一樣類型的DistroData數據處理器
    private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();
    
    public DistroTransportAgent findTransportAgent(String type) {
        return transportAgentMap.get(type);
    }
    
    public void registerTransportAgent(String type, DistroTransportAgent transportAgent) {
        transportAgentMap.put(type, transportAgent);
    }
    
    public DistroDataStorage findDataStorage(String type) {
        return dataStorageMap.get(type);
    }
    
    public void registerDataStorage(String type, DistroDataStorage dataStorage) {
        dataStorageMap.put(type, dataStorage);
    }
    
    public Set<String> getDataStorageTypes() {
        return dataStorageMap.keySet();
    }
    
    public DistroFailedTaskHandler findFailedTaskHandler(String type) {
        return failedTaskHandlerMap.get(type);
    }
    
    public void registerFailedTaskHandler(String type, DistroFailedTaskHandler failedTaskHandler) {
        failedTaskHandlerMap.put(type, failedTaskHandler);
    }
    
    public void registerDataProcessor(DistroDataProcessor dataProcessor) {
        dataProcessorMap.putIfAbsent(dataProcessor.processType(), dataProcessor);
    }
    
    public DistroDataProcessor findDataProcessor(String processType) {
        return dataProcessorMap.get(processType);
    }
}

DistroDataProcessor

用於處理Distro協議的數據對象。異步

/**
 * Distro data processor.
 *
 * @author xiweng.yy
 */
public interface DistroDataProcessor {
    
    /**
     * Process type of this processor.
     * 當前處理器可處理的類型
     * @return type of this processor
     */
    String processType();
    
    /**
     * Process received data.
     * 處理接收到的數據
     * @param distroData received data	接收到的數據對象
     * @return true if process data successfully, otherwise false
     */
    boolean processData(DistroData distroData);
    
    /**
     * Process received verify data.
     * 處理接收到的驗證類型的數據
     * @param distroData    verify data	被處理的數據
     * @param sourceAddress source server address, might be get data from source server 被處理數據的來源服務器
     * @return true if the data is available, otherwise false
     */
    boolean processVerifyData(DistroData distroData, String sourceAddress);
    
    /**
     * Process snapshot data.
     * 處理快照數據
     * @param distroData snapshot data
     * @return true if process data successfully, otherwise false
     */
    boolean processSnapshot(DistroData distroData);
}

DiustroDataStorage

DistroData的存儲器async

public interface DistroDataStorage {
    
    /**
     * Set this distro data storage has finished initial step.
	 * 設置當前存儲器已經初始化完畢它內部的DistroData
     */
    void finishInitial();
    
    /**
     * Whether this distro data is finished initial.
     * 當前存儲器是否已經初始化完畢內部的DistroData
     * <p>If not finished, this data storage should not send verify data to other node.
     *
     * @return {@code true} if finished, otherwise false
     */
    boolean isFinishInitial();
    
    /**
     * Get distro datum.
     * 獲取內部的DistroData
     * @param distroKey key of distro datum	數據對應的key
     * @return need to sync datum
     */
    DistroData getDistroData(DistroKey distroKey);
    
    /**
     * Get all distro datum snapshot.
     * 獲取內部存儲的全部DistroData
     * @return all datum
     */
    DistroData getDatumSnapshot();
    
    /**
     * Get verify datum.
     * 獲取全部的DistroData用於驗證
     * @return verify datum
     */
    List<DistroData> getVerifyData();
}

DistroFailedTaskHandler

用於Distro任務失敗重試

public interface DistroFailedTaskHandler {
    
    /**
     * Build retry task when distro task execute failed.
     * 當Distro任務執行失敗能夠建立重試任務
     * @param distroKey distro key of failed task	失敗任務的distroKey
     * @param action action of task					任務的操做類型
     */
    void retry(DistroKey distroKey, DataOperation action);
}

DistroTransportAgent

DistroData的傳輸代理,用於發送請求。

public interface DistroTransportAgent {
    
    /**
     * Whether support transport data with callback.
     * 是否支持回調
     * @return true if support, otherwise false
     */
    boolean supportCallbackTransport();
    
    /**
     * Sync data.
     * 同步數據
     * @param data         data			被同步的數據
     * @param targetServer target server同步的目標服務器
     * @return true is sync successfully, otherwise false
     */
    boolean syncData(DistroData data, String targetServer);
    
    /**
     * Sync data with callback.
     * 帶回調的同步方法
     * @param data         data
     * @param targetServer target server
     * @param callback     callback
     * @throws UnsupportedOperationException if method supportCallbackTransport is false, should throw {@code
     *                                       UnsupportedOperationException}
     */
    void syncData(DistroData data, String targetServer, DistroCallback callback);
    
    /**
     * Sync verify data.
     * 同步驗證數據
     * @param verifyData   verify data
     * @param targetServer target server
     * @return true is verify successfully, otherwise false
     */
    boolean syncVerifyData(DistroData verifyData, String targetServer);
    
    /**
     * Sync verify data.
     * 帶回調的同步驗證數據
     * @param verifyData   verify data
     * @param targetServer target server
     * @param callback     callback
     * @throws UnsupportedOperationException if method supportCallbackTransport is false, should throw {@code
     *                                       UnsupportedOperationException}
     */
    void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);
    
    /**
     * get Data from target server.
     * 從遠程節點獲取指定數據
     * @param key          key of data	須要獲取數據的key
     * @param targetServer target server遠端節點地址
     * @return distro data
     */
    DistroData getData(DistroKey key, String targetServer);
    
    /**
     * Get all datum snapshot from target server.
     * 從遠端節點獲取全量快照數據
     * @param targetServer target server.
     * @return distro data
     */
    DistroData getDatumSnapshot(String targetServer);
}

com.alibaba.nacos.core.distributed.distro.entity

這裏存放了Distro協議的數據對象。

DistroData

Distro協議的核心對象,協議交互過程當中的數據傳輸將使用此對象,它的設計也能夠看作是一個容器,後期將會常常看見他。

public class DistroData {
    // 數據的key
    private DistroKey distroKey;
    // 數據的操做類型,也能夠理解爲是什麼操做產生了此數據,或此數據用於什麼操做
    private DataOperation type;
    // 數據的字節數組
    private byte[] content;
    
    public DistroData() {
    }
    
    public DistroData(DistroKey distroKey, byte[] content) {
        this.distroKey = distroKey;
        this.content = content;
    }
    
    public DistroKey getDistroKey() {
        return distroKey;
    }
    
    public void setDistroKey(DistroKey distroKey) {
        this.distroKey = distroKey;
    }
    
    public DataOperation getType() {
        return type;
    }
    
    public void setType(DataOperation type) {
        this.type = type;
    }
    
    public byte[] getContent() {
        return content;
    }
    
    public void setContent(byte[] content) {
        this.content = content;
    }
}

DistroKey

DistroData的key對象,能夠包含較多的屬性。

public class DistroKey {
    // 數據自己的key
    private String resourceKey;
    // 數據的類型
    private String resourceType;
    // 數據傳輸的目標服務器
    private String targetServer;
    
    public DistroKey() {
    }
    
    public DistroKey(String resourceKey, String resourceType) {
        this.resourceKey = resourceKey;
        this.resourceType = resourceType;
    }
    
    public DistroKey(String resourceKey, String resourceType, String targetServer) {
        this.resourceKey = resourceKey;
        this.resourceType = resourceType;
        this.targetServer = targetServer;
    }
    
    public String getResourceKey() {
        return resourceKey;
    }
    
    public void setResourceKey(String resourceKey) {
        this.resourceKey = resourceKey;
    }
    
    public String getResourceType() {
        return resourceType;
    }
    
    public void setResourceType(String resourceType) {
        this.resourceType = resourceType;
    }
    
    public String getTargetServer() {
        return targetServer;
    }
    
    public void setTargetServer(String targetServer) {
        this.targetServer = targetServer;
    }
    
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        DistroKey distroKey = (DistroKey) o;
        return Objects.equals(resourceKey, distroKey.resourceKey) && Objects
                .equals(resourceType, distroKey.resourceType) && Objects.equals(targetServer, distroKey.targetServer);
    }
    
    @Override
    public int hashCode() {
        return Objects.hash(resourceKey, resourceType, targetServer);
    }
    
    @Override
    public String toString() {
        return "DistroKey{" + "resourceKey='" + resourceKey + '\'' + ", resourceType='" + resourceType + '\''
                + ", targetServer='" + targetServer + '\'' + '}';
    }
}

com.alibaba.nacos.core.distributed.distro.exception

com.alibaba.nacos.core.distributed.distro.task

DistroTaskEngineHolder

Distro任務引擎持有者,用於管理不一樣類型的任務執行引擎。

@Component
public class DistroTaskEngineHolder {
    // 延遲任務執行引擎
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
    // 非延遲任務執行引擎
    private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
    
    public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
		// 爲延遲任務執行引擎添加默認任務處理器
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }
    
    public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
        return delayTaskExecuteEngine;
    }
    
    public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
        return executeWorkersManager;
    }
    
	/**
     * 爲延遲任務添加默認任務處理器
     * @param key          處理器向容器保存時的key
     * @param nacosTaskProcessor 處理器對象
     */
    public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
        this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
    }
}

com.alibaba.nacos.core.distributed.distro.task.delay

DistroDelayTask

Distro延遲任務

public class DistroDelayTask extends AbstractDelayTask {
    // 當前任務處理數據的key
    private final DistroKey distroKey;
    // 當前任務處理數據的操做類型
    private DataOperation action;
    // 當前任務建立的時間
    private long createTime;
    
    public DistroDelayTask(DistroKey distroKey, long delayTime) {
        this(distroKey, DataOperation.CHANGE, delayTime);
    }
    
	// 構造一個延遲任務
    public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {
        this.distroKey = distroKey;
        this.action = action;
        this.createTime = System.currentTimeMillis();
		// 建立時設置上次處理的時間
        setLastProcessTime(createTime);
		// 設置間隔多久執行
        setTaskInterval(delayTime);
    }
    
    public DistroKey getDistroKey() {
        return distroKey;
    }
    
    public DataOperation getAction() {
        return action;
    }
    
    public long getCreateTime() {
        return createTime;
    }
    
    /**
     * 從字面意思是合併任務,實際的操做證實它是用於更新過期的任務
     * 在向任務列表添加新的任務時,使用新任務的key來從任務列表獲取,若結果不爲空,代表此任務已經存在
     * 相同的任務再次添加的話,就重複了,所以再此合併
     * 爲何新的任務會過期?(新任務指的是當前類)
     * 想要理解此處邏輯,請參考{@link com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask(Object,
     *  AbstractDelayTask)}.添加任務時是帶鎖操做的。所以添加的前後順序不能保證
     * @param task task 已存在的任務
     */
    @Override
    public void merge(AbstractDelayTask task) {
        if (!(task instanceof DistroDelayTask)) {
            return;
        }
        DistroDelayTask oldTask = (DistroDelayTask) task;
        // 若舊的任務和新的任務的操做類型不一樣,而且新任務的建立時間小於舊任務的建立時間,說明當前這個新任務還未被添加成功
        // 這個新的任務已通過時了,不須要再執行這個任務的操做,所以將舊的任務的操做類型和建立時間設置給新任務
        if (!action.equals(oldTask.getAction()) && createTime < oldTask.getCreateTime()) {
            action = oldTask.getAction();
            createTime = oldTask.getCreateTime();
        }
        setLastProcessTime(oldTask.getLastProcessTime());
    }
}

DistroDelayTaskExecuteEngine

延遲任務執行引擎

public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
    
    public DistroDelayTaskExecuteEngine() {
        super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);
    }
    
    @Override
    public void addProcessor(Object key, NacosTaskProcessor taskProcessor) {
		// 構建當前任務的key
        Object actualKey = getActualKey(key);
        super.addProcessor(actualKey, taskProcessor);
    }
    
    @Override
    public NacosTaskProcessor getProcessor(Object key) {
        Object actualKey = getActualKey(key);
        return super.getProcessor(actualKey);
    }
    
    private Object getActualKey(Object key) {
        return key instanceof DistroKey ? ((DistroKey) key).getResourceType() : key;
    }
}

DistroDelayTaskProcessor

延遲任務處理器

/**
 * Distro delay task processor.
 *
 * @author xiweng.yy
 */
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    // Distro任務引擎持有者
    private final DistroTaskEngineHolder distroTaskEngineHolder;
    // Distro組件持有者
    private final DistroComponentHolder distroComponentHolder;
    
    public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder,
            DistroComponentHolder distroComponentHolder) {
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        this.distroComponentHolder = distroComponentHolder;
    }
    
    @Override
    public boolean process(NacosTask task) {
		// 不處理非延遲任務
        if (!(task instanceof DistroDelayTask)) {
            return true;
        }
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
		// 根據不一樣的操做類型建立具體的任務
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

com.alibaba.nacos.core.distributed.distro.task.execute

AbstractDistroExecuteTask

抽象的執行任務,定義了任務處理流程。

public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {
    
    private final DistroKey distroKey;
    
    private final DistroComponentHolder distroComponentHolder;
    
    protected AbstractDistroExecuteTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
        this.distroKey = distroKey;
        this.distroComponentHolder = distroComponentHolder;
    }
    
    protected DistroKey getDistroKey() {
        return distroKey;
    }
    
    protected DistroComponentHolder getDistroComponentHolder() {
        return distroComponentHolder;
    }
    
    @Override
    public void run() {
		// 獲取被處理的數據資源類型
        String type = getDistroKey().getResourceType();
		// 根據類型獲取數據傳輸代理
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
            return;
        }
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
		// 判斷代理對象是否支持回調
        if (transportAgent.supportCallbackTransport()) {
            doExecuteWithCallback(new DistroExecuteCallback());
        } else {
            executeDistroTask();
        }
    }
    
	// 執行任務
    private void executeDistroTask() {
        try {
            boolean result = doExecute();
            if (!result) {
				// 執行失敗以後,進行失敗處理
                handleFailedTask();
            }
            Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
        } catch (Exception e) {
            Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
			// 執行失敗任務,進行失敗處理
            handleFailedTask();
        }
    }
    
    /**
     * Get {@link DataOperation} for current task.
     *
     * @return data operation
     */
    protected abstract DataOperation getDataOperation();
    
    /**
     * Do execute for different sub class.
     *
     * @return result of execute
     */
    protected abstract boolean doExecute();
    
    /**
     * Do execute with callback for different sub class.
     *
     * @param callback callback
     */
    protected abstract void doExecuteWithCallback(DistroCallback callback);
    
    /**
     * Handle failed task.
	 * 處理失敗的任務
     */
    protected void handleFailedTask() {
        String type = getDistroKey().getResourceType();
		// 使用失敗任務處理器進行重試
        DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);
        if (null == failedTaskHandler) {
            Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type);
            return;
        }
        failedTaskHandler.retry(getDistroKey(), getDataOperation());
    }
    
    private class DistroExecuteCallback implements DistroCallback {
        
        @Override
        public void onSuccess() {
            Loggers.DISTRO.info("[DISTRO-END] {} result: true", getDistroKey().toString());
        }
        
        @Override
        public void onFailed(Throwable throwable) {
            if (null == throwable) {
                Loggers.DISTRO.info("[DISTRO-END] {} result: false", getDistroKey().toString());
            } else {
                Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", throwable);
            }
            handleFailedTask();
        }
    }
}

DistroExecuteTaskExecuteEngine

Distro協議負責執行任務的執行引擎

package com.alibaba.nacos.common.task.engine;


public class DistroExecuteTaskExecuteEngine extends NacosExecuteTaskExecuteEngine {
    
	// 直接建立了一個新的NacosExecuteTaskExecuteEngine執行引擎
    public DistroExecuteTaskExecuteEngine() {
        super(DistroExecuteTaskExecuteEngine.class.getSimpleName(), Loggers.DISTRO);
    }
}

NacosExecuteTaskExecuteEngine

package com.alibaba.nacos.common.task.engine;

/**
 * Nacos execute task execute engine.
 * Nacos負責執行任務的執行引擎
 * @author xiweng.yy
 */
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
    
	// 任務執行者
    private final TaskExecuteWorker[] executeWorkers;
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
		// 任務執行者的數量,取決於CPU的核數,默認爲CPU核數的1.5~2倍,傳遞的參數是表示須要產生的線程數量是CPU核數的多少倍
        this(name, logger, ThreadUtils.getSuitableThreadCount(1));
    }
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
        super(logger);
		// 建立一組任務執行者
        executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
        for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
            executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
        }
    }
    
    @Override
    public int size() {
        int result = 0;
        for (TaskExecuteWorker each : executeWorkers) {
            result += each.pendingTaskCount();
        }
        return result;
    }
    
    @Override
    public boolean isEmpty() {
        return 0 == size();
    }
    
    @Override
    public void addTask(Object tag, AbstractExecuteTask task) {
		// 從父類獲取任務處理器
        NacosTaskProcessor processor = getProcessor(tag);
		// 若存在處理器,則用處理器來處理
        if (null != processor) {
            processor.process(task);
            return;
        }
		// 不存在處理器則使用worker處理
        TaskExecuteWorker worker = getWorker(tag);
        worker.process(task);
    }
    
    private TaskExecuteWorker getWorker(Object tag) {
		// 計算當前任務應該由哪一個worker處理
        int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
        return executeWorkers[idx];
    }
    
    private int workersCount() {
        return executeWorkers.length;
    }
    
    @Override
    public AbstractExecuteTask removeTask(Object key) {
        throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
    }
    
    @Override
    public Collection<Object> getAllTaskKeys() {
        throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
    }
    
    @Override
    public void shutdown() throws NacosException {
        for (TaskExecuteWorker each : executeWorkers) {
            each.shutdown();
        }
    }
    
    /**
     * Get workers status.
     *
     * @return workers status string
     */
    public String workersStatus() {
        StringBuilder sb = new StringBuilder();
        for (TaskExecuteWorker worker : executeWorkers) {
            sb.append(worker.status()).append("\n");
        }
        return sb.toString();
    }
}

TaskExecuteWorker

package com.alibaba.nacos.common.task.engine;

/**
 * Nacos execute task execute worker.
 * Nacos任務執行者,每一個執行者在建立的時候會同時啓動一個線程InnerWorker,持續從內部隊列中獲取須要處理的任務
 * @author xiweng.yy
 */
public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {

    /**
     * Max task queue size 32768.
     * 隊列最大數量爲32768
     */
    private static final int QUEUE_CAPACITY = 1 << 15;

    private final Logger log;

    /**
     * 當前執行者線程的名稱
     */
    private final String name;

    /**
     * 負責處理的線程隊列
     */
    private final BlockingQueue<Runnable> queue;

    /**
     * 工做狀態
     */
    private final AtomicBoolean closed;

    public TaskExecuteWorker(final String name, final int mod, final int total) {
        this(name, mod, total, null);
    }

    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        /**
         * 執行線程的名稱,以DistroExecuteTaskExecuteEngine舉例:
         * DistroExecuteTaskExecuteEngine_0%8
         * DistroExecuteTaskExecuteEngine_1%8
         * DistroExecuteTaskExecuteEngine_2%8
         * DistroExecuteTaskExecuteEngine_3%8
         * DistroExecuteTaskExecuteEngine_4%8
         * DistroExecuteTaskExecuteEngine_5%8
         * DistroExecuteTaskExecuteEngine_6%8
         * DistroExecuteTaskExecuteEngine_7%8
         */
        this.name = name + "_" + mod + "%" + total;
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        this.closed = new AtomicBoolean(false);
        this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
        // 啓動一個新線程來消費隊列
        new InnerWorker(name).start();
    }

    public String getName() {
        return name;
    }

    @Override
    public boolean process(NacosTask task) {
        if (task instanceof AbstractExecuteTask) {
            putTask((Runnable) task);
        }
        return true;
    }

    private void putTask(Runnable task) {
        try {
            queue.put(task);
        } catch (InterruptedException ire) {
            log.error(ire.toString(), ire);
        }
    }

    public int pendingTaskCount() {
        return queue.size();
    }

    /**
     * Worker status.
     */
    public String status() {
        return name + ", pending tasks: " + pendingTaskCount();
    }

    @Override
    public void shutdown() throws NacosException {
        queue.clear();
        closed.compareAndSet(false, true);
    }

    /**
     * Inner execute worker.
     */
    private class InnerWorker extends Thread {

        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }

        @Override
        public void run() {
            // 若線程還未中斷,則持續執行
            while (!closed.get()) {
                try {
                    // 從隊列獲取任務
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    // 在當前InnerWorker線程內執行任務
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    // 若任務執行時間超過1秒,則警告
                    if (duration > 1000L) {
                        log.warn("task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e.toString(), e);
                }
            }
        }
    }
}

DistroSyncChangeTask

Distro同步變動任務,此任務用於向其餘節點發送本機數據

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    
	// 此任務操做類型爲變動
    private static final DataOperation OPERATION = DataOperation.CHANGE;
    
    public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
        super(distroKey, distroComponentHolder);
    }
    
    @Override
    protected DataOperation getDataOperation() {
        return OPERATION;
    }
    
	/**
     * 執行不帶回調的任務
     * @return
     */
    @Override
    protected boolean doExecute() {
		// 獲取同步的數據類型
        String type = getDistroKey().getResourceType();
		// 獲取同步數據
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
		// 使用DistroTransportAgent同步數據
        return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
    }
    
	/**
     * 執行帶回調的任務
     * @param callback callback
     */
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    @Override
    public String toString() {
        return "DistroSyncChangeTask for " + getDistroKey().toString();
    }
    
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

DistroSyncDeleteTask

Distro同步刪除任務,用於向其餘節點發送本機刪除的數據

public class DistroSyncDeleteTask extends AbstractDistroExecuteTask {
    
	// 此任務操做類型爲刪除
    private static final DataOperation OPERATION = DataOperation.DELETE;
    
    public DistroSyncDeleteTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
        super(distroKey, distroComponentHolder);
    }
    
    @Override
    protected DataOperation getDataOperation() {
        return OPERATION;
    }
    
	/**
     * 執行不帶回調的任務
     * @return
     */
    @Override
    protected boolean doExecute() {
		// 構建請求參數
        String type = getDistroKey().getResourceType();
        DistroData distroData = new DistroData();
        distroData.setDistroKey(getDistroKey());
        distroData.setType(OPERATION);
		// 使用DistroTransportAgent同步數據
        return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
    }
    
	/**
     * 執行帶回調的任務
     * @param callback callback
     */
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = new DistroData();
        distroData.setDistroKey(getDistroKey());
        distroData.setType(OPERATION);
        getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    @Override
    public String toString() {
        return "DistroSyncDeleteTask for " + getDistroKey().toString();
    }
}

提示:

  • DistroSyncChangeTask是將本機全部的服務發送到其餘節點
  • DistroSyncDeleteTask是將本機刪除的服務發送到其餘節點

com.alibaba.nacos.core.distributed.distro.task.load

DistroLoadDataTask

Distro全量數據同步任務,用於在節點啓動後首次從其餘節點同步服務數據到當前節點。

public class DistroLoadDataTask implements Runnable {

	// 節點管理器
    private final ServerMemberManager memberManager;
	// Distro協議組件持有者
    private final DistroComponentHolder distroComponentHolder;
	// Distro協議配置
    private final DistroConfig distroConfig;
	// 回調函數
    private final DistroCallback loadCallback;
	// 已加載數據集合
    private final Map<String, Boolean> loadCompletedMap;

    public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroConfig distroConfig, DistroCallback loadCallback) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroConfig = distroConfig;
        this.loadCallback = loadCallback;
        loadCompletedMap = new HashMap<>(1);
    }

    @Override
    public void run() {
        try {
			// 首次加載
            load();
			// 若首次加載沒有完成,繼續加載
            if (!checkCompleted()) {
				// 繼續建立一個新的加載任務進行加載
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
				// 觸發回調函數
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
        } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
        }
    }

    private void load() throws Exception {
		// 若出自身以外沒有其餘節點,則休眠1秒,可能其餘節點還未啓動完畢
        while (memberManager.allMembersWithoutSelf().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
            TimeUnit.SECONDS.sleep(1);
        }
		// 若數據類型爲空,說明distroComponentHolder的組件註冊器還未初始化完畢(v1版本爲DistroHttpRegistry, v2版本爲DistroClientComponentRegistry)
        while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
            TimeUnit.SECONDS.sleep(1);
        }
		// 加載每一個類型的數據
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
				// 調用加載方法,並標記已處理
                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
            }
        }
    }

	/**
     * 從其餘節點獲取同步數據
     * @param resourceType
     * @return
     */
    private boolean loadAllDataSnapshotFromRemote(String resourceType) {
		// 獲取數據傳輸對象
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
		// 獲取數據處理器
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == transportAgent || null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor);
            return false;
        }
		// 向每一個節點請求數據
        for (Member each : memberManager.allMembersWithoutSelf()) {
            try {
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
				// 獲取到數據
                DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
				// 解析數據
                boolean result = dataProcessor.processSnapshot(distroData);
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result);
				// 若解析成功,標記此類型數據已加載完畢
                if (result) {
                    distroComponentHolder.findDataStorage(resourceType).finishInitial();
                    return true;
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
            }
        }
        return false;
    }

	// 判斷是否完成加載
    private boolean checkCompleted() {
		// 若待加載的數據類型數量和已經加載完畢的數據類型數量不一致,鐵定是未加載完成
        if (distroComponentHolder.getDataStorageTypes().size() != loadCompletedMap.size()) {
            return false;
        }
		// 若加載完畢列表內的狀態有false的,說明多是解析失敗,還須要從新加載
        for (Boolean each : loadCompletedMap.values()) {
            if (!each) {
                return false;
            }
        }
        return true;
    }
}

com.alibaba.nacos.core.distributed.distro.task.verify

DistroVerifyExecuteTask

Distro數據驗證任務執行器,用於向其餘節點發送當前節點負責的Client狀態報告,通知對方此Client正常服務。它的數據處理維度是DistroData。

/**
 * Execute distro verify task.
 * 執行Distro協議數據驗證的任務,爲每一個DistroData發送一個異步的rpc請求
 * @author xiweng.yy
 */
public class DistroVerifyExecuteTask extends AbstractExecuteTask {

    /**
     * 被驗證數據的傳輸對象
     */
    private final DistroTransportAgent transportAgent;

    /**
     * 被驗證數據
     */
    private final List<DistroData> verifyData;

    /**
     * 目標節點
     */
    private final String targetServer;

    /**
     * 被驗證數據的類型
     */
    private final String resourceType;

    public DistroVerifyExecuteTask(DistroTransportAgent transportAgent, List<DistroData> verifyData,
            String targetServer, String resourceType) {
        this.transportAgent = transportAgent;
        this.verifyData = verifyData;
        this.targetServer = targetServer;
        this.resourceType = resourceType;
    }

    @Override
    public void run() {
        for (DistroData each : verifyData) {
            try {
                // 判斷傳輸對象是否支持回調(如果http的則不支持,實際上沒區別,當前2.0.1版本沒有實現回調的實質內容)
                if (transportAgent.supportCallbackTransport()) {
                    doSyncVerifyDataWithCallback(each);
                } else {
                    doSyncVerifyData(each);
                }
            } catch (Exception e) {
                Loggers.DISTRO
                        .error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
            }
        }
    }

    /**
     * 支持回調的同步數據驗證
     * @param data
     */
    private void doSyncVerifyDataWithCallback(DistroData data) {
        // 回調實際上,也沒啥。。。基本算是空對象
        transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback());
    }

    /**
     * 不支持回調的同步數據驗證
     * @param data
     */
    private void doSyncVerifyData(DistroData data) {
        transportAgent.syncVerifyData(data, targetServer);
    }

    /**
     * TODO add verify monitor.
     */
    private class DistroVerifyCallback implements DistroCallback {

        @Override
        public void onSuccess() {
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO] verify data for type {} to {} success", resourceType, targetServer);
            }
        }

        @Override
        public void onFailed(Throwable throwable) {
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO
                        .debug("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer,
                                throwable);
            }
        }
    }
}

DistroVerifyTimedTask

定時驗證任務,此任務在啓動時延遲5秒,間隔5秒執行。主要用於爲每一個節點建立一個數據驗證的執行任務DistroVerifyExecuteTask。它的數據處理維度是Member。

/**
 * Timed to start distro verify task.
 * 啓動Distro協議的數據驗證流程
 * @author xiweng.yy
 */
public class DistroVerifyTimedTask implements Runnable {

    private final ServerMemberManager serverMemberManager;

    private final DistroComponentHolder distroComponentHolder;

    private final DistroExecuteTaskExecuteEngine executeTaskExecuteEngine;

    public DistroVerifyTimedTask(ServerMemberManager serverMemberManager, DistroComponentHolder distroComponentHolder,
            DistroExecuteTaskExecuteEngine executeTaskExecuteEngine) {
        this.serverMemberManager = serverMemberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.executeTaskExecuteEngine = executeTaskExecuteEngine;
    }

    @Override
    public void run() {
        try {
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("server list is: {}", targetServer);
            }
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }

    private void verifyForDataStorage(String type, List<Member> targetServer) {
        DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
        if (!dataStorage.isFinishInitial()) {
            Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                    dataStorage.getClass().getSimpleName());
            return;
        }
        List<DistroData> verifyData = dataStorage.getVerifyData();
        if (null == verifyData || verifyData.isEmpty()) {
            return;
        }
        for (Member member : targetServer) {
            DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
            if (null == agent) {
                continue;
            }
            executeTaskExecuteEngine.addTask(member.getAddress() + type,
                    new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
        }
    }
}

DistroConfig

Distro協議的配置信息。

public class DistroConfig {
    
    private static final DistroConfig INSTANCE = new DistroConfig();
    // 同步任務延遲時長(單位:毫秒)
    private long syncDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_DELAY_MILLISECONDS;
    // 同步任務超時時長(單位:毫秒)
    private long syncTimeoutMillis = DistroConstants.DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS;
    // 同步任務重試延遲時長(單位:毫秒)
    private long syncRetryDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS;
    // 驗證任務執行間隔時長(單位:毫秒)
    private long verifyIntervalMillis = DistroConstants.DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS;
    // 驗證任務超時時長(單位:毫秒)
    private long verifyTimeoutMillis = DistroConstants.DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS;
    // 首次同步數據重試延遲時長(單位:毫秒)
    private long loadDataRetryDelayMillis = DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS;
    
    private DistroConfig() {
        try {
			// 嘗試從環境信息中獲取配置
            getDistroConfigFromEnv();
        } catch (Exception e) {
            Loggers.CORE.warn("Get Distro config from env failed, will use default value", e);
        }
    }
    
	/**
     * 從環境信息中獲取配置,若沒有,則使用默認值
     */
    private void getDistroConfigFromEnv() {
		
		// 從常量對象中獲取key和default value 
		
        syncDelayMillis = EnvUtil.getProperty(DistroConstants.DATA_SYNC_DELAY_MILLISECONDS, Long.class,
                DistroConstants.DEFAULT_DATA_SYNC_DELAY_MILLISECONDS);
        syncTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_SYNC_TIMEOUT_MILLISECONDS, Long.class,
                DistroConstants.DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS);
        syncRetryDelayMillis = EnvUtil.getProperty(DistroConstants.DATA_SYNC_RETRY_DELAY_MILLISECONDS, Long.class,
                DistroConstants.DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS);
        verifyIntervalMillis = EnvUtil.getProperty(DistroConstants.DATA_VERIFY_INTERVAL_MILLISECONDS, Long.class,
                DistroConstants.DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS);
        verifyTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_VERIFY_TIMEOUT_MILLISECONDS, Long.class,
                DistroConstants.DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS);
        loadDataRetryDelayMillis = EnvUtil.getProperty(DistroConstants.DATA_LOAD_RETRY_DELAY_MILLISECONDS, Long.class,
                DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS);
    }
    
    public static DistroConfig getInstance() {
        return INSTANCE;
    }
    
    public long getSyncDelayMillis() {
        return syncDelayMillis;
    }
    
    public void setSyncDelayMillis(long syncDelayMillis) {
        this.syncDelayMillis = syncDelayMillis;
    }
    
    public long getSyncTimeoutMillis() {
        return syncTimeoutMillis;
    }
    
    public void setSyncTimeoutMillis(long syncTimeoutMillis) {
        this.syncTimeoutMillis = syncTimeoutMillis;
    }
    
    public long getSyncRetryDelayMillis() {
        return syncRetryDelayMillis;
    }
    
    public void setSyncRetryDelayMillis(long syncRetryDelayMillis) {
        this.syncRetryDelayMillis = syncRetryDelayMillis;
    }
    
    public long getVerifyIntervalMillis() {
        return verifyIntervalMillis;
    }
    
    public void setVerifyIntervalMillis(long verifyIntervalMillis) {
        this.verifyIntervalMillis = verifyIntervalMillis;
    }
    
    public long getVerifyTimeoutMillis() {
        return verifyTimeoutMillis;
    }
    
    public void setVerifyTimeoutMillis(long verifyTimeoutMillis) {
        this.verifyTimeoutMillis = verifyTimeoutMillis;
    }
    
    public long getLoadDataRetryDelayMillis() {
        return loadDataRetryDelayMillis;
    }
    
    public void setLoadDataRetryDelayMillis(long loadDataRetryDelayMillis) {
        this.loadDataRetryDelayMillis = loadDataRetryDelayMillis;
    }
}

DistroConstants

Distro常量配置,主要定義了一些關於任務執行時長的可配置的配置名稱和對應的默認值。具體的使用,能夠參考DistroConfig

public class DistroConstants {
    
    public static final String DATA_SYNC_DELAY_MILLISECONDS = "nacos.core.protocol.distro.data.sync.delayMs";
    
    public static final long DEFAULT_DATA_SYNC_DELAY_MILLISECONDS = 1000L;
    
    public static final String DATA_SYNC_TIMEOUT_MILLISECONDS = "nacos.core.protocol.distro.data.sync.timeoutMs";
    
    public static final long DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS = 3000L;
    
    public static final String DATA_SYNC_RETRY_DELAY_MILLISECONDS = "nacos.core.protocol.distro.data.sync.retryDelayMs";
    
    public static final long DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS = 3000L;
    
    public static final String DATA_VERIFY_INTERVAL_MILLISECONDS = "nacos.core.protocol.distro.data.verify.intervalMs";
    
    public static final long DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS = 5000L;
    
    public static final String DATA_VERIFY_TIMEOUT_MILLISECONDS = "nacos.core.protocol.distro.data.verify.timeoutMs";
    
    public static final long DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS = 3000L;
    
    public static final String DATA_LOAD_RETRY_DELAY_MILLISECONDS = "nacos.core.protocol.distro.data.load.retryDelayMs";
    
    public static final long DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS = 30000L;
    
}

DistroProtocol

Distro協議的真正入口,這裏將使用上面定義的全部組件來共同完實現Distro協議。能夠看到它使用了Spring的@Componet註解,意味着它將被Spring容器管理,執行到構造方法的時候將會啓動Distro協議的工做。

@Component
public class DistroProtocol {

    private Logger logger = LoggerFactory.getLogger(DistroProtocol.class);

    /**
     * 節點管理器
     */
    private final ServerMemberManager memberManager;

    /**
     * Distro組件持有者
     */
    private final DistroComponentHolder distroComponentHolder;

    /**
     * Distro任務引擎持有者
     */
    private final DistroTaskEngineHolder distroTaskEngineHolder;

    private volatile boolean isInitialized = false;

    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        // 啓動Distro協議
        startDistroTask();
    }

    private void startDistroTask() {
        // 單機模式不進行數據同步操做
        if (EnvUtil.getStandaloneMode()) {
            isInitialized = true;
            return;
        }
        // 開啓節點Client狀態報告任務
        startVerifyTask();
        // 啓動數據同步任務
        startLoadTask();
    }

    /**
     * 從其餘節點獲取數據到當前節點
     */
    private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }

            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        // 提交數據加載任務
        GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
    }

    private void startVerifyTask() {
        // 啓動數據報告的定時任務
        GlobalExecutor.schedulePartitionDataTimedSync(
            new DistroVerifyTimedTask(
                memberManager,
                distroComponentHolder,
                distroTaskEngineHolder.getExecuteWorkersManager()
            ),
        DistroConfig.getInstance().getVerifyIntervalMillis());
    }

    public boolean isInitialized() {
        return isInitialized;
    }

    /**
     * Start to sync by configured delay.
     * 按配置的延遲開始同步
     * @param distroKey distro key of sync data
     * @param action    the action of data operation
     */
    public void sync(DistroKey distroKey, DataOperation action) {
        sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
    }

    /**
     * Start to sync data to all remote server.
     * 開始將數據同步到其餘節點
     * @param distroKey distro key of sync data
     * @param action    the action of data operation
     * @param delay     delay time for sync
     */
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }

    /**
     * Start to sync to target server.
     *
     * @param distroKey    distro key of sync data
     * @param action       the action of data operation
     * @param targetServer target server
     * @param delay        delay time for sync
     */
    public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }

    /**
     * Query data from specified server.
     * 從指定節點查詢數據
     * @param distroKey data key
     * @return data
     */
    public DistroData queryFromRemote(DistroKey distroKey) {
        if (null == distroKey.getTargetServer()) {
            Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
            return null;
        }
        String resourceType = distroKey.getResourceType();
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
            return null;
        }
        return transportAgent.getData(distroKey, distroKey.getTargetServer());
    }

    /**
     * Receive synced distro data, find processor to process.
     * 接收到同步數據,並查找處理器進行處理
     * @param distroData Received data
     * @return true if handle receive data successfully, otherwise false
     */
    public boolean onReceive(DistroData distroData) {
        Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
                distroData.getDistroKey());
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
            return false;
        }
        return dataProcessor.processData(distroData);
    }

    /**
     * Receive verify data, find processor to process.
     * 接收到驗證數據,並查找處理器進行處理
     * @param distroData    verify data
     * @param sourceAddress source server address, might be get data from source server
     * @return true if verify data successfully, otherwise false
     */
    public boolean onVerify(DistroData distroData, String sourceAddress) {
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
        }
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
            return false;
        }
        return dataProcessor.processVerifyData(distroData, sourceAddress);
    }

    /**
     * Query data of input distro key.
     * 根據條件查詢數據
     * @param distroKey key of data
     * @return data
     */
    public DistroData onQuery(DistroKey distroKey) {
        String resourceType = distroKey.getResourceType();
        DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
        if (null == distroDataStorage) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
            return new DistroData(distroKey, new byte[0]);
        }
        return distroDataStorage.getDistroData(distroKey);
    }

    /**
     * Query all datum snapshot.
     * 查詢全部快照數據
     * @param type datum type
     * @return all datum snapshot
     */
    public DistroData onSnapshot(String type) {
        DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
        if (null == distroDataStorage) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
            return new DistroData(new DistroKey("snapshot", type), new byte[0]);
        }
        return distroDataStorage.getDatumSnapshot();
    }
}

若是您認真從頭看到這裏,相信您腦海中會記住一些關鍵字,好比TasksyncprocessorDistroData。所謂的Distro協議,不就是同步數據嘛,沒錯,它就是同步數據,在多個節點之間同步數據。經過DistroProtocol這個類不難發現,它實現了定時向其餘節點報告狀態、首次從其餘節點加載數據、同步數據到指定節點、獲取當前節點的快照數據。將這些功能組合在一塊兒即可以實現多節點同步。由於全部節點都會作這些操做。

Distro協議數據對象

在整個交互過程當中,是使用DistroData對象做爲數據載體,它能夠保存多種操做類型的任意數據。結構如圖:

在DistroKey中,包含了資源的標識、資源的類型,以及該資源所屬的節點。所以任何DistroData數據都可以肯定它是來自於那臺機器的什麼類型的數據,在DataOperation中則定義了該數據將被用於什麼操做。至於真正的數據類型,字節數組保證了它的兼容性,實際上DistroKey和Operation也能肯定它將會是什麼類型。

Distro協議重要角色

咱們知道DistroData是做爲Distro協議的交互對象,剩下的還有負責保存數據的組件、處理數據的組件、發送數據的組件,它們共同協做來完成整個協議流程。

存儲DistroData

DistroDataStorage 用於保存DistroData, 它有多種實現,用於處理不一樣類型的數據。實際上就是處理不一樣版本中的數據。

  • v1版本的實現:DistroDataStorageImpl
  • v2版本的實現:DistroClientDataProcessor

提示:
後續將再也不刻意說起v1或者是v2的實現,默認以v2實現來分析。

數據的獲取發生在DistroDataStorage接口的getDistroData(DistroKey distroKey)getDatumSnapshot()getVerifyData()三個方法中。在v2版本中DistroClientDataProcessor實現了DistroDataStorage接口,提供DistroData的獲取功能。

// DistroClientDataProcessor.java

@Override
public DistroData getDistroData(DistroKey distroKey) {
	// 從Client管理器中獲取指定Client
	Client client = clientManager.getClient(distroKey.getResourceKey());
	if (null == client) {
		return null;
	}
	byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
	return new DistroData(distroKey, data);
}

@Override
public DistroData getDatumSnapshot() {
	List<ClientSyncData> datum = new LinkedList<>();
	// 從Client管理器中獲取全部Client
	for (String each : clientManager.allClientId()) {
		Client client = clientManager.getClient(each);
		if (null == client || !client.isEphemeral()) {
			continue;
		}
		datum.add(client.generateSyncData());
	}
	ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
	snapshot.setClientSyncDataList(datum);
	byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
	return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}

@Override
public List<DistroData> getVerifyData() {
	List<DistroData> result = new LinkedList<>();
	// 從Client管理器中獲取全部Client
	for (String each : clientManager.allClientId()) {
		Client client = clientManager.getClient(each);
		if (null == client || !client.isEphemeral()) {
			continue;
		}
		if (clientManager.isResponsibleClient(client)) {
			// TODO add revision for client.
			DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
			DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
			DistroData data = new DistroData(distroKey,
					ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
			data.setType(DataOperation.VERIFY);
			result.add(data);
		}
	}
	return result;
}

經過v2版本的數據存儲實現能夠發現,它並無直接去保存數據,而是從ClientManager內部獲取。

處理DistroData

DistroDataProcessor 用於處理DistroData。數據的處理髮生在processData(DistroData distroData)processVerifyData(DistroData distroData, String sourceAddress)processSnapshot(DistroData distroData)三個方法中。在v2版本中DistroClientDataProcessor實現了DistroDataProcessor接口,提供DistroData的處理能力。

// DistroClientDataProcessor.java

@Override
public boolean processData(DistroData distroData) {
	switch (distroData.getType()) {
		case ADD:
		case CHANGE:
			ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);
			handlerClientSyncData(clientSyncData);
			return true;
		case DELETE:
			String deleteClientId = distroData.getDistroKey().getResourceKey();
			Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
			clientManager.clientDisconnected(deleteClientId);
			return true;
		default:
			return false;
	}
}

@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
	DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
	if (clientManager.verifyClient(verifyData.getClientId())) {
		return true;
	}
	Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
	return false;
}

@Override
public boolean processSnapshot(DistroData distroData) {
	ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
	for (ClientSyncData each : snapshot.getClientSyncDataList()) {
		handlerClientSyncData(each);
	}
	return true;
}

發送DistroData

DistroTransportAgent用於傳輸DistroData,v2版本中DistroClientTransportAgent實現了DistroTransportAgent接口,提供DistroData的發送能力。

/**
 * Distro transport agent for v2.
 * v2版本的DistroData傳輸代理
 * @author xiweng.yy
 */
public class DistroClientTransportAgent implements DistroTransportAgent {

    private final ClusterRpcClientProxy clusterRpcClientProxy;

    private final ServerMemberManager memberManager;

    public DistroClientTransportAgent(ClusterRpcClientProxy clusterRpcClientProxy,
            ServerMemberManager serverMemberManager) {
        this.clusterRpcClientProxy = clusterRpcClientProxy;
        this.memberManager = serverMemberManager;
    }

    /**
     * 當前實現支持回調
     * @return
     */
    @Override
    public boolean supportCallbackTransport() {
        return true;
    }

    /**
     * 向指定節點發送同步數據
     * @param data         data
     * @param targetServer target server
     * @return
     */
    @Override
    public boolean syncData(DistroData data, String targetServer) {
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        DistroDataRequest request = new DistroDataRequest(data, data.getType());
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
        }
        return false;
    }

    /**
     * 向指定節點發送回同步數據(支持回調)
     * @param data         data
     * @param targetServer target server
     * @param callback     callback
     */
    @Override
    public void syncData(DistroData data, String targetServer, DistroCallback callback) {
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
        }
        DistroDataRequest request = new DistroDataRequest(data, data.getType());
        Member member = memberManager.find(targetServer);
        try {
            clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }

    /**
     * 向指定節點發送驗證數據
     * @param verifyData   verify data
     * @param targetServer target server
     * @return
     */
    @Override
    public boolean syncVerifyData(DistroData verifyData, String targetServer) {
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        // replace target server as self server so that can callback.
        verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
        }
        return false;
    }

    /**
     * 向指定節點發送驗證數據(支持回調)
     * @param verifyData   verify data
     * @param targetServer target server
     * @param callback     callback
     */
    @Override
    public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
        // 若此節點不在當前節點緩存中,直接返回,由於可能下線、或者過時,不須要驗證了
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
        }
        // 構建請求對象
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        try {
            // 建立一個回調對象(Wrapper實現了RequestCallBack接口)
            DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                    verifyData.getDistroKey().getResourceKey(), callback, member);
            // 使用集羣Rpc請求對象發送異步任務
            clusterRpcClientProxy.asyncRequest(member, request, wrapper);
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }

    /**
     * 從指定節點獲取數據
     * @param key          key of data
     * @param targetServer target server
     * @return
     */
    @Override
    public DistroData getData(DistroKey key, String targetServer) {
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            throw new DistroException(
                    String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
        }
        DistroDataRequest request = new DistroDataRequest();
        DistroData distroData = new DistroData();
        distroData.setDistroKey(key);
        distroData.setType(DataOperation.QUERY);
        request.setDistroData(distroData);
        request.setDataOperation(DataOperation.QUERY);
        try {
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            if (checkResponse(response)) {
                return ((DistroDataResponse) response).getDistroData();
            } else {
                throw new DistroException(
                        String.format("[DISTRO-FAILED] Get data request to %s failed, code: %d, message: %s",
                                targetServer, response.getErrorCode(), response.getMessage()));
            }
        } catch (NacosException e) {
            throw new DistroException("[DISTRO-FAILED] Get distro data failed! ", e);
        }
    }

    /**
     * 從指定節點獲取快照數據
     * @param targetServer target server.
     * @return
     */
    @Override
    public DistroData getDatumSnapshot(String targetServer) {
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            throw new DistroException(
                    String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
        }
        DistroDataRequest request = new DistroDataRequest();
        request.setDataOperation(DataOperation.SNAPSHOT);
        try {
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            if (checkResponse(response)) {
                return ((DistroDataResponse) response).getDistroData();
            } else {
                throw new DistroException(
                        String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                                targetServer, response.getErrorCode(), response.getMessage()));
            }
        } catch (NacosException e) {
            throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
        }
    }

    private boolean isNoExistTarget(String target) {
        return !memberManager.hasMember(target);
    }

    private boolean checkTargetServerStatusUnhealthy(Member member) {
        return null == member || !NodeState.UP.equals(member.getState());
    }

    private boolean checkResponse(Response response) {
        return ResponseCode.SUCCESS.getCode() == response.getResultCode();
    }

    /**
     * rpc請求回調包裝器
     */
    private class DistroRpcCallbackWrapper implements RequestCallBack<Response> {

        private final DistroCallback distroCallback;

        private final Member member;

        public DistroRpcCallbackWrapper(DistroCallback distroCallback, Member member) {
            this.distroCallback = distroCallback;
            this.member = member;
        }

        @Override
        public Executor getExecutor() {
            return GlobalExecutor.getCallbackExecutor();
        }

        @Override
        public long getTimeout() {
            return DistroConfig.getInstance().getSyncTimeoutMillis();
        }

        @Override
        public void onResponse(Response response) {
            if (checkResponse(response)) {
                NamingTpsMonitor.distroSyncSuccess(member.getAddress(), member.getIp());
                distroCallback.onSuccess();
            } else {
                NamingTpsMonitor.distroSyncFail(member.getAddress(), member.getIp());
                distroCallback.onFailed(null);
            }
        }

        @Override
        public void onException(Throwable e) {
            distroCallback.onFailed(e);
        }
    }

    /**
     * 驗證數據回調包裝器
     */
    private class DistroVerifyCallbackWrapper implements RequestCallBack<Response> {

        private final String targetServer;

        private final String clientId;

        private final DistroCallback distroCallback;

        private final Member member;

        private DistroVerifyCallbackWrapper(String targetServer, String clientId, DistroCallback distroCallback,
                Member member) {
            this.targetServer = targetServer;
            this.clientId = clientId;
            this.distroCallback = distroCallback;
            this.member = member;
        }

        @Override
        public Executor getExecutor() {
            return GlobalExecutor.getCallbackExecutor();
        }

        @Override
        public long getTimeout() {
            return DistroConfig.getInstance().getVerifyTimeoutMillis();
        }

        @Override
        public void onResponse(Response response) {
            if (checkResponse(response)) {
                NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
                distroCallback.onSuccess();
            } else {
                Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
				// 驗證失敗以後發佈事件
                NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
                NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
                distroCallback.onFailed(null);
            }
        }

        @Override
        public void onException(Throwable e) {
            distroCallback.onFailed(e);
        }
    }
}

無論同步數據的操做類型是什麼,最終發送數據使用的是ClusterRpcClientProxy對象。

以上3個組件是實現Distro協議中重要的一環,後續關於Distro協議的邏輯將所有圍繞這三個組件進行

相關文章
相關標籤/搜索