SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第五篇,介紹SOFARegistry消息總線的異步處理。node
前文咱們講述了SOFARegistry的消息總線,本文咱們講講一個變種 DataChangeEventCenter。數組
DataChangeEventCenter 是被獨立出來的,專門處理數據變化相關的消息。緩存
爲何要分離呢?由於:網絡
DataChangeEventCenter 的獨特業務場景以下:session
所以,DataChangeEventCenter 代碼和業務聯繫至關緊密,前文的 EventCenter 已經不適合了。架構
關於延遲和歸併操做,咱們單獨說明下。異步
螞蟻金服業務的一個特色是:經過鏈接敏感的特性對服務宕機作到秒級發現。ide
所以 SOFARegistry 在健康檢測的設計方面決定「服務數據與服務發佈者的實體鏈接綁定在一塊兒,斷連立刻清數據」,簡稱此特色叫作鏈接敏感性。鏈接敏感性是指在 SOFARegistry 裏全部 Client 都與 SessionServer 保持長鏈接,每條長鏈接都設置基於 SOFABolt 的鏈接心跳,若是長鏈接斷連客戶端當即發起從新建連,時刻保持 Client 與 SessionServer 之間可靠的鏈接。
但帶來了一個問題就是:可能由於網絡問題,短時間內會出現大量從新建連操做。好比只是網絡問題致使鏈接斷開,實際的服務進程沒有宕機,此時客戶端當即發起從新鏈接 SessionServer 而且從新註冊全部服務數據。
可是 假如此過程耗時足夠短暫(例如 500ms 內發生斷連和重連),服務訂閱者應該感覺不到服務下線。從而 SOFARegistry 內部應該作相應處理。
SOFARegistry 內部作了歸併和延遲操做來保證用戶不受影響。好比 DataServer 內部的數據經過 mergeDatum 延遲合併變動的 Publisher 服務信息,version 是合併後最新的版本號。
對於 DataChangeEventCenter,就是經過消息的延遲和歸併來協助完成這個功能。
下面是 DataChangeEventCenter 整體的功能描述:
DataChangeEventCenter具體分紅四部分:
接下來咱們一一介紹,由於 DataChangeEventCenter 和業務結合緊密,因此咱們會深刻結合業務進行講解。
DataChangeEventCenter 中維護着一個 DataChangeEventQueue 隊列數組,這是核心。數組中的每一個元素是一個事件隊列。具體定義以下:
public class DataChangeEventCenter { /** * count of DataChangeEventQueue */ private int queueCount; /** * queues of DataChangeEvent */ private DataChangeEventQueue[] dataChangeEventQueues; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumCache datumCache; }
DataChangeEventCenter 專門處理 IDataChangeEvent 類型消息,其具體實現爲三種:
這些不一樣類型的消息能夠放入同一個隊列,具體放入哪一個隊列,是根據特定判別方式來決定,好比根據Publisher的DataInfoId來作hash,以此決定放入哪一個Queue。
即,當對應 handler 的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步肯定出該服務註冊數據所在的隊列編號,進而把該變化的數據封裝成一個數據變化對象,傳入到隊列中。
在初始化函數中,構建了EventQueue,每個Queue啓動了一個線程,用來處理消息。
@PostConstruct public void init() { if (isInited.compareAndSet(false, true)) { queueCount = dataServerConfig.getQueueCount(); dataChangeEventQueues = new DataChangeEventQueue[queueCount]; for (int idx = 0; idx < queueCount; idx++) { dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache); dataChangeEventQueues[idx].start(); } } }
put消息比較簡單,具體如何判別應該把Event放入哪個Queue是根據具體方式來判斷,好比根據Publisher的DataInfoId來作hash,以此決定放入哪一個Queue:
int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum));
具體是經過 dataChangeEventQueues.onChange 來作處理,好比以下幾個函數,分別處理不一樣的消息類型。具體都是找到queue,而後調用:
public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } } public void onChange(ClientChangeEvent event) { for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); } } public void onChange(DatumSnapshotEvent event) { for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); } } public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) { int idx = hash(datum.getDataInfoId()); DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum); dataChangeEventQueues[idx].onChange(event); }
由於 DataChangeEvent 最經常使用,因此咱們單獨拿出來講明。
DataChangeEvent會根據DataChangeTypeEnum和DataSourceTypeEnum來進行區分,就是處理類型和消息來源。
DataChangeTypeEnum具體分爲:
DataSourceTypeEnum 具體分爲:
具體定義以下:
public class DataChangeEvent implements IDataChangeEvent { /** * type of changed data, MERGE or COVER */ private DataChangeTypeEnum changeType; private DataSourceTypeEnum sourceType; /** * data changed */ private Datum datum; }
DataChangeEventQueue 是這個子模塊的核心,用於多路分別處理,增長處理能力。每個Queue內部啓動一個線程,用於異步處理,也能增長處理能力。
這裏的核心是:
BlockingQueue
Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();
DelayQueue
講解以下:
具體定義以下:
public class DataChangeEventQueue { private final String name; /** * a block queue that stores all data change events */ private final BlockingQueue<IDataChangeEvent> eventQueue; private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>(); private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue(); private final int notifyIntervalMs; private final int notifyTempDataIntervalMs; private final ReentrantLock lock = new ReentrantLock(); private final int queueIdx; private DataServerConfig dataServerConfig; private DataChangeEventCenter dataChangeEventCenter; private DatumCache datumCache; }
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的時候被一個新的線程調用,該線程會源源不斷地從隊列中獲取新增事件,而且進行分發。新增數據會由此添加進節點內,實現分片。由於 eventQueue 是一個 BlockingQueue,因此可使用while (true)來控制。
當event被取出以後,會根據 DataChangeScopeEnum.DATUM 的不一樣,會作不一樣的處理。
具體代碼以下:
public void start() { Executor executor = ExecutorFactory .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName())); executor.execute(() -> { while (true) { try { IDataChangeEvent event = eventQueue.take(); DataChangeScopeEnum scope = event.getScope(); if (scope == DataChangeScopeEnum.DATUM) { DataChangeEvent dataChangeEvent = (DataChangeEvent) event; //Temporary push data will be notify as soon as,and not merge to normal pub data; if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) { addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType()); } else { handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum()); } } else if (scope == DataChangeScopeEnum.CLIENT) { handleClientOff((ClientChangeEvent) event); } else if (scope == DataChangeScopeEnum.SNAPSHOT) { handleSnapshot((DatumSnapshotEvent) event); } } } }); }
具體以下圖:
+----------------------------+ | DataChangeEventCenter | | | | +-----------------------+ | | | DataChangeEventQueue[]| | | +-----------------------+ | +----------------------------+ | | v +------------------+------------------------+ | DataChangeEventQueue | | | | +---------------------------------------+ | | | | | | | BlockingQueue<IDataChangeEvent> +-------------+ | | | | | | | | | +-v---------+ | | Map<String, Map<String, ChangeData<> | | <--> | | | | | | | Executor | | | | | | | | | start +------------------------------> | | | | | | +-+---------+ | | | | | | | DelayQueue<ChangeData> <-------------------+ | | | | | +---------------------------------------+ | +-------------------------------------------+
handleDatum 具體處理是把Datum轉換爲 ChangeData來處理,
爲何要轉換成 ChangeData來存儲呢。
由於不管是消息處理方式或者來源,都有不一樣的類型。好比在 NotifyFetchDatumHandler . fetchDatum 函數中,會先從其餘 data server 獲取 Datum,而後會根據 Datum 向dataChangeEventCenter中投放消息,通知本 Data Server 進行 BACKUP 操做,類型是 COVER 類型。
轉換成 ChangeData就能夠把消息處理方式或者來源統一塊兒來處理。
用戶會存儲一個包含 datum 的消息。
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);
DataChangeEventQueue 會從 DataChangeEvent 中獲取 Datum,而後把 Datum 轉換爲 ChangeData,存儲起來。
private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) { //get changed datum ChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType); Datum cacheDatum = changeData.getDatum(); if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) { changeData.setDatum(targetDatum); } }
ChangeData 定義以下:
public class ChangeData implements Delayed { /** data changed */ private Datum datum; /** change time */ private Long gmtCreate; /** timeout */ private long timeout; private DataSourceTypeEnum sourceType; private DataChangeTypeEnum changeType; }
這裏是處理真實ChangeData緩存,以及新加入的Datum。
具體以下:
private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) { lock.lock(); try { //get changed datum ChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType); Datum cacheDatum = changeData.getDatum(); if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) { changeData.setDatum(targetDatum); } else { Map<String, Publisher> targetPubMap = targetDatum.getPubMap(); Map<String, Publisher> cachePubMap = cacheDatum.getPubMap(); for (Publisher pub : targetPubMap.values()) { String registerId = pub.getRegisterId(); Publisher cachePub = cachePubMap.get(registerId); if (cachePub != null) { // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means // that pub is not the newest data, should be ignored if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) { continue; } // if pub and cachePub both are publisher, and sourceAddress of both are equal, // and version of cachePub is greater than version of pub, should be ignored if (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher) && pub.getSourceAddress().equals(cachePub.getSourceAddress()) && cachePub.getVersion() > pub.getVersion()) { continue; } } cachePubMap.put(registerId, pub); cacheDatum.setVersion(targetDatum.getVersion()); } } } finally { lock.unlock(); } }
當提取時候,使用take函數,從CHANGE_QUEUE 和 CHANGE_DATA_MAP_FOR_MERGE 提出ChangeData。
public ChangeData take() throws InterruptedException { ChangeData changeData = CHANGE_QUEUE.take(); lock.lock(); try { removeMapForMerge(changeData); return changeData; } finally { lock.unlock(); } }
具體提取Datum會在DataChangeHandler。
DataChangeHandler 會按期提取DataChangeEventCenter中的消息,而後進行處理,主要功能就是執行ChangeNotifier 來通知相關模塊:hi,這裏有新數據變化來到了,兄弟們走起來。
public class DataChangeHandler { @Autowired private DataServerConfig dataServerConfig; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DatumCache datumCache; @Resource private List<IDataChangeNotifier> dataChangeNotifiers; }
DataChangeHandler 會遍歷 DataChangeEventCenter 中全部 DataChangeEventQueue,而後從 DataChangeEventQueue 之中取出ChangeData,針對每個ChangeData,生成一個ChangeNotifier。
每一個ChangeNotifier都是一個處理線程。
每一個 dataChangeEventQueue 生成了 5 個 ChangeNotifier。
@PostConstruct public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
咱們回顧下業務:
當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler ,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,用於異步地通知事件變動中心數據的變動。事件變動中心收到該事件以後,會往隊列中加入事件。此時 dataChangeEventCenter 會根據不一樣的事件類型異步地對上下線數據進行相應的處理。
對於 ChangeData,會生成 ChangeNotifier 進行處理。會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步。
private class ChangeNotifier implements Runnable { private ChangeData changeData; private String name; @Override public void run() { if (changeData instanceof SnapshotData) { ...... } else { Datum datum = changeData.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); DataSourceTypeEnum sourceType = changeData.getSourceType(); DataChangeTypeEnum changeType = changeData.getChangeType(); if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) { //update version for pub or unPub merge to cache //if the version product before merge to cache,it may be cause small version override big one datum.updateVersion(); } long version = datum.getVersion(); try { if (sourceType == DataSourceTypeEnum.CLEAN) { if (datumCache.cleanDatum(dataCenter, dataInfoId)) { ...... } } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) { notifyTempPub(datum, sourceType, changeType); } else { MergeResult mergeResult = datumCache.putDatum(changeType, datum); Long lastVersion = mergeResult.getLastVersion(); if (lastVersion != null && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) { return; } //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } } } } } } }
notify函數會遍歷dataChangeNotifiers
private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) { for (IDataChangeNotifier notifier : dataChangeNotifiers) { if (notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); } } }
對應的Bean是:
@Bean(name = "dataChangeNotifiers") public List<IDataChangeNotifier> dataChangeNotifiers() { List<IDataChangeNotifier> list = new ArrayList<>(); list.add(sessionServerNotifier()); list.add(tempPublisherNotifier()); list.add(backUpNotifier()); return list; }
至於如何處理通知,咱們後續會撰文處理。
至此,DataChangeEventCenter 總體邏輯以下圖所示
+----------------------------+ | DataChangeEventCenter | | | | +-----------------------+ | | | DataChangeEventQueue[]| | | +-----------------------+ | +----------------------------+ | | v +------------------+------------------------+ | DataChangeEventQueue | | | | +---------------------------------------+ | | | | | | | BlockingQueue<IDataChangeEvent> +-------------+ | | | | | | | | | +-v---------+ | | Map<String, Map<String, ChangeData<> | | <--> | | | | | | | Executor | | | | | | | | | start +------------------------------> | | | | | | +-+---------+ | | | | | +----------------+ DelayQueue<ChangeData> <-------------------+ | | | | | | | +---------------------------------------+ | | +-------------------------------------------+ | | | +--------------------------+ | take | | notify +-------------------+ +-------> | DataChangeHandler | +---------> |dataChangeNotifiers| | | +-------------------+ +--------------------------+
手機以下圖:
由於獨特的業務場景,因此阿里把 DataChangeEventCenter 單獨分離出來,知足瞭如下業務需求。若是你們在實際工做中有相似的需求,能夠參考借鑑,具體處理方式以下: