SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第十五篇,介紹續約和剔除。網絡
續約和剔除是服務註冊與發現的重要功能,好比:session
有些時候,咱們的服務實例並不必定會正常下線,可能因爲內存溢出,網絡故障等緣由使服務不能正常工做,而服務註冊中心未收到」服務下線「的請求。數據結構
爲了從服務列表中將這些沒法提供服務的實例剔除。Server在啓動的時候會建立一個定時任務,默認每隔一段時間(默認60s)將當前清單中,超時(默認爲90s)沒有續約的服務剔除出去。架構
在註冊完服務以後,服務提供者會維護一個心跳用來持續告訴 Server: "我還活着"。以防止 Server 的」剔除任務「將該服務實例從服務列表中排除出去。咱們稱該操做爲服務續約(Renew)。框架
在 Data Server 端,DatumLeaseManager 實現了 「失效剔除」 和 「服務續約 「功能。less
DatumLeaseManager 的主要變量以下:異步
connectIdRenewTimestampMap 裏面會維護每一個服務最近一次發送心跳的時間,Eureka 裏面也有相似的數據結構;ide
locksForConnectId :爲了每次只有一個線程操做;lock for connectId: every connectId allows only one task to be created;
具體定義以下:
public class DatumLeaseManager implements AfterWorkingProcess { /** record the latest heartbeat time for each connectId, format: connectId -> lastRenewTimestamp */ private final Map<String, Long> connectIdRenewTimestampMap = new ConcurrentHashMap<>(); /** lock for connectId , format: connectId -> true */ private ConcurrentHashMap<String, Boolean> locksForConnectId = new ConcurrentHashMap(); private volatile boolean serverWorking = false; private volatile boolean renewEnable = true; private AsyncHashedWheelTimer datumAsyncHashedWheelTimer; @Autowired private DataServerConfig dataServerConfig; @Autowired private DisconnectEventHandler disconnectEventHandler; @Autowired private DatumCache datumCache; @Autowired private DataNodeStatus dataNodeStatus; private ScheduledThreadPoolExecutor executorForHeartbeatLess; private ScheduledFuture<?> futureForHeartbeatLess; }
在DatumLeaseManager之中,主要是有以下數據結構對續約起做用。
private ConcurrentHashMap<String, Boolean> locksForConnectId = new ConcurrentHashMap(); private AsyncHashedWheelTimer datumAsyncHashedWheelTimer;
在以下模塊會調用到 review,這些都是 AbstractServerHandler。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> public class DatumSnapshotHandler extends AbstractServerHandler<DatumSnapshotRequest> public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements AfterWorkingProcess public class UnPublishDataHandler extends AbstractServerHandler<UnPublishDataRequest>
DatumLeaseManager 這裏會記錄最新的時間戳,而後啓動scheduleEvictTask。
public void renew(String connectId) { // record the renew timestamp connectIdRenewTimestampMap.put(connectId, System.currentTimeMillis()); // try to trigger evict task scheduleEvictTask(connectId, 0); }
具體以下:
具體代碼以下:
/** * trigger evict task: if connectId expired, create ClientDisconnectEvent to cleanup datums bind to the connectId * PS: every connectId allows only one task to be created */ private void scheduleEvictTask(String connectId, long delaySec) { delaySec = (delaySec <= 0) ? dataServerConfig.getDatumTimeToLiveSec() : delaySec; // lock for connectId: every connectId allows only one task to be created Boolean ifAbsent = locksForConnectId.putIfAbsent(connectId, true); if (ifAbsent != null) { return; } datumAsyncHashedWheelTimer.newTimeout(_timeout -> { boolean continued = true; long nextDelaySec = 0; try { // release lock locksForConnectId.remove(connectId); // get lastRenewTime of this connectId Long lastRenewTime = connectIdRenewTimestampMap.get(connectId); if (lastRenewTime == null) { // connectId is already clientOff return; } /* * 1. lastRenewTime expires, then: * - build ClientOffEvent and hand it to DataChangeEventCenter. * - It will not be scheduled next time, so terminated. * 2. lastRenewTime not expires, then: * - trigger the next schedule */ boolean isExpired = System.currentTimeMillis() - lastRenewTime > dataServerConfig.getDatumTimeToLiveSec() * 1000L; if (!isRenewEnable()) { nextDelaySec = dataServerConfig.getDatumTimeToLiveSec(); } else if (isExpired) { int ownPubSize = getOwnPubSize(connectId); if (ownPubSize > 0) { evict(connectId); } connectIdRenewTimestampMap.remove(connectId, lastRenewTime); continued = false; } else { nextDelaySec = dataServerConfig.getDatumTimeToLiveSec() - (System.currentTimeMillis() - lastRenewTime) / 1000L; nextDelaySec = nextDelaySec <= 0 ? 1 : nextDelaySec; } } if (continued) { scheduleEvictTask(connectId, nextDelaySec); } }, delaySec, TimeUnit.SECONDS); }
具體以下圖所示
+------------------+ +-------------------------------------------+ |PublishDataHandler| | DatumLeaseManager | +--------+---------+ | | | | newTimeout | | | +----------------------> | doHandle | ^ + | | | | | | | renew | +-----------+--------------+ | | | +--------------> | | AsyncHashedWheelTimer | | | | | +-----+-----+--------------+ | | | | | ^ | | | | | | scheduleEvictTask | | | | evict | + v | | | | <----------------------+ | | +-------------------------------------------+ | | | | | | | | v v
或者以下圖所示:
+------------------+ +-------------------+ +------------------------+ |PublishDataHandler| | DatumLeaseManager | | AsyncHashedWheelTimer | +--------+---------+ +--------+----------+ +-----------+------------+ | | new | doHandle +------------------------> | | renew | | +-------------------> | | | | | | | | | scheduleEvictTask | | | | | | newTimeout | | +----------> +------------------------> | | | | | | | | | | | | | | | | No + | | | <---------------+ if (ownPubSize > 0) | | | + | | v | | +--+ scheduleEvictTask | Yes | + v | | evict | | | v v v
在DatumLeaseManager之中,主要是有以下數據結構對續約起做用。
private ScheduledThreadPoolExecutor executorForHeartbeatLess; private ScheduledFuture<?> futureForHeartbeatLess;
有兩個調用途徑,這樣在數據變化時,就會看看是否能夠驅逐:
LocalDataServerChangeEventHandler 類中,調用了datumLeaseManager.reset(),隨之調用了 evict。
@Override public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) { isChanged.set(true); // Better change to Listener pattern localDataServerCleanHandler.reset(); datumLeaseManager.reset(); events.offer(localDataServerChangeEvent); }
DatumLeaseManager的reset調用了scheduleEvictTaskForHeartbeatLess啓動了驅逐線程。
public synchronized void reset() { if (futureForHeartbeatLess != null) { futureForHeartbeatLess.cancel(false); } scheduleEvictTaskForHeartbeatLess(); }
啓動時候,會啓動驅逐線程。
@PostConstruct public void init() { ...... executorForHeartbeatLess = new ScheduledThreadPoolExecutor(1, threadFactoryBuilder .setNameFormat("Registry-DatumLeaseManager-ExecutorForHeartbeatLess").build()); scheduleEvictTaskForHeartbeatLess(); }
具體驅逐是經過啓動了一個定時線程 EvictTaskForHeartbeatLess 來完成。
private void scheduleEvictTaskForHeartbeatLess() { futureForHeartbeatLess = executorForHeartbeatLess.scheduleWithFixedDelay( new EvictTaskForHeartbeatLess(), dataServerConfig.getDatumTimeToLiveSec(), dataServerConfig.getDatumTimeToLiveSec(), TimeUnit.SECONDS); }
當時間端到達以後,會從datumCache獲取目前全部connectionId,而後遍歷connectionID,看看上次時間戳是否到期,若是到期就驅逐。
/** * evict own connectIds with heartbeat less */ private class EvictTaskForHeartbeatLess implements Runnable { @Override public void run() { // If in a non-working state, cannot clean up because the renew request cannot be received at this time. if (!isRenewEnable()) { return; } Set<String> allConnectIds = datumCache.getAllConnectIds(); for (String connectId : allConnectIds) { Long timestamp = connectIdRenewTimestampMap.get(connectId); // no heartbeat if (timestamp == null) { int ownPubSize = getOwnPubSize(connectId); if (ownPubSize > 0) { evict(connectId); } } } } }
這裏調用
private void evict(String connectId) { disconnectEventHandler.receive(new ClientDisconnectEvent(connectId, System .currentTimeMillis(), 0)); }
具體以下圖:
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | | for (allConnectIds) | | | | v | | | | | | | | connectIdRenewTimestampMap | | | | | | | | | | | | | | no heartbeat | | | | v | | | | | | | | evict | | | | | | | +----------------------------------------------+ | +--------------------------------------------------+
驅逐消息須要轉發出來,就對應到 DisconnectEventHandler . receive 這裏,就是 EVENT_QUEUE.add(event);
public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess { /** * a DelayQueue that contains client disconnect events */ private final DelayQueue<DisconnectEvent> EVENT_QUEUE = new DelayQueue<>(); @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataNodeStatus dataNodeStatus; private static final BlockingQueue<DisconnectEvent> noWorkQueue = new LinkedBlockingQueue<>(); public void receive(DisconnectEvent event) { if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(event); return; } EVENT_QUEUE.add(event); } }
在 afterPropertiesSet 中會啓動一個 Thread,循環從 EVENT_QUEUE 之中取出消息,而後處理,具體就是:
具體以下:
@Override public void afterPropertiesSet() { Executor executor = ExecutorFactory .newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName()); executor.execute(() -> { while (true) { try { DisconnectEvent disconnectEvent = EVENT_QUEUE.take(); if (disconnectEvent.getType() == DisconnectTypeEnum.SESSION_SERVER) { SessionServerDisconnectEvent event = (SessionServerDisconnectEvent) disconnectEvent; String processId = event.getProcessId(); //check processId confirm remove,and not be registered again when delay time String sessionServerHost = event.getSessionServerHost(); if (sessionServerConnectionFactory .removeProcessIfMatch(processId,sessionServerHost)) { Set<String> connectIds = sessionServerConnectionFactory .removeConnectIds(processId); if (connectIds != null && !connectIds.isEmpty()) { for (String connectId : connectIds) { unPub(connectId, event.getRegisterTimestamp()); } } } } else { ClientDisconnectEvent event = (ClientDisconnectEvent) disconnectEvent; unPub(event.getConnectId(), event.getRegisterTimestamp()); } } } }); } /** * * @param connectId * @param registerTimestamp */ private void unPub(String connectId, long registerTimestamp) { dataChangeEventCenter.onChange(new ClientChangeEvent(connectId, dataServerConfig .getLocalDataCenter(), registerTimestamp)); }
以下圖所示
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | | for (allConnectIds) | | +------------------------+ | | v | | | | | | | | | DisconnectEventHandler | | | connectIdRenewTimestampMap | | | | | | | | | +-------------+ | | | | | | | | noWorkQueue | | | | | no heartbeat | | | +-------------+ | | | v | | receive | | | | | | | +--------------+ | | | evict +---------------------------------> | EVENT_QUEUE | | | | | | | +--------------+ | | +----------------------------------------------+ | +------------------------+ +--------------------------------------------------+
邏輯而後來到了 DataChangeEventCenter,這裏也是起到轉發做用。
public class DataChangeEventCenter { /** * queues of DataChangeEvent */ private DataChangeEventQueue[] dataChangeEventQueues; /** * receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue * * @param publisher * @param dataCenter */ public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } } }
具體業務是 DataChangeEventQueue 完成的,就是調用 addTempChangeData 與 handleDatum 處理對應數據,就是處理這些須要驅逐的數據。
當event被取出以後,會根據 DataChangeScopeEnum.DATUM 的不一樣,會作不一樣的處理。
具體參見前文 從源碼學設計]螞蟻金服SOFARegistry之消息總線異步處理
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | | for (allConnectIds) | | +------------------------+ | | v | | | | | | | | | DisconnectEventHandler | | | connectIdRenewTimestampMap | | | | | | | | | +-------------+ | | | | | | | | noWorkQueue | | | | | no heartbeat | | | +-------------+ | | | v | | receive | | | | | | | +--------------+ | | | evict +---------------------------------> | EVENT_QUEUE | | | | | | | +--------------+ | | +----------------------------------------------+ | +------------------------+ +--------------------------------------------------+ | | +----------------------+ | onChange | DataChangeEventQueue | v | | +--------+------------------+ | | | DataChangeEventCenter | | +------------+ | | | | | eventQueue | | add DataChangeEvent | | | +------------+ | | +-----------------------+ | | | <-----------------------------+ | | dataChangeEventQueues | | | addTempChangeData | | +-----------------------+ | | | +---------------------------+ | handleDatum | | | +----------------------+
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 - SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析
螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制
螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析