[從源碼學設計]螞蟻金服SOFARegistry之服務上線

[從源碼學設計]螞蟻金服SOFARegistry之服務上線

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java

本文爲第十三篇,介紹從SessionServer角度看的服務上線。node

本文以介紹業務爲主,順便整理邏輯,設計和模式。由於註冊過程牽扯模塊太多,因此本文僅僅專一在註冊過程當中Session Server的部分。算法

0x01 業務領域

1.1 應用場景

服務的上下線過程是指服務經過代碼調用執行常規註冊(Publisher#register) 和下線(Publisher#unregister)操做,不考慮由於服務宕機等意外狀況致使的下線場景。設計模式

1.1.1 服務發佈

一個典型的 「RPC 調用的服務尋址」 應用場景,服務的提供方經過以下兩個步驟完成服務發佈:數組

  1. 註冊,將本身以 Publisher 的角色註冊到 SOFARegistry;
  2. 發佈,將須要發佈的數據 (一般是IP 地址、端口、調用方式等) 發佈到 SOFARegistry;

與此相對應的,服務的調用方經過以下步驟實現服務調用:session

  1. 註冊,將本身以 Subscriber 的角色註冊到 SOFARegistry;
  2. 訂閱,收到 SOFARegistry 推送的服務數據;

1.1.2 SessionServer的必要性

在SOFARegistry中,全部 Client 在註冊和訂閱數據時,根據 dataInfoId 作一致性 Hash,計算出應該訪問哪一臺 DataServer,而後與該 DataServer 創建長鏈接。數據結構

因爲每一個 Client 一般都會註冊和訂閱比較多的 dataInfoId 數據,所以咱們能夠預見每一個 Client 均會與好幾臺 DataServer 創建鏈接。這個架構存在的問題是:「每臺 DataServer 承載的鏈接數會隨 Client 數量的增加而增加,每臺 Client 極端的狀況下須要與每臺 DataServer 都建連,所以經過 DataServer 的擴容並不能線性的分攤 Client 鏈接數」。架構

因此,爲數據分片層(DataServer)專門設計一個鏈接代理層是很是重要的,因此 SOFARegistry 就有了 SessionServer 這一層。隨着 Client 數量的增加,能夠經過擴容 SessionServer 就解決了單機的鏈接數瓶頸問題。併發

1.2 問題點

由於SessionServer是一箇中間層,因此看起來好像比較簡單,表面上看,就是接受,轉發。

可是實際上,在大型系統中,應該如何在邏輯上,物理上實現模塊分割,解耦都是很是有必要的。

1.3 阿里方案

咱們主要看看阿里方案的註冊部分。

1.3.1 註冊過程

一次服務的上線(註冊)過程

服務的上下線過程,是指服務經過代碼調用作正常的註冊(publisher.register) 和 下線(publisher.unregister),不考慮由於服務宕機等意外狀況致使的下線。如上圖,大概呈現了「一次服務註冊過程」的服務數據在內部流轉過程。

  1. Client 調用 publisher.register 向 SessionServer 註冊服務。
  2. SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入內存 (SessionServer 會存儲 Client 的數據到內存,用於後續能夠跟 DataServer 作按期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
  3. DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入內存 ,DataServer 會以 dataInfoId 的維度彙總全部 PublisherRegister。同時,DataServer 將該 dataInfoId 的變動事件通知給全部 SessionServer,變動事件的內容是 dataInfoId 和版本號信息 version。
  4. 同時,異步地,DataServer 以 dataInfoId 維度增量地同步數據給其餘副本。由於 DataServer 在一致性 Hash 分片的基礎上,對每一個分片保存了多個副本(默認是3個副本)。
  5. SessionServer 接收到變動事件通知後,對比 SessionServer 內存中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了全部該 dataInfoId 具體的 PublisherRegister 列表。
  6. 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊以後的最新的服務列表數據。

1.3.2 圖示

下圖展現了 Publisher 註冊的代碼流轉過程

這個過程也是採用了 Handler - Task & Strategy - Listener 的方式來處理,任務在代碼內部的處理流程和訂閱過程基本一致。

圖 - 代碼流轉:Publisher 註冊

0x02 Client SDK

PublisherRegistration 是Client的接口,發佈數據的關鍵代碼以下:

// 構造發佈者註冊表
PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");

// 將註冊表註冊進客戶端併發布數據
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy");

// 如需覆蓋上次發佈的數據可使用發佈者模型從新發布數據
publisher.republish("10.10.1.1:12200?xx=zz");

發佈數據的關鍵是構造 PublisherRegistration,該類包含三個屬性:

屬性名 屬性類型 描述
dataId String 數據ID,發佈訂閱時須要使用相同值,數據惟一標識由 dataId + group + instanceId 組成。
group String 數據分組,發佈訂閱時須要使用相同值,數據惟一標識由 dataId + group + instanceId 組成,默認值 DEFAULT_GROUP。
appName String 應用 appName。

0x03 Session server

流程來到了Session server。

3.1 Bean

首先,能夠經過Beans來入手。

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(publisherHandler());
    list.add(subscriberHandler());
    list.add(watcherHandler());
    list.add(clientNodeConnectionHandler());
    list.add(cancelAddressRequestHandler());
    list.add(syncConfigHandler());
    return list;
}

serverHandlers 是Bolt Server 的響應函數組合。

@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry() {
    return new SessionRegistry();
}

從Bean角度看,目前的邏輯是如圖所示,這裏有了一次解耦Strategy:

Beans


+-----------------------------------+
| Bolt Server(in openSessionServer) |        +---------------------------------+
|                                   |    +-> | DefaultPublisherHandlerStrategy |
|    +----------------------+       |    |   +---------+-----------------------+
|    |    serverHandlers    |       |    |             |
|    |                      |       |    |             |
|    | +------------------+ |       |    |             |
|    | | PublisherHandle+----------------+             v
|    | |                  | |       |          +-------+-------+
|    | | watcherHandler   | |       |          |SessionRegistry|
|    | |                  | |       |          +---------------+
|    | |     ......       | |       |
|    | +------------------+ |       |
|    +----------------------+       |
+-----------------------------------+

服務發佈者和Session Server通常都應該處於一個Data Center之中,這就是阿里等實踐的單體概念.

3.2 入口

PublisherHandler 是 Session Server對Client的接口,是Bolt Server 的響應函數。

public class PublisherHandler extends AbstractServerHandler {
    @Autowired
    private ExecutorManager          executorManager;

    @Autowired
    private PublisherHandlerStrategy publisherHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) throws RemotingException {

        RegisterResponse result = new RegisterResponse();
        PublisherRegister publisherRegister = (PublisherRegister) message;
        publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
        return result;
    }

邏輯以下圖所示:

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |
                              |    |                      |       |
+--------+  PublisherRegister |    | +------------------+ |       |
| Client +---------------------------> PublisherHandler | |       |
+--------+          1         |    | |                  | |       |
               +              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
               |              +-----------------------------------+
               |

3.3 策略

總體上,這裏是採用 Handler - Task & Strategy - Listener 的方式來處理。

什麼是策略模式(Strategy Pattern)

在軟件開發過程當中經常遇到這樣的狀況,實現某一個功能有不少種算法或實現策略,咱們能夠根據環境或者條件的不一樣選擇不一樣的算法或者策略來完成該功能。若是將這些算法或者策略抽象出來,提供一個統一的接口,不一樣的算法或者策略有不一樣的實現類,這樣在程序客戶端就能夠經過注入不一樣的實現對象來實現算法或者策略的動態替換,這種模式的可擴展性和可維護性也更高,這就是策略模式。

策略模式的定義(Strategy Pattern)

  • 策略模式: 定義了算法族,分別封裝起來,讓它們之間能夠相互替換,此模式讓算法的變化獨立與使用算法的客戶。

  • 簡單理解: 定義了一系列算法。每一個算法封裝起來。各個算法之間能夠互相替換。且算法的變化不會影響到使用算法的客戶。屬於行爲型模式。

在策略模式(Strategy Pattern)中,一個類的行爲或其算法能夠在運行時更改。這種類型的設計模式屬於行爲型模式

在策略模式中,咱們建立表示各類策略的對象和一個行爲隨着策略對象改變而改變的 context 對象。策略對象改變 context 對象的執行算法。

3.3.1 目錄結構

從目錄結構看,有不少Strategy的定義和實現,應該螞蟻內部但願根據不一樣狀況制定不一樣的策略,其中有些是目前留出的接口

com/alipay/sofa/registry/server/session/strategy

.
├── DataChangeRequestHandlerStrategy.java
├── PublisherHandlerStrategy.java
├── ReceivedConfigDataPushTaskStrategy.java
├── ReceivedDataMultiPushTaskStrategy.java
├── SessionRegistryStrategy.java
├── SubscriberHandlerStrategy.java
├── SubscriberMultiFetchTaskStrategy.java
├── SubscriberRegisterFetchTaskStrategy.java
├── SyncConfigHandlerStrategy.java
├── TaskMergeProcessorStrategy.java
├── WatcherHandlerStrategy.java
└── impl
    ├── DefaultDataChangeRequestHandlerStrategy.java
    ├── DefaultPublisherHandlerStrategy.java
    ├── DefaultPushTaskMergeProcessor.java
    ├── DefaultReceivedConfigDataPushTaskStrategy.java
    ├── DefaultReceivedDataMultiPushTaskStrategy.java
    ├── DefaultSessionRegistryStrategy.java
    ├── DefaultSubscriberHandlerStrategy.java
    ├── DefaultSubscriberMultiFetchTaskStrategy.java
    ├── DefaultSubscriberRegisterFetchTaskStrategy.java
    ├── DefaultSyncConfigHandlerStrategy.java
    └── DefaultWatcherHandlerStrategy.java

3.3.2 DefaultPublisherHandlerStrategy

從目前代碼看,只是設置,分類,轉發。即設置Publisher的缺省信息,而且根據 event type 不一樣執行register或者unRegister。

public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
    @Autowired
    private Registry            sessionRegistry;

    @Override
    public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
        try {
            String ip = channel.getRemoteAddress().getAddress().getHostAddress();
            int port = channel.getRemoteAddress().getPort();
            publisherRegister.setIp(ip);
            publisherRegister.setPort(port);

            if (StringUtils.isBlank(publisherRegister.getZone())) {
                publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
            }

            if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
                publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
            }

            Publisher publisher = PublisherConverter.convert(publisherRegister);
            publisher.setProcessId(ip + ":" + port);
            publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
            if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.register(publisher);
            } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.unRegister(publisher);
            }
            registerResponse.setSuccess(true);
            registerResponse.setVersion(publisher.getVersion());
            registerResponse.setRegistId(publisherRegister.getRegistId());
            registerResponse.setMessage("Publisher register success!");
        } 
    }
}

