SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第十三篇,介紹從SessionServer角度看的服務上線。node
本文以介紹業務爲主,順便整理邏輯,設計和模式。由於註冊過程牽扯模塊太多,因此本文僅僅專一在註冊過程當中Session Server的部分。算法
服務的上下線過程是指服務經過代碼調用執行常規註冊(Publisher#register) 和下線(Publisher#unregister)操做,不考慮由於服務宕機等意外狀況致使的下線場景。設計模式
一個典型的 「RPC 調用的服務尋址」 應用場景,服務的提供方經過以下兩個步驟完成服務發佈:數組
與此相對應的,服務的調用方經過以下步驟實現服務調用:session
在SOFARegistry中,全部 Client 在註冊和訂閱數據時,根據 dataInfoId 作一致性 Hash,計算出應該訪問哪一臺 DataServer,而後與該 DataServer 創建長鏈接。數據結構
因爲每一個 Client 一般都會註冊和訂閱比較多的 dataInfoId 數據,所以咱們能夠預見每一個 Client 均會與好幾臺 DataServer 創建鏈接。這個架構存在的問題是:「每臺 DataServer 承載的鏈接數會隨 Client 數量的增加而增加,每臺 Client 極端的狀況下須要與每臺 DataServer 都建連,所以經過 DataServer 的擴容並不能線性的分攤 Client 鏈接數」。架構
因此,爲數據分片層(DataServer)專門設計一個鏈接代理層是很是重要的,因此 SOFARegistry 就有了 SessionServer 這一層。隨着 Client 數量的增加,能夠經過擴容 SessionServer 就解決了單機的鏈接數瓶頸問題。併發
由於SessionServer是一箇中間層,因此看起來好像比較簡單,表面上看,就是接受,轉發。
可是實際上,在大型系統中,應該如何在邏輯上,物理上實現模塊分割,解耦都是很是有必要的。
咱們主要看看阿里方案的註冊部分。
服務的上下線過程,是指服務經過代碼調用作正常的註冊(publisher.register) 和 下線(publisher.unregister),不考慮由於服務宕機等意外狀況致使的下線。如上圖,大概呈現了「一次服務註冊過程」的服務數據在內部流轉過程。
下圖展現了 Publisher 註冊的代碼流轉過程
這個過程也是採用了 Handler - Task & Strategy - Listener 的方式來處理,任務在代碼內部的處理流程和訂閱過程基本一致。
PublisherRegistration 是Client的接口,發佈數據的關鍵代碼以下:
// 構造發佈者註冊表 PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT"); registration.setGroup("TEST_GROUP"); registration.setAppName("TEST_APP"); // 將註冊表註冊進客戶端併發布數據 Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy"); // 如需覆蓋上次發佈的數據可使用發佈者模型從新發布數據 publisher.republish("10.10.1.1:12200?xx=zz");
發佈數據的關鍵是構造 PublisherRegistration,該類包含三個屬性:
屬性名 | 屬性類型 | 描述 |
---|---|---|
dataId | String | 數據ID,發佈訂閱時須要使用相同值,數據惟一標識由 dataId + group + instanceId 組成。 |
group | String | 數據分組,發佈訂閱時須要使用相同值,數據惟一標識由 dataId + group + instanceId 組成,默認值 DEFAULT_GROUP。 |
appName | String | 應用 appName。 |
流程來到了Session server。
首先,能夠經過Beans來入手。
@Bean(name = "serverHandlers") public Collection<AbstractServerHandler> serverHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(publisherHandler()); list.add(subscriberHandler()); list.add(watcherHandler()); list.add(clientNodeConnectionHandler()); list.add(cancelAddressRequestHandler()); list.add(syncConfigHandler()); return list; }
serverHandlers 是Bolt Server 的響應函數組合。
@Bean @ConditionalOnMissingBean(name = "sessionRegistry") public Registry sessionRegistry() { return new SessionRegistry(); }
從Bean角度看,目前的邏輯是如圖所示,這裏有了一次解耦Strategy:
Beans +-----------------------------------+ | Bolt Server(in openSessionServer) | +---------------------------------+ | | +-> | DefaultPublisherHandlerStrategy | | +----------------------+ | | +---------+-----------------------+ | | serverHandlers | | | | | | | | | | | | +------------------+ | | | | | | | PublisherHandle+----------------+ v | | | | | | +-------+-------+ | | | watcherHandler | | | |SessionRegistry| | | | | | | +---------------+ | | | ...... | | | | | +------------------+ | | | +----------------------+ | +-----------------------------------+
服務發佈者和Session Server通常都應該處於一個Data Center之中,這就是阿里等實踐的單體概念.
PublisherHandler 是 Session Server對Client的接口,是Bolt Server 的響應函數。
public class PublisherHandler extends AbstractServerHandler { @Autowired private ExecutorManager executorManager; @Autowired private PublisherHandlerStrategy publisherHandlerStrategy; @Override public Object reply(Channel channel, Object message) throws RemotingException { RegisterResponse result = new RegisterResponse(); PublisherRegister publisherRegister = (PublisherRegister) message; publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result); return result; }
邏輯以下圖所示:
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | | | | | +--------+ PublisherRegister | | +------------------+ | | | Client +---------------------------> PublisherHandler | | | +--------+ 1 | | | | | | + | | | ...... | | | | | | +------------------+ | | | | +----------------------+ | | +-----------------------------------+ |
總體上,這裏是採用 Handler - Task & Strategy - Listener 的方式來處理。
什麼是策略模式(Strategy Pattern)
在軟件開發過程當中經常遇到這樣的狀況,實現某一個功能有不少種算法或實現策略,咱們能夠根據環境或者條件的不一樣選擇不一樣的算法或者策略來完成該功能。若是將這些算法或者策略抽象出來,提供一個統一的接口,不一樣的算法或者策略有不一樣的實現類,這樣在程序客戶端就能夠經過注入不一樣的實現對象來實現算法或者策略的動態替換,這種模式的可擴展性和可維護性也更高,這就是策略模式。
策略模式的定義(Strategy Pattern)
策略模式: 定義了算法族,分別封裝起來,讓它們之間能夠相互替換,此模式讓算法的變化獨立與使用算法的客戶。
簡單理解: 定義了一系列算法。每一個算法封裝起來。各個算法之間能夠互相替換。且算法的變化不會影響到使用算法的客戶。屬於行爲型模式。
在策略模式(Strategy Pattern)中,一個類的行爲或其算法能夠在運行時更改。這種類型的設計模式屬於行爲型模式。
在策略模式中,咱們建立表示各類策略的對象和一個行爲隨着策略對象改變而改變的 context 對象。策略對象改變 context 對象的執行算法。
從目錄結構看,有不少Strategy的定義和實現,應該螞蟻內部但願根據不一樣狀況制定不一樣的策略,其中有些是目前留出的接口。
com/alipay/sofa/registry/server/session/strategy . ├── DataChangeRequestHandlerStrategy.java ├── PublisherHandlerStrategy.java ├── ReceivedConfigDataPushTaskStrategy.java ├── ReceivedDataMultiPushTaskStrategy.java ├── SessionRegistryStrategy.java ├── SubscriberHandlerStrategy.java ├── SubscriberMultiFetchTaskStrategy.java ├── SubscriberRegisterFetchTaskStrategy.java ├── SyncConfigHandlerStrategy.java ├── TaskMergeProcessorStrategy.java ├── WatcherHandlerStrategy.java └── impl ├── DefaultDataChangeRequestHandlerStrategy.java ├── DefaultPublisherHandlerStrategy.java ├── DefaultPushTaskMergeProcessor.java ├── DefaultReceivedConfigDataPushTaskStrategy.java ├── DefaultReceivedDataMultiPushTaskStrategy.java ├── DefaultSessionRegistryStrategy.java ├── DefaultSubscriberHandlerStrategy.java ├── DefaultSubscriberMultiFetchTaskStrategy.java ├── DefaultSubscriberRegisterFetchTaskStrategy.java ├── DefaultSyncConfigHandlerStrategy.java └── DefaultWatcherHandlerStrategy.java
從目前代碼看,只是設置,分類,轉發。即設置Publisher的缺省信息,而且根據 event type 不一樣執行register或者unRegister。
public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy { @Autowired private Registry sessionRegistry; @Override public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) { try { String ip = channel.getRemoteAddress().getAddress().getHostAddress(); int port = channel.getRemoteAddress().getPort(); publisherRegister.setIp(ip); publisherRegister.setPort(port); if (StringUtils.isBlank(publisherRegister.getZone())) { publisherRegister.setZone(ValueConstants.DEFAULT_ZONE); } if (StringUtils.isBlank(publisherRegister.getInstanceId())) { publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID); } Publisher publisher = PublisherConverter.convert(publisherRegister); publisher.setProcessId(ip + ":" + port); publisher.setSourceAddress(new URL(channel.getRemoteAddress())); if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) { sessionRegistry.register(publisher); } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) { sessionRegistry.unRegister(publisher); } registerResponse.setSuccess(true); registerResponse.setVersion(publisher.getVersion()); registerResponse.setRegistId(publisherRegister.getRegistId()); registerResponse.setMessage("Publisher register success!"); } } }
邏輯以下圖所示
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | | | + | | | watcherHandler | | | +-------------------------------+ | | | | | | | | | | | ...... | | | | | | +------------------+ | | | | +----------------------+ | +-----------------------------------+
手機如圖
前面代碼中,策略會調用到 sessionRegistry.register(publisher),即註冊功能。
從SessionRegistry的內部成員變量就可以看出來,這是 Session Server 核心邏輯所在。
主要提供了以下功能:
register(StoreData
cancel(List
remove(List
unRegister(StoreData
.....
具體成員變量以下:
public class SessionRegistry implements Registry { /** * store subscribers */ @Autowired private Interests sessionInterests; /** * store watchers */ @Autowired private Watchers sessionWatchers; /** * store publishers */ @Autowired private DataStore sessionDataStore; /** * transfer data to DataNode */ @Autowired private DataNodeService dataNodeService; /** * trigger task com.alipay.sofa.registry.server.meta.listener process */ @Autowired private TaskListenerManager taskListenerManager; /** * calculate data node url */ @Autowired private NodeManager dataNodeManager; @Autowired private SessionServerConfig sessionServerConfig; @Autowired private Exchange boltExchange; @Autowired private SessionRegistryStrategy sessionRegistryStrategy; @Autowired private WrapperInterceptorManager wrapperInterceptorManager; @Autowired private DataIdMatchStrategy dataIdMatchStrategy; @Autowired private RenewService renewService; @Autowired private WriteDataAcceptor writeDataAcceptor; private volatile boolean enableDataRenewSnapshot = true; }
register函數生成一個WriteDataRequest,而後調用了 writeDataAcceptor.accept 完成處理。
@Override public void register(StoreData storeData) { WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation( new Wrapper<StoreData, Boolean>() { @Override public Boolean call() { switch (storeData.getDataType()) { case PUBLISHER: Publisher publisher = (Publisher) storeData; sessionDataStore.add(publisher); // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot) // are handed over to WriteDataAcceptor writeDataAcceptor.accept(new WriteDataRequest() { @Override public Object getRequestBody() { return publisher; } @Override public WriteDataRequestType getRequestType() { return WriteDataRequestType.PUBLISHER; } @Override public String getConnectId() { return publisher.getSourceAddress().getAddressString(); } @Override public String getDataServerIP() { Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId()); return dataNode.getNodeUrl().getIpAddress(); } }); sessionRegistryStrategy.afterPublisherRegister(publisher); break; case SUBSCRIBER: Subscriber subscriber = (Subscriber) storeData; sessionInterests.add(subscriber); sessionRegistryStrategy.afterSubscriberRegister(subscriber); break; case WATCHER: Watcher watcher = (Watcher) storeData; sessionWatchers.add(watcher); sessionRegistryStrategy.afterWatcherRegister(watcher); break; default: break; } return null; } @Override public Supplier<StoreData> getParameterSupplier() { return () -> storeData; } }, wrapperInterceptorManager); try { wrapperInvocation.proceed(); } catch (Exception e) { throw new RuntimeException("Proceed register error!", e); } }
目前邏輯以下圖所示:
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | 2 | | + | | | watcherHandler | | | +------------+------------------+ | | | | | | | | | | | | ...... | | | | | | | +------------------+ | | 3 | register | | +----------------------+ | | +-----------------------------------+ | v +-------------------+-------------------+ | SessionRegistry | | | | | | storeData.getDataType() == PUBLISHER | +---------------------------------------+
手機以下:
這裏又出現一個策略,目前也只有一個實現,應該也是想要將來作成替換,目前功能只是簡單的留下了接口爲空。
咱們能夠看出阿里到處想解耦的思路。
public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy { @Override public void afterPublisherRegister(Publisher publisher) { } }
前文在註冊過程當中有:
sessionDataStore.add(publisher);
這裏就是Session的 數據存儲模塊,也是系統的核心。
public class SessionDataStore implements DataStore { /** * publisher store */ private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry = new ConcurrentHashMap<>(); /*** index */ private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>> connectIndex = new ConcurrentHashMap<>(); }
這裏記錄了兩種存儲方式,分別是按照 dataInfoId 和 connectId 來存儲。
存儲時候,會從版本號和時間戳兩個維度來比較。
@Override public void add(Publisher publisher) { Publisher.internPublisher(publisher); write.lock(); try { Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId()); if (publishers == null) { ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>(); publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap); if (publishers == null) { publishers = newmap; } } Publisher existingPublisher = publishers.get(publisher.getRegisterId()); if (existingPublisher != null) { if (existingPublisher.getVersion() != null) { long oldVersion = existingPublisher.getVersion(); Long newVersion = publisher.getVersion(); if (newVersion == null) { return; } else if (oldVersion > newVersion) { return; } else if (oldVersion == newVersion) { Long newTime = publisher.getRegisterTimestamp(); long oldTime = existingPublisher.getRegisterTimestamp(); if (newTime == null) { return; } if (oldTime > newTime) { return; } } } } publishers.put(publisher.getRegisterId(), publisher); addToConnectIndex(publisher); } finally { write.unlock(); } }
在SessionServer自己存儲完成以後,接下來就是通知Data Server了。
WriteDataAcceptorImpl 負責處理具體Publisher的寫入。首先須要把寫入請求統一塊兒來。
使用 private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap()
; 來統一存儲全部的寫入請求。
這裏根據不一樣的Connection來處理不一樣鏈接的寫入請求。
具體以下:
public class WriteDataAcceptorImpl implements WriteDataAcceptor { @Autowired private TaskListenerManager taskListenerManager; @Autowired private SessionServerConfig sessionServerConfig; @Autowired private RenewService renewService; /** * acceptor for all write data request * key:connectId * value:writeRequest processor * */ private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap(); public void accept(WriteDataRequest request) { String connectId = request.getConnectId(); WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId, key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService)); writeDataProcessor.process(request); } public void remove(String connectId) { writeDataProcessors.remove(connectId); } }
目前邏輯以下圖所示
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | 2 | | + | | | watcherHandler | | | +------------+------------------+ | | | | | | | | | | | | ...... | | | | | | | +------------------+ | | register | 3 | | +----------------------+ | | | +-----------------------------------+ | | v | +-----------------------------------------------------+ +-------------+-------------------------+ | | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry | | | | <------------------+ | | | | | | | | Map<String, WriteDataProcessor> writeDataProcessors | | storeData.getDataType() == PUBLISHER | | | | +---------------------------------------+ + +-----------------------------------------------------+
手機如圖
前面已經把全部請求統一塊兒來,如今就須要針對每個鏈接的寫入繼續處理。
這裏關鍵是以下數據結構,就是每個鏈接的寫入請求 放到了queue中。
ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue
針對每一個請求不一樣,作不一樣處理。
對於咱們的例子,處理以下:
case PUBLISHER: { doPublishAsync(request); }
而最終是向taskListenerManager發送給請求TaskType.PUBLISH_DATA_TASK,該請求將被PublishDataTaskListener調用publishDataTask來處理。
這裏有一個listener解耦,咱們接下來說解。
private void doPublishAsync(WriteDataRequest request) { sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK); } private void sendEvent(Object eventObj, TaskType taskType) { TaskEvent taskEvent = new TaskEvent(eventObj, taskType); taskListenerManager.sendTaskEvent(taskEvent); }
具體代碼以下:
public class WriteDataProcessor { private final TaskListenerManager taskListenerManager; private final SessionServerConfig sessionServerConfig; private final RenewService renewService; private final String connectId; private Map<String, AtomicLong> lastUpdateTimestampMap = new ConcurrentHashMap<>(); private AtomicBoolean writeDataLock = new AtomicBoolean( false); private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue = new ConcurrentLinkedQueue(); private AtomicInteger acceptorQueueSize = new AtomicInteger(0); public void process(WriteDataRequest request) { // record the last update time by pub/unpub if (isWriteRequest(request)) { refreshUpdateTime(request.getDataServerIP()); } if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) { // snapshot has high priority, so handle directly doHandle(request); } else { // If locked, insert the queue; // otherwise, try emptying the queue (to avoid residue) before processing the request. if (writeDataLock.get()) { addQueue(request); } else { flushQueue(); doHandle(request); } } } private void doHandle(WriteDataRequest request) { switch (request.getRequestType()) { case PUBLISHER: { doPublishAsync(request); } break; case UN_PUBLISHER: { doUnPublishAsync(request); } break; case CLIENT_OFF: { doClientOffAsync(request); } break; case RENEW_DATUM: { if (renewAndSnapshotInSilence(request.getDataServerIP())) { return; } doRenewAsync(request); } break; case DATUM_SNAPSHOT: { if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) { return; } halt(); try { doSnapshotAsync(request); } finally { resume(); } } break; } private void doPublishAsync(WriteDataRequest request) { sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK); } private void sendEvent(Object eventObj, TaskType taskType) { TaskEvent taskEvent = new TaskEvent(eventObj, taskType); taskListenerManager.sendTaskEvent(taskEvent); } }
以下圖所示
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | 2 | | + | | | watcherHandler | | | +------------+------------------+ | | | | | | | | | | | | ...... | | | | | | | +------------------+ | | register | 3 | | +----------------------+ | | | +-----------------------------------+ | | v | +---------------------------------------------------------+ +---------+-----------------------------+ | | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry | | | | <------------------+ | | | | 4 | sessionDataStore.add(publisher) | | | Map<connectId , WriteDataProcessor> writeDataProcessors | | | | | | | storeData.getDataType() == PUBLISHER | | +----------------------+----------------------------------+ | | | process | 5 +---------------------------------------+ | v | +-------------------+---------------------+ +--------------------------+ | | WriteDataProcessor | | PublishDataTaskListener | | | | PUBLISH_DATA_TASK | | | | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> | PublishDataTask | | | | 6 +--------------------------+ + +-----------------------------------------+
手機如圖 :
前面在邏輯上都是一體化的,在這裏,進行了一次解耦。
DefaultTaskListenerManager 是解耦的機制,能夠看到,其中添加了listener,當用戶調用sendTaskEvent時候,將遍歷全部的listeners,調用對應的listener。
public class DefaultTaskListenerManager implements TaskListenerManager { private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create(); @Override public Multimap<TaskType, TaskListener> getTaskListeners() { return taskListeners; } @Override public void addTaskListener(TaskListener taskListener) { taskListeners.put(taskListener.support(), taskListener); } @Override public void sendTaskEvent(TaskEvent taskEvent) { Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType()); for (TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); } } }
PublishDataTaskListener是對應的處理函數,在其support函數中,聲明瞭支持PUBLISH_DATA_TASK。這樣就完成了解耦。
public class PublishDataTaskListener implements TaskListener { @Autowired private DataNodeService dataNodeService; @Autowired private TaskProcessor dataNodeSingleTaskProcessor; @Autowired private ExecutorManager executorManager; @Override public TaskType support() { return TaskType.PUBLISH_DATA_TASK; } @Override public void handleEvent(TaskEvent event) { SessionTask publishDataTask = new PublishDataTask(dataNodeService); publishDataTask.setTaskEvent(event); executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask)); } }
上面找到了Listener,Listener中經過以下代碼啓動了執行業務的task來處理。可是這背後的機制須要探究。
executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
ExecutorManager 之中,對於線程池作了統一的啓動,關閉。publishDataExecutor就是其中之一。
ExecutorManager相關代碼摘取以下:
public class ExecutorManager { private final ScheduledThreadPoolExecutor scheduler; private final ThreadPoolExecutor publishDataExecutor; private static final String PUBLISH_DATA_EXECUTOR = "PublishDataExecutor"; public ExecutorManager(SessionServerConfig sessionServerConfig) { publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR, k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR, sessionServerConfig.getPublishDataExecutorMinPoolSize(), sessionServerConfig.getPublishDataExecutorMaxPoolSize(), sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()), new NamedThreadFactory("PublishData-executor", true))); } public ThreadPoolExecutor getPublishDataExecutor() { return publishDataExecutor; } }
其中ExecutorManager的bean以下:
@Bean public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) { return new ExecutorManager(sessionServerConfig); }
Processor是任務定義,內部封裝了task。
public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> { @Override public ProcessingResult process(SessionTask task) { try { task.execute(); return ProcessingResult.Success; } catch (Throwable throwable) { if (task instanceof Retryable) { Retryable retryAbleTask = (Retryable) task; if (retryAbleTask.checkRetryTimes()) { return ProcessingResult.TransientError; } } return ProcessingResult.PermanentError; } } @Override public ProcessingResult process(List<SessionTask> tasks) { return null; } }
PublishDataTask的execute 之中 ,調用dataNodeService.register(publisher)進行註冊。
public class PublishDataTask extends AbstractSessionTask { private final DataNodeService dataNodeService; private Publisher publisher; public PublishDataTask(DataNodeService dataNodeService) { this.dataNodeService = dataNodeService; } @Override public void execute() { dataNodeService.register(publisher); } @Override public void setTaskEvent(TaskEvent taskEvent) { //taskId create from event if (taskEvent.getTaskId() != null) { setTaskId(taskEvent.getTaskId()); } Object obj = taskEvent.getEventObj(); if (obj instanceof Publisher) { this.publisher = (Publisher) obj; } } }
具體以下
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | | | Multimap<TaskType, TaskListener> taskListeners | | | +----------------------+--------------------------+ | | PUBLISH_DATA_TASK | | v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | | v +--------+--------+ | PublishDataTask | +-----------------+
通過listener解耦以後,PublishDataTask就調用了dataNodeService.register(publisher)
,因而接下來就是轉發服務信息給Data Server。
此處就是調用DataNodeServiceImpl的register函數來把請求轉發給Data Server。
public class DataNodeServiceImpl implements DataNodeService { @Autowired private NodeExchanger dataNodeExchanger; @Autowired private NodeManager dataNodeManager; @Autowired private SessionServerConfig sessionServerConfig; private AsyncHashedWheelTimer asyncHashedWheelTimer; }
能夠看到,創建了PublishDataRequest,而後經過Bolt Client,發送給Data Server。
@Override public void register(final Publisher publisher) { String bizName = "PublishData"; Request<PublishDataRequest> request = buildPublishDataRequest(publisher); try { sendRequest(bizName, request); } catch (RequestException e) { doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(), sessionServerConfig.getPublishDataTaskRetryFirstDelay(), sessionServerConfig.getPublishDataTaskRetryIncrementDelay()); } } private CommonResponse sendRequest(String bizName, Request request) throws RequestException { Response response = dataNodeExchanger.request(request); Object result = response.getResult(); CommonResponse commonResponse = (CommonResponse) result; return commonResponse; }
以下:
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | | | Multimap<TaskType, TaskListener> taskListeners | | | +----------------------+--------------------------+ | PUBLISH_DATA_TASK | v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | v +--------+--------+ | PublishDataTask | +--------+--------+ register | | +----------v----------+ | DataNodeServiceImpl | +----------+----------+ PublishDataRequest | v +----------+----------+ Client.sendSync +------------+ | DataNodeExchanger +------------------> | Data Server| +---------------------+ PublishDataRequest +------------+
如何知道發給哪個Data Sever?DataNodeExchanger 中有:
@Override public Response request(Request request) throws RequestException { Response response; URL url = request.getRequestUrl(); try { Client sessionClient = getClient(url); final Object result = sessionClient .sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut()); response = () -> result; } return response; }
因而去DataNodeServiceImpl尋找
private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) { return new Request<PublishDataRequest>() { private AtomicInteger retryTimes = new AtomicInteger(); @Override public PublishDataRequest getRequestBody() { PublishDataRequest publishDataRequest = new PublishDataRequest(); publishDataRequest.setPublisher(publisher); publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator .getSessionProcessId()); return publishDataRequest; } @Override public URL getRequestUrl() { return getUrl(publisher.getDataInfoId()); } @Override public AtomicInteger getRetryTimes() { return retryTimes; } }; } private URL getUrl(String dataInfoId) { Node dataNode = dataNodeManager.getNode(dataInfoId); //meta push data node has not port String dataIp = dataNode.getNodeUrl().getIpAddress(); return new URL(dataIp, sessionServerConfig.getDataServerPort()); }
在 DataNodeManager中有:
@Override public DataNode getNode(String dataInfoId) { DataNode dataNode = consistentHash.getNodeFor(dataInfoId); return dataNode; }
可見是經過dataInfoId計算出hash,而後 從DataNodeManager之中獲取對應的DataNode,獲得其url。
因而,上圖拓展爲:
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | Multimap<TaskType, TaskListener> taskListeners | | | +----------------------+--------------------------+ | PUBLISH_DATA_TASK | 1 v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | 2 v +--------+--------+ 4 +---------------+ | PublishDataTask | +------> |DataNodeManager| +--------+--------+ | +---------------+ register | 3 | consistentHash| | | | 5 +----------v----------+---+ v | DataNodeServiceImpl | 6 +-----+----+ +----------+----------+ <------------+ DataNode | PublishDataRequest | 7 url +----------+ v +----------+----------+ | DataNodeExchanger | +----------+----------+ | Client.sendSync | PublishDataRequest | v 8 +-----+------+ | Data Server| +------------+
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 - SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析
螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制
螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析