SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第十五篇,分析如何執行ChangeNotifier 來通知相關模塊:hi,這裏有新數據變化來到了,兄弟們走起來。數組
當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,用於異步地通知事件變動中心數據的變動。事件變動中心收到該事件以後,會往隊列中加入事件。緩存
此時 dataChangeEventCenter 會根據不一樣的事件類型異步地對上下線數據進行相應的處理,即把這個事件變動信息變成ChangeNotifier ,進而變成Operator,放到AbstractAcceptorStore;服務器
與此同時 DataChangeHandler 會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步。session
由於篇幅限制,前文對 ChangeNotifier 這部分只是略過,本文就詳細講解下事件變動通知ChangeNotifier。 這裏會再把整理流程串起來,會涉及到前面某些文章內容。數據結構
先給出圖示以便你們瞭解 ChangeNotifier 的做用。架構
+--------------------+ | PublishDataHandler | +--------+-----------+ | | | publisher | v +---------+------------+ |DataChangeEventCenter | +---------+------------+ | | | ChangeData v +---------+------------+ | DataChangeEventQueue | +---------+------------+ | | | ChangeData v +-------+----------+ | DataChangeHandler| +-------+----------+ | | | ChangeData v +------+--------+ +------------+ | ChangeNotifier| +--------> | datumCache | +------+--------+ +------------+ | | v +---+------+ | notifier | +---+------+ | v +-----------+---------------+ | | v v +----+----------------+ +------+----------+ |SessionServerNotifier| | BackUpNotifier | +----+----------------+ +------+----------+ | | | | | | | v +--v------------+ +------+----------------+ | sessionServer | | AbstractAcceptorStore | +---------------+ +-----------------------+
數據變化有兩個方向app
數據服務器節點變化;異步
數據的變化,即Publisher和Scriber的變化;
ChangeNotifier就是負責把 Publisher和Scriber的變化 通知給相關模塊。變動通知就是一種解耦。
咱們首先須要看看通知的數據結構。
IDataChangeNotifier是通知的接口定義:
public interface IDataChangeNotifier { Set<DataSourceTypeEnum> getSuitableSource(); /** * * @param datum * @param lastVersion */ void notify(Datum datum, Long lastVersion); }
IDataChangeNotifier 有四個派生類,分別對應了具體數據變化的四種可能,從名字大約能夠判斷出用途。
public class BackUpNotifier implements IDataChangeNotifier public class SessionServerNotifier implements IDataChangeNotifier public class SnapshotBackUpNotifier implements IDataChangeNotifier public class TempPublisherNotifier implements IDataChangeNotifier
對應的Bean以下:
@Bean(name = "dataChangeNotifiers") public List<IDataChangeNotifier> dataChangeNotifiers() { List<IDataChangeNotifier> list = new ArrayList<>(); list.add(sessionServerNotifier()); list.add(tempPublisherNotifier()); list.add(backUpNotifier()); return list; }
咱們從頭理一下流程。
當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler ,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,用於異步地通知事件變動中心數據的變動。事件變動中心收到該事件以後,會往隊列中加入事件。
在DataServer這裏,具體流程以下:
PublishDataHandler 響應 PublishDataRequest。當有Publisher時候,就往DataChangeEventCenter放入消息。即調用下面來放入消息
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
具體代碼以下:
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> { @Autowired private ForwardService forwardService; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumLeaseManager datumLeaseManager; @Autowired private ThreadPoolExecutor publishProcessorExecutor; @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSuccessResponse(); } }
此時具體邏輯以下:
+--------------------+ | PublishDataHandler | +--------+-----------+ | | | publisher | v +---------+------------+ |DataChangeEventCenter | +---------+------------+
DataChangeEventCenter 的核心是一個DataChangeEventQueue數組,
DataChangeEventCenter . onChange函數會首先根據Publisher的DataInfoId獲取hash,根據這個hash數值來決定把 DataChangeEvent 消息放入哪一個queue來處理,就是調用這個 queue的 onChange 函數。
public class DataChangeEventCenter { /** * queues of DataChangeEvent */ private DataChangeEventQueue[] dataChangeEventQueues; @Autowired private DatumCache datumCache; @PostConstruct public void init() { if (isInited.compareAndSet(false, true)) { queueCount = dataServerConfig.getQueueCount(); dataChangeEventQueues = new DataChangeEventQueue[queueCount]; for (int idx = 0; idx < queueCount; idx++) { dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this, datumCache); dataChangeEventQueues[idx].start(); } } } /** * receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue * * @param publisher * @param dataCenter */ public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } } }
DataChangeEventQueue 的主要數據成員以下:
public class DataChangeEventQueue { /** * a block queue that stores all data change events */ private final BlockingQueue<IDataChangeEvent> eventQueue; private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>(); private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue(); private DataChangeEventCenter dataChangeEventCenter; private DatumCache datumCache; }
其執行引擎是一個線程,其block在 BlockingQueue eventQueue 之上,當有消息時候,就取出消息,針對消息類型作不一樣處理。
public void start() { Executor executor = ExecutorFactory .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName())); executor.execute(() -> { while (true) { try { IDataChangeEvent event = eventQueue.take(); DataChangeScopeEnum scope = event.getScope(); if (scope == DataChangeScopeEnum.DATUM) { DataChangeEvent dataChangeEvent = (DataChangeEvent) event; //Temporary push data will be notify as soon as,and not merge to normal pub data; if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) { addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType()); } else { handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum()); } } else if (scope == DataChangeScopeEnum.CLIENT) { handleClientOff((ClientChangeEvent) event); } else if (scope == DataChangeScopeEnum.SNAPSHOT) { handleSnapshot((DatumSnapshotEvent) event); } } } }); }
對於 Publisher 消息類型,handleDatum 函數會根據changeType是 COVER 仍是 MERGE 來作不一樣處理。
在此步驟中,也會把 ChangeData 放入 CHANGE_QUEUE.put(changeData);
private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) { lock.lock(); try { //get changed datum ChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType); Datum cacheDatum = changeData.getDatum(); if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) { changeData.setDatum(targetDatum); } else { Map<String, Publisher> targetPubMap = targetDatum.getPubMap(); Map<String, Publisher> cachePubMap = cacheDatum.getPubMap(); for (Publisher pub : targetPubMap.values()) { String registerId = pub.getRegisterId(); Publisher cachePub = cachePubMap.get(registerId); if (cachePub != null) { // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means // that pub is not the newest data, should be ignored if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) { continue; } // if pub and cachePub both are publisher, and sourceAddress of both are equal, // and version of cachePub is greater than version of pub, should be ignored if (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher) && pub.getSourceAddress().equals(cachePub.getSourceAddress()) && cachePub.getVersion() > pub.getVersion()) { continue; } } cachePubMap.put(registerId, pub); cacheDatum.setVersion(targetDatum.getVersion()); } } } finally { lock.unlock(); } }
此時具體邏輯以下:
+--------------------+ | PublishDataHandler | +--------+-----------+ | | | publisher | v +---------+------------+ |DataChangeEventCenter | +---------+------------+ | | | ChangeData v +---------+------------+ | DataChangeEventQueue | +---------+------------+
DataChangeHandler 會針對每一個DataChangeEventQueue進行消費通知。
public class DataChangeHandler { @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DatumCache datumCache; @Resource private List<IDataChangeNotifier> dataChangeNotifiers; @PostConstruct public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { try { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } } }); } } }
DataChangeHandler 會按期提取DataChangeEventCenter中的消息,而後進行處理。
public class DataChangeHandler { @Autowired private DataServerConfig dataServerConfig; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DatumCache datumCache; @Resource private List<IDataChangeNotifier> dataChangeNotifiers; }
這裏是一個雙層線程模型。
executor = ExecutorFactory.newFixedThreadPool(queueCount)
notifyExecutor= ExecutorFactory.newFixedThreadPool(dataServerConfig.getQueueCount() * 5)
能夠認爲 executor 是控制線程,notifierExecutor是工做線程,工做線程是控制線程的5倍。
@PostConstruct public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
對於 ChangeData,會生成 ChangeNotifier 進行處理。會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步。
在 ChangeNotifier 之中,會判斷changeData的類型作不一樣處理。
具體以下:
private class ChangeNotifier implements Runnable { private ChangeData changeData; private String name; @Override public void run() { if (changeData instanceof SnapshotData) { ...... } else { Datum datum = changeData.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); DataSourceTypeEnum sourceType = changeData.getSourceType(); DataChangeTypeEnum changeType = changeData.getChangeType(); if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) { //update version for pub or unPub merge to cache //if the version product before merge to cache,it may be cause small version override big one datum.updateVersion(); } long version = datum.getVersion(); try { if (sourceType == DataSourceTypeEnum.CLEAN) { if (datumCache.cleanDatum(dataCenter, dataInfoId)) { ...... } } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) { notifyTempPub(datum, sourceType, changeType); } else { MergeResult mergeResult = datumCache.putDatum(changeType, datum); Long lastVersion = mergeResult.getLastVersion(); if (lastVersion != null && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) { return; } //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } } } } } } }
此時具體邏輯以下:
+--------------------+ | PublishDataHandler | +--------+-----------+ | | | publisher | v +---------+------------+ |DataChangeEventCenter | +---------+------------+ | | | ChangeData v +---------+------------+ | DataChangeEventQueue | +---------+------------+ | | | ChangeData v +-------+----------+ | DataChangeHandler| +-------+----------+ | | | ChangeData v +------+--------+ +------------+ | ChangeNotifier| +--------> | datumCache | +------+--------+ +------------+
notify函數會遍歷dataChangeNotifiers,找出能夠支持本Datum對應SourceType的Notifier來執行。
具體如何支持哪些函數,是由getSuitableSource設置的。
private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) { for (IDataChangeNotifier notifier : dataChangeNotifiers) { if (notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); } } }
對應的Bean是:
@Bean(name = "dataChangeNotifiers") public List<IDataChangeNotifier> dataChangeNotifiers() { List<IDataChangeNotifier> list = new ArrayList<>(); list.add(sessionServerNotifier()); list.add(tempPublisherNotifier()); list.add(backUpNotifier()); return list; }
就是調用 syncDataService.appendOperator 進行通知,其實就是把 Datum 變成 Operator,存到AbstractAcceptorStore。
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public Set<DataSourceTypeEnum> getSuitableSource() { Set<DataSourceTypeEnum> set = new HashSet<>(); set.add(DataSourceTypeEnum.PUB); return set; } @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
SessionServerNotifier 則要複雜不少。
public class SessionServerNotifier implements IDataChangeNotifier { private AsyncHashedWheelTimer asyncHashedWheelTimer; @Autowired private DataServerConfig dataServerConfig; @Autowired private Exchange boltExchange; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DatumCache datumCache; @Override public Set<DataSourceTypeEnum> getSuitableSource() { Set<DataSourceTypeEnum> set = new HashSet<>(); set.add(DataSourceTypeEnum.PUB); set.add(DataSourceTypeEnum.SYNC); set.add(DataSourceTypeEnum.SNAPSHOT); return set; } }
創建了一個500毫秒的時間輪。
@PostConstruct public void init() { ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); threadFactoryBuilder.setDaemon(true); asyncHashedWheelTimer = new AsyncHashedWheelTimer(threadFactoryBuilder.setNameFormat( "Registry-SessionServerNotifier-WheelTimer").build(), 500, TimeUnit.MILLISECONDS, 1024, dataServerConfig.getSessionServerNotifierRetryExecutorThreadSize(), dataServerConfig.getSessionServerNotifierRetryExecutorQueueSize(), threadFactoryBuilder .setNameFormat("Registry-SessionServerNotifier-WheelExecutor-%d").build(), new TaskFailedCallback() { @Override public void executionRejected(Throwable e) { LOGGER.error("executionRejected: " + e.getMessage(), e); } @Override public void executionFailed(Throwable e) { LOGGER.error("executionFailed: " + e.getMessage(), e); } }); }
從業務角度看,當有publisher相關消息來臨時候,
DataChangeHandler的notify函數會遍歷dataChangeNotifiers,找出能夠支持本Datum對應SourceType的Notifier來執行。
private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) { for (IDataChangeNotifier notifier : dataChangeNotifiers) { if (notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); } } }
到了SessionServerNotifier這裏的notify函數,會遍歷目前緩存的全部Connection,逐一通知。
@Override public void notify(Datum datum, Long lastVersion) { DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(), datum.getDataCenter(), datum.getVersion()); List<Connection> connections = sessionServerConnectionFactory.getSessionConnections(); for (Connection connection : connections) { doNotify(new NotifyCallback(connection, request)); } }
具體通知函數:
private void doNotify(NotifyCallback notifyCallback) { Connection connection = notifyCallback.connection; DataChangeRequest request = notifyCallback.request; try { //check connection active if (!connection.isFine()) { return; } Server sessionServer = boltExchange.getServer(dataServerConfig.getPort()); sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()), request, notifyCallback, dataServerConfig.getRpcTimeout()); } catch (Exception e) { onFailed(notifyCallback); } }
而時間輪是在調用失敗的重試中使用。
就是當沒有達到失敗重試最大次數時,進行定時重試。
private void onFailed(NotifyCallback notifyCallback) { DataChangeRequest request = notifyCallback.request; Connection connection = notifyCallback.connection; notifyCallback.retryTimes++; //check version, if it's fall behind, stop retry long _currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion(); if (request.getVersion() != _currentVersion) { return; } if (notifyCallback.retryTimes <= dataServerConfig.getNotifySessionRetryTimes()) { this.asyncHashedWheelTimer.newTimeout(timeout -> { //check version, if it's fall behind, stop retry long currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion(); if (request.getVersion() == currentVersion) { doNotify(notifyCallback); } }, getDelayTimeForRetry(notifyCallback.retryTimes), TimeUnit.MILLISECONDS); } }
具體邏輯以下:
+--------------------+ | PublishDataHandler | +--------+-----------+ | | | publisher | v +---------+------------+ |DataChangeEventCenter | +---------+------------+ | | | ChangeData v +---------+------------+ | DataChangeEventQueue | +---------+------------+ | | | ChangeData v +-------+----------+ | DataChangeHandler| +-------+----------+ | | | ChangeData v +------+--------+ +------------+ | ChangeNotifier| +--------> | datumCache | +------+--------+ +------------+ | | v +---+------+ | notifier | +---+------+ | v +-----------+---------------+ | | v v +----+----------------+ +------+----------+ |SessionServerNotifier| | BackUpNotifier | +----+----------------+ +------+----------+ | | | | | | | v +--v------------+ +------+----------------+ | sessionServer | | AbstractAcceptorStore | +---------------+ +-----------------------+
本文是把註冊中的一個點「事件變動通知ChangeNotifie「進行細化展開,以 SessionServerNotifier 和 BackUpNotifier 爲例,爲你們進行解釋ChangeNotifier的原理和使用。把包括 dataChangeEventCenter 等功能也梳理了一下,但願對你們有所幫助。
在 DataServer,數據變化有兩個方向:
數據服務器節點變化;
數據的變化,即 Publisher 和 Scriber 的變化;
ChangeNotifier就是負責把 Publisher 和 Scriber 的變化 通知給相關模塊。變動通知就是一種解耦。