[從源碼學設計]螞蟻金服SOFARegistry之Data節點變動

[從源碼學設計]螞蟻金服SOFARegistry之Data節點變動

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。java

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。服務器

本文爲第十一篇,介紹SOFARegistry如何處理Data節點變化,即處理DataServerChangeEvent消息。網絡

0x02 引子

上文中咱們提到,MetaServerChangeEvent也會轉化爲 DataServerChangeEvent,投放到EventCenter。session

這是由於Meta Server的這個推送,也許是在告訴data Server,"hi,目前data server也有變更"。因此本期咱們介紹如何處理DataServerChangeEvent,此處須要結合上文。架構

0x03 業務範疇

咱們在這裏首先要講講幾個業務範疇。併發

3.1 DataServer 數據一致性

爲支持海量數據,SOFARegistry 採用了一致性 Hash 來分片存儲 Publisher 數據,避免了單個服務器存儲全量數據時產生的容量瓶頸問題。而在這個模型中,每一個數據分片擁有多個副本,當存儲註冊數的 DataServer 進行擴容、縮容時,MetaServer 會把這個變動通知到 DataServer 和 SessionServer,數據分片會在集羣內部進行數據遷移與同步,此時就出現了 DataServer 內部數據的一致性問題。框架

3.2 節點變動時的數據同步

MetaServer 會經過網絡鏈接感知到新節點上線或者下線,全部的 DataServer 中運行着一個定時刷新鏈接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,獲取數據節點的信息。須要注意的是,除了 DataServer 主動去 MetaServer 拉取節點信息外,MetaServer 也會主動發送 NodeChangeResult 請求到各個節點,通知節點信息發生變化,推拉獲取信息的最終效果是一致的。dom

0x04 整體邏輯

這部分整體邏輯以下:異步

當輪詢信息返回數據節點有變化時,會向 EventCenter 投遞一個 DataServerChangeEvent 事件,在該事件的處理器中,若是判斷出是當前機房節點信息有變化,則會投遞新的事件 LocalDataServerChangeEventide

該事件的處理器 LocalDataServerChangeEventHandler 中會判斷當前節點是否爲新加入的節點,若是是新節點則會向其它節點發送 NotifyOnlineRequest 請求,如圖所示:

圖 DataServer 節點上線時新節點的邏輯

本文就主要講解從DataServerChangeEvent到LocalDataServerChangeEvent這部分的邏輯

0x05 DataServerChangeEvent

5.1 消息來源

DataServerChangeEvent有三種來源:啓動主動獲取,按期,推送。這三種具體以下

  • 啓動主動獲取:這個主動查詢而且拉取的過程,這個過程基本上相似一個同步過程,體現爲客戶端一次查詢結果的同步返回。
  • 版本變動推送:爲了肯定服務發佈數據的變動,對於這個服務感興趣的全部客戶端訂閱方都須要推送,進行推送。因爲性能要求必須併發執行而且異步肯定推送成功。
  • 按期輪訓:這樣避免了某次變動通知沒有通知到全部訂閱方的狀況。

由於有了上文的知識,咱們應該知道,啓動主動獲取 和 推送 這兩種方式是經過MetaServerChangeEvent完成的,結合上文邏輯圖,如今簡述以下:

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |           a             |
|                               |                         |              +---------------+
|                               |                         |              |               |
+-------------------------------+                         |              |               |
+-------------------------------+                         |              |               |
| [Timer]                       |                         |              v               |
|                               |            b            |  1   +-------+-----+         |
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter +----+    |
|                               | MetaServerChangeEvent   |      +-------+-----+    |    |
+-------------------------------+                         |              ^          |    |
+-------------------------------+                         |              |          |    |
|                               |                         |              |          |    |
| [Push<NodeChangeResult>]      |                         |              |          |    |
|                               |            c            |              |          |    |
|                               +-------------------------+              |          |    |
|                               |  MetaServerChangeEvent                 |          |    |
|      ServerChangeHandler      |                               2        |          |    |
|                               +----------------------------------------+          |    |
+-------------------------------+      DataServerChangeEvent                        |    |
                                                                                    |    |
                                                                                    |    |
                                 MetaServerChangeEvent                              |    |
                                                                   3                |    |
                               +----------------------------------------------------+    |
                               |                                                         |
                               v                                                         |
             +-----------------+--------------+         DataServerChangeEvent            |
             |                                |                                   4      |
             |  MetaServerChangeEventHandler  +------------------------------------------+
             |                                |
             +--------------------------------+

5.1.1 啓動

5.1.1.1 產生消息

當 DataServer 節點初始化成功後,會啓動任務自動去鏈接 MetaServer。啓動時,會從配置裏面讀取meta server配置,metaServerService.getMetaServerMap();據此構建MetaServerChangeEvent,投放到EventCenter之中。

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
5.1.1.2 MetaServerChangeEventHandler

MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。由於其繼承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已經註冊到了EventCenter之上。

在處理MetaServerChangeEvent以後,該任務會往事件中心 EventCenter 註冊一個 DataServerChangeEvent 事件,該事件註冊後會被觸發,以後將對新增節點計算 Hash 值,同時進行納管分片。

就是對應上圖 a,1,3,4這條線,是DataServerChangeEvent的來源1

5.1.2 推送

這個來源是其餘消息的轉換,即NodeChangeResult的轉換。並且有兩個轉換過程。

5.1.2.1 推送

除了 DataServer 主動去 MetaServer 拉取節點信息外,MetaServer 也會主動發送 NodeChangeResult 請求到各個節點,通知節點信息發生變化,推拉獲取信息的最終效果是一致的。

5.1.2.2 第一層轉換

ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的響應函數。

在ServerChangeHandler之中,拿到了NodeChangeResult以後,會判斷變動節點類型,這裏會根據 Note 類型不一樣,決定產生 DataServerChangeEvent 仍是 MetaServerChangeEvent。

若是是NodeType.META,就發送消息給eventCenter,即eventCenter.post(new MetaServerChangeEvent(map));

這就是MetaServerChangeEvent和DataServerChangeEvent來源之一就是對應上圖2這條線,是DataServerChangeEvent的來源2

public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> {

    @Autowired
    private EventCenter         eventCenter;

    @Autowired
    private DataServerConfig    dataServerConfig;

    @Override
    public Object doHandle(Channel channel, NodeChangeResult request) {
        ExecutorFactory.getCommonExecutor().execute(() -> {
          
            if (request.getNodeType() == NodeType.DATA) {
              
                eventCenter.post(new DataServerChangeEvent(request.getNodes(),
                        request.getDataCenterListVersions(), FromType.META_NOTIFY));
              
            } else if (request.getNodeType() == NodeType.META) {
              
                Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes();
                if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
                    Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter());
                    if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
                        HashMap<String, Set<String>> map = new HashMap<>();
                        map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
                        eventCenter.post(new MetaServerChangeEvent(map));
                    }
                }
            }
        });
        return CommonResponse.buildSuccessResponse();
    }

    @Override
    public Class interest() {
        return NodeChangeResult.class;
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}
5.1.2.3 第二層轉換

MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。由於其繼承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已經註冊到了EventCenter之上。

注意,這裏有一個再次轉換DataServerChangeEvent的過程,即MetaServerChangeEventHandler這裏會再主動和MetaServer交互,這是由於Meta Server的這個推送,也許是在告訴data Server,"hi,目前data server也有變更"。

若是返回消息是NodeChangeResult,就轉換爲DataServerChangeEvent,投放DataServerChangeEvent到Event Center。

就是對應上圖 b,1,3,4這條線,是DataServerChangeEvent的來源3

public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {
    private void registerMetaServer(String dataCenter, String ip) {
        ......
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();
    
                        //send renew after first register dataNode
                        Set<StartTaskTypeEnum> set = new HashSet<>();
                        set.add(StartTaskTypeEnum.RENEW);
                        eventCenter.post(new StartTaskEvent(set));
    
                        eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
                                DataServerChangeEvent.FromType.REGISTER_META));
                        break;
                    }
    }
}

0x06 輪訓拉

咱們這裏要重點講解DataServerChangeEvent的來源「輪訓拉」

MetaServer 會經過網絡鏈接感知到新節點上線或者下線,全部的 DataServer 中運行着一個定時刷新鏈接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,獲取數據節點的信息。

6.1 Bean

ConnectionRefreshTask 在 tasks 這個 Bean中啓動。

@Bean(name = "tasks")
public List<AbstractTask> tasks() {
    List<AbstractTask> list = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());
    return list;
}

6.2 啓動

tasks是在startScheduler間接啓動的。