邏輯以下圖所示

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |                          |                               |
               +              |    | |  watcherHandler  | |       |                          +-------------------------------+
               |              |    | |                  | |       |
               |              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
                              +-----------------------------------+

手機如圖

3.4 核心邏輯組件

前面代碼中,策略會調用到 sessionRegistry.register(publisher),即註冊功能。

從SessionRegistry的內部成員變量就可以看出來,這是 Session Server 核心邏輯所在。

主要提供了以下功能:

  • register(StoreData data) :註冊新publisher或者subscriber data

  • cancel(List connectIds) :取消publisher或者subscriber data

  • remove(List connectIds) :移除publisher或者subscriber data

  • unRegister(StoreData data) :註銷publisher或者subscriber data

  • .....

具體成員變量以下:

public class SessionRegistry implements Registry {

    /**
     * store subscribers
     */
    @Autowired
    private Interests                 sessionInterests;

    /**
     * store watchers
     */
    @Autowired
    private Watchers                  sessionWatchers;

    /**
     * store publishers
     */
    @Autowired
    private DataStore                 sessionDataStore;

    /**
     * transfer data to DataNode
     */
    @Autowired
    private DataNodeService           dataNodeService;

    /**
     * trigger task com.alipay.sofa.registry.server.meta.listener process
     */
    @Autowired
    private TaskListenerManager       taskListenerManager;

    /**
     * calculate data node url
     */
    @Autowired
    private NodeManager               dataNodeManager;

    @Autowired
    private SessionServerConfig       sessionServerConfig;

    @Autowired
    private Exchange                  boltExchange;

    @Autowired
    private SessionRegistryStrategy   sessionRegistryStrategy;

    @Autowired
    private WrapperInterceptorManager wrapperInterceptorManager;

    @Autowired
    private DataIdMatchStrategy       dataIdMatchStrategy;

    @Autowired
    private RenewService              renewService;

    @Autowired
    private WriteDataAcceptor         writeDataAcceptor;

    private volatile boolean          enableDataRenewSnapshot = true;
}

register函數生成一個WriteDataRequest,而後調用了 writeDataAcceptor.accept 完成處理。

@Override
public void register(StoreData storeData) {

    WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
            new Wrapper<StoreData, Boolean>() {
                @Override
                public Boolean call() {

                    switch (storeData.getDataType()) {
                        case PUBLISHER:
                            Publisher publisher = (Publisher) storeData;

                            sessionDataStore.add(publisher);

                            // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
                            // are handed over to WriteDataAcceptor
                            writeDataAcceptor.accept(new WriteDataRequest() {
                                @Override
                                public Object getRequestBody() {
                                    return publisher;
                                }

                                @Override
                                public WriteDataRequestType getRequestType() {
                                    return WriteDataRequestType.PUBLISHER;
                                }

                                @Override
                                public String getConnectId() {
                                    return publisher.getSourceAddress().getAddressString();
                                }

                                @Override
                                public String getDataServerIP() {
                                    Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
                                    return dataNode.getNodeUrl().getIpAddress();
                                }
                            });

                            sessionRegistryStrategy.afterPublisherRegister(publisher);
                            break;
                        case SUBSCRIBER:
                            Subscriber subscriber = (Subscriber) storeData;

                            sessionInterests.add(subscriber);

                            sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                            break;
                        case WATCHER:
                            Watcher watcher = (Watcher) storeData;

                            sessionWatchers.add(watcher);

                            sessionRegistryStrategy.afterWatcherRegister(watcher);
                            break;
                        default:
                            break;
                    }
                    return null;
                }

                @Override
                public Supplier<StoreData> getParameterSupplier() {
                    return () -> storeData;
                }

            }, wrapperInterceptorManager);

    try {
        wrapperInvocation.proceed();
    } catch (Exception e) {
        throw new RuntimeException("Proceed register error!", e);
    }

}

目前邏輯以下圖所示:

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                                    3  | register
               |              |    +----------------------+       |                                       |
                              +-----------------------------------+                                       |
                                                                                                          v
                                                                                      +-------------------+-------------------+
                                                                                      |           SessionRegistry             |
                                                                                      |                                       |
                                                                                      |                                       |
                                                                                      |  storeData.getDataType() == PUBLISHER |
                                                                                      +---------------------------------------+

手機以下:

3.4.1 SessionRegistryStrategy

這裏又出現一個策略,目前也只有一個實現,應該也是想要將來作成替換,目前功能只是簡單的留下了接口爲空。

咱們能夠看出阿里到處想解耦的思路

public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
    @Override
    public void afterPublisherRegister(Publisher publisher) {

    }
}

3.4.2 存儲模塊

前文在註冊過程當中有:

sessionDataStore.add(publisher);

這裏就是Session的 數據存儲模塊,也是系統的核心

public class SessionDataStore implements DataStore {
    /**
     * publisher store
     */
    private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry      = new ConcurrentHashMap<>();

    /*** index */
    private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>>  connectIndex  = new ConcurrentHashMap<>();
}

這裏記錄了兩種存儲方式,分別是按照 dataInfoId 和 connectId 來存儲。

存儲時候,會從版本號和時間戳兩個維度來比較

@Override
public void add(Publisher publisher) {
    Publisher.internPublisher(publisher);

    write.lock();
    try {
        Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());

        if (publishers == null) {
            ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
            publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
            if (publishers == null) {
                publishers = newmap;
            }
        }

        Publisher existingPublisher = publishers.get(publisher.getRegisterId());

        if (existingPublisher != null) {

            if (existingPublisher.getVersion() != null) {
                long oldVersion = existingPublisher.getVersion();
                Long newVersion = publisher.getVersion();
                if (newVersion == null) {
                    return;
                } else if (oldVersion > newVersion) {
                    return;
                } else if (oldVersion == newVersion) {
                    Long newTime = publisher.getRegisterTimestamp();
                    long oldTime = existingPublisher.getRegisterTimestamp();
                    if (newTime == null) {
                        return;
                    }
                    if (oldTime > newTime) {
                        return;
                    }
                }
            }
        }
        publishers.put(publisher.getRegisterId(), publisher);
        addToConnectIndex(publisher);

    } finally {
        write.unlock();
    }
}

3.5 Acceptor模塊

在SessionServer自己存儲完成以後,接下來就是通知Data Server了。

