[從源碼學設計]螞蟻金服SOFARegistry 之 服務註冊和操做日誌

[從源碼學設計]螞蟻金服SOFARegistry之服務註冊和操做日誌

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java

本文爲第十四篇,介紹SOFARegistry服務上線和操做日誌。上文是從Session Server角度,本文從 Data Server 角度介紹。node

0x01 總體業務流程

咱們首先回顧整體業務流程,這部分屬於數據分片。數組

1.1 服務註冊過程

回顧下「一次服務註冊過程」的服務數據在內部流轉過程。緩存

  1. Client 調用 publisher.register 向 SessionServer 註冊服務。
  2. SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入內存 (SessionServer 會存儲 Client 的數據到內存,用於後續能夠跟 DataServer 作按期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
  3. DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入內存 ,DataServer 會以 dataInfoId 的維度彙總全部 PublisherRegister。同時,DataServer 將該 dataInfoId 的變動事件通知給全部 SessionServer,變動事件的內容是 dataInfoId 和版本號信息 version。
  4. 同時,異步地,DataServer 以 dataInfoId 維度增量地同步數據給其餘副本。由於 DataServer 在一致性 Hash 分片的基礎上,對每一個分片保存了多個副本(默認是3個副本)。
  5. SessionServer 接收到變動事件通知後,對比 SessionServer 內存中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了全部該 dataInfoId 具體的 PublisherRegister 列表。
  6. 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊以後的最新的服務列表數據。

由於篇幅所限,上文討論的是前兩點,本文介紹第三,第四點網絡

1.2 數據分片

當服務上線時,會計算新增服務的 dataInfoId Hash 值,從而對該服務進行分片,最後尋找最近的一個節點,存儲到相應的節點上。session

DataServer 服務在啓動時添加了 publishDataProcessor 來處理相應的服務發佈者數據發佈請求,該 publishDataProcessor 就是 PublishDataHandler。當有新的服務發佈者上線,DataServer 的 PublishDataHandler 將會被觸發。數據結構

該 Handler 首先會判斷當前節點的狀態,如果非工做狀態則返回請求失敗。如果工做狀態,則觸發數據變化事件中心 DataChangeEventCenter 的 onChange 方法。架構

DataChangeEventQueue 中維護着一個 DataChangeEventQueue 隊列數組,數組中的每一個元素是一個事件隊列。當上文中的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步肯定出該服務註冊數據所在的隊列編號,進而把該變化的數據封裝成一個數據變化對象,傳入到隊列中。app

DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的時候被一個新的線程調用,該方法會源源不斷地從隊列中獲取新增事件,而且進行分發。新增數據會由此添加進節點內,實現分片。

與此同時,DataChangeHandler 會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步

0x02 基礎數據結構

這裏須要首先講解幾個相關數據結構。

2.1 Publisher

Publisher是數據發佈者信息

public class Publisher extends BaseInfo {
    private List<ServerDataBox> dataList;
    private PublishType         publishType      = PublishType.NORMAL;
}

2.2 Datum

從SOFARegistry自己出發而聚集的數據發佈者信息,裏面核心是 :

  • dataInfoId:服務惟一標識,由``<分組 group><租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 一般是 com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據作邏輯上的切分,使不一樣 group 和 instance 的服務數據在邏輯上徹底獨立。模型裏有 group 和 instanceId 字段,但這裏不額外列出來,讀者只要理解 dataInfoId 的含義便可;
  • dataCenter:一個物理機房,包含多個邏輯單元(zone)。zone:是一種單元化架構下的概念,表明一個機房內的邏輯單元。在服務發現場景下,發佈服務時需指定邏輯單元(zone),而訂閱服務者能夠訂閱邏輯單元(zone)維度的服務數據,也能夠訂閱物理機房(datacenter)維度的服務數據,即訂閱該 datacenter 下的全部 zone 的服務數據。;
  • pubMap:包括的Publisher;
  • version:對應的版本

具體代碼以下:

public class Datum implements Serializable {
    private String                                dataInfoId;
    private String                                dataCenter;
    private String                                dataId;
    private String                                instanceId;
    private String                                group;
    private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
    private long                                  version;
    private boolean                               containsUnPub    = false;
}

2.3 DatumCache

DatumCache 是最新的Datum。

public class DatumCache {
    @Autowired
    private DatumStorage localDatumStorage;
}

具體存儲是在LocalDatumStorage中完成。

public class LocalDatumStorage implements DatumStorage {
    /**
     * row:     dataCenter
     * column:  dataInfoId
     * value:   datum
     */
    protected final Map<String, Map<String, Datum>>     DATUM_MAP            = new ConcurrentHashMap<>();

    /**
     * all datum index
     *
     * row:     ip:port
     * column:  registerId
     * value:   publisher
     */
    protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();

    @Autowired
    private DataServerConfig                            dataServerConfig;
}

2.4 Operator

Operator 是每一步Datum對應的操做

public class Operator {
    private Long               version;
    private Long               sourceVersion;
    private Datum              datum;
    private DataSourceTypeEnum sourceType;
}

2.5 Acceptor

記錄了全部的Datum操做。其中:

  • logOperatorsOrder記錄了操做的順序;
  • logOperators是全部的操做;
public class Acceptor {
    private final String                    dataInfoId;
    private final String                    dataCenter;
    private int                             maxBufferSize;
    static final int                        DEFAULT_DURATION_SECS = 30;
    private final Deque<Long/*version*/>   logOperatorsOrder = new ConcurrentLinkedDeque<>();
    private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
    private final DatumCache                datumCache;
}

2.6 總結

總結下這幾個數據結構的聯繫:

  • Publisher是數據發佈者信息
  • Datum是從SOFARegistry自己出發而聚集的數據發佈者信息
  • DatumCache 是最新的Datum
  • Operator 是每一步Datum對應的操做
  • Acceptor記錄了全部的Datum操做

0x03 Datum的前因後果

咱們先回顧下 Datum 的前因後果。

3.1 Session Server 內部

首先,咱們講講Session Server 內部如何獲取Datum

在 Session Server 內部,Datum存儲在 SessionCacheService 之中。

好比在 DataChangeFetchCloudTask 內部,能夠這樣獲取 Datum。

private Map<String, Datum> getDatumsCache() {
    Map<String, Datum> map = new HashMap<>();
    NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
    Collection<String> dataCenters = nodeManager.getDataCenters();
    if (dataCenters != null) {
        Collection<Key> keys = dataCenters.stream().
                map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
                        new DatumKey(fetchDataInfoId, dataCenter))).
                collect(Collectors.toList());

        Map<Key, Value> values = null;
        values = sessionCacheService.getValues(keys);

        if (values != null) {
            values.forEach((key, value) -> {
                if (value != null && value.getPayload() != null) {
                    map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                }
            });
        }
    }
    return map;
}

Session Server 會向 Data Server 發送 PublishDataRequest 請求

3.2 PublishDataHandler

在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            String connectId = WordCache.getInstance().getWordCache(
                publisher.getSourceAddress().getAddressString());
            sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                connectId);
            // record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        return CommonResponse.buildSuccessResponse();
    }
}

3.3 DataChangeEventCenter

在 DataChangeEventCenter 的 onChange 函數中,會進行投放

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

3.4 DataChangeEventQueue

在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裏對Datum進行存儲。

3.5 DataChangeHandler

在 DataChangeHandler 之中,會提取ChangeData,而後進行Notify。

public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                    final ChangeData changeData = dataChangeEventQueue.take();
                    notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

具體以下:

+
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           |
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           +      +-------------------+

0x04 DataChangeHandler處理

因而咱們接着進行 DataChangeHandler 處理。即總述中提到的:DataChangeHandler 會把這個事件變動信息:

  1. 把這個事件變動信息變成Operator,放到AbstractAcceptorStore;
  2. 經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步;

下面咱們從第一部分 :把這個事件變動信息變成Operator,放到AbstractAcceptorStore 出發,進行講解日誌操做。

即如圖所示:

+
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           +
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           |      +-------+-----------+
                                           |              |
                                           |              |
                                           |              v
                                           |      +-------+---------+
                                           |      |  ChangeNotifier |
                                           |      +-------+---------+
                                           |              |
                                           |              |
                                           |              v
                                           |   +----------+------------+
                                           |   | AbstractAcceptorStore |
                                           |   +-----------------------+
                                           +

Acceptor的appendOperator誰來調用?在Notifier 裏面有,好比:

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

以及另外一個:

public class SnapshotBackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

0x05 AbstractAcceptorStore存儲

AbstractAcceptorStore是日誌存儲,咱們下面詳細分析。

5.1 Bean

對於操做信息,提供了一個Bean來存儲。

@Bean
public AcceptorStore localAcceptorStore() {
    return new LocalAcceptorStore();
}

5.2 StoreServiceFactory

做用是在 storeServiceMap 中存放各類 AcceptorStore,目前只有LocalAcceptorStore 這一個。

public class StoreServiceFactory implements ApplicationContextAware {

    private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();

    /**
     * get AcceptorStore by storeType
     * @param storeType
     * @return
     */
    public static AcceptorStore getStoreService(String storeType) {
        return storeServiceMap.get(storeType);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);

        map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
    }
}

5.3 AbstractAcceptorStore

AbstractAcceptorStore 是存儲的基本實現類,幾個基本成員是。

  • acceptors :是一個矩陣,按照dataCenter,dataInfoId維度來分類,存儲了此維度下的Acceptor;就是說,針對每個dataCenter,dataInfoId的組合,都有一個Acceptor,用來存儲這下面的Operator。

  • notifyAcceptorsCache :是一個矩陣,按照dataCente,dataInfoId維度來分類,緩存了此維度下須要進一步處理的Acceptor;

  • delayQueue :配合notifyAcceptorsCache使用,針對notifyAcceptorsCache的每個新acceptor,系統會添加一個消息進入queue,這個queue等延時到了,就會取出,而且從notifyAcceptorsCache取出對應的新acceptor進行相應處理;

按說應該是 cache 有東西,因此dequeue 時候就會取出來,可是若是這期間多放入了幾個進入 Cache,原有cache 的 value 只是被替換而已,等時間到了,也會取出來

notifyAcceptorsCache 也是按照 data center 來控制的,只有按期 removeCache。

public abstract class AbstractAcceptorStore implements AcceptorStore {

    private static final int               DEFAULT_MAX_BUFFER_SIZE = 30;

    @Autowired
    protected IMetaServerService           metaServerService;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DataServerConnectionFactory    dataServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;

    private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors               = new ConcurrentHashMap<>();

    private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache    = new ConcurrentHashMap<>();

    private DelayQueue<DelayItem<Acceptor>>     delayQueue 
}

具體以下圖:

+-----------------------------+                      +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|[AbstractAcceptorStore]      |                      |
|                             |   +-> dataCenter +---+
|                             |   |                  |
|     acceptors  +--------------->+                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|                             |   |
|     notifyAcceptorsCache    |   |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|           +                 |   +-> dataCenter +-->+
+-----------------------------+                      |
            |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            |
            |
            |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            |                     +-> dataCenter +-->+
            |                     |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            +-------------------->+
                                  |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                                  +-> dataCenter +---+
                                                     +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>

手機如圖:

有一點須要說明,就是delayQueue 爲什麼要延遲隊列。這是因爲SOFA的「秒級服務上下線通知「特性形成的

由於要實現此特性,因此涉及到了一個鏈接敏感性問題,即在 SOFARegistry 裏,全部 Client 都與 SessionServer 保持長鏈接,每條長鏈接都會有基於 bolt 的鏈接心跳,若是鏈接斷開,Client 會立刻從新建連,時刻保證 Client 與 SessionServer 之間有可靠的鏈接。

由於強烈的鏈接敏感性,因此致使若是隻是網絡問題致使鏈接斷開,實際的進程並無宕機,那麼 Client 會立刻重連 SessionServer 並從新註冊全部服務數據。這種大量的短暫的服務下線後又從新上線會給用戶帶來困擾和麻煩

所以在 DataServer 內部實現了數據延遲合併的功能,就是這裏的DelayQueue

5.4 加入

addOperator的基本邏輯是:

  • 從Operator的Datum中提取dataCenter和dataInfoId;
  • 從acceptors取出dataCenter對應的Map<dataInfoId, Acceptor> acceptorMap;
  • 從acceptorMap中提取dataInfoId對應的existAcceptor;
  • 若是新operator是SnapshotOperator類型,則清除以前的 opeator queue。
  • 不然加入新operator;
  • 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定時任務會處理;

在操做中,都是使用putIfAbsent,這樣短時間內如有多個一樣value插入,則不會替換原有的value,這樣 起到了歸併做用

@Override
public void addOperator(Operator operator) {

    Datum datum = operator.getDatum();
    String dataCenter = datum.getDataCenter();
    String dataInfoId = datum.getDataInfoId();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
        if (acceptorMap == null) {
            Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
            acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
            if (acceptorMap == null) {
                acceptorMap = newMap;
            }
        }

        Acceptor existAcceptor = acceptorMap.get(dataInfoId);
        if (existAcceptor == null) {
            Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
                dataCenter, datumCache);
            existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
            if (existAcceptor == null) {
                existAcceptor = newAcceptor;
            }
        }

        if (operator instanceof SnapshotOperator) {
            //snapshot: clear the queue, Make other data retrieve the latest memory data
            existAcceptor.clearBefore();
        } else {
            existAcceptor.appendOperator(operator);
        }

        //put cache
        putCache(existAcceptor);
    } 
}

putCache的做用是:

  • 從acceptor中提取dataCenter和dataInfoId;
  • 從notifyAcceptorsCache中取出dataCenter對應的Map<dataInfoId, Acceptor> acceptorMap;
  • 向acceptorMap中放入dataInfoId對應的acceptor;
  • 若是acceptorMap中以前沒有對應的value,則把acceptor放入delayQueue;

這裏也使用putIfAbsent,這樣短時間內如有多個一樣value插入,則不會替換原有的value,這樣 起到了歸併做用

private void putCache(Acceptor acceptor) {

    String dataCenter = acceptor.getDataCenter();
    String dataInfoId = acceptor.getDataInfoId();

    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
        if (acceptorMap == null) {
            Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
            acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
            if (acceptorMap == null) {
                acceptorMap = newMap;
            }
        }
        Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
        if (existAcceptor == null) {
            addQueue(acceptor);
        }
    } 
}

5.5 使用

具體消費是在按期任務中完成。消費日誌的目的就是同步日誌操做給其餘 DataServer。

5.5.1 Scheduler