eventCenter.post(new StartTaskEvent(
        Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW).collect(Collectors.toSet())));

StartTaskEventHandler響應StartTaskEvent,其會逐一啓動tasks。

public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {
 
    @Resource(name = "tasks")
    private List<AbstractTask>       tasks;

    private ScheduledExecutorService executor     = null;

    @Override
    public void doHandle(StartTaskEvent event) {
        if (executor == null || executor.isShutdown()) {
            getExecutor();
        }

        for (AbstractTask task : tasks) {
            if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
                    task.getTimeUnit());
            }
        }
    }

    private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}

這裏有一個技巧。

ConnectionRefreshTask裏面指定了支持CONNECT_DATA,StartTaskEventHandler在啓動時判斷支持類型,發現是CONNECT_DATA,就啓動了ConnectionRefreshTask

AbstractEventHandler 其中註冊了eventCenter.register,這樣它的繼承類都默認註冊到了EventCenter 之上。

public abstract class AbstractEventHandler<Event> implements InitializingBean {
    @Autowired
    private EventCenter         eventCenter;

    @Override
    public void afterPropertiesSet() throws Exception {
        eventCenter.register(this);
    }

    /**
     * event handle func
     * @param event
     */
    public void handle(Event event) {
            doHandle(event);
    }

    public abstract List<Class<? extends Event>> interest();

    public abstract void doHandle(Event event);
}

因而,connectionRefreshTask就啓動了。

6.3 ConnectionRefreshTask

ConnectionRefreshTask負責輪詢與meta Server交互,能夠看到,也發送了DataServerChangeEvent。

public class ConnectionRefreshTask extends AbstractTask {

    @Autowired
    private IMetaServerService metaServerService;

    @Autowired
    private EventCenter        eventCenter;

    @Override
    public void handle() {
        DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers();
        if (dataServerChangeItem != null) {
            eventCenter
                .post(new DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK));
        }
    }

    @Override
    public int getDelay() {
        return 30;
    }

    @Override
    public int getInitialDelay() {
        return 0;
    }

    @Override
    public TimeUnit getTimeUnit() {
        return TimeUnit.SECONDS;
    }

    @Override
    public StartTaskTypeEnum getStartTaskTypeEnum() {
        return StartTaskTypeEnum.CONNECT_DATA;
    }
}

ConnectionRefreshTask 調用 metaServerService.getDateServers();getDateServers 的做用是:

  • 從 metaServerConnectionFactory 獲取connectionMap;
  • 經過raft來獲取raft leader;
  • 從 connectionMap 獲取 leader 的connection,這是一個 bolt Connection;
  • 利用 bolt Connection 進行請求,GetNodesRequest(NodeType.DATA);
  • 從請求結果構建 DataServerChangeItem;
  • 在 EventCenter 中放一個消息 DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK);

具體以下:

@Override
public DataServerChangeItem getDateServers() {
    Map<String, Connection> connectionMap = metaServerConnectionFactory
        .getConnections(dataServerConfig.getLocalDataCenter());
    String leader = getLeader().getIp();
    if (connectionMap.containsKey(leader)) {
        Connection connection = connectionMap.get(leader);
        if (connection.isFine()) {
            try {
                GetNodesRequest request = new GetNodesRequest(NodeType.DATA);
                Object obj = metaNodeExchanger.request(new Request() {
                    @Override
                    public Object getRequestBody() {
                        return request;
                    }

                    @Override
                    public URL getRequestUrl() {
                        return new URL(connection.getRemoteIP(), connection.getRemotePort());
                    }
                }).getResult();
                if (obj instanceof NodeChangeResult) {
                    NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                    Map<String, Long> versionMap = result.getDataCenterListVersions();
                    versionMap.put(result.getLocalDataCenter(), result.getVersion());
                    return new DataServerChangeItem(result.getNodes(), versionMap);
                }
            } 
        }
    }
    String newip = refreshLeader().getIp();
    return null;
}

0x07 DataServerChangeEventHandler

7.1 整體邏輯

