[從源碼學設計]螞蟻金服SOFARegistry之消息總線異步處理

[從源碼學設計]螞蟻金服SOFARegistry之消息總線異步處理

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java

本文爲第五篇,介紹SOFARegistry消息總線的異步處理。node

0x01 爲什麼分離

前文咱們講述了SOFARegistry的消息總線,本文咱們講講一個變種 DataChangeEventCenter。數組

DataChangeEventCenter 是被獨立出來的,專門處理數據變化相關的消息。緩存

爲何要分離呢?由於:網絡

  • 從架構說,DataChangeEventCenter 是專門處理數據變化消息,這是一種解耦;
  • 從技術上來講,DataChangeEventCenter 也和 EventCenter 有具體實現技巧的不一樣,因此須要分開處理;
  • 但更深刻的緣由是業務場景不一樣,下面分析中咱們能夠看出,DataChangeEventCenter 和業務耦合的至關緊密;

0x02 業務領域

2.1 應用場景

DataChangeEventCenter 的獨特業務場景以下:session

  • 須要提供歸併功能。即短時間內會有多個通知來到,不須要逐一處理,只處理最後一個便可;
  • 異步處理消息;
  • 須要保證消息順序;
  • 有延遲操做;
  • 須要提升處理能力,並行處理;

所以,DataChangeEventCenter 代碼和業務聯繫至關緊密,前文的 EventCenter 已經不適合了。架構

2.2 延遲和歸併

關於延遲和歸併操做,咱們單獨說明下。異步

2.2.1 業務特色

螞蟻金服業務的一個特色是:經過鏈接敏感的特性對服務宕機作到秒級發現ide

所以 SOFARegistry 在健康檢測的設計方面決定「服務數據與服務發佈者的實體鏈接綁定在一塊兒,斷連立刻清數據」,簡稱此特色叫作鏈接敏感性。鏈接敏感性是指在 SOFARegistry 裏全部 Client 都與 SessionServer 保持長鏈接,每條長鏈接都設置基於 SOFABolt 的鏈接心跳,若是長鏈接斷連客戶端當即發起從新建連,時刻保持 Client 與 SessionServer 之間可靠的鏈接。

2.2.2 問題

但帶來了一個問題就是:可能由於網絡問題,短時間內會出現大量從新建連操做。好比只是網絡問題致使鏈接斷開,實際的服務進程沒有宕機,此時客戶端當即發起從新鏈接 SessionServer 而且從新註冊全部服務數據。

可是 假如此過程耗時足夠短暫(例如 500ms 內發生斷連和重連),服務訂閱者應該感覺不到服務下線。從而 SOFARegistry 內部應該作相應處理

2.2.3 解決

SOFARegistry 內部作了歸併和延遲操做來保證用戶不受影響。好比 DataServer 內部的數據經過 mergeDatum 延遲合併變動的 Publisher 服務信息,version 是合併後最新的版本號。

對於 DataChangeEventCenter,就是經過消息的延遲和歸併來協助完成這個功能

2.3 螞蟻金服實現

下面是 DataChangeEventCenter 整體的功能描述:

  • 當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler;
  • Handler 首先會判斷當前節點的狀態:
    • 如果非工做狀態則返回請求失敗;
    • 如果工做狀態,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,則觸發數據變化事件中心 DataChangeEventCenter 的 onChange 方法。用於異步地通知事件變動中心數據的變動;
  • 事件變動中心收到該事件以後,會往隊列中加入事件。此時 dataChangeEventCenter 會根據不一樣的事件類型異步地對上下線數據進行相應的處理;
  • 與此同時,DataChangeHandler 會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步;

0x03 DataChangeEventCenter

3.1 總述

DataChangeEventCenter具體分紅四部分:

  • Event Center:組織成消息中心;
  • Event Queue:用於多路分別處理,增長處理能力;
  • Event Task:每個Queue內部啓動一個線程,用於異步處理,增長處理能力;
  • Event Handler:用於處理內部ChangeData;