3.5.1 整體Acceptor

WriteDataAcceptorImpl 負責處理具體Publisher的寫入。首先須要把寫入請求統一塊兒來

使用 private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap(); 來統一存儲全部的寫入請求

這裏根據不一樣的Connection來處理不一樣鏈接的寫入請求

具體以下:

public class WriteDataAcceptorImpl implements WriteDataAcceptor {

    @Autowired
    private TaskListenerManager             taskListenerManager;

    @Autowired
    private SessionServerConfig             sessionServerConfig;

    @Autowired
    private RenewService                    renewService;

    /**
     * acceptor for all write data request
     * key:connectId
     * value:writeRequest processor
     *
     */
    private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();

    public void accept(WriteDataRequest request) {
        String connectId = request.getConnectId();
        WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
                key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));

        writeDataProcessor.process(request);
    }
  
    public void remove(String connectId) {
        writeDataProcessors.remove(connectId);
    }
}

目前邏輯以下圖所示

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +-----------------------------------------------------+                    +-------------+-------------------------+
               | |           WriteDataAcceptorImpl                     |  WriteDataRequest  |           SessionRegistry             |
               | |                                                     | <------------------+                                       |
               | |                                                     |                    |                                       |
               | | Map<String, WriteDataProcessor> writeDataProcessors |                    |  storeData.getDataType() == PUBLISHER |
               | |                                                     |                    +---------------------------------------+
               + +-----------------------------------------------------+

手機如圖

3.5.2 具體處理

前面已經把全部請求統一塊兒來,如今就須要針對每個鏈接的寫入繼續處理

這裏關鍵是以下數據結構,就是每個鏈接的寫入請求 放到了queue中。

ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue

針對每一個請求不一樣,作不一樣處理。

對於咱們的例子,處理以下:

case PUBLISHER: {
		doPublishAsync(request);
}

而最終是向taskListenerManager發送給請求TaskType.PUBLISH_DATA_TASK,該請求將被PublishDataTaskListener調用publishDataTask來處理。

這裏有一個listener解耦,咱們接下來說解。

private void doPublishAsync(WriteDataRequest request) {
    sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}

private void sendEvent(Object eventObj, TaskType taskType) {
		TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
		taskListenerManager.sendTaskEvent(taskEvent);
}

具體代碼以下:

public class WriteDataProcessor {
    private final TaskListenerManager               taskListenerManager;

    private final SessionServerConfig               sessionServerConfig;

    private final RenewService                      renewService;

    private final String                            connectId;

    private Map<String, AtomicLong>                 lastUpdateTimestampMap = new ConcurrentHashMap<>();

    private AtomicBoolean                           writeDataLock          = new AtomicBoolean(
                                                                               false);

    private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue          = new ConcurrentLinkedQueue();

    private AtomicInteger                           acceptorQueueSize      = new AtomicInteger(0);

    public void process(WriteDataRequest request) {
        // record the last update time by pub/unpub
        if (isWriteRequest(request)) {
            refreshUpdateTime(request.getDataServerIP());
        }

        if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
            // snapshot has high priority, so handle directly
            doHandle(request);
        } else {
            // If locked, insert the queue;
            // otherwise, try emptying the queue (to avoid residue) before processing the request.
            if (writeDataLock.get()) {
                addQueue(request);
            } else {
                flushQueue();
                doHandle(request);
            }
        }

    }

    private void doHandle(WriteDataRequest request) {
        switch (request.getRequestType()) {
            case PUBLISHER: {
                doPublishAsync(request);
            }
                break;
            case UN_PUBLISHER: {
                doUnPublishAsync(request);
            }
                break;
            case CLIENT_OFF: {
                doClientOffAsync(request);
            }
                break;
            case RENEW_DATUM: {
                if (renewAndSnapshotInSilence(request.getDataServerIP())) {
                    return;
                }
                doRenewAsync(request);
            }
                break;
            case DATUM_SNAPSHOT: {
                if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
                    return;
                }
                halt();
                try {
                    doSnapshotAsync(request);
                } finally {
                    resume();
                }
            }
                break;
    }
      
    private void doPublishAsync(WriteDataRequest request) {
        sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
    }
      
    private void sendEvent(Object eventObj, TaskType taskType) {
        TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
        taskListenerManager.sendTaskEvent(taskEvent);
    }
}