DataServerChangeEvent 事件被觸發後,由 DataServerChangeEventHandler 來進行相應的處理,分別分爲以下一些步驟:

  • 初始化當前數據節點的一致性 Hash 值,把當前節點添加進一致性的 Hash 環中。
  • 獲取變動了的 DataServer 節點,這些節點在啓動 DataServer 服務的時候從 MetaServer 中獲取到的,而且經過 DataServerChangeEvent 事件中的 DataServerChangeItem 傳入。
  • 獲取了當前的 DataServer 節點以後,若節點列表非空,則遍歷每一個節點,創建當前節點與其他數據節點之間的鏈接,同時刪除本地維護的不在節點列表中的節點數據,更新dataServerCache。同時,若當前節點是 DataCenter 節點,則觸發 LocalDataServerChangeEvent 事件

SOFA這裏主要是處理LocalDataServerChangeEvent,異地機房的部分沒有開源

7.2 LocalDataServerChangeEvent

關於上面第三點,詳細說明以下:

從DataServerChangeEvent中提取DataServerChangeItem,若是發現有一個DataCenter就是本機,則使用以下語句獲取新加入的DataServer。

Set<String> newjoined = new HashSet<>(ips);
newjoined.removeAll(localDataServers);

而後使用這些新加入的DataServer來構建 LocalDataServerChangeEvent。

參見以下片斷:

//get changed dataservers
Map<String, Set<String>> changedMap = dataServerCache.compareAndSet(
                dataServerChangeItem, event.getFromType());
if (!changedMap.isEmpty()) {
for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) {
                    String dataCenter = changeEntry.getKey();
                    Set<String> ips = changeEntry.getValue();
                  
		String dataCenter = changeEntry.getKey();
    Set<String> ips = changeEntry.getValue();

    //if the dataCenter is self, post LocalDataServerChangeEvent
    if (dataServerConfig.isLocalDataCenter(dataCenter)) {
        Set<String> newjoined = new HashSet<>(ips);
        newjoined.removeAll(localDataServers);
        eventCenter.post(new LocalDataServerChangeEvent(map, newjoined,
            dataServerChangeItem.getVersionMap().get(dataCenter), newVersion));
    } else {
        dataServerCache.updateItem(newDataNodes, newVersion, dataCenter);
        eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map,
            dataServerChangeItem.getVersionMap().get(dataCenter), newVersion));
    }
}

具體代碼以下:

public class DataServerChangeEventHandler extends AbstractEventHandler<DataServerChangeEvent> {

    private static final int    TRY_COUNT = 5;

    @Autowired
    private DataServerConfig    dataServerConfig;

    @Autowired
    private DataServerCache     dataServerCache;

    @Autowired
    private DataNodeExchanger   dataNodeExchanger;

    @Autowired
    private EventCenter         eventCenter;

    @Override
    public List<Class<? extends DataServerChangeEvent>> interest() {
        return Lists.newArrayList(DataServerChangeEvent.class);
    }