接下來咱們一一介紹,由於 DataChangeEventCenter 和業務結合緊密,因此咱們會深刻結合業務進行講解。

3.2 DataChangeEventCenter

3.2.1 定義

DataChangeEventCenter 中維護着一個 DataChangeEventQueue 隊列數組,這是核心。數組中的每一個元素是一個事件隊列。具體定義以下:

public class DataChangeEventCenter {

    /**
     * count of DataChangeEventQueue
     */
    private int                    queueCount;

    /**
     * queues of DataChangeEvent
     */
    private DataChangeEventQueue[] dataChangeEventQueues;

    @Autowired
    private DataServerConfig       dataServerConfig;

    @Autowired
    private DatumCache             datumCache;
}

3.2.2 消息類型

DataChangeEventCenter 專門處理 IDataChangeEvent 類型消息,其具體實現爲三種:

  • public class ClientChangeEvent implements IDataChangeEvent
  • public class DataChangeEvent implements IDataChangeEvent
  • public class DatumSnapshotEvent implements IDataChangeEvent

這些不一樣類型的消息能夠放入同一個隊列,具體放入哪一個隊列,是根據特定判別方式來決定,好比根據Publisher的DataInfoId來作hash,以此決定放入哪一個Queue。

即,當對應 handler 的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步肯定出該服務註冊數據所在的隊列編號,進而把該變化的數據封裝成一個數據變化對象,傳入到隊列中。

3.2.3 初始化

在初始化函數中,構建了EventQueue,每個Queue啓動了一個線程,用來處理消息。

@PostConstruct
public void init() {
    if (isInited.compareAndSet(false, true)) {
        queueCount = dataServerConfig.getQueueCount();
        dataChangeEventQueues = new DataChangeEventQueue[queueCount];
        for (int idx = 0; idx < queueCount; idx++) {
            dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache);
            dataChangeEventQueues[idx].start();
        }
    }
}

3.2.4 Put 消息

put消息比較簡單,具體如何判別應該把Event放入哪個Queue是根據具體方式來判斷,好比根據Publisher的DataInfoId來作hash,以此決定放入哪一個Queue:

int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));

3.2.5 如何處理消息

具體是經過 dataChangeEventQueues.onChange 來作處理,好比以下幾個函數,分別處理不一樣的消息類型。具體都是找到queue,而後調用:

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

public void onChange(ClientChangeEvent event) {
    for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}

public void onChange(DatumSnapshotEvent event) {
    for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}

public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}

3.3 DataChangeEvent

由於 DataChangeEvent 最經常使用,因此咱們單獨拿出來講明。

DataChangeEvent會根據DataChangeTypeEnum和DataSourceTypeEnum來進行區分,就是處理類型和消息來源。

DataChangeTypeEnum具體分爲:

  • MERGE,若是變動類型是MERGE,則會更新緩存中須要更新的新Datum,而且更新版本號;
  • COVER,若是變動類型是 COVER,則會覆蓋原有的緩存;

DataSourceTypeEnum 具體分爲:

  • PUB :pub by client;
  • PUB_TEMP :pub temporary data;
  • SYNC:sync from dataservers in other datacenter;
  • BACKUP:from dataservers in the same datacenter;
  • CLEAN:local dataInfo check,not belong this node schedule remove;
  • SNAPSHOT:Snapshot data, after renew finds data inconsistent;

具體定義以下:

public class DataChangeEvent implements IDataChangeEvent {

    /**
     * type of changed data, MERGE or COVER
     */
    private DataChangeTypeEnum changeType;

    private DataSourceTypeEnum sourceType;

    /**
     * data changed
     */
    private Datum              datum;
}

3.4 DataChangeEventQueue

DataChangeEventQueue 是這個子模塊的核心,用於多路分別處理,增長處理能力。每個Queue內部啓動一個線程,用於異步處理,也能增長處理能力

3.4.1 核心變量

