SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第十七篇,介紹SOFARegistry的延遲操做。數組
爲何要有AfterWorkingProcess?緩存
AfterWorkingProcess 的做用是延遲操做。猜想大體是由於某些狀況下,沒法執行業務,只能在後續時機進行彌補。session
在官方博客有相似論述也支持咱們的判斷 :架構
在數據未同步完成以前,全部對新節點的讀數據操做,將轉發到擁有該數據分片的數據節點。框架
在數據未同步完成以前,禁止對新節點的寫數據操做,防止在數據同步過程當中出現新的數據不一致狀況。異步
能夠看到相似這種業務上延遲操做應該如何實現。async
接口定義以下:ide
public interface AfterWorkingProcess { void afterWorkingProcess(); int getOrder(); }
這個 afterWorkProcessors 會做爲 AfterWorkingProcessHandler 的成員變量進行處理。用於處理一些業務邏輯結束後的處理動做。
@Bean(name = "afterWorkProcessors") public List<AfterWorkingProcess> afterWorkingProcessors() { List<AfterWorkingProcess> list = new ArrayList<>(); list.add(renewDatumHandler()); list.add(datumLeaseManager()); list.add(disconnectEventHandler()); list.add(notifyDataSyncHandler()); return list; } @Bean public AfterWorkingProcessHandler afterWorkingProcessHandler() { return new AfterWorkingProcessHandler(); }
這裏用法比較少見。AfterWorkingProcessHandler 也是 AfterWorkingProcess 的實現類。
在其 afterWorkingProcess 函數中,會對 Bean afterWorkingProcessors 中間註冊的實現類一一調用其 afterWorkingProcess 業務函數。
其中,getOrder 會指定執行優先級,這是一個常見套路。
public class AfterWorkingProcessHandler implements AfterWorkingProcess { @Resource(name = "afterWorkProcessors") private List<AfterWorkingProcess> afterWorkingProcessors; @Override public void afterWorkingProcess() { if(afterWorkingProcessors != null){ List<AfterWorkingProcess> list = afterWorkingProcessors.stream().sorted(Comparator.comparing(AfterWorkingProcess::getOrder)).collect(Collectors.toList()); list.forEach(AfterWorkingProcess::afterWorkingProcess); } } @Override public int getOrder() { return 0; } }
只有在 DataServerCache # updateDataServerStatus 函數中有調用:
afterWorkingProcessHandler.afterWorkingProcess();
而在 DataServerCache 中有以下函數都會調用到 updateDataServerStatus:
圖示以下:
+------------------------------------------+ | DataServerCache | +----------------------------------------------+ | | | AfterWorkingProcess | | synced +----------------------+ | | | | | | +----------------------------+ | +------------------------------------------+ | | | | | AfterWorkingProcessHandler | | |renewDatumHandler.afterWorkingProcess | | | | | | | | | | | | v | | | | |datumLeaseManager.afterWorkingProcess | | | notifiedAll +--->updateDataServerStatus +------> afterWorkingProcess +------>+ | | | ^ ^ | | | | |disconnectEventHandler.afterWorkingProcess| | | | | | +----------------------------+ | | | | | | | | | |notifyDataSyncHandler.afterWorkingProcess | | | checkAndUpdateStatus+-----------+ | | | +------------------------------------------+ | | | | +----------------------------------------------+ | addNotWorkingServer +---------------+ | | | +------------------------------------------+
手機以下:
由於是業務關聯,因此不須要什麼定時,異步之類。
public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess { /** * a DelayQueue that contains client disconnect events */ private final DelayQueue<DisconnectEvent> EVENT_QUEUE = new DelayQueue<>(); @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataNodeStatus dataNodeStatus; private static final int BLOCK_FOR_ALL_SYNC = 5000; private static final BlockingQueue<DisconnectEvent> noWorkQueue = new LinkedBlockingQueue<>(); }
在receive的正常業務操做中,若是發現自己狀態不是 WORKING,則把event放入 BlockingQueue 之中。
public void receive(DisconnectEvent event) { if (event.getType() == DisconnectTypeEnum.SESSION_SERVER) { SessionServerDisconnectEvent sessionServerDisconnectEvent = (SessionServerDisconnectEvent) event; sessionServerDisconnectEvent.getProcessId()); } else if (event.getType() == DisconnectTypeEnum.CLIENT) { ClientDisconnectEvent clientDisconnectEvent = (ClientDisconnectEvent) event; } if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(event); return; } EVENT_QUEUE.add(event); }
當時機來到時候,系統再次調用afterWorkingProcess。這裏會始終Block在noWorkQueue上,若是不爲空,則會執行請求。
public void afterWorkingProcess() { try { /* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */ TimeUnit.MILLISECONDS.sleep(BLOCK_FOR_ALL_SYNC); while (!noWorkQueue.isEmpty()) { DisconnectEvent event = noWorkQueue.poll(1, TimeUnit.SECONDS); if (event != null) { receive(event); } } } }
圖示以下:
+----------------------------------------------------------+ | DisconnectEventHandler | | +-------------------------+ | | | receive | | | | | NOT WORKING | | | dataNodeStatus.getStatus+---------------+ | | | + | | | | | | WORKING | | add | | | | | | | | | v | | | | | EVENT_QUEUE.add(event) | | | | | | +---v---------+ | | +-------------------------+ | | | | | noWorkQueue | | | | | | | +-----------------------+ +-----+-------+ | | | afterWorkingProcess | | | | | | | poll | | | | NOT isEmpty | | | | receive(event) <----------------------+ | | | | | | | | | | +-----------------------+ | +----------------------------------------------------------+
DisconnectEventHandler 和 NotifyDataSyncHandler 的實現相似。
依託一個 LinkedBlockingQueue 作緩存queue。
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess { private static final BlockingQueue<SyncDataRequestForWorking> noWorkQueue = new LinkedBlockingQueue<>(); }
在doHandle的正常業務操做中,若是發現自己狀態不是 WORKING,則用業務邏輯SyncDataRequestForWorking 構建一個消息 SyncDataRequestForWorking,放入 LinkedBlockingQueue 之中。
@Override public Object doHandle(Channel channel, NotifyDataSyncRequest request) { final Connection connection = ((BoltChannel) channel).getConnection(); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(new SyncDataRequestForWorking(connection, request)); return CommonResponse.buildSuccessResponse(); } executorRequest(connection, request); return CommonResponse.buildSuccessResponse(); }
當時機來到時候,系統再次調用afterWorkingProcess。這裏會始終Block在noWorkQueue上,若是不爲空,則會執行請求。
@Override public void afterWorkingProcess() { while (!noWorkQueue.isEmpty()) { SyncDataRequestForWorking event = noWorkQueue.poll(1, TimeUnit.SECONDS); if (event != null) { executorRequest(event.getConnection(), event.getRequest()); } } } }
圖示以下:
+----------------------------------------------------------+ | NotifyDataSyncHandler | | +-------------------------+ | | | doHandle | | | | | NOT WORKING | | | dataNodeStatus.getStatus+---------------+ | | | + | | | | | | WORKING | | add | | | | | | | | | v | | | | | executorRequest | | | | | | +---v---------+ | | +-------------------------+ | | | | | noWorkQueue | | | | | | | +-----------------------+ +-----+-------+ | | | afterWorkingProcess | | | | | | | poll | | | | NOT isEmpty | | | | executorRequest <----------------------+ | | | | | | | | | | +-----------------------+ | +----------------------------------------------------------+
RenewDatumHandler 同 DatumLeaseManager 這二者很相似。並無使用queue,只是提交一個線程。
其實現目的在註釋中寫的很清楚:
/* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */
可是細節又有所不一樣,這兩個類是同一個做者,懷疑此君在實驗比較兩種不一樣實現方式。
RenewDatumHandler 基於 ThreadPoolExecutorDataServer 來實現。
public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements AfterWorkingProcess { @Autowired private ThreadPoolExecutor renewDatumProcessorExecutor; }
renewDatumProcessorExecutor 是一個Bean,具體代碼以下,ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,按FIFO原則進行排序。
@Bean(name = "renewDatumProcessorExecutor") public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServerConfig) { return new ThreadPoolExecutorDataServer("RenewDatumProcessorExecutor", dataServerConfig.getRenewDatumExecutorMinPoolSize(), dataServerConfig.getRenewDatumExecutorMaxPoolSize(), 300, TimeUnit.SECONDS, new ArrayBlockingQueue<>(dataServerConfig.getRenewDatumExecutorQueueSize()), new NamedThreadFactory("DataServer-RenewDatumProcessor-executor", true)); }
ThreadPoolExecutorDataServer 主要代碼以下,就是簡單繼承了ThreadPoolExecutor,估計這裏後續會有新功能添加,如今只是佔坑:
public class ThreadPoolExecutorDataServer extends ThreadPoolExecutor { @Override public void execute(Runnable command) { super.execute(command); } }
對於afterWorkingProcess,就是提交了一個線程,其業務是:等待一段時間,而後設置renewEnabled。
@Override public void afterWorkingProcess() { renewDatumProcessorExecutor.submit(() -> { TimeUnit.MILLISECONDS.sleep(dataServerConfig.getRenewEnableDelaySec()); renewEnabled.set(true); }); }
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 - SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析
螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制
螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析