    @Override
    public void doHandle(DataServerChangeEvent event) {
        synchronized (this) {
            //register self first,execute once
            DataServerNodeFactory.initConsistent(dataServerConfig);

            DataServerChangeItem dataServerChangeItem = event.getDataServerChangeItem();
            Set<String> localDataServers = dataServerCache.getDataServers(
                dataServerConfig.getLocalDataCenter()).keySet();
          
            //get changed dataservers 獲得變化了的dataServers
            Map<String, Set<String>> changedMap = dataServerCache.compareAndSet(
                dataServerChangeItem, event.getFromType());
          
            if (!changedMap.isEmpty()) {
                for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) {
                    String dataCenter = changeEntry.getKey();
                    Set<String> ips = changeEntry.getValue();
                    Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
                    if (!CollectionUtils.isEmpty(ips)) {
                        for (String ip : ips) {
                            if (!StringUtils.equals(ip, DataServerConfig.IP)) {
                                DataServerNode dataServerNode = DataServerNodeFactory
                                    .getDataServerNode(dataCenter, ip);
                                if (dataServerNode == null
                                    || dataServerNode.getConnection() == null
                                    || !dataServerNode.getConnection().isFine()) {
                                    connectDataServer(dataCenter, ip);
                                }
                            }
                        }
                        //remove all old DataServerNode not in change map
                        Set<String> ipSet = DataServerNodeFactory.getIps(dataCenter);
                        for (String ip : ipSet) {
                            if (!ips.contains(ip)) {
                                DataServerNodeFactory.remove(dataCenter, ip, dataServerConfig);
                            }
                        }

                        Map<String, DataNode> newDataNodes = dataServerCache
                            .getNewDataServerMap(dataCenter);

                        //avoid input map reference operation DataServerNodeFactory MAP
                        Map<String, DataNode> map = new ConcurrentHashMap<>(newDataNodes);

                        //if the dataCenter is self, post LocalDataServerChangeEvent
                        if (dataServerConfig.isLocalDataCenter(dataCenter)) {
                          //爲何 local 時候不作 updateItem
                            Set<String> newjoined = new HashSet<>(ips);
                            newjoined.removeAll(localDataServers);
                            eventCenter.post(new LocalDataServerChangeEvent(map, newjoined,
                                dataServerChangeItem.getVersionMap().get(dataCenter), newVersion));
                        } else {
                            dataServerCache.updateItem(newDataNodes, newVersion, dataCenter);
                            eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map,
                                dataServerChangeItem.getVersionMap().get(dataCenter), newVersion));
                        }
                    } else {
                        //if the dataCenter which has no dataServers is not self, remove it
                        if (!dataServerConfig.isLocalDataCenter(dataCenter)) {
                            removeDataCenter(dataCenter);
                            eventCenter.post(new RemoteDataServerChangeEvent(dataCenter,
                                Collections.EMPTY_MAP, dataServerChangeItem.getVersionMap().get(
                                    dataCenter), newVersion));
                        }
                        Map<String, DataNode> newDataNodes = dataServerCache
                            .getNewDataServerMap(dataCenter);
                        dataServerCache.updateItem(newDataNodes, newVersion, dataCenter);
                    }
                }
            } else {
                //refresh for keep connect other dataServers
                //若是沒有「有變化」的DataServer,則從新鏈接一下現有的DataServer
                Set<String> allDataCenter = new HashSet<>(dataServerCache.getAllDataCenters());
                for (String dataCenter : allDataCenter) {
                    Map<String, DataNode> dataNodes = dataServerCache
                        .getNewDataServerMap(dataCenter);
                    if (dataNodes != null) {
                        for (DataNode dataNode : dataNodes.values()) {
                            if (!StringUtils.equals(dataNode.getIp(), DataServerConfig.IP)) {
                                DataServerNode dataServerNode = DataServerNodeFactory
                                    .getDataServerNode(dataCenter, dataNode.getIp());
                                Connection connection = dataServerNode != null ? dataServerNode
                                    .getConnection() : null;
                                if (connection == null || !connection.isFine()) {
                                    connectDataServer(dataCenter, dataNode.getIp());
                                }
                            }
                        }
                    }
                }
            }
        }
    }
  
    /**
     * connect specific dataserver
     *
     * @param dataCenter
     * @param ip
     */
    private void connectDataServer(String dataCenter, String ip) {
        Connection conn = null;
        for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
            try {
                conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig
                    .getSyncDataPort()))).getConnection();
                break;
            } catch (Exception e) {
                TimeUtil.randomDelay(3000);
            }
        }
        if (conn == null || !conn.isFine()) {
            throw new RuntimeException(
                String
                    .format(
                        "[DataServerChangeEventHandler] connect dataServer %s in %s failed five times,dataServer will not work,please check connect!",
                        ip, dataCenter));
        }
        //maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute
        DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig);
    }

    /**
     * remove dataCenter, and close connections of dataServers in this dataCenter
     *
     * @param dataCenter
     */
    private void removeDataCenter(String dataCenter) {
        DataServerNodeFactory.getDataServerNodes(dataCenter).values().stream().map(DataServerNode::getConnection)
                .filter(connection -> connection != null && connection.isFine()).forEach(Connection::close);
        DataServerNodeFactory.remove(dataCenter);
    }
}

7.3 邏輯圖

因而,咱們的邏輯圖拓展以下:

DataServerChangeEvent一共四個來源。

