SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。java
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。服務器
本文爲第十一篇,介紹SOFARegistry如何處理Data節點變化,即處理DataServerChangeEvent消息。網絡
上文中咱們提到,MetaServerChangeEvent也會轉化爲 DataServerChangeEvent,投放到EventCenter。session
這是由於Meta Server的這個推送,也許是在告訴data Server,"hi,目前data server也有變更"。因此本期咱們介紹如何處理DataServerChangeEvent,此處須要結合上文。架構
咱們在這裏首先要講講幾個業務範疇。併發
爲支持海量數據,SOFARegistry 採用了一致性 Hash 來分片存儲 Publisher 數據,避免了單個服務器存儲全量數據時產生的容量瓶頸問題。而在這個模型中,每一個數據分片擁有多個副本,當存儲註冊數的 DataServer 進行擴容、縮容時,MetaServer 會把這個變動通知到 DataServer 和 SessionServer,數據分片會在集羣內部進行數據遷移與同步,此時就出現了 DataServer 內部數據的一致性問題。框架
MetaServer 會經過網絡鏈接感知到新節點上線或者下線,全部的 DataServer 中運行着一個定時刷新鏈接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,獲取數據節點的信息。須要注意的是,除了 DataServer 主動去 MetaServer 拉取節點信息外,MetaServer 也會主動發送 NodeChangeResult 請求到各個節點,通知節點信息發生變化,推拉獲取信息的最終效果是一致的。dom
這部分整體邏輯以下:異步
當輪詢信息返回數據節點有變化時,會向 EventCenter 投遞一個 DataServerChangeEvent 事件,在該事件的處理器中,若是判斷出是當前機房節點信息有變化,則會投遞新的事件 LocalDataServerChangeEvent。ide
該事件的處理器 LocalDataServerChangeEventHandler 中會判斷當前節點是否爲新加入的節點,若是是新節點則會向其它節點發送 NotifyOnlineRequest 請求,如圖所示:
圖 DataServer 節點上線時新節點的邏輯
本文就主要講解從DataServerChangeEvent到LocalDataServerChangeEvent這部分的邏輯。
DataServerChangeEvent有三種來源:啓動主動獲取,按期,推送。這三種具體以下:
由於有了上文的知識,咱們應該知道,啓動主動獲取 和 推送 這兩種方式是經過MetaServerChangeEvent完成的,結合上文邏輯圖,如今簡述以下:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | a | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | b | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | c | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
當 DataServer 節點初始化成功後,會啓動任務自動去鏈接 MetaServer。啓動時,會從配置裏面讀取meta server配置,metaServerService.getMetaServerMap();據此構建MetaServerChangeEvent,投放到EventCenter之中。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。由於其繼承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已經註冊到了EventCenter之上。
在處理MetaServerChangeEvent以後,該任務會往事件中心 EventCenter 註冊一個 DataServerChangeEvent 事件,該事件註冊後會被觸發,以後將對新增節點計算 Hash 值,同時進行納管分片。
就是對應上圖 a,1,3,4這條線,是DataServerChangeEvent的來源1。
這個來源是其餘消息的轉換,即NodeChangeResult的轉換。並且有兩個轉換過程。
除了 DataServer 主動去 MetaServer 拉取節點信息外,MetaServer 也會主動發送 NodeChangeResult 請求到各個節點,通知節點信息發生變化,推拉獲取信息的最終效果是一致的。
ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的響應函數。
在ServerChangeHandler之中,拿到了NodeChangeResult以後,會判斷變動節點類型,這裏會根據 Note 類型不一樣,決定產生 DataServerChangeEvent 仍是 MetaServerChangeEvent。
若是是NodeType.META,就發送消息給eventCenter,即eventCenter.post(new MetaServerChangeEvent(map));
,
這就是MetaServerChangeEvent和DataServerChangeEvent來源之一。就是對應上圖2這條線,是DataServerChangeEvent的來源2。
public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> { @Autowired private EventCenter eventCenter; @Autowired private DataServerConfig dataServerConfig; @Override public Object doHandle(Channel channel, NodeChangeResult request) { ExecutorFactory.getCommonExecutor().execute(() -> { if (request.getNodeType() == NodeType.DATA) { eventCenter.post(new DataServerChangeEvent(request.getNodes(), request.getDataCenterListVersions(), FromType.META_NOTIFY)); } else if (request.getNodeType() == NodeType.META) { Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { HashMap<String, Set<String>> map = new HashMap<>(); map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); eventCenter.post(new MetaServerChangeEvent(map)); } } } }); return CommonResponse.buildSuccessResponse(); } @Override public Class interest() { return NodeChangeResult.class; } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。由於其繼承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已經註冊到了EventCenter之上。
注意,這裏有一個再次轉換DataServerChangeEvent的過程,即MetaServerChangeEventHandler這裏會再主動和MetaServer交互,這是由於Meta Server的這個推送,也許是在告訴data Server,"hi,目前data server也有變更"。
若是返回消息是NodeChangeResult,就轉換爲DataServerChangeEvent,投放DataServerChangeEvent到Event Center。
就是對應上圖 b,1,3,4這條線,是DataServerChangeEvent的來源3。
public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> { private void registerMetaServer(String dataCenter, String ip) { ...... if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); //send renew after first register dataNode Set<StartTaskTypeEnum> set = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap, DataServerChangeEvent.FromType.REGISTER_META)); break; } } }
咱們這裏要重點講解DataServerChangeEvent的來源「輪訓拉」。
MetaServer 會經過網絡鏈接感知到新節點上線或者下線,全部的 DataServer 中運行着一個定時刷新鏈接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,獲取數據節點的信息。
ConnectionRefreshTask 在 tasks 這個 Bean中啓動。
@Bean(name = "tasks") public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
tasks是在startScheduler間接啓動的。
eventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW).collect(Collectors.toSet())));
StartTaskEventHandler響應StartTaskEvent,其會逐一啓動tasks。
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> { @Resource(name = "tasks") private List<AbstractTask> tasks; private ScheduledExecutorService executor = null; @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
這裏有一個技巧。
ConnectionRefreshTask裏面指定了支持CONNECT_DATA,StartTaskEventHandler在啓動時判斷支持類型,發現是CONNECT_DATA,就啓動了ConnectionRefreshTask。
AbstractEventHandler 其中註冊了eventCenter.register,這樣它的繼承類都默認註冊到了EventCenter 之上。
public abstract class AbstractEventHandler<Event> implements InitializingBean { @Autowired private EventCenter eventCenter; @Override public void afterPropertiesSet() throws Exception { eventCenter.register(this); } /** * event handle func * @param event */ public void handle(Event event) { doHandle(event); } public abstract List<Class<? extends Event>> interest(); public abstract void doHandle(Event event); }
因而,connectionRefreshTask就啓動了。
ConnectionRefreshTask負責輪詢與meta Server交互,能夠看到,也發送了DataServerChangeEvent。
public class ConnectionRefreshTask extends AbstractTask { @Autowired private IMetaServerService metaServerService; @Autowired private EventCenter eventCenter; @Override public void handle() { DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers(); if (dataServerChangeItem != null) { eventCenter .post(new DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK)); } } @Override public int getDelay() { return 30; } @Override public int getInitialDelay() { return 0; } @Override public TimeUnit getTimeUnit() { return TimeUnit.SECONDS; } @Override public StartTaskTypeEnum getStartTaskTypeEnum() { return StartTaskTypeEnum.CONNECT_DATA; } }
ConnectionRefreshTask 調用 metaServerService.getDateServers();getDateServers 的做用是:
具體以下:
@Override public DataServerChangeItem getDateServers() { Map<String, Connection> connectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); String leader = getLeader().getIp(); if (connectionMap.containsKey(leader)) { Connection connection = connectionMap.get(leader); if (connection.isFine()) { try { GetNodesRequest request = new GetNodesRequest(NodeType.DATA); Object obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return request; } @Override public URL getRequestUrl() { return new URL(connection.getRemoteIP(), connection.getRemotePort()); } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); versionMap.put(result.getLocalDataCenter(), result.getVersion()); return new DataServerChangeItem(result.getNodes(), versionMap); } } } } String newip = refreshLeader().getIp(); return null; }
DataServerChangeEvent 事件被觸發後,由 DataServerChangeEventHandler 來進行相應的處理,分別分爲以下一些步驟:
SOFA這裏主要是處理LocalDataServerChangeEvent,異地機房的部分沒有開源。
關於上面第三點,詳細說明以下:
從DataServerChangeEvent中提取DataServerChangeItem,若是發現有一個DataCenter就是本機,則使用以下語句獲取新加入的DataServer。
Set<String> newjoined = new HashSet<>(ips); newjoined.removeAll(localDataServers);
而後使用這些新加入的DataServer來構建 LocalDataServerChangeEvent。
參見以下片斷:
//get changed dataservers Map<String, Set<String>> changedMap = dataServerCache.compareAndSet( dataServerChangeItem, event.getFromType()); if (!changedMap.isEmpty()) { for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) { String dataCenter = changeEntry.getKey(); Set<String> ips = changeEntry.getValue(); String dataCenter = changeEntry.getKey(); Set<String> ips = changeEntry.getValue(); //if the dataCenter is self, post LocalDataServerChangeEvent if (dataServerConfig.isLocalDataCenter(dataCenter)) { Set<String> newjoined = new HashSet<>(ips); newjoined.removeAll(localDataServers); eventCenter.post(new LocalDataServerChangeEvent(map, newjoined, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } else { dataServerCache.updateItem(newDataNodes, newVersion, dataCenter); eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } }
具體代碼以下:
public class DataServerChangeEventHandler extends AbstractEventHandler<DataServerChangeEvent> { private static final int TRY_COUNT = 5; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataServerCache dataServerCache; @Autowired private DataNodeExchanger dataNodeExchanger; @Autowired private EventCenter eventCenter; @Override public List<Class<? extends DataServerChangeEvent>> interest() { return Lists.newArrayList(DataServerChangeEvent.class); } @Override public void doHandle(DataServerChangeEvent event) { synchronized (this) { //register self first,execute once DataServerNodeFactory.initConsistent(dataServerConfig); DataServerChangeItem dataServerChangeItem = event.getDataServerChangeItem(); Set<String> localDataServers = dataServerCache.getDataServers( dataServerConfig.getLocalDataCenter()).keySet(); //get changed dataservers 獲得變化了的dataServers Map<String, Set<String>> changedMap = dataServerCache.compareAndSet( dataServerChangeItem, event.getFromType()); if (!changedMap.isEmpty()) { for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) { String dataCenter = changeEntry.getKey(); Set<String> ips = changeEntry.getValue(); Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter); if (!CollectionUtils.isEmpty(ips)) { for (String ip : ips) { if (!StringUtils.equals(ip, DataServerConfig.IP)) { DataServerNode dataServerNode = DataServerNodeFactory .getDataServerNode(dataCenter, ip); if (dataServerNode == null || dataServerNode.getConnection() == null || !dataServerNode.getConnection().isFine()) { connectDataServer(dataCenter, ip); } } } //remove all old DataServerNode not in change map Set<String> ipSet = DataServerNodeFactory.getIps(dataCenter); for (String ip : ipSet) { if (!ips.contains(ip)) { DataServerNodeFactory.remove(dataCenter, ip, dataServerConfig); } } Map<String, DataNode> newDataNodes = dataServerCache .getNewDataServerMap(dataCenter); //avoid input map reference operation DataServerNodeFactory MAP Map<String, DataNode> map = new ConcurrentHashMap<>(newDataNodes); //if the dataCenter is self, post LocalDataServerChangeEvent if (dataServerConfig.isLocalDataCenter(dataCenter)) { //爲何 local 時候不作 updateItem Set<String> newjoined = new HashSet<>(ips); newjoined.removeAll(localDataServers); eventCenter.post(new LocalDataServerChangeEvent(map, newjoined, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } else { dataServerCache.updateItem(newDataNodes, newVersion, dataCenter); eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, map, dataServerChangeItem.getVersionMap().get(dataCenter), newVersion)); } } else { //if the dataCenter which has no dataServers is not self, remove it if (!dataServerConfig.isLocalDataCenter(dataCenter)) { removeDataCenter(dataCenter); eventCenter.post(new RemoteDataServerChangeEvent(dataCenter, Collections.EMPTY_MAP, dataServerChangeItem.getVersionMap().get( dataCenter), newVersion)); } Map<String, DataNode> newDataNodes = dataServerCache .getNewDataServerMap(dataCenter); dataServerCache.updateItem(newDataNodes, newVersion, dataCenter); } } } else { //refresh for keep connect other dataServers //若是沒有「有變化」的DataServer,則從新鏈接一下現有的DataServer Set<String> allDataCenter = new HashSet<>(dataServerCache.getAllDataCenters()); for (String dataCenter : allDataCenter) { Map<String, DataNode> dataNodes = dataServerCache .getNewDataServerMap(dataCenter); if (dataNodes != null) { for (DataNode dataNode : dataNodes.values()) { if (!StringUtils.equals(dataNode.getIp(), DataServerConfig.IP)) { DataServerNode dataServerNode = DataServerNodeFactory .getDataServerNode(dataCenter, dataNode.getIp()); Connection connection = dataServerNode != null ? dataServerNode .getConnection() : null; if (connection == null || !connection.isFine()) { connectDataServer(dataCenter, dataNode.getIp()); } } } } } } } } /** * connect specific dataserver * * @param dataCenter * @param ip */ private void connectDataServer(String dataCenter, String ip) { Connection conn = null; for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig .getSyncDataPort()))).getConnection(); break; } catch (Exception e) { TimeUtil.randomDelay(3000); } } if (conn == null || !conn.isFine()) { throw new RuntimeException( String .format( "[DataServerChangeEventHandler] connect dataServer %s in %s failed five times,dataServer will not work,please check connect!", ip, dataCenter)); } //maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); } /** * remove dataCenter, and close connections of dataServers in this dataCenter * * @param dataCenter */ private void removeDataCenter(String dataCenter) { DataServerNodeFactory.getDataServerNodes(dataCenter).values().stream().map(DataServerNode::getConnection) .filter(connection -> connection != null && connection.isFine()).forEach(Connection::close); DataServerNodeFactory.remove(dataCenter); } }
因而,咱們的邏輯圖拓展以下:
DataServerChangeEvent一共四個來源。
前三個來源是與MetaServerChangeEvent相關。
eventCenter.post(new MetaServerChangeEvent(map));
;就是對應下圖2這條線,是DataServerChangeEvent的來源2。eventCenter.post(new MetaServerChangeEvent(map));
;注意,這裏有一個再次轉換DataServerChangeEvent的過程,即MetaServerChangeEventHandler這裏會再主動和MetaServer交互,若是返回消息是NodeChangeResult,就轉換爲DataServerChangeEvent,投放DataServerChangeEvent到Event Center。就是對應下圖 b,1,3,4這條線,是DataServerChangeEvent的來源3。第四來源是按期輪訓。
全部的 DataServer 中運行着一個定時刷新鏈接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,獲取數據節點的信息。
ConnectionRefreshTask 調用 metaServerService.getDateServers();
與MetaServer聯繫,從請求結果構建 DataServerChangeItem;在 EventCenter 中放一個消息 DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK);就是對應下圖5這條線,是DataServerChangeEvent的來源3。
最後,DataServerChangeEvent 事件被觸發後,由 DataServerChangeEventHandler 來進行相應的處理。就是對應下圖6,7這條線。
若當前節點是 DataCenter 節點,則觸發 LocalDataServerChangeEvent 事件。SOFA這裏主要是處理LocalDataServerChangeEvent,異地機房的部分沒有開源。
+---------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent +------------------------+ | | | | | +-------------------------+ | +------------------+ | | startRaftClient | a | | | | | | | | | | +-------------+ | | | | | | | | | | | +---------------------------+ | | | | | | | +---------------------------+ | v | | | | | | [Timer] | | | v | | | | | b | 1 +----+--+-----+ | | | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | | | MetaServerChangeEvent | +-------+---+-+ | | | | +---------------------------+ | ^ ^ | | | | +---------------------------+ | | | | | | | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | | | | c | | | | | | | | +-------------------------+ | | 5 | | | | | | MetaServerChangeEvent | | | | | | | ServerChangeHandler | 2 | | | | | | | +----------------------------------------+ | | | | | +---------------------------+ DataServerChangeEvent | | | | | | | | | | +-------------------------+ | | | | | | | | | | | | | ConnectionRefreshTask +----------------------------------------------+ | | | | | | | | | | +-------------------------+ | | | | | | | | MetaServerChangeEvent | | | | 3 | | | | +----------------------------------------------------+ | | | | | | | v | | | +-----------------+--------------+ DataServerChangeEvent | | | | | 4 | | | | MetaServerChangeEventHandler +----------------------------------------+ | | | | | | +--------------------------------+ | | | | DataServerChangeEvent | | +------------------------------+ | | | DataServerChangeEventHandler | <----------------------------------------+ | +---------------+--------------+ 6 | | | | 7 | +------------------------------------------------------------+ LocalDataServerChangeEvent / RemoteDataServerChangeEvent
本文講解了SOFARegistry如何處理Data節點變化。
主要就是從DataServerChangeEvent到LocalDataServerChangeEvent這部分的邏輯。SOFA這裏主要是處理LocalDataServerChangeEvent,異地機房的部分沒有開源。因此下文咱們介紹LocalDataServerChangeEvent。
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 - SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析