Scheduler類是按期任務,會啓動兩個線程池按期調用AcceptorStore的函數

public class Scheduler {
    private final ScheduledExecutorService scheduler;
  
    public final ExecutorService           versionCheckExecutor;

    private final ThreadPoolExecutor       expireCheckExecutor;

    @Autowired
    private AcceptorStore                  localAcceptorStore;

   public Scheduler() {
        scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));

        expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

        versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory(
                "SyncDataScheduler-versionChangeCheck"));

    }
  
    public void startScheduler() {
        scheduler.schedule(
                new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                        TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
                30, TimeUnit.SECONDS);

        versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());

    }
}

AbstractAcceptorStore中函數以下:

5.5.2 changeDataCheck

changeDataCheck 內部是一個while true,因此不須要再使用線程池。

changeDataCheck綁定在delayQueue上,若是有新消息,則會取出Acceptor,也從notifyAcceptorsCache取出Acceptor,調用notifyChange(acceptor);進行處理 。

@Override
public void changeDataCheck() {

    while (true) {
        try {
            DelayItem<Acceptor> delayItem = delayQueue.take();
            Acceptor acceptor = delayItem.getItem();
            removeCache(acceptor); // compare and remove
        } catch (InterruptedException e) {
            break;
        } catch (Throwable e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

}

消費Cache用到的是removeCache。

private void removeCache(Acceptor acceptor) {
    String dataCenter = acceptor.getDataCenter();
    String dataInfoId = acceptor.getDataInfoId();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
        if (acceptorMap != null) {
            boolean result = acceptorMap.remove(dataInfoId, acceptor);
            if (result) {
                //data change notify
                notifyChange(acceptor);
            }
        }
    } 
}
5.5.2.1 通知NotifyDataSyncRequest

在removeCache中,也使用notifyChange進行了通知,邏輯以下:

  • 從acceptor中提取 DataInfoId;
  • 根據DataInfoId從meta service中獲取dataServerNodes的ip;
  • 遍歷ip,經過bolt server進行通知syncServer.sendSync,就是給ip對應的data center發送 NotifyDataSyncRequest;
private void notifyChange(Acceptor acceptor) {

    Long lastVersion = acceptor.getLastVersion();

    NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
        acceptor.getDataCenter(), lastVersion, getType());

    List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId());
    for (String targetDataIp : targetDataIps) {

        if (DataServerConfig.IP.equals(targetDataIp)) {
            continue;
        }

        Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort());

        for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) {
            try {
                Connection connection = dataServerConnectionFactory.getConnection(targetDataIp);
                if (connection == null) {
                    TimeUtil.randomDelay(1000);
                    continue;
                }
                syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
                    request, 1000);
                break;
            } 
        }
    }
}

這部分的調用邏輯爲:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest

具體以下圖:

+--------------------------+
|                          |     +----------------------------------------------------------------------+
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |
|                          |     |                                                                      |
+--------+-----------------+     |                                                                      |
         |                       |                                                                      |
         |                       |                                                                      |
         |                       |                                                                      |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors            |
         |   changeDataCheck     |                                                                      |
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                    +            +----------------------------------------------------------------------+
                    |
                    |
                    |
                    | NotifyDataSyncRequest
                    |
                    v
             +------+-----------+
             | Other DataServer |
             +------------------+

手機以下圖:

5.5.3 checkAcceptorsChangAndExpired

checkAcceptorsChangAndExpired做用是遍歷acceptors每一個acceptor,看看是否expired,進行處理。

@Override
public void checkAcceptorsChangAndExpired() {
    acceptors.forEach((dataCenter, acceptorMap) -> {
        if (acceptorMap != null && !acceptorMap.isEmpty()) {
            acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0));
        }
    });
}

此時,邏輯以下:

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                    +            +----------------------------------------------------------------------+
                    |
                    |
                    |
                    | NotifyDataSyncRequest
                    |
                    v
             +------+-----------+
             | Other DataServer |
             +------------------+

手機以下:

0x06 Acceptor日誌操做

這裏記錄了日誌,即記錄了全部的Datum操做。

操做日誌存儲採用Queue方式,獲取日誌時候經過當前版本號在堆棧內所在位置,把全部版本以後的操做日誌同步過來執行。

public class Acceptor {
    private final String                    dataInfoId;
    private final String                    dataCenter;
    private int                             maxBufferSize;
    static final int                        DEFAULT_DURATION_SECS = 30;
    private final Deque<Long/*version*/>   logOperatorsOrder     = new ConcurrentLinkedDeque<>();
    private Map<Long/*version*/, Operator> logOperators          = new ConcurrentHashMap<>();
    private final DatumCache                datumCache;
}

關鍵變量是:

  • logOperators:按照版本號爲key存儲的map,用來存儲全部的Operator;
  • logOperatorsOrder:由於map沒有辦法排序,因此設置此queue來存儲版本號

Operator 就是每一步操做對應的Datum。

public class Operator {
    private Long               version;
    private Long               sourceVersion;
    private Datum              datum;
    private DataSourceTypeEnum sourceType;
}

6.1 appendOperator

此函數做用是:添加一個操做日誌。

  • 若是queue已經滿了,則取出第一個消息,爲了向後段插入一個新的 。
  • 若是Operator版本號爲空,則設置爲0L;
  • 若是Operator的前一個版本號與queue尾部Operator版本號不一致,說明queue裏面不對了,須要清空map和queue。
  • 向map中加入Operator;
  • 若是是新版本的Operator,則把版本加入queue;

具體代碼以下:

/**
 * append operator to queue,if queue is full poll the first element and append.
 * Process will check version sequence,it must append with a consequent increase in version,
 * otherwise queue will be clean
 *
 * @param operator
 */
public void appendOperator(Operator operator) {
    write.lock();
    try {
        if (isFull()) {
            logOperators.remove(logOperatorsOrder.poll());
        }
        if (operator.getSourceVersion() == null) {
            operator.setSourceVersion(0L);
        }
        Long tailVersion = logOperatorsOrder.peekLast();
        if (tailVersion != null) {
            //operation add not by solid sequence
            if (tailVersion.longValue() != operator.getSourceVersion().longValue()) {
                clearBefore();
            }
        }

        Operator previousOperator = logOperators.put(operator.getVersion(), operator);
        if (previousOperator == null) {
            logOperatorsOrder.add(operator.getVersion());
        } 
    } finally {
        write.unlock();
    }
}

appendOperator誰來調用?在Notifier 裏面有,好比:

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

以及

public class SnapshotBackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

6.2 checkExpired

此方法做用是去除過時日誌。version是時間戳,因此能夠按期check,若是過時,就清除。

public void checkExpired(int durationSEC) {
    write.lock();
    try {
        //check all expired
        Long peekVersion = logOperatorsOrder.peek();
        if (peekVersion != null && isExpired(durationSEC, peekVersion)) {
            logOperators.remove(logOperatorsOrder.poll());
            checkExpired(durationSEC);
        }
    } finally {
        write.unlock();
    }
}

0x07 NotifyDataSyncRequest通知數據同步

此請求做用是通知接收端進行數據同步。

回憶下這部分的調用邏輯爲:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest

7.1 NotifyDataSyncHandler

接收端data server經過NotifyDataSyncHandler處理

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements
                                                                                       AfterWorkingProcess {

    @Autowired
    private DataServerConfig                                      dataServerConfig;

    @Autowired
    private GetSyncDataHandler                                    getSyncDataHandler;

    @Autowired
    private DataChangeEventCenter                                 dataChangeEventCenter;

    private Executor                                              executor    = ExecutorFactory
                                                                                  .newFixedThreadPool(
                                                                                      10,
                                                                                      NotifyDataSyncHandler.class
                                                                                          .getSimpleName());

    private ThreadPoolExecutor                                    notifyExecutor;

    @Autowired
    private DataNodeStatus                                        dataNodeStatus;

    @Autowired
    private DatumCache                                            datumCache;
}

7.1.1 doHandle

doHandle方法用來繼續處理。

@Override
public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
    final Connection connection = ((BoltChannel) channel).getConnection();
    if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
        noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
        return CommonResponse.buildSuccessResponse();
    }
    executorRequest(connection, request);
    return CommonResponse.buildSuccessResponse();
}

7.1.2 executorRequest

由於接到了發起端DataServer的同步通知NotifyDataSyncRequest,因此接收端DataServer主動發起拉取,進行同步數據。即調用GetSyncDataHandler來發送SyncDataRequest

private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
    executor.execute(() -> {
        fetchSyncData(connection, request);
    });
}

protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
    String dataInfoId = request.getDataInfoId();
    String dataCenter = request.getDataCenter();
    Datum datum = datumCache.get(dataCenter, dataInfoId);
    Long version = (datum == null) ? null : datum.getVersion();
    Long requestVersion = request.getVersion();

    if (version == null || requestVersion == 0L || version < requestVersion) {
        getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
            new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
            dataChangeEventCenter));
    } 
}

7.1.3 GetSyncDataHandler

GetSyncDataHandler和SyncDataCallback配合。

即調用GetSyncDataHandler來發送SyncDataRequest,用SyncDataCallback接收同步結果。

├── remoting
│   ├── dataserver
│   │   ├── DataServerConnectionFactory.java
│   │   ├── DataServerNodeFactory.java
│   │   ├── GetSyncDataHandler.java
│   │   ├── SyncDataCallback.java
│   │   ├── handler
│   │   └── task

GetSyncDataHandler 和 SyncDataCallback 這兩個輔助類的位置比較奇怪,大概由於是功能類,因此放在dataserver目錄下,我的認爲也許單獨設置一個目錄存放更好。

public class GetSyncDataHandler {
    @Autowired
    private DataNodeExchanger   dataNodeExchanger;

    public void syncData(SyncDataCallback callback) {
        int tryCount = callback.getRetryCount();
        if (tryCount > 0) {
            try {
                callback.setRetryCount(--tryCount);
                dataNodeExchanger.request(new Request() {
                    @Override
                    public Object getRequestBody() {
                        return callback.getRequest();
                    }

                    @Override
                    public URL getRequestUrl() {
                        return new URL(callback.getConnection().getRemoteIP(), callback
                            .getConnection().getRemotePort());
                    }

                    @Override
                    public CallbackHandler getCallBackHandler() {
                        return new CallbackHandler() {
                            @Override
                            public void onCallback(Channel channel, Object message) {
                                callback.onResponse(message);
                            }

                            @Override
                            public void onException(Channel channel, Throwable exception) {
                                callback.onException(exception);
                            }

                            @Override
                            public Executor getExecutor() {
                                return callback.getExecutor();
                            }
                        };
                    }
                });
            }
        }
    }

}

7.1.4 SyncDataCallback

這裏接收同步結果。

public class SyncDataCallback implements InvokeCallback {

    private static final Executor EXECUTOR    = ExecutorFactory.newFixedThreadPool(5,
                                                  SyncDataCallback.class.getSimpleName());

    private static final int      RETRY_COUNT = 3;

    private Connection            connection;

    private SyncDataRequest       request;

    private GetSyncDataHandler    getSyncDataHandler;

    private int                   retryCount;

    private DataChangeEventCenter dataChangeEventCenter;

    @Override
    public void onResponse(Object obj) {
        GenericResponse<SyncData> response = (GenericResponse) obj;
        if (!response.isSuccess()) {
            getSyncDataHandler.syncData(this);
        } else {
            SyncData syncData = response.getData();
            Collection<Datum> datums = syncData.getDatums();
            DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
                .getDataSourceType());
            if (syncData.getWholeDataTag()) {
                //handle all data, replace cache with these datum directly
                for (Datum datum : datums) {
                    if (datum == null) {
                        datum = new Datum();
                        datum.setDataInfoId(syncData.getDataInfoId());
                        datum.setDataCenter(syncData.getDataCenter());
                    }
                    Datum.internDatum(datum);
                    dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
                    break;
                }
            } else {
                //handle incremental data one by one
                if (!CollectionUtils.isEmpty(datums)) {
                    for (Datum datum : datums) {
                        if (datum != null) {
                            Datum.internDatum(datum);
                            dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
                                dataSourceTypeEnum, datum);
                        }
                    }
                } 
            }
        }
    }
}

此時邏輯以下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +----------------------------------------------------------------------+
                     |
NotifyDataSyncRequest| 1         ^ 2
                     |           |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest
                     v           |
             +-------+-----------------------------------+
             |[Other DataServer] |                       |
             |                   |                       |
             |                   |                       |
             |                   +                       |
             |  GetSyncDataHandler      SyncDataCallback |
             |                                           |
             |                                           |
             |                                           |
             |                                           |
             +-------------------------------------------+

手機如圖:

0x08 SyncDataRequest回送通知

SyncDataRequest發送回通知發送者。因此這裏是other DataServer 發送給 Sender DataServer

8.1 SyncDataRequest

public class SyncDataRequest implements Serializable {

    private String            dataInfoId;

    private String            dataCenter;

    private String            dataSourceType;

    /**
     * be null when dataInfoId not exist in local datumCache
     */
    private Long              version;
}

8.1.1 SyncDataRequest 從哪裏來

咱們回憶下,SyncDataRequest 從哪裏來?在 NotifyDataSyncHandler 的響應函數中,會產生 SyncDataRequest。這裏會根據請求的信息,從cache之中獲取infoId對應的version,而後發送請求。

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {

    protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
        String dataInfoId = request.getDataInfoId();
        String dataCenter = request.getDataCenter();
        Datum datum = datumCache.get(dataCenter, dataInfoId);
        Long version = (datum == null) ? null : datum.getVersion();
        Long requestVersion = request.getVersion();

        if (version == null || requestVersion == 0L || version < requestVersion) {
            getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
                new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
                dataChangeEventCenter));
        } 
    }
}

進而在AbstractAcceptorStore之中

private void notifyChange(Acceptor acceptor) {

    Long lastVersion = acceptor.getLastVersion();

    //may be delete by expired
    if (lastVersion == null) {
        lastVersion = 0L;
    }

    NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
        acceptor.getDataCenter(), lastVersion, getType());
    
    syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
                        request, 1000);
}

8.2 syncDataHandler

通知發起者使用 SyncDataHandler 來處理。

  • syncDataHandler

節點間數據同步 Handler,該 Handler 被觸發時,會經過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回全部大於當前請求數據版本號的全部數據,便於節點間進行數據同步。

