[從源碼學設計]螞蟻金服SOFARegistry之存儲結構

[從源碼學設計]螞蟻金服SOFARegistry之存儲結構

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java

本文爲第六篇,介紹SOFARegistry的存儲結構,本文與業務聯繫密切。node

0x01 業務範疇

首先,咱們從 Data Server 角度出發,看看自己可能涉及的存儲結構。算法

哪些須要存儲。緩存

  • 自己服務器狀態;
  • 其餘服務器節點狀態,好比其餘DataServer,SessionServer,MetaServer;
  • 註冊的服務狀態;

因此咱們獲得以下問題須要思考:服務器

  • 問題:DataServer如何知道/保存其餘 DataServer?
  • 問題:考慮 其餘DataServer 須要保存什麼:ip,端口,狀態,如何hash,裏面存儲的數據怎麼對應hash?
  • 問題:幾個DataServer都從MetaServer獲取數據變化,互相發送同步消息,怎麼處理?
  • 問題:與其餘 Data Server 怎麼合做,新加入一個DataServer,數據須要切換?
  • 問題:爲何要把 DataServerNodeFactory 和 DataServerCache 分開。由於 Node 的結構不一樣致使的?
  • 問題:DataServerCache內部成員分紅幾類模塊?須要保存哪些信息?爲何這樣保存?

可能有些問題脫離了本文研究範疇,可是咱們也一塊兒羅列在這裏。網絡

1.1 緩存

時間和空間之間的平衡關係能夠說是計算機系統中最爲本質的關係之一。session

時間和空間這一對矛盾關係在推薦系統中的典型表現,主要體如今對緩存的使用上。數據結構

緩存一般用來存儲一些計算代價較高以及相對靜態變化較少的數據,經常在生產者和消費者之間起到緩衝的做用,使得兩者能夠解耦,各自異步進行。架構

利用緩存來解耦系統,帶來性能上的提高以及開發的便利,是在系統架構設計中須要掌握的一種通用的思路。

1.2 DataServer 分片機制

在大部分的服務註冊中心繫統中,每臺服務器都存儲着全量的服務註冊數據,服務器之間經過一致性協議(paxos、Raft 等)實現數據的複製,或者採用只保障最終一致性的算法,來實現異步數據複製。這樣的設計對於通常業務規模的系統來講沒有問題,而當應用於有着海量服務的龐大的業務系統來講,就會遇到性能瓶頸

爲解決這一問題,SOFARegistry 採用了數據分片的方法。全量服務註冊數據再也不保存在單機裏,而是分佈於每一個節點中,每臺服務器保存必定量的服務註冊數據,同時進行多副本備份,從理論上實現了服務無限擴容,且實現了高可用,最終達到支撐海量數據的目的。

在各類數據分片算法中,SOFARegistry 採用了業界主流的一致性 Hash 算法作數據分片,當節點動態擴縮容時,數據仍能均勻分佈,維持數據的平衡。

在數據同步時,沒有采用與 Dynamo、Casandra、Tair、Codis、Redis cluster 等項目中相似的預分片機制,而是在 DataServer 內存裏以 dataInfoId 爲粒度進行操做日誌記錄,這種實現方式在某種程度上也實現了「預分片」,從而保障了數據同步的有效性

1.3 服務模型

爲了更好的說明數據類型,咱們只能從SOFA博客中大段摘取文字

1.3.1 服務發佈模型(PublisherRegister)

  • dataInfoId:服務惟一標識,由``<分組 group><租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 一般是 com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據作邏輯上的切分,使不一樣 group 和 instance 的服務數據在邏輯上徹底獨立。模型裏有 group 和 instanceId 字段,但這裏不額外列出來,讀者只要理解 dataInfoId 的含義便可。
  • zone:是一種單元化架構下的概念,表明一個機房內的邏輯單元,一般一個物理機房(Datacenter)包含多個邏輯單元(zone)。在服務發現場景下,發佈服務時需指定邏輯單元(zone),而訂閱服務者能夠訂閱邏輯單元(zone)維度的服務數據,也能夠訂閱物理機房(datacenter)維度的服務數據,即訂閱該 datacenter 下的全部 zone 的服務數據。
  • dataList:服務註冊數據,一般包含「協議」、「地址」和「額外的配置參數」,例如 SOFARPC 所發佈的數據相似bolt://192.168.1.100:8080?timeout=2000」。這裏使用 dataList,表示一個 PublisherRegister 能夠容許同時發佈多個服務數據(可是一般只會發佈一個)。