這裏的核心是:

  • BlockingQueue eventQueue;

  • Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

  • DelayQueue CHANGE_QUEUE = new DelayQueue();

講解以下:

  • 能夠看到,這裏操做的數據類型是ChangeData,把Datum轉換成 ChangeData 能夠把消息處理方式 或者 來源統一塊兒來處理
  • eventQueue 用來存儲投放的消息,全部消息block在queue上,這能夠保證消息的順序處理;
  • CHANGE_DATA_MAP_FOR_MERGE。顧名思義,主要處理消息歸併。這是按照 dataCenter,dataInfoId 做爲維度,分別存儲 ChangeData,能夠理解爲一個矩陣Map,使用putIfAbsent方法添加鍵值對,若是map集合中沒有該key對應的值,則直接添加,並返回null,若是已經存在對應的值,則依舊爲原來的值。這樣若是短時間內向map中添加多個消息,這樣就對多餘的消息作了歸併
  • CHANGE_QUEUE 的做用是用於統一處理投放的ChangeData,不管是哪一個 data center的數據,都會統一在這裏處理;這裏須要注意的是使用了DelayQueue來進行延遲操做,就是咱們以前業務中提到的延遲操做;

具體定義以下:

public class DataChangeEventQueue {

    private final String                               name;

    /**
     * a block queue that stores all data change events
     */
    private final BlockingQueue<IDataChangeEvent>      eventQueue;

    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

    private final DelayQueue<ChangeData>               CHANGE_QUEUE              = new DelayQueue();

    private final int                                  notifyIntervalMs;

    private final int                                  notifyTempDataIntervalMs;

    private final ReentrantLock                        lock                      = new ReentrantLock();

    private final int                                  queueIdx;

    private DataServerConfig                           dataServerConfig;

    private DataChangeEventCenter                      dataChangeEventCenter;

    private DatumCache                                 datumCache;
}

3.4.2 啓動和引擎

DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的時候被一個新的線程調用,該線程會源源不斷地從隊列中獲取新增事件,而且進行分發。新增數據會由此添加進節點內,實現分片。由於 eventQueue 是一個 BlockingQueue,因此可使用while (true)來控制。

當event被取出以後,會根據 DataChangeScopeEnum.DATUM 的不一樣,會作不一樣的處理。

  • 若是是DataChangeScopeEnum.DATUM,則判斷dataChangeEvent.getSourceType();
    • 若是是 DataSourceTypeEnum.PUB_TEMP,則addTempChangeData,就是往CHANGE_QUEUE添加ChangeData;
    • 若是不是,則handleDatum;
  • 若是是DataChangeScopeEnum.CLIENT,則handleClientOff((ClientChangeEvent) event);
  • 若是是DataChangeScopeEnum.SNAPSHOT,則handleSnapshot((DatumSnapshotEvent) event);

具體代碼以下:

public void start() {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {
        while (true) {
            try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();
                if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
                    //Temporary push data will be notify as soon as,and not merge to normal pub data;
                    if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else {
                        handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(),
                                dataChangeEvent.getDatum());
                    }
                } else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if (scope == DataChangeScopeEnum.SNAPSHOT) {
                    handleSnapshot((DatumSnapshotEvent) event);
                }
            } 
        }
    });
}

具體以下圖:

+----------------------------+
      |   DataChangeEventCenter    |
      |                            |
      | +-----------------------+  |
      | | DataChangeEventQueue[]|  |
      | +-----------------------+  |
      +----------------------------+
                   |
                   |
                   v
+------------------+------------------------+
|          DataChangeEventQueue             |
|                                           |
| +---------------------------------------+ |
| |                                       | |
| |    BlockingQueue<IDataChangeEvent> +-------------+
| |                                       | |        |
| |                                       | |      +-v---------+
| | Map<String, Map<String, ChangeData<>  | | <--> |           |
| |                                       | |      | Executor  |
| |                                       | |      |           |
| |         start +------------------------------> |           |
| |                                       | |      +-+---------+
| |                                       | |        |
| |      DelayQueue<ChangeData>  <-------------------+
| |                                       | |
| +---------------------------------------+ |
+-------------------------------------------+