public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public Object doHandle(Channel channel, SyncDataRequest request) {
        SyncData syncData = syncDataService.getSyncDataChange(request);
        return new GenericResponse<SyncData>().fillSucceed(syncData);
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    public Class interest() {
        return SyncDataRequest.class;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}

8.3 SyncDataServiceImpl

具體業務服務是SyncDataServiceImpl。會從acceptorStore獲取data,即getSyncDataChange方法。

public class SyncDataServiceImpl implements SyncDataService {

    @Override
    public void appendOperator(Operator operator) {
        AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType()
            .toString());
        if (acceptorStore != null) {
            acceptorStore.addOperator(operator);
        } 
    }

    @Override
    public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) {
        AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest
            .getDataSourceType());
        if (acceptorStore != null) {
            return acceptorStore.getSyncData(syncDataRequest);
        } 
    }
}

關於appendOperator如何調用,前文有描述。

SyncDataServiceImpl會繼續調用到AbstractAcceptorStore。

8.4 AbstractAcceptorStore

根據dataCenter和dataInfoId獲取出Acceptor,而後返回其process後的數據。

@Override
public SyncData getSyncData(SyncDataRequest syncDataRequest) {

    String dataCenter = syncDataRequest.getDataCenter();
    String dataInfoId = syncDataRequest.getDataInfoId();

    Long currentVersion = syncDataRequest.getVersion();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
        Acceptor existAcceptor = acceptorMap.get(dataInfoId);
        return existAcceptor.process(currentVersion);
    } 
}

8.5 Acceptor

而後是Acceptor的處理。

處理髮送數據的當前版本號,若是當前版本號存在於當前queue中,返回全部版本號大於當前版本號的Operator,不然全部Operator。

public SyncData process(Long currentVersion) {
    read.lock();
    try {
        Collection<Operator> operators = acceptOperator(currentVersion);
        List<Datum> retList = new LinkedList<>();
        SyncData syncData;
        boolean wholeDataTag = false;
        if (operators != null) {
            //first get all data
            if (operators.isEmpty()) {
                wholeDataTag = true;
                retList.add(datumCache.get(dataCenter, dataInfoId));
            } else {
                for (Operator operator : operators) {
                    retList.add(operator.getDatum());
                }
            }
            syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
        } else {
            //no match get all data
            wholeDataTag = true;
            retList.add(datumCache.get(dataCenter, dataInfoId));
            syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
        }

        return syncData;
    } finally {
        read.unlock();
    }
}

同步數據結構以下:

public class SyncData implements Serializable {

    private String            dataInfoId;

    private String            dataCenter;

    private Collection<Datum> datums;

    private boolean           wholeDataTag;
}

此時圖示以下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +------------------------------------------------+-----+---------------+
                     |                                                            ^     |
NotifyDataSyncRequest| 1   +-----------------+  3     +--------------------+   4  |     |
                     |     | syncDataHandler +------> | SyncDataServiceImpl+------+     |
                     |     +-----+-----------+        +--------------------+            |
                     |           ^ 2                                                    |
                     |           |                                                      |  5
                     |           |                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest                                     |
                     v           |                                                      |
             +-------+-----------------------------------+                              |
             |[Other DataServer] |                       |                              |
             |                   |                       |                              |
             |                   |                       |                              |
             |                   +                       |                              |
             |  GetSyncDataHandler      SyncDataCallback |  <---------------------------+
             |                                           |
             |                                           |
             |                                           |
             |                                           |
             +-------------------------------------------+

手機以下:

0x09 SyncDataCallback接受者回調

回到接受者,遍歷接受到的全部Datum,逐一調用:

若是是所有datum,調用

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);

不然調用

dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)

具體以下:

public class SyncDataCallback implements InvokeCallback {

    private static final Executor EXECUTOR    = ExecutorFactory.newFixedThreadPool(5,
                                                  SyncDataCallback.class.getSimpleName());

    private static final int      RETRY_COUNT = 3;

    private Connection            connection;

    private SyncDataRequest       request;

    private GetSyncDataHandler    getSyncDataHandler;

    private int                   retryCount;

    private DataChangeEventCenter dataChangeEventCenter;

    @Override
    public void onResponse(Object obj) {
        GenericResponse<SyncData> response = (GenericResponse) obj;
        if (!response.isSuccess()) {
            getSyncDataHandler.syncData(this);
        } else {
            SyncData syncData = response.getData();
            Collection<Datum> datums = syncData.getDatums();
            DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
                .getDataSourceType());
            if (syncData.getWholeDataTag()) {
                //handle all data, replace cache with these datum directly
                for (Datum datum : datums) {
                    if (datum == null) {
                        datum = new Datum();
                        datum.setDataInfoId(syncData.getDataInfoId());
                        datum.setDataCenter(syncData.getDataCenter());
                    }
                    Datum.internDatum(datum);
                    dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
                    break;
                }
            } else {
                //handle incremental data one by one
                if (!CollectionUtils.isEmpty(datums)) {
                    for (Datum datum : datums) {
                        if (datum != null) {
                            Datum.internDatum(datum);
                            dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
                                dataSourceTypeEnum, datum);
                        }
                    }
                } 
            }
        }
    }
}