1.3.2 服務訂閱模型(SubscriberRegister)

  • dataInfoId:服務惟一標識,上面已經解釋過了。
  • scope: 訂閱維度,共有 3 種訂閱維度:zone、dataCenter 和 global。zone 和 datacenter 的意義,在上述有關「zone」的介紹裏已經解釋。global 維度涉及到機房間數據同步的特性,目前暫未開源。

關於「zone」和「scope」的概念理解,這裏再舉個例子。以下圖所示,物理機房內有 ZoneA 和 ZoneB 兩個單元,PublisherA 處於 ZoneA 裏,因此發佈服務時指定了 zone=ZoneA,PublisherB 處於 ZoneB 裏,因此發佈服務時指定了 zone=ZoneB;此時 Subscriber 訂閱時指定了 scope=datacenter 級別,所以它能夠獲取到 PublisherA 和 PublisherB (若是 Subscriber 訂閱時指定了 scope=zone 級別,那麼它只能獲取到 PublisherA)。

1.3.3 dataInfoId

Data 層是數據服務器集羣。Data 層經過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的惟一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用

SOFARegistry 最先選擇了一致性哈希分片,因此一樣遇到了數據分佈不固定帶來的數據同步難題。咱們如何解決的呢?咱們經過在 DataServer 內存中以 dataInfoId 的粒度記錄操做日誌,而且在 DataServer 之間也是以 dataInfoId 的粒度去作數據同步(一個服務就由一個 dataInfoId 惟標識)。其實這種日誌記錄的思想和虛擬桶是一致的,只是每一個 datainfoId 就至關於一個 slot 了,這是一種因歷史緣由而採起的妥協方案。在服務註冊中心的場景下,datainfoId 每每對應着一個發佈的服務,因此總量仍是比較有限的,以螞蟻金服目前的規模,每臺 DataServer 中承載的 dataInfoId 數量也僅在數萬的級別,勉強實現了 dataInfoId 做爲 slot 的數據多副本同步方案。

最終一致性

SOFARegistry 在數據存儲層面採用了相似 Eureka 的最終一致性的過程,可是存儲內容上和 Eureka 在每一個節點存儲相同內容特性不一樣,採用每一個節點上的內容按照一致性 Hash 數據分片來達到數據容量無限水平擴展能力。

SOFARegistry 是一個 AP 分佈式系統,代表了在已有條件 P 的前提下,選擇了 A 可用性。當數據進行同步時,獲取到的數據與實際數據不一致。但由於存儲的信息爲服務的註冊節點,儘管會有短暫的不一致產生,但對於客戶端來講,大機率仍是能從這部分數據中找到可用的節點,不會由於數據暫時的不一致對業務系統帶來致命性的傷害。

集羣內部數據遷移過程

SOFARegistry 的 DataServer 選擇了「一致性 Hash分片」來存儲數據。在「一致性 Hash分片」的基礎上,爲了不「分片數據不固定」這個問題,SOFARegistry 選擇了在 DataServer 內存裏以 dataInfoId 的粒度記錄操做日誌,而且在 DataServer 之間也是以 dataInfoId 的粒度去作數據同步。

img

圖 DataServer 之間進行異步數據同步

數據和副本分別分佈在不一樣的節點上,進行一致性 Hash 分片,當時對主副本進行寫操做以後,主副本會把數據異步地更新到其餘副本中,實現了集羣內部不一樣副本之間的數據遷移工做。

1.3.4 版本號

爲了肯定服務發佈數據的變動,SOFA對於一個服務不只定義了服務 ID,還對一個服務 ID 定義了對應的版本信息。

服務發佈數據變動主動通知到 Session 時,Session 對服務 ID 版本變動比較,高版本覆蓋低版本數據,而後進行推送。

由於有了服務 ID 的版本號,Session 能夠按期發起版本號比較,若是Session 存儲的的服務 ID 版本號高於dataServer存儲的 ,Session再次拉取新版本數據進行推送,這樣避免了某次變動通知沒有通知到全部訂閱方的狀況。

0x02 基本概念

首先,咱們講講一些基本概念。

2.1 物理機房DataCenter

DataCenter表明一個物理機房。一個數據中心包括多個DataNode,這些DataNode就是同機房數據節點。