3.4.3 ChangeData

handleDatum 具體處理是把Datum轉換爲 ChangeData來處理,

爲何要轉換成 ChangeData來存儲呢。

由於不管是消息處理方式或者來源,都有不一樣的類型。好比在 NotifyFetchDatumHandler . fetchDatum 函數中,會先從其餘 data server 獲取 Datum,而後會根據 Datum 向dataChangeEventCenter中投放消息,通知本 Data Server 進行 BACKUP 操做,類型是 COVER 類型。

轉換成 ChangeData就能夠把消息處理方式或者來源統一塊兒來處理

用戶會存儲一個包含 datum 的消息。

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);

DataChangeEventQueue 會從 DataChangeEvent 中獲取 Datum,而後把 Datum 轉換爲 ChangeData,存儲起來。

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
            //get changed datum
            ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
                targetDatum.getDataInfoId(), sourceType, changeType);
            Datum cacheDatum = changeData.getDatum();
            if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
                changeData.setDatum(targetDatum);
            } 
}

ChangeData 定義以下:

public class ChangeData implements Delayed {

    /** data changed */
    private Datum              datum;

    /** change time */
    private Long               gmtCreate;

    /** timeout */
    private long               timeout;

    private DataSourceTypeEnum sourceType;

    private DataChangeTypeEnum changeType;
}

3.4.4 處理Datum

3.4.4.1 加入Datum

這裏是處理真實ChangeData緩存,以及新加入的Datum。

  • 首先從 CHANGE_DATA_MAP_FOR_MERGE 獲取以前存儲的變動的ChangeData,若是沒有,就生成一個加入,此時要爲後續可能的歸併作準備;
  • 拿到ChangeData以後
    • 若是變動類型是 COVER,則會覆蓋原有的緩存。changeData.setDatum(targetDatum);
    • 不然是MERGE,則會更新緩存中須要更新的新Datum,而且更新版本號;

具體以下:

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
    lock.lock();
    try {
        //get changed datum
        ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
            targetDatum.getDataInfoId(), sourceType, changeType);
        Datum cacheDatum = changeData.getDatum();
        if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
            Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();
            for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);
                if (cachePub != null) {
                    // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
                    // that pub is not the newest data, should be ignored
                    if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
                        continue;
                    }
                    // if pub and cachePub both are publisher, and sourceAddress of both are equal,
                    // and version of cachePub is greater than version of pub, should be ignored
                    if (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {
                        continue;
                    }
                }
                cachePubMap.put(registerId, pub);
                cacheDatum.setVersion(targetDatum.getVersion());
            }
        }
    } finally {
        lock.unlock();
    }
}
3.4.4.2 提出Datum

當提取時候,使用take函數,從CHANGE_QUEUE 和 CHANGE_DATA_MAP_FOR_MERGE 提出ChangeData。

public ChangeData take() throws InterruptedException {
    ChangeData changeData = CHANGE_QUEUE.take();
    lock.lock();
    try {
        removeMapForMerge(changeData);
        return changeData;
    } finally {
        lock.unlock();
    }
}

具體提取Datum會在DataChangeHandler。

3.5 DataChangeHandler

DataChangeHandler 會按期提取DataChangeEventCenter中的消息,而後進行處理,主要功能就是執行ChangeNotifier 來通知相關模塊:hi,這裏有新數據變化來到了,兄弟們走起來。

3.5.1 類定義

public class DataChangeHandler {

    @Autowired
    private DataServerConfig          dataServerConfig;

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;
}

3.5.2 執行引擎ChangeNotifier

DataChangeHandler 會遍歷 DataChangeEventCenter 中全部 DataChangeEventQueue,而後從 DataChangeEventQueue 之中取出ChangeData,針對每個ChangeData,生成一個ChangeNotifier。