以下圖所示

Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +---------------------------------------------------------+                    +---------+-----------------------------+
               | |           WriteDataAcceptorImpl                         |  WriteDataRequest  |           SessionRegistry             |
               | |                                                         | <------------------+                                       |
               | |                                                         |       4            |   sessionDataStore.add(publisher)     |
               | | Map<connectId , WriteDataProcessor> writeDataProcessors |                    |                                       |
               | |                                                         |                    |  storeData.getDataType() == PUBLISHER |
               | +----------------------+----------------------------------+                    |                                       |
               |                process | 5                                                     +---------------------------------------+
               |                        v
               |    +-------------------+---------------------+                     +--------------------------+
               |    |          WriteDataProcessor             |                     |  PublishDataTaskListener |
               |    |                                         |  PUBLISH_DATA_TASK  |                          |
               |    | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> |      PublishDataTask     |
               |    |                                         |      6              +--------------------------+
               +    +-----------------------------------------+

手機如圖 :

3.6 Listener 解耦

前面在邏輯上都是一體化的,在這裏,進行了一次解耦。

3.6.1 解耦引擎

DefaultTaskListenerManager 是解耦的機制,能夠看到,其中添加了listener,當用戶調用sendTaskEvent時候,將遍歷全部的listeners,調用對應的listener。

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners() {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for (TaskListener taskListener : taskListeners) {
            taskListener.handleEvent(taskEvent);
        }
    }
}

3.6.2 Listener

PublishDataTaskListener是對應的處理函數,在其support函數中,聲明瞭支持PUBLISH_DATA_TASK。這樣就完成了解耦。

public class PublishDataTaskListener implements TaskListener {

    @Autowired
    private DataNodeService dataNodeService;

    @Autowired
    private TaskProcessor   dataNodeSingleTaskProcessor;

    @Autowired
    private ExecutorManager executorManager;

    @Override
    public TaskType support() {
        return TaskType.PUBLISH_DATA_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {

        SessionTask publishDataTask = new PublishDataTask(dataNodeService);

        publishDataTask.setTaskEvent(event);

        executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
    }
}

3.7 Task調度

上面找到了Listener,Listener中經過以下代碼啓動了執行業務的task來處理。可是這背後的機制須要探究。

executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));

3.7.1 ExecutorManager

ExecutorManager 之中,對於線程池作了統一的啓動,關閉。publishDataExecutor就是其中之一。

ExecutorManager相關代碼摘取以下:

public class ExecutorManager {

    private final ScheduledThreadPoolExecutor scheduler;

    private final ThreadPoolExecutor          publishDataExecutor;

    private static final String               PUBLISH_DATA_EXECUTOR                      = "PublishDataExecutor";

    public ExecutorManager(SessionServerConfig sessionServerConfig) {
      
        publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
                k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
                        sessionServerConfig.getPublishDataExecutorMinPoolSize(),
                        sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
                        sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
                        new NamedThreadFactory("PublishData-executor", true)));
    }
  
		public ThreadPoolExecutor getPublishDataExecutor() {
        return publishDataExecutor;
    }
}

其中ExecutorManager的bean以下:

@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
    return new ExecutorManager(sessionServerConfig);
}

3.7.2 Processor

Processor是任務定義,內部封裝了task。

public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {

    @Override
    public ProcessingResult process(SessionTask task) {
        try {
            task.execute();
            return ProcessingResult.Success;
        } catch (Throwable throwable) {
            if (task instanceof Retryable) {
                Retryable retryAbleTask = (Retryable) task;
                if (retryAbleTask.checkRetryTimes()) {
                    return ProcessingResult.TransientError;
                }
            }
            return ProcessingResult.PermanentError;
        }
    }

    @Override
    public ProcessingResult process(List<SessionTask> tasks) {
        return null;
    }
}

3.7.3 業務Task

PublishDataTask的execute 之中 ,調用dataNodeService.register(publisher)進行註冊。

public class PublishDataTask extends AbstractSessionTask {

    private final DataNodeService dataNodeService;

    private Publisher             publisher;

    public PublishDataTask(DataNodeService dataNodeService) {
        this.dataNodeService = dataNodeService;
    }

    @Override
    public void execute() {
        dataNodeService.register(publisher);
    }