nodeList.add(new DataNode(new URL("192.168.0.1", 9632), "DefaultDataCenter"));
nodeList.add(new DataNode(new URL("192.168.0.2", 9632), "DefaultDataCenter"));
nodeList.add(new DataNode(new URL("192.168.0.3", 9632), "DefaultDataCenter"));

2.2 Server節點DataNode

DataNode是Server節點,能夠表明任意類型的Server,不管是meta,data,session。

public class DataNode implements Node, HashNode {
    private final URL    nodeUrl;
    private final String nodeName;
    private final String dataCenter;
    private String       regionId;
    private NodeStatus   nodeStatus;
    private long         registrationTimestamp;
}

2.3 數據節點DataServerNode

這是 Data Server 概念。

爲何要有DataNode和DataServerNode兩個相似的數據結構類型

原來這是分屬於不一樣的包,或者模塊。

  • DataNode 是從 MetaServer 傳來的,被 DataServerCache 使用,而 DataServerCache 放在 cache 包。
  • DataServerNode 是 DataServer 自己本身依據信息構建的,被DataServerNodeFactory 使用,放在 remoting 包。

具體定義以下:

public class DataServerNode implements HashNode {

    private String     ip;

    private String     dataCenter;

    private Connection connection;
}

2.4 服務Publisher

Publisher 是服務概念,具體以下。

public class Publisher extends BaseInfo {
    private List<ServerDataBox> dataList;
    private PublishType         publishType      = PublishType.NORMAL;
}

2.5 服務聚合Datum

Datum類定義以下,能夠看到裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。

Publisher 只是表明發佈者本身業務服務器。Datum則是從SOFARegistry總體角度作了整理,就是一個Session Server包括的服務聚合

public class Datum implements Serializable {
    private String                                dataInfoId;
    private String                                dataCenter;
    private String                                dataId;
    private String                                instanceId;
    private String                                group;
    private Map<String/*registerId*/, Publisher> pubMap           = new ConcurrentHashMap<>();
    private long                                  version;
    private boolean                               containsUnPub    = false;
}

0x03 本機 Data Server

如下是關於本Data Server服務器的數據結構。

3.1 自己Data Server狀態

DataNodeStatus表明自己Data Server的狀態

com.alipay.sofa.registry.server.data.node.DataNodeStatus

public enum LocalServerStatusEnum {
    INITIAL,
    WORKING
}

public class DataNodeStatus {
    private volatile LocalServerStatusEnum status = LocalServerStatusEnum.INITIAL;

    public LocalServerStatusEnum getStatus() {
        return status;
    }

    public void setStatus(LocalServerStatusEnum status) {
        LocalServerStatusEnum originStatus = this.status;
        this.status = status;
    }
}

3.1.1 設置狀態

DataServerCache . updateDataServerStatus 中有設置DataNodeStatus的狀態,好比:dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING)。

private void updateDataServerStatus() {
    if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {

        dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING);

        //after working update current dataCenter list to old DataServerChangeItem
        updateItem(
            newDataServerChangeItem.getServerMap().get(dataServerConfig.getLocalDataCenter()),
            newVersion, dataServerConfig.getLocalDataCenter());
    }
}

另外 addNotWorkingServer 有 addStatus 操做。

public void addNotWorkingServer(long version, String ip) {
    synchronized (DataServerCache.class) {
        if (version >= curVersion.get()) {
            addStatus(version, ip, LocalServerStatusEnum.INITIAL);
            if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
                updateDataServerStatus();
            }
        }
    }
}

3.2 本DataServer配置

DataServerConfig包括了本DataServer所有配置,這裏只摘錄了相關信息。

public class DataServerConfig {
    public static final String IP    = NetUtil.getLocalAddress().getHostAddress();
    private CommonConfig       commonConfig; 
    private int                numberOfReplicas = 1000;
  
    public int getNumberOfReplicas() {
        return numberOfReplicas;
    }	
  
    public String getLocalDataCenter() {
        return commonConfig.getLocalDataCenter();
    }  
}

3.3 Beans

對於 DataServerConfig 和 DataNodeStatus,系統作了beans。

@Configuration
protected static class DataServerBootstrapConfigConfiguration {

    ...

    @Bean
    @ConditionalOnMissingBean
    public DataServerConfig dataServerConfig(CommonConfig commonConfig) {
        return new DataServerConfig(commonConfig);
    }

    @Bean
    public DataNodeStatus dataNodeStatus() {
        return new DataNodeStatus();
    }