每一個ChangeNotifier都是一個處理線程。

每一個 dataChangeEventQueue 生成了 5 個 ChangeNotifier。

@PostConstruct
public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
  
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

3.5.3 Notify

咱們回顧下業務:

當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler ,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,用於異步地通知事件變動中心數據的變動。事件變動中心收到該事件以後,會往隊列中加入事件。此時 dataChangeEventCenter 會根據不一樣的事件類型異步地對上下線數據進行相應的處理。

對於 ChangeData,會生成 ChangeNotifier 進行處理。會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步

private class ChangeNotifier implements Runnable {

    private ChangeData changeData;
    private String     name;

    @Override
    public void run() {
        if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();

            if (changeType == DataChangeTypeEnum.MERGE
                && sourceType != DataSourceTypeEnum.BACKUP
                && sourceType != DataSourceTypeEnum.SYNC) {
                //update version for pub or unPub merge to cache
                //if the version product before merge to cache,it may be cause small version override big one
                datum.updateVersion();
            }

            long version = datum.getVersion();

            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                      ......
                    }

                } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();

                    if (lastVersion != null
                        && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                        return;
                    }

                    //lastVersion null means first add datum
                    if (lastVersion == null || version != lastVersion) {
                        if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }

    }
}

notify函數會遍歷dataChangeNotifiers

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if (notifier.getSuitableSource().contains(sourceType)) {
            notifier.notify(datum, lastVersion);
        }
    }
}

對應的Bean是:

@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers() {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}

至於如何處理通知,咱們後續會撰文處理。

至此,DataChangeEventCenter 總體邏輯以下圖所示

+----------------------------+
                |   DataChangeEventCenter    |
                |                            |
                | +-----------------------+  |
                | | DataChangeEventQueue[]|  |
                | +-----------------------+  |
                +----------------------------+
                             |
                             |
                             v
          +------------------+------------------------+
          |          DataChangeEventQueue             |
          |                                           |
          | +---------------------------------------+ |
          | |                                       | |
          | |    BlockingQueue<IDataChangeEvent> +-------------+
          | |                                       | |        |
          | |                                       | |      +-v---------+
          | | Map<String, Map<String, ChangeData<>  | | <--> |           |
          | |                                       | |      | Executor  |
          | |                                       | |      |           |
          | |         start +------------------------------> |           |
          | |                                       | |      +-+---------+
          | |                                       | |        |
+----------------+ DelayQueue<ChangeData>  <-------------------+
|         | |                                       | |
|         | +---------------------------------------+ |
|         +-------------------------------------------+
|
|
|         +--------------------------+
|  take   |                          |    notify   +-------------------+
+-------> |    DataChangeHandler     | +---------> |dataChangeNotifiers|
          |                          |             +-------------------+
          +--------------------------+

手機以下圖:

0x04 結論

由於獨特的業務場景,因此阿里把 DataChangeEventCenter 單獨分離出來,知足瞭如下業務需求。若是你們在實際工做中有相似的需求,能夠參考借鑑,具體處理方式以下:

  • 須要提升處理能力,並行處理;
    • queue數組實現,每個Queue均可以處理消息,增長處理能力;
  • 異步處理消息;
    • 每個Queue內部啓動一個線程,用於異步處理;
  • 須要保證消息順序;
    • eventQueue 用來存儲投放的消息,全部消息block在queue上,這能夠保證消息的順序處理;
  • 有延遲操做;
    • 使用了DelayQueue來進行延遲操做;
  • 須要歸併操做,即短時間內會有多個通知來到,不須要逐一處理,只處理最後一個便可;
    • 使用putIfAbsent方法添加鍵值對,若是map集合中沒有該key對應的值,則直接添加,並返回null,若是已經存在對應的值,則依舊爲原來的值。這樣若是短時間內向map中添加多個消息,這樣就對多餘的消息作了歸併

0xFF 參考

Guava中EventBus分析

相關文章
相關標籤/搜索