SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第六篇,介紹SOFARegistry的存儲結構,本文與業務聯繫密切。node
首先,咱們從 Data Server 角度出發,看看自己可能涉及的存儲結構。算法
哪些須要存儲。緩存
因此咱們獲得以下問題須要思考:服務器
可能有些問題脫離了本文研究範疇,可是咱們也一塊兒羅列在這裏。網絡
時間和空間之間的平衡關係能夠說是計算機系統中最爲本質的關係之一。session
時間和空間這一對矛盾關係在推薦系統中的典型表現,主要體如今對緩存的使用上。數據結構
緩存一般用來存儲一些計算代價較高以及相對靜態變化較少的數據,經常在生產者和消費者之間起到緩衝的做用,使得兩者能夠解耦,各自異步進行。架構
利用緩存來解耦系統,帶來性能上的提高以及開發的便利,是在系統架構設計中須要掌握的一種通用的思路。
在大部分的服務註冊中心繫統中,每臺服務器都存儲着全量的服務註冊數據,服務器之間經過一致性協議(paxos、Raft 等)實現數據的複製,或者採用只保障最終一致性的算法,來實現異步數據複製。這樣的設計對於通常業務規模的系統來講沒有問題,而當應用於有着海量服務的龐大的業務系統來講,就會遇到性能瓶頸。
爲解決這一問題,SOFARegistry 採用了數據分片的方法。全量服務註冊數據再也不保存在單機裏,而是分佈於每一個節點中,每臺服務器保存必定量的服務註冊數據,同時進行多副本備份,從理論上實現了服務無限擴容,且實現了高可用,最終達到支撐海量數據的目的。
在各類數據分片算法中,SOFARegistry 採用了業界主流的一致性 Hash 算法作數據分片,當節點動態擴縮容時,數據仍能均勻分佈,維持數據的平衡。
在數據同步時,沒有采用與 Dynamo、Casandra、Tair、Codis、Redis cluster 等項目中相似的預分片機制,而是在 DataServer 內存裏以 dataInfoId 爲粒度進行操做日誌記錄,這種實現方式在某種程度上也實現了「預分片」,從而保障了數據同步的有效性。
爲了更好的說明數據類型,咱們只能從SOFA博客中大段摘取文字。
和
<租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 一般是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據作邏輯上的切分,使不一樣 group 和 instance 的服務數據在邏輯上徹底獨立。模型裏有 group 和 instanceId 字段,但這裏不額外列出來,讀者只要理解 dataInfoId 的含義便可。bolt://192.168.1.100:8080?timeout=2000
」。這裏使用 dataList,表示一個 PublisherRegister 能夠容許同時發佈多個服務數據(可是一般只會發佈一個)。關於「zone」和「scope」的概念理解,這裏再舉個例子。以下圖所示,物理機房內有 ZoneA 和 ZoneB 兩個單元,PublisherA 處於 ZoneA 裏,因此發佈服務時指定了 zone=ZoneA,PublisherB 處於 ZoneB 裏,因此發佈服務時指定了 zone=ZoneB;此時 Subscriber 訂閱時指定了 scope=datacenter 級別,所以它能夠獲取到 PublisherA 和 PublisherB (若是 Subscriber 訂閱時指定了 scope=zone 級別,那麼它只能獲取到 PublisherA)。
Data 層是數據服務器集羣。Data 層經過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的惟一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用。
SOFARegistry 最先選擇了一致性哈希分片,因此一樣遇到了數據分佈不固定帶來的數據同步難題。咱們如何解決的呢?咱們經過在 DataServer 內存中以 dataInfoId 的粒度記錄操做日誌,而且在 DataServer 之間也是以 dataInfoId 的粒度去作數據同步(一個服務就由一個 dataInfoId 惟標識)。其實這種日誌記錄的思想和虛擬桶是一致的,只是每一個 datainfoId 就至關於一個 slot 了,這是一種因歷史緣由而採起的妥協方案。在服務註冊中心的場景下,datainfoId 每每對應着一個發佈的服務,因此總量仍是比較有限的,以螞蟻金服目前的規模,每臺 DataServer 中承載的 dataInfoId 數量也僅在數萬的級別,勉強實現了 dataInfoId 做爲 slot 的數據多副本同步方案。
最終一致性
SOFARegistry 在數據存儲層面採用了相似 Eureka 的最終一致性的過程,可是存儲內容上和 Eureka 在每一個節點存儲相同內容特性不一樣,採用每一個節點上的內容按照一致性 Hash 數據分片來達到數據容量無限水平擴展能力。
SOFARegistry 是一個 AP 分佈式系統,代表了在已有條件 P 的前提下,選擇了 A 可用性。當數據進行同步時,獲取到的數據與實際數據不一致。但由於存儲的信息爲服務的註冊節點,儘管會有短暫的不一致產生,但對於客戶端來講,大機率仍是能從這部分數據中找到可用的節點,不會由於數據暫時的不一致對業務系統帶來致命性的傷害。
集羣內部數據遷移過程
SOFARegistry 的 DataServer 選擇了「一致性 Hash分片」來存儲數據。在「一致性 Hash分片」的基礎上,爲了不「分片數據不固定」這個問題,SOFARegistry 選擇了在 DataServer 內存裏以 dataInfoId 的粒度記錄操做日誌,而且在 DataServer 之間也是以 dataInfoId 的粒度去作數據同步。
圖 DataServer 之間進行異步數據同步
數據和副本分別分佈在不一樣的節點上,進行一致性 Hash 分片,當時對主副本進行寫操做以後,主副本會把數據異步地更新到其餘副本中,實現了集羣內部不一樣副本之間的數據遷移工做。
爲了肯定服務發佈數據的變動,SOFA對於一個服務不只定義了服務 ID,還對一個服務 ID 定義了對應的版本信息。
服務發佈數據變動主動通知到 Session 時,Session 對服務 ID 版本變動比較,高版本覆蓋低版本數據,而後進行推送。
由於有了服務 ID 的版本號,Session 能夠按期發起版本號比較,若是Session 存儲的的服務 ID 版本號高於dataServer存儲的 ,Session再次拉取新版本數據進行推送,這樣避免了某次變動通知沒有通知到全部訂閱方的狀況。
首先,咱們講講一些基本概念。
DataCenter表明一個物理機房。一個數據中心包括多個DataNode,這些DataNode就是同機房數據節點。
nodeList.add(new DataNode(new URL("192.168.0.1", 9632), "DefaultDataCenter")); nodeList.add(new DataNode(new URL("192.168.0.2", 9632), "DefaultDataCenter")); nodeList.add(new DataNode(new URL("192.168.0.3", 9632), "DefaultDataCenter"));
DataNode是Server節點,能夠表明任意類型的Server,不管是meta,data,session。
public class DataNode implements Node, HashNode { private final URL nodeUrl; private final String nodeName; private final String dataCenter; private String regionId; private NodeStatus nodeStatus; private long registrationTimestamp; }
這是 Data Server 概念。
爲何要有DataNode和DataServerNode兩個相似的數據結構類型。
原來這是分屬於不一樣的包,或者模塊。
具體定義以下:
public class DataServerNode implements HashNode { private String ip; private String dataCenter; private Connection connection; }
Publisher 是服務概念,具體以下。
public class Publisher extends BaseInfo { private List<ServerDataBox> dataList; private PublishType publishType = PublishType.NORMAL; }
Datum類定義以下,能夠看到裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是表明發佈者本身業務服務器。Datum則是從SOFARegistry總體角度作了整理,就是一個Session Server包括的服務聚合。
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
如下是關於本Data Server服務器的數據結構。
DataNodeStatus表明自己Data Server的狀態。
com.alipay.sofa.registry.server.data.node.DataNodeStatus public enum LocalServerStatusEnum { INITIAL, WORKING } public class DataNodeStatus { private volatile LocalServerStatusEnum status = LocalServerStatusEnum.INITIAL; public LocalServerStatusEnum getStatus() { return status; } public void setStatus(LocalServerStatusEnum status) { LocalServerStatusEnum originStatus = this.status; this.status = status; } }
DataServerCache . updateDataServerStatus 中有設置DataNodeStatus的狀態,好比:dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING)。
private void updateDataServerStatus() { if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING); //after working update current dataCenter list to old DataServerChangeItem updateItem( newDataServerChangeItem.getServerMap().get(dataServerConfig.getLocalDataCenter()), newVersion, dataServerConfig.getLocalDataCenter()); } }
另外 addNotWorkingServer 有 addStatus 操做。
public void addNotWorkingServer(long version, String ip) { synchronized (DataServerCache.class) { if (version >= curVersion.get()) { addStatus(version, ip, LocalServerStatusEnum.INITIAL); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { updateDataServerStatus(); } } } }
DataServerConfig包括了本DataServer所有配置,這裏只摘錄了相關信息。
public class DataServerConfig { public static final String IP = NetUtil.getLocalAddress().getHostAddress(); private CommonConfig commonConfig; private int numberOfReplicas = 1000; public int getNumberOfReplicas() { return numberOfReplicas; } public String getLocalDataCenter() { return commonConfig.getLocalDataCenter(); } }
對於 DataServerConfig 和 DataNodeStatus,系統作了beans。
@Configuration protected static class DataServerBootstrapConfigConfiguration { ... @Bean @ConditionalOnMissingBean public DataServerConfig dataServerConfig(CommonConfig commonConfig) { return new DataServerConfig(commonConfig); } @Bean public DataNodeStatus dataNodeStatus() { return new DataNodeStatus(); } ... }
具體以下:
+-----------------------------------------------+ | | | [Data Server] | | | | | | +---------------+ +------------------+ | | | DataNodeStatus| | DataServerConfig | | | +---------------+ +------------------+ | | | +-----------------------------------------------+
由於不涉及到 Meta Server 內部架構,因此從 Data Server 角度看,只要存儲 Meta Server 對應的網絡 Connection 便可。
其中邏輯意義是:Map<dataCenter, Map<ip, Connection>>
,就是哪些dataCenter中包括哪些Meta Server,對應於哪些ip。
public class MetaServerConnectionFactory { private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>(); }
具體以下:
+-----------------------------------------+ | MetaServerConnectionFactory | | | | Map<dataCenter, Map<ip, Connection> > | | | +-----------------------------------------+
由於不涉及到 Session Server 內部架構,因此從 Data Server 角度看,只要存儲 Session Server 對應的網絡 Connection 便可。
其中邏輯含義從註釋便可瞭解。
public class SessionServerConnectionFactory { private static final int DELAY = 30 * 1000; private static final Map EMPTY_MAP = new HashMap(0); /** * key : SessionServer address * value: SessionServer processId */ private final Map<String, String> SESSION_CONN_PROCESS_ID_MAP = new ConcurrentHashMap<>(); /** * key : SessionServer processId * value: ip:port of clients */ private final Map<String, Set<String>> PROCESS_ID_CONNECT_ID_MAP = new ConcurrentHashMap<>(); /** * key : SessionServer processId * value: pair(SessionServer address, SessionServer connection) */ private final Map<String, Pair> PROCESS_ID_SESSION_CONN_MAP = new ConcurrentHashMap<>(); @Autowired private DisconnectEventHandler disconnectEventHandler; }
具體以下:
+------------------------------------------------------------------------------------+ |[SessionServerConnectionFactory] | | | | | | | |EMPTY_MAP | | | |Map<SessionServer address, SessionServer processId> | | | |Map<SessionServer processId, Set<ip:port of clients> > | | | |Map<SessionServer processId, pair(SessionServer address, SessionServer connection)> | | | +------------------------------------------------------------------------------------+
由於涉及到與其餘 Data Server 的深度交互,因此須要對其餘 Data Server 的深度信息做存儲。
分爲兩個部分:DataServerNodeFactory和DataServerCache。
爲何要有DataServerNodeFactory和DataServerCache兩個相似的數據結構類型。
從註釋來看,是爲了把功能分離細化,DataServerNodeFactory專一鏈接管理,DataServerCache注重dataServer的變化與版本管理。
DataServerNodeFactory:
the factory to hold other dataservers and connection connected to them
DataServerCache
cache of dataservers
因此也分別在不一樣的包,或者模塊。
DataServerNodeFactory 存儲了其餘 Data Server 的 DataServerNode,由於 DataServerNode 自己就包括了 Connection,因此 DataServerNodeFactory 也間接的包含了 Connection,這從其類定義註釋能夠看出,並且其定義是在remoting.dataserver包之中。
the factory to hold other dataservers and connection connected to them
DataServerNodeFactory 裏面按照兩個維度存儲同一類東西,就是其餘DataServer :
具體定義以下:
public class DataServerNodeFactory { /** * row: dataCenter * column: ip * value dataServerNode */ private static final Map<String, Map<String, DataServerNode>> MAP = new ConcurrentHashMap<>(); /** * key: dataCenter * value: consistentHash */ private static final Map<String, ConsistentHash<DataServerNode>> CONSISTENT_HASH_MAP = new ConcurrentHashMap<>(); private static AtomicBoolean init = new AtomicBoolean(false); }
在DataServerChangeEventHandler.doHandle裏面會調用connectDataServer,其又會調用 DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); 來添加,其裏面又會生成一致性Hash。
DefaultMetaServiceImpl 會調用 DataServerNodeFactory 來計算 consistentHash 來獲取 DataServerNode。
public class DefaultMetaServiceImpl implements IMetaServerService { @Override public DataServerNode getDataServer(String dataCenter, String dataInfoId) { return DataServerNodeFactory.computeDataServerNode(dataCenter, dataInfoId); } }
由註釋能夠知道,這是其餘dataservers的緩存。
cache of dataservers
幾個關鍵變量:
nodeStatusMap 是本 Data Center 中全部 Data Server 的狀態。
具體以下:
com.alipay.sofa.registry.server.data.cache.DataServerCache public class DataServerCache { @Autowired private DataNodeStatus dataNodeStatus; @Autowired private DataServerConfig dataServerConfig; @Autowired private AfterWorkingProcessHandler afterWorkingProcessHandler; /** current dataServer list and version */ private volatile DataServerChangeItem dataServerChangeItem = new DataServerChangeItem(); /** new input dataServer list and version */ private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem(); private final AtomicBoolean HAS_NOTIFY_ALL = new AtomicBoolean(false); private AtomicLong curVersion = new AtomicLong(-1L); /** version -> Map(serverIp, serverStatus) */ private Map<Long, Map<String, LocalServerStatusEnum>> nodeStatusMap = new ConcurrentHashMap<>(); }
下面介紹幾個DataServerCache的函數。
updateItem會被幾個不一樣地方調用,進行更新dataServerChangeItem,就是插入一個new DataServer。
好比:LocalDataServerChangeEvent 和 DataServerChangeEvent 的響應函數就會調用。
public void updateItem(Map<String, DataNode> localDataNodes, Long version, String dataCenter) { synchronized (DataServerCache.class) { Long oldVersion = dataServerChangeItem.getVersionMap().get(dataCenter); Map<String, DataNode> oldList = dataServerChangeItem.getServerMap().get(dataCenter); Set<String> oldIps = oldList == null ? new HashSet<>() : oldList.keySet(); Set<String> newIps = localDataNodes == null ? new HashSet<>() : localDataNodes.keySet(); dataServerChangeItem.getServerMap().put(dataCenter, localDataNodes); dataServerChangeItem.getVersionMap().put(dataCenter, version); } }
newDataServerChangeItem 用這個來獲取全部的datacenters的全部DataServer。
/** * get all datacenters * * @return */ public Set<String> getAllDataCenters() { return newDataServerChangeItem.getVersionMap().keySet(); }
dataServerChangeItem 被用來獲取某一個特定 data center的全部DataServer。
public Map<String, DataNode> getDataServers(String dataCenter) { return getDataServers(dataCenter, dataServerChangeItem); } public Map<String, DataNode> getDataServers(String dataCenter, DataServerChangeItem dataServerChangeItem) { return doGetDataServers(dataCenter, dataServerChangeItem); } private Map<String, DataNode> doGetDataServers(String dataCenter, DataServerChangeItem dataServerChangeItem) { synchronized (DataServerCache.class) { Map<String, Map<String, DataNode>> dataserverMap = dataServerChangeItem.getServerMap(); if (dataserverMap.containsKey(dataCenter)) { return dataserverMap.get(dataCenter); } else { return new HashMap<>(); } } }
此數據結構實際只用來獲取local data center的data servers。
/** * change info of datacenters */ public class DataServerChangeItem { /** datacenter -> Map<ip, DataNode> */ private Map<String, Map<String, DataNode>> serverMap; /** datacenter -> version */ private Map<String, Long> versionMap; }
有些類會間接使用dataServerCache。
好比:DefaultMetaServiceImpl . dataServerCache 會被NotifyOnlineHandler調用。
public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> { @Autowired private DataServerCache dataServerCache; @Override public Object doHandle(Channel channel, NotifyOnlineRequest request) { long version = request.getVersion(); if (version >= dataServerCache.getCurVersion()) { dataServerCache.addNotWorkingServer(version, request.getIp()); } return CommonResponse.buildSuccessResponse(); } }
NotifyOnlineHandler其配置在
@Bean(name = "serverSyncHandlers") public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); //在這裏 list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); return list; }
屬於 dataSyncServer 響應函數的一部分。
private void openDataSyncServer() { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } }
具體以下:
+---------------------------------------------------+ | | | [DataServerNodeFactory] | | | | | | Map<dataCenter, Map<ip, dataServerNode> > | | | | Map<dataCenter, ConsistentHash<DataServerNode> > | | | +---------------------------------------------------+ +---------------------------------------------------+ | [DataServerCache] | | | | | | DataServerChangeItem dataServerChangeItem | | | | DataServerChangeItem newDataServerChangeItem | | | | Map<>ersion, Map<serverIp, serverStatus> > | | | +---------------------------------------------------+
從以前的MetaServer分析以及DataServerChangeItem,可知DataSever也是有版本的。
public class DataServerChangeItem { /** datacenter -> Map<ip, DataNode> */ private Map<String, Map<String, DataNode>> serverMap; /** datacenter -> version */ private Map<String, Long> versionMap; }
在DataServer中,DefaultMetaServiceImpl中有設置版本號。
if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); versionMap.put(result.getLocalDataCenter(), result.getVersion()); return new DataServerChangeItem(result.getNodes(), versionMap); }
其來源是MetaServer的DataStoreService。
dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> { if (localDataCenter.equalsIgnoreCase(dataCenter)) { nodeChangeResult.setVersion(dataNodeRepository.getVersion()); } versionMap.put(dataCenter, dataNodeRepository.getVersion()); Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap(); Map<String, DataNode> newMap = new ConcurrentHashMap<>(); dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal())); pushNodes.put(dataCenter, newMap); });
以及DataStoreService。
metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> { if (localDataCenter.equalsIgnoreCase(dataCenter)) { nodeChangeResult.setVersion(metaNodeRepository.getVersion()); } versionMap.put(dataCenter, metaNodeRepository.getVersion()); Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap(); Map<String, MetaNode> newMap = new ConcurrentHashMap<>(); dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal())); pushNodes.put(dataCenter, newMap); });
都是提取dataServer的版本號,發送出去。
服務信息包括 Subscriber 和 Publisher,這些信息須要深度存儲,本文僅以 Publisher 爲例分析。
前面描述了,Publisher包括在Datum之中,因此咱們下面的講解以Datum爲主。
Datum類定義以下,能夠看到裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是表明發佈者本身業務服務器。Datum則是從SOFARegistry總體角度作了整理,就是一個Session Server包括的服務聚合。
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
DatumCache緩存了全部Datum,就是本DataServer對應全部的SessionServer中全部的服務。
public class DatumCache { @Autowired private DatumStorage localDatumStorage; }
LocalDatumStorage負責Datum具體的存儲。
public class LocalDatumStorage implements DatumStorage { /** * row: dataCenter * column: dataInfoId * value: datum */ protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>(); /** * all datum index * * row: ip:port * column: registerId * value: publisher */ protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>(); @Autowired private DataServerConfig dataServerConfig; }
相關的Bean以下:
@Configuration public static class DataServerStorageConfiguration { @Bean @ConditionalOnMissingBean public DatumCache datumCache() { return new DatumCache(); } @Bean @ConditionalOnMissingBean public LocalDatumStorage localDatumStorage() { return new LocalDatumStorage(); } }
首先,咱們講講Session Server 內部如何獲取Datum
在 Session Server 內部,Datum存儲在 SessionCacheService 之中。
好比在 DataChangeFetchCloudTask 內部,能夠這樣獲取 Datum。
private Map<String, Datum> getDatumsCache() { Map<String, Datum> map = new HashMap<>(); NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META); Collection<String> dataCenters = nodeManager.getDataCenters(); if (dataCenters != null) { Collection<Key> keys = dataCenters.stream(). map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(fetchDataInfoId, dataCenter))). collect(Collectors.toList()); Map<Key, Value> values = null; values = sessionCacheService.getValues(keys); if (values != null) { values.forEach((key, value) -> { if (value != null && value.getPayload() != null) { map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload()); } }); } } return map; }
Session Server 會向 Data Server 發送 PublishDataRequest 請求。
在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> { @Autowired private ForwardService forwardService; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumLeaseManager datumLeaseManager; @Autowired private ThreadPoolExecutor publishProcessorExecutor; @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSuccessResponse(); } }
在 DataChangeEventCenter 的 onChange 函數中,會進行投放。
public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } }
在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裏對Datum進行存儲。
在 DataChangeHandler 之中,會提取ChangeData,而後進行Notify。
public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
具體以下:
+ Session Server | Data Server | | | | +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | + +-------------------+
至此,本文總結基本存儲結構以下:
物理機房DataCenter
DataCenter表明一個物理機房。一個數據中心包括多個DataNode,這些DataNode就是同機房數據節點。
Server節點DataNode
DataNode是Server節點,能夠表明任意類型的Server,不管是meta,data,session。
數據節點DataServerNode
這是 Data Server 概念。
服務Publisher
Publisher 是服務概念,
服務聚合Datum
Datum裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是表明發佈者本身業務服務器。Datum則是從SOFARegistry總體角度作了整理,就是一個Session Server包括的服務聚合。
DataNodeStatus表明自己Data Server的狀態。DataServerConfig包括了本DataServer所有配置。
由於不涉及到 Meta Server 內部架構,因此從 Data Server 角度看,只要存儲 Meta Server 對應的網絡 Connection 便可。
由於不涉及到 Session Server 內部架構,因此從 Data Server 角度看,只要存儲 Session Server 對應的網絡 Connection 便可。
由於涉及到與其餘 Data Server 的深度交互,因此須要對其餘 Data Server 的深度信息做存儲。
分爲兩個部分:DataServerNodeFactory 和 DataServerCache。
爲何要有DataServerNodeFactory和DataServerCache兩個相似的數據結構類型。
從註釋來看,是爲了把功能分離細化,DataServerNodeFactory專一鏈接管理,DataServerCache注重dataServer的變化與版本管理。
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 - SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析
螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制
螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析