    ...
}

具體以下:

+-----------------------------------------------+
|                                               |
|  [Data Server]                                |
|                                               |
|                                               |
| +---------------+       +------------------+  |
| | DataNodeStatus|       | DataServerConfig |  |
| +---------------+       +------------------+  |
|                                               |
+-----------------------------------------------+

0x04 Meta Server

由於不涉及到 Meta Server 內部架構,因此從 Data Server 角度看,只要存儲 Meta Server 對應的網絡 Connection 便可

其中邏輯意義是:Map<dataCenter, Map<ip, Connection>>,就是哪些dataCenter中包括哪些Meta Server,對應於哪些ip

public class MetaServerConnectionFactory {
    private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>();
}

具體以下:

+-----------------------------------------+
|       MetaServerConnectionFactory       |
|                                         |
|  Map<dataCenter, Map<ip, Connection> >  |
|                                         |
+-----------------------------------------+

0x05 Session Server

由於不涉及到 Session Server 內部架構,因此從 Data Server 角度看,只要存儲 Session Server 對應的網絡 Connection 便可

其中邏輯含義從註釋便可瞭解。

public class SessionServerConnectionFactory {
    private static final int               DELAY                       = 30 * 1000;
  
    private static final Map               EMPTY_MAP                   = new HashMap(0);
  
    /**
     * key  :   SessionServer address
     * value:   SessionServer processId
     */
    private final Map<String, String>      SESSION_CONN_PROCESS_ID_MAP = new ConcurrentHashMap<>();

    /**
     * key  :   SessionServer processId
     * value:   ip:port of clients
     */
    private final Map<String, Set<String>> PROCESS_ID_CONNECT_ID_MAP   = new ConcurrentHashMap<>();

    /**
     * key  :   SessionServer processId
     * value:   pair(SessionServer address, SessionServer connection)
     */
    private final Map<String, Pair>        PROCESS_ID_SESSION_CONN_MAP = new ConcurrentHashMap<>();

    @Autowired
    private DisconnectEventHandler         disconnectEventHandler;  
}

具體以下:

+------------------------------------------------------------------------------------+
|[SessionServerConnectionFactory]                                                    |
|                                                                                    |
|                                                                                    |
|                                                                                    |
|EMPTY_MAP                                                                           |
|                                                                                    |
|Map<SessionServer address, SessionServer processId>                                 |
|                                                                                    |
|Map<SessionServer processId, Set<ip:port of clients> >                              |
|                                                                                    |
|Map<SessionServer processId, pair(SessionServer address, SessionServer connection)> |
|                                                                                    |
+------------------------------------------------------------------------------------+

0x06 其餘Data Server

由於涉及到與其餘 Data Server 的深度交互,因此須要對其餘 Data Server 的深度信息做存儲

分爲兩個部分:DataServerNodeFactory和DataServerCache。

爲何要有DataServerNodeFactory和DataServerCache兩個相似的數據結構類型

從註釋來看,是爲了把功能分離細化,DataServerNodeFactory專一鏈接管理,DataServerCache注重dataServer的變化與版本管理

DataServerNodeFactory

the factory to hold other dataservers and connection connected to them

DataServerCache

cache of dataservers

因此也分別在不一樣的包,或者模塊。

  • DataServerCache 放在 cache 包。
  • DataServerNodeFactory 放在 remoting 包。

6.1 DataServerNodeFactory

DataServerNodeFactory 存儲了其餘 Data Server 的 DataServerNode,由於 DataServerNode 自己就包括了 Connection,因此 DataServerNodeFactory 也間接的包含了 Connection,這從其類定義註釋能夠看出,並且其定義是在remoting.dataserver包之中。

the factory to hold other dataservers and connection connected to them

DataServerNodeFactory 裏面按照兩個維度存儲同一類東西,就是其餘DataServer :

  • Map<dataCenter, Map<ip, DataServerNode>> MAP;
  • Map<dataCenter, ConsistentHash >這裏會計算 DataServerNode 的一致性hash

具體定義以下:

public class DataServerNodeFactory {
    /**
     * row:     dataCenter
     * column:  ip
     * value    dataServerNode
     */
    private static final Map<String, Map<String, DataServerNode>>    MAP                 = new ConcurrentHashMap<>();

    /**
     * key:     dataCenter
     * value:   consistentHash
     */
    private static final Map<String, ConsistentHash<DataServerNode>> CONSISTENT_HASH_MAP = new ConcurrentHashMap<>();