    @Override
    public void setTaskEvent(TaskEvent taskEvent) {
        //taskId create from event
        if (taskEvent.getTaskId() != null) {
            setTaskId(taskEvent.getTaskId());
        }

        Object obj = taskEvent.getEventObj();
        if (obj instanceof Publisher) {
            this.publisher = (Publisher) obj;
        } 
    }
}

具體以下

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
                       |
     PUBLISH_DATA_TASK |
                       |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       |
                       v
              +--------+--------+
              | PublishDataTask |
              +-----------------+

3.8 轉發服務信息

通過listener解耦以後,PublishDataTask就調用了dataNodeService.register(publisher),因而接下來就是轉發服務信息給Data Server

此處就是調用DataNodeServiceImpl的register函數來把請求轉發給Data Server。

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;

    @Autowired
    private SessionServerConfig   sessionServerConfig;

    private AsyncHashedWheelTimer asyncHashedWheelTimer;
}

能夠看到,創建了PublishDataRequest,而後經過Bolt Client,發送給Data Server。

@Override
public void register(final Publisher publisher) {
    String bizName = "PublishData";
    Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
    try {
        sendRequest(bizName, request);
    } catch (RequestException e) {
        doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(),
            sessionServerConfig.getPublishDataTaskRetryFirstDelay(),
            sessionServerConfig.getPublishDataTaskRetryIncrementDelay());
    }
}

private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
        Response response = dataNodeExchanger.request(request);
        Object result = response.getResult();
        CommonResponse commonResponse = (CommonResponse) result;
        return commonResponse;
}

以下:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       v
              +--------+--------+
              | PublishDataTask |
              +--------+--------+
              register |
                       |
            +----------v----------+
            | DataNodeServiceImpl |
            +----------+----------+
    PublishDataRequest |
                       v
            +----------+----------+  Client.sendSync   +------------+
            |  DataNodeExchanger  +------------------> | Data Server|
            +---------------------+ PublishDataRequest +------------+

如何知道發給哪個Data Sever?DataNodeExchanger 中有:

@Override
public Response request(Request request) throws RequestException {

    Response response;
    URL url = request.getRequestUrl();
    try {
        Client sessionClient = getClient(url);

        final Object result = sessionClient
                .sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());

        response = () -> result;
    } 

    return response;
}

因而去DataNodeServiceImpl尋找

private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
    return new Request<PublishDataRequest>() {
        private AtomicInteger retryTimes = new AtomicInteger();

        @Override
        public PublishDataRequest getRequestBody() {
            PublishDataRequest publishDataRequest = new PublishDataRequest();
            publishDataRequest.setPublisher(publisher);
            publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
                .getSessionProcessId());
            return publishDataRequest;
        }

        @Override
        public URL getRequestUrl() {
            return getUrl(publisher.getDataInfoId());
        }

        @Override
        public AtomicInteger getRetryTimes() {
            return retryTimes;
        }
    };
}

private URL getUrl(String dataInfoId) {
        Node dataNode = dataNodeManager.getNode(dataInfoId);
        //meta push data node has not port
        String dataIp = dataNode.getNodeUrl().getIpAddress();
        return new URL(dataIp, sessionServerConfig.getDataServerPort());
}

在 DataNodeManager中有:

@Override
public DataNode getNode(String dataInfoId) {
    DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
    return dataNode;
}

可見是經過dataInfoId計算出hash,而後 從DataNodeManager之中獲取對應的DataNode,獲得其url

因而,上圖拓展爲:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |  1
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |  2
                       v
              +--------+--------+        4     +---------------+
              | PublishDataTask |     +------> |DataNodeManager|
              +--------+--------+     |        +---------------+
              register |  3           |  consistentHash|
                       |              |                | 5
            +----------v----------+---+                v
            | DataNodeServiceImpl |       6      +-----+----+
            +----------+----------+ <------------+ DataNode |
    PublishDataRequest | 7              url      +----------+
                       v
            +----------+----------+
            |  DataNodeExchanger  |
            +----------+----------+
                       |
       Client.sendSync | PublishDataRequest
                       |
                       v 8
                 +-----+------+
                 | Data Server|
                 +------------+

0xFF 參考

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 - SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通訊框架SOFABolt解析之鏈接管理剖析

螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通訊框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

螞蟻通訊框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 - 優雅簡潔的設計之道

SofaBolt源碼分析-服務啓動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 - UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 - Connection 事件處理機制的設計

相關文章
相關標籤/搜索