前三個來源是與MetaServerChangeEvent相關。

  • 啓動主動獲取:MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。在處理MetaServerChangeEvent以後,該任務會往事件中心 EventCenter 註冊一個 DataServerChangeEvent 事件就是對應下圖 a,1,3,4這條線,是DataServerChangeEvent的來源1
  • 版本變動推送:ServerChangeHandler 是 MetaNodeExchanger 的響應函數。在ServerChangeHandler之中,拿到了NodeChangeResult以後,會判斷變動節點類型,這裏會根據 Note 類型不一樣,決定產生 DataServerChangeEvent 仍是 MetaServerChangeEvent。
    • 若是是NodeType.DATA,ServerChangeHandler 就發送消息給eventCenter,即eventCenter.post(new MetaServerChangeEvent(map));就是對應下圖2這條線,是DataServerChangeEvent的來源2
    • 若是是NodeType.DATA,ServerChangeHandler 就發送消息給eventCenter,即eventCenter.post(new MetaServerChangeEvent(map));;注意,這裏有一個再次轉換DataServerChangeEvent的過程,即MetaServerChangeEventHandler這裏會再主動和MetaServer交互,若是返回消息是NodeChangeResult,就轉換爲DataServerChangeEvent,投放DataServerChangeEvent到Event Center。就是對應下圖 b,1,3,4這條線,是DataServerChangeEvent的來源3

第四來源是按期輪訓。

全部的 DataServer 中運行着一個定時刷新鏈接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,獲取數據節點的信息。

ConnectionRefreshTask 調用 metaServerService.getDateServers();與MetaServer聯繫,從請求結果構建 DataServerChangeItem;在 EventCenter 中放一個消息 DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK);就是對應下圖5這條線,是DataServerChangeEvent的來源3

最後,DataServerChangeEvent 事件被觸發後,由 DataServerChangeEventHandler 來進行相應的處理。就是對應下圖6,7這條線

若當前節點是 DataCenter 節點,則觸發 LocalDataServerChangeEvent 事件。SOFA這裏主要是處理LocalDataServerChangeEvent,異地機房的部分沒有開源

+---------------------------+
|[DataServerBootstrap]      |   MetaServerChangeEvent         +------------------------+
|                           |                                 |                        |
|                           +-------------------------+       |   +------------------+ |
|       startRaftClient     |           a             |       |   |                  | |
|                           |                         |       |   |  +-------------+ | |
|                           |                         |       |   |  |             | | |
+---------------------------+                         |       |   |  |             | | |
+---------------------------+                         |       v   |  |             | | |
| [Timer]                   |                         |           |  v             | | |
|                           |           b             |  1   +----+--+-----+       | | |
| ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+  | | |
|                           | MetaServerChangeEvent   |      +-------+---+-+    |  | | |
+---------------------------+                         |              ^   ^      |  | | |
+---------------------------+                         |              |   |      |  | | |
|                           |                         |              |   |      |  | | |
| [Push<NodeChangeResult>]  |                         |              |   |      |  | | |
|                           |           c             |              |   |      |  | | |
|                           +-------------------------+              |   | 5    |  | | |
|                           |  MetaServerChangeEvent                 |   |      |  | | |
|      ServerChangeHandler  |                               2        |   |      |  | | |
|                           +----------------------------------------+   |      |  | | |
+---------------------------+      DataServerChangeEvent                 |      |  | | |
                                                                         |      |  | | |
+-------------------------+                                              |      |  | | |
|                         |                                              |      |  | | |
| ConnectionRefreshTask   +----------------------------------------------+      |  | | |
|                         |                                                     |  | | |
+-------------------------+                                                     |  | | |
                                                                                |  | | |
                             MetaServerChangeEvent                              |  | | |
                                                               3                |  | | |
                           +----------------------------------------------------+  | | |
                           |                                                       | | |
                           v                                                       | | |
         +-----------------+--------------+         DataServerChangeEvent          | | |
         |                                |                                   4    | | |
         |  MetaServerChangeEventHandler  +----------------------------------------+ | |
         |                                |                                          | |
         +--------------------------------+                                          | |
                                                                                     | |
                                                   DataServerChangeEvent             | |
          +------------------------------+                                           | |
          | DataServerChangeEventHandler |  <----------------------------------------+ |
          +---------------+--------------+              6                              |
                          |                                                            |
                          |     7                                                      |
                          +------------------------------------------------------------+
                                LocalDataServerChangeEvent / RemoteDataServerChangeEvent

0x08 總結

本文講解了SOFARegistry如何處理Data節點變化。

主要就是從DataServerChangeEvent到LocalDataServerChangeEvent這部分的邏輯。SOFA這裏主要是處理LocalDataServerChangeEvent,異地機房的部分沒有開源。因此下文咱們介紹LocalDataServerChangeEvent。

0xFF 參考

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 - SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通訊框架SOFABolt解析之鏈接管理剖析

螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通訊框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

相關文章
相關標籤/搜索