    private static AtomicBoolean  init        = new AtomicBoolean(false);
}

6.1.1 添加

在DataServerChangeEventHandler.doHandle裏面會調用connectDataServer,其又會調用 DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); 來添加,其裏面又會生成一致性Hash。

6.1.2 使用

DefaultMetaServiceImpl 會調用 DataServerNodeFactory 來計算 consistentHash 來獲取 DataServerNode。

public class DefaultMetaServiceImpl implements IMetaServerService {
    @Override
    public DataServerNode getDataServer(String dataCenter, String dataInfoId) {
        return DataServerNodeFactory.computeDataServerNode(dataCenter, dataInfoId);
    }  
}

6.2 DataServerCache

由註釋能夠知道,這是其餘dataservers的緩存

cache of dataservers

幾個關鍵變量:

nodeStatusMap 是本 Data Center 中全部 Data Server 的狀態。

  • dataServerChangeItem 是當前節點列表;
  • newDataServerChangeItem 是新加入的節點列表;有兩個變量的緣由是由於有一個變化過程,因此加入一個New....;

具體以下:

com.alipay.sofa.registry.server.data.cache.DataServerCache
  
public class DataServerCache {

    @Autowired
    private DataNodeStatus                                dataNodeStatus;

    @Autowired
    private DataServerConfig                              dataServerConfig;

    @Autowired
    private AfterWorkingProcessHandler                    afterWorkingProcessHandler;

    /** current dataServer list and version */
    private volatile DataServerChangeItem                 dataServerChangeItem    = new DataServerChangeItem();

    /** new input dataServer list and version */
    private volatile DataServerChangeItem                 newDataServerChangeItem = new DataServerChangeItem();

    private final AtomicBoolean                           HAS_NOTIFY_ALL          = new AtomicBoolean(false);

    private AtomicLong                                    curVersion              = new AtomicLong(-1L);

    /** version -> Map(serverIp, serverStatus) */
    private Map<Long, Map<String, LocalServerStatusEnum>> nodeStatusMap           = new ConcurrentHashMap<>();
}

下面介紹幾個DataServerCache的函數。

6.2.1 updateItem

updateItem會被幾個不一樣地方調用,進行更新dataServerChangeItem,就是插入一個new DataServer。

好比:LocalDataServerChangeEvent 和 DataServerChangeEvent 的響應函數就會調用。

public void updateItem(Map<String, DataNode> localDataNodes, Long version, String dataCenter) {
    synchronized (DataServerCache.class) {
        Long oldVersion = dataServerChangeItem.getVersionMap().get(dataCenter);
        Map<String, DataNode> oldList = dataServerChangeItem.getServerMap().get(dataCenter);
        Set<String> oldIps = oldList == null ? new HashSet<>() : oldList.keySet();
        Set<String> newIps = localDataNodes == null ? new HashSet<>() : localDataNodes.keySet();
        dataServerChangeItem.getServerMap().put(dataCenter, localDataNodes);
        dataServerChangeItem.getVersionMap().put(dataCenter, version);
   }
}

6.2.2 newDataServerChangeItem

newDataServerChangeItem 用這個來獲取全部的datacenters的全部DataServer

/**
 * get all datacenters
 *
 * @return
 */
public Set<String> getAllDataCenters() {
    return newDataServerChangeItem.getVersionMap().keySet();
}

6.2.3 dataServerChangeItem

dataServerChangeItem 被用來獲取某一個特定 data center的全部DataServer。

public Map<String, DataNode> getDataServers(String dataCenter) {
    return getDataServers(dataCenter, dataServerChangeItem);
}

public Map<String, DataNode> getDataServers(String dataCenter,
                                            DataServerChangeItem dataServerChangeItem) {
    return doGetDataServers(dataCenter, dataServerChangeItem);
}

private Map<String, DataNode> doGetDataServers(String dataCenter,
                                                   DataServerChangeItem dataServerChangeItem) {
        synchronized (DataServerCache.class) {
            Map<String, Map<String, DataNode>> dataserverMap = dataServerChangeItem.getServerMap();
            if (dataserverMap.containsKey(dataCenter)) {
                return dataserverMap.get(dataCenter);
            } else {
                return new HashMap<>();
            }
        }
}

6.2.4 DataServerChangeItem

此數據結構實際只用來獲取local data center的data servers