DataChangeEventCenter調用以下:

public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}

DataChangeEventQueue調用handleDatum處理,這部分在其餘文章中已經講述。這裏只是貼出代碼。

@Override
public void run() {
    if (changeData instanceof SnapshotData) {
        SnapshotData snapshotData = (SnapshotData) changeData;
        String dataInfoId = snapshotData.getDataInfoId();
        Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap();
        Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap();
        Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId);
        long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l;
        Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
        long version = datum != null ? datum.getVersion() : 0l;
        notify(datum, changeData.getSourceType(), null);
    } else {
        Datum datum = changeData.getDatum();
        String dataCenter = datum.getDataCenter();
        String dataInfoId = datum.getDataInfoId();
        DataSourceTypeEnum sourceType = changeData.getSourceType();
        DataChangeTypeEnum changeType = changeData.getChangeType();

        if (changeType == DataChangeTypeEnum.MERGE
            && sourceType != DataSourceTypeEnum.BACKUP
            && sourceType != DataSourceTypeEnum.SYNC) {
            //update version for pub or unPub merge to cache
            //if the version product before merge to cache,it may be cause small version override big one
            datum.updateVersion();
        }

        long version = datum.getVersion();

        try {
            if (sourceType == DataSourceTypeEnum.CLEAN) {
                if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                }
            } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                notifyTempPub(datum, sourceType, changeType);
            } else {
                MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                Long lastVersion = mergeResult.getLastVersion();

                if (lastVersion != null
                    && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                    return;
                }

                //lastVersion null means first add datum
                if (lastVersion == null || version != lastVersion) {
                    if (mergeResult.isChangeFlag()) {
                        notify(datum, sourceType, lastVersion);
                    }
                }
            }
        } 
    }

}

9.1 DataChangeHandler

DataChangeHandler 會按期提取DataChangeEventCenter中的消息,而後進行處理。

ChangeNotifier存儲了Datum。由於此時版本號已經更新,因此不會再次通知,至此流程結束。

MergeResult mergeResult = datumCache.putDatum(changeType, datum);


//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
    if (mergeResult.isChangeFlag()) {
        notify(datum, sourceType, lastVersion);
     }
}

此時邏輯以下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +------------------------------------------------+-----+---------------+
                     |                                                            ^     |
NotifyDataSyncRequest| 1   +-----------------+  3     +--------------------+   4  |     |
                     |     | syncDataHandler +------> | SyncDataServiceImpl+------+     |
                     |     +-----+-----------+        +--------------------+            |
                     |           ^ 2                                                    |
                     |           |                                                      |  5
                     |           |                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest                                     |
[Other DataServer]   |           |                                                      |
                     |           |                                                      |
                     |           |                                                      |
                     |           |              +---------------------------------------+
                     |           |              |
                     |           |              |
                     v           |              v
              +------+-----------++ +-----------+-------+  6    +-----------------------+  7   +--------------------+  8   +-----------------+
              | GetSyncDataHandler| |  SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler|
              +-------------------+ +-------------------+       +-----------------------+      +--------------------+      +-----------------+

手機上以下:

0x10 總結

回顧下「一次服務註冊過程」的服務數據在內部流轉過程。

  1. Client 調用 publisher.register 向 SessionServer 註冊服務。
  2. SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入內存 (SessionServer 會存儲 Client 的數據到內存,用於後續能夠跟 DataServer 作按期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
  3. DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入內存 ,DataServer 會以 dataInfoId 的維度彙總全部 PublisherRegister。同時,DataServer 將該 dataInfoId 的變動事件通知給全部 SessionServer,變動事件的內容是 dataInfoId 和版本號信息 version。
  4. 同時,異步地,DataServer 以 dataInfoId 維度增量地同步數據給其餘副本。由於 DataServer 在一致性 Hash 分片的基礎上,對每一個分片保存了多個副本(默認是3個副本)。
  5. SessionServer 接收到變動事件通知後,對比 SessionServer 內存中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了全部該 dataInfoId 具體的 PublisherRegister 列表。
  6. 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊以後的最新的服務列表數據。

由於篇幅所限,上文討論的是前兩點,本文介紹第三,第四點。若是之後有時間,會介紹最後兩點。

0xFF 參考

Eureka系列(六) TimedSupervisorTask類解析

Eureka的TimedSupervisorTask類(自動調節間隔的週期性任務)

java線程池ThreadPoolExecutor類使用詳解

Java線程池ThreadPoolExecutor實現原理剖析

深刻理解Java線程池:ThreadPoolExecutor

深刻理解Java線程池:ThreadPoolExecutor

Java中線程池ThreadPoolExecutor原理探究

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 - SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通訊框架SOFABolt解析之鏈接管理剖析

螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通訊框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

螞蟻通訊框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 - 優雅簡潔的設計之道

SofaBolt源碼分析-服務啓動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 - UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 - Connection 事件處理機制的設計

相關文章
相關標籤/搜索