SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第十四篇,介紹SOFARegistry服務上線和操做日誌。上文是從Session Server角度,本文從 Data Server 角度介紹。node
咱們首先回顧整體業務流程,這部分屬於數據分片。數組
回顧下「一次服務註冊過程」的服務數據在內部流轉過程。緩存
由於篇幅所限,上文討論的是前兩點,本文介紹第三,第四點。網絡
當服務上線時,會計算新增服務的 dataInfoId Hash 值,從而對該服務進行分片,最後尋找最近的一個節點,存儲到相應的節點上。session
DataServer 服務在啓動時添加了 publishDataProcessor 來處理相應的服務發佈者數據發佈請求,該 publishDataProcessor 就是 PublishDataHandler。當有新的服務發佈者上線,DataServer 的 PublishDataHandler 將會被觸發。數據結構
該 Handler 首先會判斷當前節點的狀態,如果非工做狀態則返回請求失敗。如果工做狀態,則觸發數據變化事件中心 DataChangeEventCenter 的 onChange 方法。架構
DataChangeEventQueue 中維護着一個 DataChangeEventQueue 隊列數組,數組中的每一個元素是一個事件隊列。當上文中的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步肯定出該服務註冊數據所在的隊列編號,進而把該變化的數據封裝成一個數據變化對象,傳入到隊列中。app
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的時候被一個新的線程調用,該方法會源源不斷地從隊列中獲取新增事件,而且進行分發。新增數據會由此添加進節點內,實現分片。
與此同時,DataChangeHandler 會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步。
這裏須要首先講解幾個相關數據結構。
Publisher是數據發佈者信息。
public class Publisher extends BaseInfo { private List<ServerDataBox> dataList; private PublishType publishType = PublishType.NORMAL; }
是從SOFARegistry自己出發而聚集的數據發佈者信息,裏面核心是 :
和
<租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 一般是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據作邏輯上的切分,使不一樣 group 和 instance 的服務數據在邏輯上徹底獨立。模型裏有 group 和 instanceId 字段,但這裏不額外列出來,讀者只要理解 dataInfoId 的含義便可;具體代碼以下:
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
DatumCache 是最新的Datum。
public class DatumCache { @Autowired private DatumStorage localDatumStorage; }
具體存儲是在LocalDatumStorage中完成。
public class LocalDatumStorage implements DatumStorage { /** * row: dataCenter * column: dataInfoId * value: datum */ protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>(); /** * all datum index * * row: ip:port * column: registerId * value: publisher */ protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>(); @Autowired private DataServerConfig dataServerConfig; }
Operator 是每一步Datum對應的操做。
public class Operator { private Long version; private Long sourceVersion; private Datum datum; private DataSourceTypeEnum sourceType; }
記錄了全部的Datum操做。其中:
public class Acceptor { private final String dataInfoId; private final String dataCenter; private int maxBufferSize; static final int DEFAULT_DURATION_SECS = 30; private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>(); private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>(); private final DatumCache datumCache; }
總結下這幾個數據結構的聯繫:
咱們先回顧下 Datum 的前因後果。
首先,咱們講講Session Server 內部如何獲取Datum
在 Session Server 內部,Datum存儲在 SessionCacheService 之中。
好比在 DataChangeFetchCloudTask 內部,能夠這樣獲取 Datum。
private Map<String, Datum> getDatumsCache() { Map<String, Datum> map = new HashMap<>(); NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META); Collection<String> dataCenters = nodeManager.getDataCenters(); if (dataCenters != null) { Collection<Key> keys = dataCenters.stream(). map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(fetchDataInfoId, dataCenter))). collect(Collectors.toList()); Map<Key, Value> values = null; values = sessionCacheService.getValues(keys); if (values != null) { values.forEach((key, value) -> { if (value != null && value.getPayload() != null) { map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload()); } }); } } return map; }
Session Server 會向 Data Server 發送 PublishDataRequest 請求。
在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> { @Autowired private ForwardService forwardService; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumLeaseManager datumLeaseManager; @Autowired private ThreadPoolExecutor publishProcessorExecutor; @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSuccessResponse(); } }
在 DataChangeEventCenter 的 onChange 函數中,會進行投放。
public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } }
在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裏對Datum進行存儲。
在 DataChangeHandler 之中,會提取ChangeData,而後進行Notify。
public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
具體以下:
+ Session Server | Data Server | | | | +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | + +-------------------+
因而咱們接着進行 DataChangeHandler 處理。即總述中提到的:DataChangeHandler 會把這個事件變動信息:
下面咱們從第一部分 :把這個事件變動信息變成Operator,放到AbstractAcceptorStore 出發,進行講解日誌操做。
即如圖所示:
+ Session Server | Data Server | | | + +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | | +-------+-----------+ | | | | | v | +-------+---------+ | | ChangeNotifier | | +-------+---------+ | | | | | v | +----------+------------+ | | AbstractAcceptorStore | | +-----------------------+ +
Acceptor的appendOperator誰來調用?在Notifier 裏面有,好比:
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
以及另外一個:
public class SnapshotBackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
AbstractAcceptorStore是日誌存儲,咱們下面詳細分析。
對於操做信息,提供了一個Bean來存儲。
@Bean public AcceptorStore localAcceptorStore() { return new LocalAcceptorStore(); }
做用是在 storeServiceMap 中存放各類 AcceptorStore,目前只有LocalAcceptorStore 這一個。
public class StoreServiceFactory implements ApplicationContextAware { private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>(); /** * get AcceptorStore by storeType * @param storeType * @return */ public static AcceptorStore getStoreService(String storeType) { return storeServiceMap.get(storeType); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class); map.forEach((key, value) -> storeServiceMap.put(value.getType(), value)); } }
AbstractAcceptorStore 是存儲的基本實現類,幾個基本成員是。
acceptors :是一個矩陣,按照dataCenter,dataInfoId維度來分類,存儲了此維度下的Acceptor;就是說,針對每個dataCenter,dataInfoId的組合,都有一個Acceptor,用來存儲這下面的Operator。
notifyAcceptorsCache :是一個矩陣,按照dataCente,dataInfoId維度來分類,緩存了此維度下須要進一步處理的Acceptor;
delayQueue :配合notifyAcceptorsCache使用,針對notifyAcceptorsCache的每個新acceptor,系統會添加一個消息進入queue,這個queue等延時到了,就會取出,而且從notifyAcceptorsCache取出對應的新acceptor進行相應處理;
按說應該是 cache 有東西,因此dequeue 時候就會取出來,可是若是這期間多放入了幾個進入 Cache,原有cache 的 value 只是被替換而已,等時間到了,也會取出來。
notifyAcceptorsCache 也是按照 data center 來控制的,只有按期 removeCache。
public abstract class AbstractAcceptorStore implements AcceptorStore { private static final int DEFAULT_MAX_BUFFER_SIZE = 30; @Autowired protected IMetaServerService metaServerService; @Autowired private Exchange boltExchange; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataServerConnectionFactory dataServerConnectionFactory; @Autowired private DatumCache datumCache; private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors = new ConcurrentHashMap<>(); private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap<>(); private DelayQueue<DelayItem<Acceptor>> delayQueue }
具體以下圖:
+-----------------------------+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> |[AbstractAcceptorStore] | | | | +-> dataCenter +---+ | | | | | acceptors +--------------->+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | | | | notifyAcceptorsCache | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | + | +-> dataCenter +-->+ +-----------------------------+ | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | +-> dataCenter +-->+ | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> +-------------------->+ | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> +-> dataCenter +---+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
手機如圖:
有一點須要說明,就是delayQueue 爲什麼要延遲隊列。這是因爲SOFA的「秒級服務上下線通知「特性形成的。
由於要實現此特性,因此涉及到了一個鏈接敏感性問題,即在 SOFARegistry 裏,全部 Client 都與 SessionServer 保持長鏈接,每條長鏈接都會有基於 bolt 的鏈接心跳,若是鏈接斷開,Client 會立刻從新建連,時刻保證 Client 與 SessionServer 之間有可靠的鏈接。
由於強烈的鏈接敏感性,因此致使若是隻是網絡問題致使鏈接斷開,實際的進程並無宕機,那麼 Client 會立刻重連 SessionServer 並從新註冊全部服務數據。這種大量的短暫的服務下線後又從新上線會給用戶帶來困擾和麻煩。
所以在 DataServer 內部實現了數據延遲合併的功能,就是這裏的DelayQueue。
addOperator的基本邏輯是:
在操做中,都是使用putIfAbsent,這樣短時間內如有多個一樣value插入,則不會替換原有的value,這樣 起到了歸併做用。
@Override public void addOperator(Operator operator) { Datum datum = operator.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter); if (acceptorMap == null) { Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>(); acceptorMap = acceptors.putIfAbsent(dataCenter, newMap); if (acceptorMap == null) { acceptorMap = newMap; } } Acceptor existAcceptor = acceptorMap.get(dataInfoId); if (existAcceptor == null) { Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId, dataCenter, datumCache); existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor); if (existAcceptor == null) { existAcceptor = newAcceptor; } } if (operator instanceof SnapshotOperator) { //snapshot: clear the queue, Make other data retrieve the latest memory data existAcceptor.clearBefore(); } else { existAcceptor.appendOperator(operator); } //put cache putCache(existAcceptor); } }
putCache的做用是:
這裏也使用putIfAbsent,這樣短時間內如有多個一樣value插入,則不會替換原有的value,這樣 起到了歸併做用。
private void putCache(Acceptor acceptor) { String dataCenter = acceptor.getDataCenter(); String dataInfoId = acceptor.getDataInfoId(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter); if (acceptorMap == null) { Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>(); acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap); if (acceptorMap == null) { acceptorMap = newMap; } } Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor); if (existAcceptor == null) { addQueue(acceptor); } } }
具體消費是在按期任務中完成。消費日誌的目的就是同步日誌操做給其餘 DataServer。
Scheduler類是按期任務,會啓動兩個線程池按期調用AcceptorStore的函數。
public class Scheduler { private final ScheduledExecutorService scheduler; public final ExecutorService versionCheckExecutor; private final ThreadPoolExecutor expireCheckExecutor; @Autowired private AcceptorStore localAcceptorStore; public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( "SyncDataScheduler-versionChangeCheck")); } public void startScheduler() { scheduler.schedule( new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()), 30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); } }
AbstractAcceptorStore中函數以下:
changeDataCheck 內部是一個while true,因此不須要再使用線程池。
changeDataCheck綁定在delayQueue上,若是有新消息,則會取出Acceptor,也從notifyAcceptorsCache取出Acceptor,調用notifyChange(acceptor);進行處理 。
@Override public void changeDataCheck() { while (true) { try { DelayItem<Acceptor> delayItem = delayQueue.take(); Acceptor acceptor = delayItem.getItem(); removeCache(acceptor); // compare and remove } catch (InterruptedException e) { break; } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } }
消費Cache用到的是removeCache。
private void removeCache(Acceptor acceptor) { String dataCenter = acceptor.getDataCenter(); String dataInfoId = acceptor.getDataInfoId(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter); if (acceptorMap != null) { boolean result = acceptorMap.remove(dataInfoId, acceptor); if (result) { //data change notify notifyChange(acceptor); } } } }
在removeCache中,也使用notifyChange進行了通知,邏輯以下:
private void notifyChange(Acceptor acceptor) { Long lastVersion = acceptor.getLastVersion(); NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion, getType()); List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId()); for (String targetDataIp : targetDataIps) { if (DataServerConfig.IP.equals(targetDataIp)) { continue; } Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort()); for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) { try { Connection connection = dataServerConnectionFactory.getConnection(targetDataIp); if (connection == null) { TimeUtil.randomDelay(1000); continue; } syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); break; } } } }
這部分的調用邏輯爲:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
具體以下圖:
+--------------------------+ | | +----------------------------------------------------------------------+ | versionCheckExecutor | | [AbstractAcceptorStore] | | | | | +--------+-----------------+ | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors | | changeDataCheck | | +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | | | | NotifyDataSyncRequest | v +------+-----------+ | Other DataServer | +------------------+
手機以下圖:
checkAcceptorsChangAndExpired做用是遍歷acceptors每一個acceptor,看看是否expired,進行處理。
@Override public void checkAcceptorsChangAndExpired() { acceptors.forEach((dataCenter, acceptorMap) -> { if (acceptorMap != null && !acceptorMap.isEmpty()) { acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0)); } }); }
此時,邏輯以下:
+--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | | | | NotifyDataSyncRequest | v +------+-----------+ | Other DataServer | +------------------+
手機以下:
這裏記錄了日誌,即記錄了全部的Datum操做。
操做日誌存儲採用Queue方式,獲取日誌時候經過當前版本號在堆棧內所在位置,把全部版本以後的操做日誌同步過來執行。
public class Acceptor { private final String dataInfoId; private final String dataCenter; private int maxBufferSize; static final int DEFAULT_DURATION_SECS = 30; private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>(); private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>(); private final DatumCache datumCache; }
關鍵變量是:
Operator 就是每一步操做對應的Datum。
public class Operator { private Long version; private Long sourceVersion; private Datum datum; private DataSourceTypeEnum sourceType; }
此函數做用是:添加一個操做日誌。
具體代碼以下:
/** * append operator to queue,if queue is full poll the first element and append. * Process will check version sequence,it must append with a consequent increase in version, * otherwise queue will be clean * * @param operator */ public void appendOperator(Operator operator) { write.lock(); try { if (isFull()) { logOperators.remove(logOperatorsOrder.poll()); } if (operator.getSourceVersion() == null) { operator.setSourceVersion(0L); } Long tailVersion = logOperatorsOrder.peekLast(); if (tailVersion != null) { //operation add not by solid sequence if (tailVersion.longValue() != operator.getSourceVersion().longValue()) { clearBefore(); } } Operator previousOperator = logOperators.put(operator.getVersion(), operator); if (previousOperator == null) { logOperatorsOrder.add(operator.getVersion()); } } finally { write.unlock(); } }
appendOperator誰來調用?在Notifier 裏面有,好比:
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
以及
public class SnapshotBackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
此方法做用是去除過時日誌。version是時間戳,因此能夠按期check,若是過時,就清除。
public void checkExpired(int durationSEC) { write.lock(); try { //check all expired Long peekVersion = logOperatorsOrder.peek(); if (peekVersion != null && isExpired(durationSEC, peekVersion)) { logOperators.remove(logOperatorsOrder.poll()); checkExpired(durationSEC); } } finally { write.unlock(); } }
此請求做用是通知接收端進行數據同步。
回憶下這部分的調用邏輯爲:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
接收端data server經過NotifyDataSyncHandler處理
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess { @Autowired private DataServerConfig dataServerConfig; @Autowired private GetSyncDataHandler getSyncDataHandler; @Autowired private DataChangeEventCenter dataChangeEventCenter; private Executor executor = ExecutorFactory .newFixedThreadPool( 10, NotifyDataSyncHandler.class .getSimpleName()); private ThreadPoolExecutor notifyExecutor; @Autowired private DataNodeStatus dataNodeStatus; @Autowired private DatumCache datumCache; }
doHandle方法用來繼續處理。
@Override public Object doHandle(Channel channel, NotifyDataSyncRequest request) { final Connection connection = ((BoltChannel) channel).getConnection(); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(new SyncDataRequestForWorking(connection, request)); return CommonResponse.buildSuccessResponse(); } executorRequest(connection, request); return CommonResponse.buildSuccessResponse(); }
由於接到了發起端DataServer的同步通知NotifyDataSyncRequest,因此接收端DataServer主動發起拉取,進行同步數據。即調用GetSyncDataHandler來發送SyncDataRequest
private void executorRequest(Connection connection, NotifyDataSyncRequest request) { executor.execute(() -> { fetchSyncData(connection, request); }); } protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); Datum datum = datumCache.get(dataCenter, dataInfoId); Long version = (datum == null) ? null : datum.getVersion(); Long requestVersion = request.getVersion(); if (version == null || requestVersion == 0L || version < requestVersion) { getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), dataChangeEventCenter)); } }
GetSyncDataHandler和SyncDataCallback配合。
即調用GetSyncDataHandler來發送SyncDataRequest,用SyncDataCallback接收同步結果。
├── remoting │ ├── dataserver │ │ ├── DataServerConnectionFactory.java │ │ ├── DataServerNodeFactory.java │ │ ├── GetSyncDataHandler.java │ │ ├── SyncDataCallback.java │ │ ├── handler │ │ └── task
GetSyncDataHandler 和 SyncDataCallback 這兩個輔助類的位置比較奇怪,大概由於是功能類,因此放在dataserver目錄下,我的認爲也許單獨設置一個目錄存放更好。
public class GetSyncDataHandler { @Autowired private DataNodeExchanger dataNodeExchanger; public void syncData(SyncDataCallback callback) { int tryCount = callback.getRetryCount(); if (tryCount > 0) { try { callback.setRetryCount(--tryCount); dataNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return callback.getRequest(); } @Override public URL getRequestUrl() { return new URL(callback.getConnection().getRemoteIP(), callback .getConnection().getRemotePort()); } @Override public CallbackHandler getCallBackHandler() { return new CallbackHandler() { @Override public void onCallback(Channel channel, Object message) { callback.onResponse(message); } @Override public void onException(Channel channel, Throwable exception) { callback.onException(exception); } @Override public Executor getExecutor() { return callback.getExecutor(); } }; } }); } } } }
這裏接收同步結果。
public class SyncDataCallback implements InvokeCallback { private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5, SyncDataCallback.class.getSimpleName()); private static final int RETRY_COUNT = 3; private Connection connection; private SyncDataRequest request; private GetSyncDataHandler getSyncDataHandler; private int retryCount; private DataChangeEventCenter dataChangeEventCenter; @Override public void onResponse(Object obj) { GenericResponse<SyncData> response = (GenericResponse) obj; if (!response.isSuccess()) { getSyncDataHandler.syncData(this); } else { SyncData syncData = response.getData(); Collection<Datum> datums = syncData.getDatums(); DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request .getDataSourceType()); if (syncData.getWholeDataTag()) { //handle all data, replace cache with these datum directly for (Datum datum : datums) { if (datum == null) { datum = new Datum(); datum.setDataInfoId(syncData.getDataInfoId()); datum.setDataCenter(syncData.getDataCenter()); } Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum); break; } } else { //handle incremental data one by one if (!CollectionUtils.isEmpty(datums)) { for (Datum datum : datums) { if (datum != null) { Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE, dataSourceTypeEnum, datum); } } } } } } }
此時邏輯以下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | NotifyDataSyncRequest| 1 ^ 2 | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest v | +-------+-----------------------------------+ |[Other DataServer] | | | | | | | | | + | | GetSyncDataHandler SyncDataCallback | | | | | | | | | +-------------------------------------------+
手機如圖:
SyncDataRequest發送回通知發送者。因此這裏是other DataServer 發送給 Sender DataServer。
public class SyncDataRequest implements Serializable { private String dataInfoId; private String dataCenter; private String dataSourceType; /** * be null when dataInfoId not exist in local datumCache */ private Long version; }
咱們回憶下,SyncDataRequest 從哪裏來?在 NotifyDataSyncHandler 的響應函數中,會產生 SyncDataRequest。這裏會根據請求的信息,從cache之中獲取infoId對應的version,而後發送請求。
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess { protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); Datum datum = datumCache.get(dataCenter, dataInfoId); Long version = (datum == null) ? null : datum.getVersion(); Long requestVersion = request.getVersion(); if (version == null || requestVersion == 0L || version < requestVersion) { getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), dataChangeEventCenter)); } } }
進而在AbstractAcceptorStore之中
private void notifyChange(Acceptor acceptor) { Long lastVersion = acceptor.getLastVersion(); //may be delete by expired if (lastVersion == null) { lastVersion = 0L; } NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion, getType()); syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); }
通知發起者使用 SyncDataHandler 來處理。
節點間數據同步 Handler,該 Handler 被觸發時,會經過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回全部大於當前請求數據版本號的全部數據,便於節點間進行數據同步。
public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> { @Autowired private SyncDataService syncDataService; @Override public Object doHandle(Channel channel, SyncDataRequest request) { SyncData syncData = syncDataService.getSyncDataChange(request); return new GenericResponse<SyncData>().fillSucceed(syncData); } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override public Class interest() { return SyncDataRequest.class; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
具體業務服務是SyncDataServiceImpl。會從acceptorStore獲取data,即getSyncDataChange方法。
public class SyncDataServiceImpl implements SyncDataService { @Override public void appendOperator(Operator operator) { AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType() .toString()); if (acceptorStore != null) { acceptorStore.addOperator(operator); } } @Override public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) { AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest .getDataSourceType()); if (acceptorStore != null) { return acceptorStore.getSyncData(syncDataRequest); } } }
關於appendOperator如何調用,前文有描述。
SyncDataServiceImpl會繼續調用到AbstractAcceptorStore。
根據dataCenter和dataInfoId獲取出Acceptor,而後返回其process後的數據。
@Override public SyncData getSyncData(SyncDataRequest syncDataRequest) { String dataCenter = syncDataRequest.getDataCenter(); String dataInfoId = syncDataRequest.getDataInfoId(); Long currentVersion = syncDataRequest.getVersion(); try { Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter); Acceptor existAcceptor = acceptorMap.get(dataInfoId); return existAcceptor.process(currentVersion); } }
而後是Acceptor的處理。
處理髮送數據的當前版本號,若是當前版本號存在於當前queue中,返回全部版本號大於當前版本號的Operator,不然全部Operator。
public SyncData process(Long currentVersion) { read.lock(); try { Collection<Operator> operators = acceptOperator(currentVersion); List<Datum> retList = new LinkedList<>(); SyncData syncData; boolean wholeDataTag = false; if (operators != null) { //first get all data if (operators.isEmpty()) { wholeDataTag = true; retList.add(datumCache.get(dataCenter, dataInfoId)); } else { for (Operator operator : operators) { retList.add(operator.getDatum()); } } syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList); } else { //no match get all data wholeDataTag = true; retList.add(datumCache.get(dataCenter, dataInfoId)); syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList); } return syncData; } finally { read.unlock(); } }
同步數據結構以下:
public class SyncData implements Serializable { private String dataInfoId; private String dataCenter; private Collection<Datum> datums; private boolean wholeDataTag; }
此時圖示以下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +------------------------------------------------+-----+---------------+ | ^ | NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | | | | syncDataHandler +------> | SyncDataServiceImpl+------+ | | +-----+-----------+ +--------------------+ | | ^ 2 | | | | 5 | | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest | v | | +-------+-----------------------------------+ | |[Other DataServer] | | | | | | | | | | | | + | | | GetSyncDataHandler SyncDataCallback | <---------------------------+ | | | | | | | | +-------------------------------------------+
手機以下:
回到接受者,遍歷接受到的全部Datum,逐一調用:
若是是所有datum,調用
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
不然調用
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)
具體以下:
public class SyncDataCallback implements InvokeCallback { private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5, SyncDataCallback.class.getSimpleName()); private static final int RETRY_COUNT = 3; private Connection connection; private SyncDataRequest request; private GetSyncDataHandler getSyncDataHandler; private int retryCount; private DataChangeEventCenter dataChangeEventCenter; @Override public void onResponse(Object obj) { GenericResponse<SyncData> response = (GenericResponse) obj; if (!response.isSuccess()) { getSyncDataHandler.syncData(this); } else { SyncData syncData = response.getData(); Collection<Datum> datums = syncData.getDatums(); DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request .getDataSourceType()); if (syncData.getWholeDataTag()) { //handle all data, replace cache with these datum directly for (Datum datum : datums) { if (datum == null) { datum = new Datum(); datum.setDataInfoId(syncData.getDataInfoId()); datum.setDataCenter(syncData.getDataCenter()); } Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum); break; } } else { //handle incremental data one by one if (!CollectionUtils.isEmpty(datums)) { for (Datum datum : datums) { if (datum != null) { Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE, dataSourceTypeEnum, datum); } } } } } } }
DataChangeEventCenter調用以下:
public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) { int idx = hash(datum.getDataInfoId()); DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum); dataChangeEventQueues[idx].onChange(event); }
DataChangeEventQueue調用handleDatum處理,這部分在其餘文章中已經講述。這裏只是貼出代碼。
@Override public void run() { if (changeData instanceof SnapshotData) { SnapshotData snapshotData = (SnapshotData) changeData; String dataInfoId = snapshotData.getDataInfoId(); Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap(); Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap(); Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId); long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l; Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap); long version = datum != null ? datum.getVersion() : 0l; notify(datum, changeData.getSourceType(), null); } else { Datum datum = changeData.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); DataSourceTypeEnum sourceType = changeData.getSourceType(); DataChangeTypeEnum changeType = changeData.getChangeType(); if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) { //update version for pub or unPub merge to cache //if the version product before merge to cache,it may be cause small version override big one datum.updateVersion(); } long version = datum.getVersion(); try { if (sourceType == DataSourceTypeEnum.CLEAN) { if (datumCache.cleanDatum(dataCenter, dataInfoId)) { } } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) { notifyTempPub(datum, sourceType, changeType); } else { MergeResult mergeResult = datumCache.putDatum(changeType, datum); Long lastVersion = mergeResult.getLastVersion(); if (lastVersion != null && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) { return; } //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } } } } } }
DataChangeHandler 會按期提取DataChangeEventCenter中的消息,而後進行處理。
ChangeNotifier存儲了Datum。由於此時版本號已經更新,因此不會再次通知,至此流程結束。
MergeResult mergeResult = datumCache.putDatum(changeType, datum); //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } }
此時邏輯以下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+ | changeDataCheck | | checkAcceptorsChangAndExpired +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache | removeCache / notifyChange | | + +------------------------------------------------+-----+---------------+ | ^ | NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | | | | syncDataHandler +------> | SyncDataServiceImpl+------+ | | +-----+-----------+ +--------------------+ | | ^ 2 | | | | 5 | | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest | [Other DataServer] | | | | | | | | | | | +---------------------------------------+ | | | | | | v | v +------+-----------++ +-----------+-------+ 6 +-----------------------+ 7 +--------------------+ 8 +-----------------+ | GetSyncDataHandler| | SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler| +-------------------+ +-------------------+ +-----------------------+ +--------------------+ +-----------------+
手機上以下:
回顧下「一次服務註冊過程」的服務數據在內部流轉過程。
由於篇幅所限,上文討論的是前兩點,本文介紹第三,第四點。若是之後有時間,會介紹最後兩點。
Eureka系列(六) TimedSupervisorTask類解析
Eureka的TimedSupervisorTask類(自動調節間隔的週期性任務)
java線程池ThreadPoolExecutor類使用詳解
Java線程池ThreadPoolExecutor實現原理剖析
深刻理解Java線程池:ThreadPoolExecutor
深刻理解Java線程池:ThreadPoolExecutor
Java中線程池ThreadPoolExecutor原理探究
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 - SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析
螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制
螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析