/**
 * change info of datacenters
 */
public class DataServerChangeItem {

    /** datacenter -> Map<ip, DataNode> */
    private Map<String, Map<String, DataNode>> serverMap;

    /** datacenter -> version */
    private Map<String, Long>                  versionMap;
}

6.2.5 使用

有些類會間接使用dataServerCache。

好比:DefaultMetaServiceImpl . dataServerCache 會被NotifyOnlineHandler調用。

public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> {
    @Autowired
    private DataServerCache dataServerCache;

    @Override
    public Object doHandle(Channel channel, NotifyOnlineRequest request) {
        long version = request.getVersion();
        if (version >= dataServerCache.getCurVersion()) {
            dataServerCache.addNotWorkingServer(version, request.getIp());
        }
        return CommonResponse.buildSuccessResponse();
    }
}

NotifyOnlineHandler其配置在

@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler()); //在這裏
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());
    return list;
}

屬於 dataSyncServer 響應函數的一部分。

private void openDataSyncServer() {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
}

具體以下:

+---------------------------------------------------+
|                                                   |
| [DataServerNodeFactory]                           |
|                                                   |
|                                                   |
| Map<dataCenter, Map<ip, dataServerNode> >         |
|                                                   |
| Map<dataCenter, ConsistentHash<DataServerNode> >  |
|                                                   |
+---------------------------------------------------+

+---------------------------------------------------+
| [DataServerCache]                                 |
|                                                   |
|                                                   |
| DataServerChangeItem dataServerChangeItem         |
|                                                   |
| DataServerChangeItem newDataServerChangeItem      |
|                                                   |
| Map<>ersion, Map<serverIp, serverStatus> >        |
|                                                   |
+---------------------------------------------------+

6.2.6 DataServer版本

從以前的MetaServer分析以及DataServerChangeItem,可知DataSever也是有版本的。

public class DataServerChangeItem {

    /** datacenter -> Map<ip, DataNode> */
    private Map<String, Map<String, DataNode>> serverMap;

    /** datacenter -> version */
    private Map<String, Long>                  versionMap;
}

在DataServer中,DefaultMetaServiceImpl中有設置版本號。

if (obj instanceof NodeChangeResult) {
    NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
    Map<String, Long> versionMap = result.getDataCenterListVersions();
    versionMap.put(result.getLocalDataCenter(), result.getVersion());
    return new DataServerChangeItem(result.getNodes(), versionMap);
}

其來源是MetaServer的DataStoreService。

dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> {
    if (localDataCenter.equalsIgnoreCase(dataCenter)) {
        nodeChangeResult.setVersion(dataNodeRepository.getVersion());
    }
    versionMap.put(dataCenter, dataNodeRepository.getVersion());
    Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap();
    Map<String, DataNode> newMap = new ConcurrentHashMap<>();
    dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
    pushNodes.put(dataCenter, newMap);
});

以及DataStoreService。

metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> {
    if (localDataCenter.equalsIgnoreCase(dataCenter)) {
        nodeChangeResult.setVersion(metaNodeRepository.getVersion());
    }
    versionMap.put(dataCenter, metaNodeRepository.getVersion());
    Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap();
    Map<String, MetaNode> newMap = new ConcurrentHashMap<>();
    dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
    pushNodes.put(dataCenter, newMap);
});

都是提取dataServer的版本號,發送出去。

0x07 服務信息

服務信息包括 Subscriber 和 Publisher,這些信息須要深度存儲,本文僅以 Publisher 爲例分析。

前面描述了,Publisher包括在Datum之中,因此咱們下面的講解以Datum爲主。

7.1 Datum

Datum類定義以下,能夠看到裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。

Publisher 只是表明發佈者本身業務服務器。Datum則是從SOFARegistry總體角度作了整理,就是一個Session Server包括的服務聚合

public class Datum implements Serializable {

    private String                                dataInfoId;

    private String                                dataCenter;

    private String                                dataId;

    private String                                instanceId;

    private String                                group;

    private Map<String/*registerId*/, Publisher> pubMap           = new ConcurrentHashMap<>();

    private long                                  version;

    private boolean                               containsUnPub    = false;
}

7.2 DatumCache

DatumCache緩存了全部Datum,就是本DataServer對應全部的SessionServer中全部的服務

public class DatumCache {

    @Autowired
    private DatumStorage localDatumStorage;
}

7.3 LocalDatumStorage

LocalDatumStorage負責Datum具體的存儲。

public class LocalDatumStorage implements DatumStorage {


    /**
     * row:     dataCenter
     * column:  dataInfoId
     * value:   datum
     */
    protected final Map<String, Map<String, Datum>>     DATUM_MAP            = new ConcurrentHashMap<>();

    /**
     * all datum index
     *
     * row:     ip:port
     * column:  registerId
     * value:   publisher
     */
    protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();

    @Autowired
    private DataServerConfig                            dataServerConfig;
}

7.4 Beans

相關的Bean以下:

@Configuration
public static class DataServerStorageConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public DatumCache datumCache() {
        return new DatumCache();
    }

    @Bean
    @ConditionalOnMissingBean
    public LocalDatumStorage localDatumStorage() {
        return new LocalDatumStorage();
    }
}

0x08 Datum的前因後果

8.1 Session Server 內部

首先,咱們講講Session Server 內部如何獲取Datum

Session Server 內部,Datum存儲在 SessionCacheService 之中。

好比在 DataChangeFetchCloudTask 內部,能夠這樣獲取 Datum。

private Map<String, Datum> getDatumsCache() {
    Map<String, Datum> map = new HashMap<>();
    NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
    Collection<String> dataCenters = nodeManager.getDataCenters();
    if (dataCenters != null) {
        Collection<Key> keys = dataCenters.stream().
                map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
                        new DatumKey(fetchDataInfoId, dataCenter))).
                collect(Collectors.toList());

        Map<Key, Value> values = null;
        values = sessionCacheService.getValues(keys);

        if (values != null) {
            values.forEach((key, value) -> {
                if (value != null && value.getPayload() != null) {
                    map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                }
            });
        }
    }
    return map;
}

Session Server 會向 Data Server 發送 PublishDataRequest 請求。

8.2 PublishDataHandler

在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest。

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            String connectId = WordCache.getInstance().getWordCache(
                publisher.getSourceAddress().getAddressString());
            sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                connectId);
            // record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        return CommonResponse.buildSuccessResponse();
    }
}

8.3 DataChangeEventCenter

在 DataChangeEventCenter 的 onChange 函數中,會進行投放。

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

8.4 DataChangeEventQueue

在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裏對Datum進行存儲。

8.5 DataChangeHandler

在 DataChangeHandler 之中,會提取ChangeData,而後進行Notify。

public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                    final ChangeData changeData = dataChangeEventQueue.take();
                    notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

具體以下:

+
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           |
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           +      +-------------------+

0x09 總結

至此,本文總結基本存儲結構以下:

9.1 基本概念

物理機房DataCenter

DataCenter表明一個物理機房。一個數據中心包括多個DataNode,這些DataNode就是同機房數據節點。

Server節點DataNode

DataNode是Server節點,能夠表明任意類型的Server,不管是meta,data,session。

數據節點DataServerNode

這是 Data Server 概念。

服務Publisher

Publisher 是服務概念

服務聚合Datum

Datum裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。

Publisher 只是表明發佈者本身業務服務器。Datum則是從SOFARegistry總體角度作了整理,就是一個Session Server包括的服務聚合

9.2 自己Data Server狀態

DataNodeStatus表明自己Data Server的狀態DataServerConfig包括了本DataServer所有配置。

9.3 Meta Server

由於不涉及到 Meta Server 內部架構,因此從 Data Server 角度看,只要存儲 Meta Server 對應的網絡 Connection 便可

9.4 Session Server

由於不涉及到 Session Server 內部架構,因此從 Data Server 角度看,只要存儲 Session Server 對應的網絡 Connection 便可

9.5 其餘Data Server

由於涉及到與其餘 Data Server 的深度交互,因此須要對其餘 Data Server 的深度信息做存儲

分爲兩個部分:DataServerNodeFactory 和 DataServerCache。

爲何要有DataServerNodeFactory和DataServerCache兩個相似的數據結構類型

從註釋來看,是爲了把功能分離細化,DataServerNodeFactory專一鏈接管理,DataServerCache注重dataServer的變化與版本管理

0xFF 參考

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 - SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通訊框架SOFABolt解析之鏈接管理剖析

螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通訊框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

螞蟻通訊框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 - 優雅簡潔的設計之道

SofaBolt源碼分析-服務啓動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 - UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 - Connection 事件處理機制的設計

相關文章
相關標籤/搜索