[從源碼學設計]螞蟻金服SOFARegistry之網絡封裝和操做

[從源碼學設計]螞蟻金服SOFARegistry之網絡封裝和操做

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。html

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java

本文爲第二篇,介紹SOFARegistry的網絡封裝和操做。node

0x01 業務領域

1.1 SOFARegistry 整體架構

由於有的兄弟可能沒有讀過前面MetaServer的文章,因此這裏回憶下SOFARegistry 整體架構。git

  • Client 層

應用服務器集羣。Client 層是應用層,每一個應用系統經過依賴註冊中心相關的客戶端 jar 包,經過編程方式來使用服務註冊中心的服務發佈和服務訂閱能力。github

  • Session 層

Session 服務器集羣。顧名思義,Session 層是會話層,經過長鏈接和 Client 層的應用服務器保持通信,負責接收 Client 的服務發佈和服務訂閱請求。該層只在內存中保存各個服務的發佈訂閱關係,對於具體的服務信息,只在 Client 層和 Data 層之間透傳轉發。Session 層是無狀態的,能夠隨着 Client 層應用規模的增加而擴容。編程

  • Data 層

數據服務器集羣。Data 層經過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的惟一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用。下文的重點也在於隨着數據規模的增加,Data 層如何在不影響業務的前提下實現平滑的擴縮容。bootstrap

  • Meta 層

元數據服務器集羣。這個集羣管轄的範圍是 Session 服務器集羣和 Data 服務器集羣的服務器信息,其角色就至關於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 做爲服務註冊中心是服務於廣大應用服務層,而 Meta 集羣是服務於 SOFARegistry 內部的 Session 集羣和 Data 集羣,Meta 層可以感知到 Session 節點和 Data 節點的變化,並通知集羣的其它節點。api

1.2 應用場景

DataServer,SessionServer,MetaServer 本質上都是網絡應用程序。這就決定了網絡封裝和操做是本系統的基礎模塊及功能,下面咱們講講其應用場景。緩存

1.2.1 單元化狀態

SOFARegistry 的應用場景是單元化狀態下。服務器

在單元化狀態下,一個單元,是一個五臟俱全的縮小版整站,它是全能的,由於部署了全部應用;但它不是全量的,由於只能操做一部分數據。可以單元化的系統,很容易在多機房中部署,由於能夠輕易的把幾個單元部署在一個機房,而把另外幾個部署在其餘機房。藉由在業務入口處設置一個流量調配器,能夠調整業務流量在單元之間的比例。

1.2.2 內網通信

因此 SOFARegistry 考慮的就是在 IDC 私網環境中如何進行節點間通訊。高吞吐、高併發的通訊,數量衆多的鏈接管理(C10K 問題),便捷的升級機制,兼容性保障,靈活的線程池模型運用,細緻的異常處理與日誌埋點等,這些功能都須要在通訊協議和實現框架上作文章。

1.2.3 Http協議

服務器也有若干配置需求,這用簡單的http協議便可。

1.3 問題點

在這種內網單元化場景下,可以想到的問題點以下:

  • 如何制定私有協議。對於私網環境,若是全部應用的節點間,所有經過標準協議來通訊,會有不少問題:好比研發效率方面的影響,升級兼容性,無用字段的傳輸,功能定製也可能不那麼靈活。
  • 如何進行連接管理(無鎖建連、定時斷連、自動重連);
  • 如何進行精細的線程模型的設計;
  • 如何進行超時控制;
  • 如何進行批量解包和批量提交處理;
  • 如何進行心跳控制;
  • 如何支持通訊模型(oneway、sync、callback、future);
  • 如何實現長鏈接;
  • 如何實現推拉模式;
  • 如何進行節點判活;

1.4 解決方案

對於這種高性能,高併發的場景,在Java體系下,必然選擇非阻塞IO複用,那麼天然選擇基於Netty進行開發。

1.5 阿里方案

阿里就是藉助 SOFABolt 通訊框架,實現基於TCP長鏈接的節點判活與推模式的變動推送,服務上下線通知時效性在秒級之內。

sofa-bolt是螞蟻開源的一款基於Netty的網絡通訊框架。在Netty的基礎上對網絡編程常見問題進行了一層簡單封裝,讓中間件開發者更關注於中間件產品自己。

大致功能爲:

  1. 鏈接管理
  2. 請求處理

SOFABolt能夠理解爲Netty的最佳實踐,並額外進行了一些優化工做。

  • 基於Netty的高效的網絡IO於線程模型的應用
  • 連接管理(無鎖建連、定時斷連、自動重連)
  • 通訊模型(oneway、sync、callback、future)
  • 超時控制
  • 批量解包和批量提交處理
  • 心跳於IDLE機制

SOFABolt框架咱們後續可能會專門有系列進行分析,目前認爲基於SOFABolt此能夠知足咱們需求,因此咱們會簡單介紹SOFABolt,重點在於如何使用以及業務實現

1.6 實現問題

在肯定了採用SOFABolt以後,以前提到的問題點就基本被SOFABolt解決了,因此咱們暫時能想到的其餘問題大體以下:

  • 對於網絡中的各類功能模塊,是否須要封裝,若是封裝,到什麼程度比較恰當;
  • 具體封裝,縱向橫向分別須要細化到什麼程度;
  • Data Server 既要對外提供服務,也會做爲客戶端向其餘模塊發送請求,這兩個功能是否須要抽象出來;
  • 是否須要按照具體業務功能進行分類封裝;

1.7 總述

咱們提早劇透,即從邏輯上看,阿里提供了兩個層級的封裝

從鏈接角度看,阿里實現了基於 netty.channel.Channel 的封裝,從下往上看是:

  • 由於SOFABolt基於Netty,因此封裝的核心是netty.channel.Channel。
  • 在此基礎上, SOFABolt封裝了com.alipay.remoting.Connection
  • 而後 SOFARegistry 基於SOFABolt 封裝了 BoltChannel

從應用角度看,阿里實現了Server,Client層次的封裝,從下往上看是:

  • SOFABolt構建了 RpcServer,RpcClient。
  • SOFARegisty 基於 RpcServer,RpcClient 構建了BoltClient 和 BoltServer。
  • 而後 SOFARegistry 基於此構建了 BoltExchange。做爲 Client / Server 鏈接的抽象,負責節點之間的鏈接。
  • 最後構建了 XXXNodeExchanger,在 BoltExchange 基礎上,把全部 Data Server 相關的 「Server,Client概念 強相關」 的網絡操做統一集中在這裏,用戶能夠直接使用。以DataServer業務模塊爲例,其內部按照業務不一樣,實現了 DataNodeExchanger 和 MetaNodeExchanger。用以讓:
    • DataServer 內部直接使用 DataNodeExchanger 與其餘 DataServer 交互,
    • DataServer 內部直接使用 MetaNodeExchanger 與 MetaServer 交互。

具體邏輯大體以下:

+---------------------+                                 +---------------------+
|                     |                                 |                     |
|  DataNodeExchanger  |                                 |  MetaNodeExchanger  |
|                     |                                 |                     |
+----------------+----+                                 +--------+------------+
                 |                                               |
                 +-----------+                     +-------------+
                             |                     |
                             v                     v
                   +---------+---------------------+--------+
                   |  BoltExchange                          |
                   | +------------------------------------+ |
                   | |                                    | |
                   | |       Map<String, Client>          | |
                   | |                                    | |
                   | | ConcurrentHashMap<Integer, Server> | |
                   | |                                    | |
                   | +------------------------------------+ |
                   +----------------------------------------+
                          |                           |
                          |                           |
                          |                           |
                          v                           v
  +-----------------------+----------+      +---------+---------------------------------+
  | BoltClient                       |      | BoltServer                                |
  |                                  |      |                                           |
  | +------------------------------+ |      | +---------------------------------------+ |
  | |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
  | |  +-------------------------+ | |      | |                                       | |
  | |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
  | |  |                         | | |      | |                                       | |
  | |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
  | |  +-------------------------+ | |      | |                                       | |
  | +------------------------------+ |      | +---------------------------------------+ |
  +----------------------------------+      +-------------------------------------------+
        |       |           |                 |                        |
        |       |           +---------------------------+              |
        |       v                             |         |              v
        |   +---+------------+ <--------------+   +-----v--------------+--------------+
        |   | ChannelHandler |                    |   BoltChannel                     |
        |   +----------------+                    |                                   |
        |                                         |  +------------------------------+ |
        |                                         |  |com.alipay.remoting.Connection| |
        |                                         |  +------------------------------+ |
        |                                         +-----------------------------------+
        |                                                       |
        v                                                       v
    +---+-------------+                               +---------+------------+
    | CallbackHandler |                               | netty.channel.Channel|
    +-----------------+                               +----------------------+

0x02 基礎封裝

SOFARegistry 對網絡基礎功能作了封裝,也對外提供了API。如下是封裝模塊以及對外接口 registry-remoting-api。

├── CallbackHandler.java
├── Channel.java
├── ChannelHandler.java
├── Client.java
├── Endpoint.java
├── RemotingException.java
├── Server.java
└── exchange
    ├── Exchange.java
    ├── NodeExchanger.java
    ├── RequestException.java
    └── message
        ├── Request.java
        └── Response.java

其中比較關鍵的是四個接口:Server,Client,Exchange,Channel,所以這些就是網絡封裝的最基本概念。

2.1 Channel

Channel 這個概念比較廣泛,表明了IO源與目標打開的鏈接。咱們先以Java的Channel爲例來進行說明。

2.1.1 Java Channel

Java 的Channel 由java.nio.channels包定義的,Channel表示IO源與目標打開的鏈接,Channel相似於傳統的「流」,只不過Channel自己不能直接訪問數據,Channel只能與Buffer進行交互。

Channel用於在字節緩衝區和位於通道另外一側的實體(一般是一個文件或套接字)之間有效地傳輸數據。通道是訪問IO服務的導管,經過通道,咱們能夠以最小的開銷來訪問操做系統的I/O服務;順便說下,緩衝區是通道內部發送數據和接收數據的端點。

由java.nio.channels包定義的,Channel表示IO源與目標打開的鏈接,Channel相似於傳統的「流」,只不過Channel自己不能直接訪問數據,Channel只能與Buffer進行交互。通道主要用於傳輸數據,從緩衝區的一側傳到另外一側的實體(如文件、套接字...),反之亦然;通道是訪問IO服務的導管,經過通道,咱們能夠以最小的開銷來訪問操做系統的I/O服務;順便說下,緩衝區是通道內部發送數據和接收數據的端點。

2.1.2 SOFA Channel

從SOFARegistry的Channel定義能夠看出其基本功能主要是屬性相關功能。

public interface Channel {
    InetSocketAddress getRemoteAddress();
    InetSocketAddress getLocalAddress();
    boolean isConnected();
    Object getAttribute(String key);
    void setAttribute(String key, Object value);
    WebTarget getWebTarget();
    void close();
}

2.2 Server

Server是服務器對應的封裝,其基本功能由定義可知,主要是基於Channel發送功能。

public interface Server extends Endpoint {
    boolean isOpen();
    Collection<Channel> getChannels();
    Channel getChannel(InetSocketAddress remoteAddress);
    Channel getChannel(URL url);
    void close(Channel channel);
    int getChannelCount();

    Object sendSync(final Channel channel, final Object message, final int timeoutMillis);

    void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis);
}

2.3 Client

Client是客戶端對應的封裝,其基本功能也是基於Channel進行交互。

public interface Client extends Endpoint {

    Channel getChannel(URL url);

    Channel connect(URL url);

    Object sendSync(final URL url, final Object message, final int timeoutMillis);

    Object sendSync(final Channel channel, final Object message, final int timeoutMillis);

    void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler,
                      final int timeoutMillis);
}

2.4 Exchange

Exchange 做爲 Client / Server 鏈接的進一步抽象,負責同類型server之間的鏈接

public interface Exchange<T> {

    String DATA_SERVER_TYPE = "dataServer";
    String META_SERVER_TYPE = "metaServer";

    /**
     * connect same type server,one server ip one connection
     * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
     * so we must connect by serverType,and get Client instance by serverType
     * @param serverType
     * @param serverUrl
     * @param channelHandlers
     */
    Client connect(String serverType, URL serverUrl, T... channelHandlers);

    /**
     * connect same type server,one server ip one connection
     * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
     * so we must connect by serverType,and get Client instance by serverType
     * @param serverType
     * @param connNum connection number per serverUrl
     * @param serverUrl
     * @param channelHandlers
     */
    Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);

    /**
     * bind server by server port in url parameter,one port must by same server type
     * @param url
     * @param channelHandlers
     */
    Server open(URL url, T... channelHandlers);

    Client getClient(String serverType);

    Server getServer(Integer port);
}

2.5 ChannelHandler

在創建鏈接中,能夠設置一系列應對不一樣任務的 handler (稱之爲 ChannelHandler)。

這些 ChannelHandler 有的做爲 Listener 用來處理鏈接事件,有的做爲 Processor 用來處理各類指定的事件,好比服務信息數據變化、Subscriber 註冊等事件。

public interface ChannelHandler<T> {

    enum HandlerType {
        LISENTER,
        PROCESSER
    }

    enum InvokeType {
        SYNC,
        ASYNC
    }

    /**
     * on channel connected.
     * @param channel
     */
    void connected(Channel channel) throws RemotingException;

    /**
     * on channel disconnected.
     *
     * @param channel channel.
     */
    void disconnected(Channel channel) throws RemotingException;

    /**
     * on message received.
     * @param channel channel.
     * @param message message.
     */
    void received(Channel channel, T message) throws RemotingException;

    /**
     * on message reply.
     *
     * @param channel
     * @param message
     */
    Object reply(Channel channel, T message) throws RemotingException;

    /**
     * on exception caught.
     * @param channel channel.
     * @param message message.
     * @param exception exception.
     * @throws RemotingException
     */
    void caught(Channel channel, T message, Throwable exception) throws RemotingException;

    HandlerType getType();

    /**
     * return processor request class name
     */
    Class interest();

    /**
     * Select Sync process by reply or Async process by received
     */
    default InvokeType getInvokeType() {
        return InvokeType.SYNC;
    }

    /**
     * specify executor for processor handler
     */
    default Executor getExecutor() {
        return null;
    }
}

所以,網絡基本對外接口以下:

+-------------------------------------------------------------------------+
|[registry+remoting+api]                                                  |
|                                                                         |
|              +----------+                      +-------------+          |
|              | Exchange |                      |NodeExchanger|          |
|              ++-----+--++                      +----+--------+          |
|               |     |  |                            |                   |
|               |     |  +----------------------+     |                   |
|               |     |                         |     |                   |
|        +------+     v                         v     v                   |
|        |         +--+-----+                 +-+-----++                  |
|        |         | Server |                 | Client |                  |
|        |         +-----+--+                 +-+----+-+                  |
|        |               |                      |    |                    |
|        |               +--------+     +-------+    |                    |
|        |                        |     |            |                    |
|        v                        v     v            v                    |
|   +----+-----------+            +-----+-+         ++--------------+     |
|   | ChannelHandler |            |Channel|         |CallbackHandler|     |
|   +----------------+            +-------+         +---------------+     |
+-------------------------------------------------------------------------+

0x03 SOFABolt

由於SOFARegistry主要是基於SOFABolt,無法繞開,因此咱們須要首先簡單介紹SOFABolt。

Bolt是基於Netty,因此要先說明Netty Channel。

3.1 Netty Channel

在Netty框架中,Channel是其中核心概念之一,是Netty網絡通訊的主體,由它負責同對端進行網絡通訊、註冊和數據操做等功能。

Netty對Jdk原生的ServerSocketChannel進行了封裝和加強封裝成了NioXXXChannel, 相對於原生的JdkChannel, Netty的Channel增長了以下的組件。

  • id 標識惟一身份信息
  • 可能存在的parent Channel
  • 管道 Pipeline
  • 用於數據讀寫的unsafe內部類
  • 關聯上相伴終生的NioEventLoop

根據服務端和客戶端,Channel能夠分紅兩類:

  • 服務端: NioServerSocketChannel
  • 客戶端: NioSocketChannel

其實inbound和outbound分別用於標識 Context 所對應的handler的類型, 在Netty中事件能夠分爲Inbound和Outbound事件,在ChannelPipeline的類註釋中,有以下圖示:

* 
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

3.2 Connection

Connection其刪減版定義以下,能夠看到其主要成員就是 Netty channel 實例

public class Connection {

    private Channel                                                               channel;

    private final ConcurrentHashMap<Integer, InvokeFuture>                        invokeFutureMap  = new ConcurrentHashMap<Integer, InvokeFuture>(4);

    /** Attribute key for connection */
    public static final AttributeKey<Connection>                                  CONNECTION       = AttributeKey.valueOf("connection");
  
    /** Attribute key for heartbeat count */
    public static final AttributeKey<Integer>                                     HEARTBEAT_COUNT  = AttributeKey.valueOf("heartbeatCount");

    /** Attribute key for heartbeat switch for each connection */
    public static final AttributeKey<Boolean>                                     HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");

    /** Attribute key for protocol */
    public static final AttributeKey<ProtocolCode>                                PROTOCOL         = AttributeKey.valueOf("protocol");

    /** Attribute key for version */
    public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");

    private Url url;

    private final ConcurrentHashMap<Integer/* id */, String/* poolKey */>       id2PoolKey       = new ConcurrentHashMap<Integer, String>(256);

    private Set<String>                                                           poolKeys         = new ConcurrentHashSet<String>();

    private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes       = new ConcurrentHashMap<String, Object>();
}

Connection的輔助類不少,摘錄以下:

  • ConnectionFactory 鏈接工廠:建立鏈接、檢測鏈接等
  • ConnectionPool 鏈接池:存儲 { uniqueKey, List } ,uniqueKey 默認爲 ip:port;包含 ConnectionSelectStrategy,從 pool 中選擇 Connection
  • ConnectionEventHandler 和 ConnectionEventListener:事件處理器和監聽器
  • ConnectionManager 鏈接管理器:是對外的門面,包含全部與 Connection 相關的對外的接口操做
  • Scanner 掃描器:Bolt 提供的一個統一的掃描器,用於執行一些後臺任務

另外,須要注意的點以下:

  • 服務端建立 Connection 只有一個時機:netty 鏈接剛剛創建時
  • 客戶端真正建立鏈接的時候,是在發起第一次調用的時候。

3.3 Connection消息處理

不管是服務端仍是客戶端,其實本質都在作一件事情:建立 ConnectionEventHandler 實例並添加到 Netty 的 pipeline 中,基本原理是:

  • ConnectionEventListener:Connection 事件監聽器,存儲處理對應 ConnectionEventType 的 ConnectionEventProcessor 列表;
  • ConnectionEventProcessor:真正的 Connection 事件處理器接口;能夠繼承 ConnectionEventProcessor,編寫自定義的事件處理類
  • 將自定義的事件處理類添加到 ConnectionEventListener 中;

ConnectionEventHandler處理兩類事件

  • Netty 定義的事件:例如 connect,channelActive 等;
  • SOFABolt 定義的事件:事件類型 ConnectionEventType;

以後當有 ConnectionEvent 觸發時(不管是 Netty 定義的事件被觸發,仍是 SOFABolt 定義的事件被觸發),ConnectionEventHandler 會經過異步線程執行器通知 ConnectionEventListener,ConnectionEventListener 將消息派發給具體的 ConnectionEventProcessor 實現類。

3.4 RpcServer

RpcServer實現了一個Server所必須的基本機制,能夠直接使用,好比:

  • 編解碼,協議版本,地址處理;
  • workerGroup(static類變量,實現多個 RpcServer 實例共享 workerGroup)與 bossGroup;
  • 請求消息處理器;
  • 響應消息處理器;
  • 心跳消息處理器;
  • 用戶處理器 UserProcessor;
  • 鏈接Manager;
  • RpcServerRemoting (發起底層調用實現類) 實例,由於SOFABolt 能夠進行雙向調用,server 端也能夠調用 client 端,因此此處構建了 RpcServerRemoting 實例;
  • ServerBootstrap 實例用以設置一系列 netty 服務端配置;

其中,須要說明的是:

  • 請求鏈 RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
  • 響應鏈 RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
  • 心跳鏈 RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor

具體定義以下:

public class RpcServer extends AbstractRemotingServer {

    /** server bootstrap */
    private ServerBootstrap                             bootstrap;

    /** channelFuture */
    private ChannelFuture                               channelFuture;

    /** connection event handler */
    private ConnectionEventHandler                      connectionEventHandler;

    /** connection event listener */
    private ConnectionEventListener                     connectionEventListener = new ConnectionEventListener();

    /** user processors of rpc server */
    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors          = new ConcurrentHashMap<String, UserProcessor<?>>(
                                                                                    4);

    /** boss event loop group, boss group should not be daemon, need shutdown manually*/
    private final EventLoopGroup                        bossGroup               = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false));
  
    /** worker event loop group. Reuse I/O worker threads between rpc servers. */
    private static final EventLoopGroup                 workerGroup             = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));

    /** address parser to get custom args */
    private RemotingAddressParser                       addressParser;

    /** connection manager */
    private DefaultConnectionManager                    connectionManager;

    /** rpc remoting */
    protected RpcRemoting                               rpcRemoting;

    /** rpc codec */
    private Codec                                       codec                   = new RpcCodec();
}

3.5 RpcClient

RpcClient主要機制以下:

  • 用戶處理器 UserProcessor ;RpcServer 能夠主動向 RpcClient 發起請求,因此RpcClient 也須要建立 UserProcessor 來處理這些請求;
  • RpcConnectionFactory 工廠;
  • workerGroup(static類變量,實現多個 RpcClient 實例共享 workerGroup);
  • Codec 的實現類 RpcCodec 實例,用於建立 netty 的編解碼器,實質上是一個工廠類;
  • HeartbeatHandler 心跳處理器;
  • RpcHandler 實例做爲 netty 的業務邏輯處理器;
  • ConnectionSelectStrategy 鏈接選擇器;
  • DefaultConnectionManager 鏈接管理器(是整個 Connection 設計的核心);
  • Bootstrap 實例用以設置一系列 netty 客戶端配置;
  • Remote 層請求和響應封裝實體的建立工廠 RpcCommandFactory 實例;
  • RpcClientRemoting (發起底層調用實現類) 實例;這是向 RpcServer 發起調用的工具類;

具體代碼以下:

public class RpcClient extends AbstractConfigurableInstance {

    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors           = new ConcurrentHashMap<String, UserProcessor<?>>();
  
    /** connection factory */
    private ConnectionFactory                           connectionFactory        = new RpcConnectionFactory(userProcessors, this);

    /** connection event handler */
    private ConnectionEventHandler                      connectionEventHandler   = new RpcConnectionEventHandler(switches());

    /** reconnect manager */
    private ReconnectManager                            reconnectManager;

    /** connection event listener */
    private ConnectionEventListener                     connectionEventListener  = new ConnectionEventListener();

    /** address parser to get custom args */
    private RemotingAddressParser                       addressParser;

    /** connection select strategy */
    private ConnectionSelectStrategy                    connectionSelectStrategy = new RandomSelectStrategy(
                                                                                     switches());

    /** connection manager */
    private DefaultConnectionManager                    connectionManager        = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());

    /** rpc remoting */
    protected RpcRemoting                               rpcRemoting;

    /** task scanner */
    private RpcTaskScanner                              taskScanner              = new RpcTaskScanner();

    /** connection monitor */
    private DefaultConnectionMonitor                    connectionMonitor;

    /** connection monitor strategy */
    private ConnectionMonitorStrategy                   monitorStrategy;
}

0x04 Bolt封裝

針對上述提到的基礎封裝,系統針對Bolt和Http都進行了實現。如下是SOFABolt的對應封裝 registry-remoting-bolt。

├── AsyncUserProcessorAdapter.java
├── BoltChannel.java
├── BoltChannelUtil.java
├── BoltClient.java
├── BoltServer.java
├── ConnectionEventAdapter.java
├── SyncUserProcessorAdapter.java
└── exchange
    └── BoltExchange.java

4.1 BoltChannel

BoltChannel 主要是封裝了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封裝了io.netty.channel.Channel。

感受Channel封裝的不夠完全,仍是把Connection暴露出來了,以此獲得本地IP,port,遠端IP,port等。

public class BoltChannel implements Channel {
    private Connection                connection;
    private AsyncContext              asyncContext;
    private BizContext                bizContext;
    private final Map<String, Object> attributes = new ConcurrentHashMap<>();
}

4.2 BoltServer

BoltServer 封裝了 com.alipay.remoting.rpc.RpcServer。

在初始化的時候,調用 addConnectionEventProcessor,registerUserProcessor等把 handler 註冊到 RpcServer。

用 ConcurrentHashMap 記錄了全部鏈接到本Server 的Channel,key是IP:port。

public class BoltServer implements Server {
    /**
     * accoding server port
     * can not be null
     */
    private final URL                  url;
    private final List<ChannelHandler> channelHandlers;
    /**
     * bolt server
     */
    private RpcServer                  boltServer;
    /**
     * started status
     */
    private AtomicBoolean              isStarted   = new AtomicBoolean(false);
    private Map<String, Channel>       channels    = new ConcurrentHashMap<>();

    private AtomicBoolean              initHandler = new AtomicBoolean(false);
}

其主要功能以下,基本就是調用Bolt的功能:

@Override
public Object sendSync(Channel channel, Object message, int timeoutMillis) {
    if (channel != null && channel.isConnected()) {
        Url boltUrl = null;
        try {
            boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
                .getRemoteAddress().getPort());
            return boltServer.invokeSync(boltUrl, message, timeoutMillis);
        } 
}

@Override
public void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler,
                         int timeoutMillis) {
    if (channel != null && channel.isConnected()) {
        Url boltUrl = null;
        try {
            boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
                .getRemoteAddress().getPort());
            boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {
                @Override
                public void onResponse(Object result) {
                    callbackHandler.onCallback(channel, result);
                }

                @Override
                public void onException(Throwable e) {
                    callbackHandler.onException(channel, e);
                }

                @Override
                public Executor getExecutor() {
                    return callbackHandler.getExecutor();
                }
            }, timeoutMillis);
            return;
        } 
}

4.3 BoltClient

主要就是封裝了 com.alipay.remoting.rpc.RpcClient。

在初始化的時候,調用 addConnectionEventProcessor,registerUserProcessor 等把 handler 註冊到 RpcClient。

public class BoltClient implements Client {
    private RpcClient           rpcClient;
    private AtomicBoolean       closed         = new AtomicBoolean(false);
    private int                 connectTimeout = 2000;
    private final int           connNum;
}

主要函數以下:

@Override
    public Channel connect(URL url) {
        try {
            Connection connection = getBoltConnection(rpcClient, url);
            BoltChannel channel = new BoltChannel();
            channel.setConnection(connection);
            return channel;
        } 
    }

    protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException {
        Url boltUrl = createBoltUrl(url);
        try {
            Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);
            if (connection == null || !connection.isFine()) {
                if (connection != null) {
                    connection.close();
                }
            }
            return connection;
        } 
    }

    @Override
    public Object sendSync(URL url, Object message, int timeoutMillis) {
            return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis);
    }

    @Override
    public Object sendSync(Channel channel, Object message, int timeoutMillis) {
        if (channel != null && channel.isConnected()) {
            BoltChannel boltChannel = (BoltChannel) channel;
            return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); 
        }
    }

    @Override
    public void sendCallback(URL url, Object message, CallbackHandler callbackHandler,
                             int timeoutMillis) {
        try {
            Connection connection = getBoltConnection(rpcClient, url);
            BoltChannel channel = new BoltChannel();
            channel.setConnection(connection);
            rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {

                @Override
                public void onResponse(Object result) {
                    callbackHandler.onCallback(channel, result);
                }

                @Override
                public void onException(Throwable e) {
                    callbackHandler.onException(channel, e);
                }

                @Override
                public Executor getExecutor() {
                    return callbackHandler.getExecutor();
                }
            }, timeoutMillis);
            return;
        } 
    }

4.4 BoltExchange

BoltExchange 的主要做用是維護了Client和Server兩個ConcurrentHashMap。就是全部的Clients和Servers

這裏進行了第一層鏈接維護。

Map<String, Client> clients 是依據String對Client作了區分,String包括以下:

String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";

就是說,假如 Data Server 使用了BoltExchange,則其內部只有兩個BoltClient,這兩個Client分別被 同 dataServer 和 metaServer 的交互 所複用。

ConcurrentHashMap<Integer, Server> serverMap 是依據自己端口對 自己啓動的Server 作了區分

public class BoltExchange implements Exchange<ChannelHandler> {

    private Map<String, Client>                clients   = new ConcurrentHashMap<>();

    private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();

    @Override
    public Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {
        return this.connect(serverType, 1, serverUrl, channelHandlers);
    }

    @Override
    public Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) {
        Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
        client.connect(serverUrl);
        return client;
    }

    @Override
    public Server open(URL url, ChannelHandler... channelHandlers) {
        BoltServer server = createBoltServer(url, channelHandlers);
        setServer(server, url);
        server.startServer();
        return server;
    }

    @Override
    public Client getClient(String serverType) {
        return clients.get(serverType);
    }

    @Override
    public Server getServer(Integer port) {
        return serverMap.get(port);
    }

    /**
     * add server into serverMap
     * @param server
     * @param url
     */
    public void setServer(Server server, URL url) {
        serverMap.putIfAbsent(url.getPort(), server);
    }

    private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) {
        BoltClient boltClient = createBoltClient(connNum);
        boltClient.initHandlers(Arrays.asList(channelHandlers));
        return boltClient;
    }

    protected BoltClient createBoltClient(int connNum) {
        return new BoltClient(connNum);
    }

    protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
        return new BoltServer(url, Arrays.asList(channelHandlers));
    }
}

4.5 BoltExchange 獲取Bolt Client

內部會根據不一樣的server type從 boltExchange 取出對應Bolt Client。

String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";

用以下方法put Client。

Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));

Bolt用以下辦法獲取Client

Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);

Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);

獲得對應的Client以後,而後分別根據參數的url創建Channel,或者發送請求。

Channel channel = client.getChannel(url);

client.sendCallback(request.getRequestUrl()....;

此時大體邏輯以下:

+----------------------------------------+
                 |  BoltExchange                          |
                 | +------------------------------------+ |
                 | |                                    | |
                 | |       Map<String, Client>          | |
                 | |                                    | |
                 | | ConcurrentHashMap<Integer, Server> | |
                 | |                                    | |
                 | +------------------------------------+ |
                 +----------------------------------------+
                        |                           |
                        |                           |
                        |                           |
                        v                           v
+-----------------------+----------+      +---------+---------------------------------+
| BoltClient                       |      | BoltServer                                |
|                                  |      |                                           |
| +------------------------------+ |      | +---------------------------------------+ |
| |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
| |  +-------------------------+ | |      | |                                       | |
| |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
| |  |                         | | |      | |                                       | |
| |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
| |  +-------------------------+ | |      | |                                       | |
| +------------------------------+ |      | +---------------------------------------+ |
+----------------------------------+      +-------------------------------------------+
      |       |           |                 |                        |
      |       |           +---------------------------+              |
      |       v                             |         |              v
      |   +---+------------+ <--------------+   +-----v--------------+--------------+
      |   | ChannelHandler |                    |   BoltChannel                     |
      |   +----------------+                    |                                   |
      |                                         |  +------------------------------+ |
      |                                         |  |com.alipay.remoting.Connection| |
      |                                         |  +------------------------------+ |
      |                                         +-----------------------------------+
      |                                                       |
      v                                                       v
  +---+-------------+                               +---------+------------+
  | CallbackHandler |                               | netty.channel.Channel|
  +-----------------+                               +----------------------+

0x04 Http封裝

如下是基於Jetty封裝的Http模塊 registry-remoting-http。

├── JerseyChannel.java
├── JerseyClient.java
├── JerseyJettyServer.java
├── exchange
│   └── JerseyExchange.java
└── jetty
    └── server
        ├── HttpChannelOverHttpCustom.java
        ├── HttpConnectionCustom.java
        └── HttpConnectionCustomFactory.java

由於 Http 服務不是SOFTRegistry主要功能,因此此處略去。

0x05 功能模塊

咱們從目錄結構能夠大體看出功能模塊劃分。

├── remoting
│   ├── DataNodeExchanger.java
│   ├── MetaNodeExchanger.java
│   ├── dataserver
│   │   ├── DataServerConnectionFactory.java
│   │   ├── DataServerNodeFactory.java
│   │   ├── GetSyncDataHandler.java
│   │   ├── SyncDataCallback.java
│   │   ├── handler
│   │   │   ├── DataSyncServerConnectionHandler.java
│   │   │   ├── FetchDataHandler.java
│   │   │   ├── NotifyDataSyncHandler.java
│   │   │   ├── NotifyFetchDatumHandler.java
│   │   │   ├── NotifyOnlineHandler.java
│   │   │   └── SyncDataHandler.java
│   │   └── task
│   │       ├── AbstractTask.java
│   │       ├── ConnectionRefreshTask.java
│   │       └── RenewNodeTask.java
│   ├── handler
│   │   ├── AbstractClientHandler.java
│   │   └── AbstractServerHandler.java
│   ├── metaserver
│   │   ├── handler
│   │   ├── provideData
│   │   └── task
│   └── sessionserver
│       ├── disconnect
│       ├── forward
│       └── handler

由於每一個大功能模塊大同小異,因此咱們下面主要以 dataserver 目錄下爲主,兼顧 metaserver 和 sessionserver目錄下的特殊部分。

Data Server 比較複雜,便是服務器也是客戶端,因此分別作了不一樣的組件來抽象這兩個概念

5.1 Server組件

DataServerBootstrap#start 方法,用於啓動一系列的初始化服務。在此函數中,啓動了若干網絡服務,用來提供 對外接口。

public void start() {
    try {
        openDataServer();
        openDataSyncServer();
        openHttpServer();
        startRaftClient();
    } 
}

各 Handler 具體做用如圖所示:

圖 3 各 Handler 做用

5.2 Bolt Server

DataServer 和 DataSyncServer 是 Bolt Server,是節點間的 bolt 通訊組件,其中:

  • boltExchange。bolt組件通信組件,用來給server和dataSyncServer提供通信服務;
  • DataServer。dataServer 則負責數據相關服務,好比數據服務,獲取數據的推送,服務上下線通知等;
  • dataSyncServer。dataSyncServer 主要是處理一些數據同步相關的服務;

具體代碼以下:

private void openDataServer() {
    try {
        if (serverForSessionStarted.compareAndSet(false, true)) {
            server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                dataServerConfig.getPort()), serverHandlers
                .toArray(new ChannelHandler[serverHandlers.size()]));
        }
    } 
}

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    } 
}

這兩個server的handlers有部分重複,懷疑開發者在作功能遷移。

@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());
    return list;
}

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(clientOffHandler());
    list.add(getDataVersionsHandler());
    list.add(publishDataProcessor());
    list.add(sessionServerRegisterHandler());
    list.add(unPublishDataHandler());
    list.add(dataServerConnectionHandler());
    list.add(renewDatumHandler());
    list.add(datumSnapshotHandler());
    return list;
}

5.2.1 DataSyncServer

這裏用DataSyncServer作具體說明。

啓動 DataSyncServer 時,註冊了以下幾個 handler 用於處理 bolt 請求 :

圖5 DayaSyncServer 註冊的 Handler

DayaSyncServer 註冊的 Handler 以下:

  • getDataHandler

該 Handler 主要用於數據的獲取,當一個請求過來時,會經過請求中的 DataCenter 和 DataInfoId 獲取當前 DataServer 節點存儲的相應數據。

  • publishDataProcessor \ unPublishDataHandler

當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler ,Handler 會往 dataChangeEventCenter 中添加一個數據變動事件,用於異步地通知事件變動中心數據的變動。事件變動中心收到該事件以後,會往隊列中加入事件。此時 dataChangeEventCenter 會根據不一樣的事件類型異步地對上下線數據進行相應的處理。

與此同時,DataChangeHandler 會把這個事件變動信息經過 ChangeNotifier 對外發布,通知其餘節點進行數據同步。

  • notifyFetchDatumHandler

這是一個數據拉取請求,當該 Handler 被觸發時,通知當前 DataServer 節點進行版本號對比,若請求中數據的版本號高於當前節點緩存中的版本號,則會進行數據同步操做,保證數據是最新的。

  • notifyOnlineHandler

這是一個 DataServer 上線通知請求 Handler,當其餘節點上線時,會觸發該 Handler,從而當前節點在緩存中存儲新增的節點信息。用於管理節點狀態,到底是 INITIAL 仍是 WORKING 。

  • syncDataHandler

節點間數據同步 Handler,該 Handler 被觸發時,會經過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回全部大於當前請求數據版本號的全部數據,便於節點間進行數據同步。

  • dataSyncServerConnectionHandler

鏈接管理 Handler,當其餘 DataServer 節點與當前 DataServer 節點鏈接時,會觸發 connect 方法,從而在本地緩存中註冊鏈接信息,而當其餘 DataServer 節點與當前節點斷連時,則會觸發 disconnect 方法,從而刪除緩存信息,進而保證當前 DataServer 節點存儲有全部與之鏈接的 DataServer 節點。

5.2.2 調用鏈

dataSyncServer 調用鏈以下:

在 DataServerBootstrap 中有

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    }
}

而後有

public class BoltExchange implements Exchange<ChannelHandler> {
    @Override
    public Server open(URL url, ChannelHandler... channelHandlers) {
        BoltServer server = createBoltServer(url, channelHandlers);
        setServer(server, url);
        server.startServer();
        return server;
    }
  
    protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
        return new BoltServer(url, Arrays.asList(channelHandlers));
    }
}

BoltServer啓動以下,而且用戶自定義了UserProcessor。

public class BoltServer implements Server {
    public BoltServer(URL url, List<ChannelHandler> channelHandlers) {
        this.channelHandlers = channelHandlers;
        this.url = url;
    }
  
    public void startServer() {
        if (isStarted.compareAndSet(false, true)) {
                boltServer = new RpcServer(url.getPort(), true);
                initHandler();
                boltServer.start();
        }
    }  
  
    private void initHandler() {
        if (initHandler.compareAndSet(false, true)) {
            boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
                new ConnectionEventAdapter(ConnectionEventType.CONNECT,
                    getConnectionEventHandler(), this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
                new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
                    this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
                new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
                    getConnectionEventHandler(), this));

            registerUserProcessorHandler();
        }
    }  
  
    //這裏分了同步和異步
    private void registerUserProcessorHandler() {
        if (channelHandlers != null) {
            for (ChannelHandler channelHandler : channelHandlers) {
                if (HandlerType.PROCESSER.equals(channelHandler.getType())) {
                    if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) {
                        boltServer.registerUserProcessor(new SyncUserProcessorAdapter(
                            channelHandler));
                    } else {
                        boltServer.registerUserProcessor(new AsyncUserProcessorAdapter(
                            channelHandler));
                    }
                }
            }
        }
    }
  
}

5.3 HttpServer

HttpServer 是用於控制的Http 通訊組件以及其配置,提供一系列 REST 接口,用於 dashboard 管理、數據查詢等;

  • jerseyExchange。jersey組件通信組件,提供服務;
  • httpServer 主要提供一系列 http 接口,用於 dashboard 管理、數據查詢等;
private void openHttpServer() {
    try {
        if (httpServerStarted.compareAndSet(false, true)) {
            bindResourceConfig();
            httpServer = jerseyExchange.open(
                new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                    .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
        }
    } 
}

5.4 RaftClient

RaftClient 是基於Raft協議的客戶端,用來基於raft協議獲取meta server leader信息。

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

具體DefaultMetaServiceImpl中實現代碼以下:

@Override
public void startRaftClient() {
    try {
        if (clientStart.compareAndSet(false, true)) {
            String serverConf = getServerConfig();
            raftClient = new RaftClient(getGroup(), serverConf);
            raftClient.start();
        }
    }
}

功能模塊最後邏輯大體以下:

+->  getDataHandler
                                                  |
                                                  +->  publishDataProcessor
                                                  |
                                                  +--> unPublishDataHandler
                                                  |
                               +---------------+  +--> notifyFetchDatumHandler
                        +----> | DataSyncServer+->+
                        |      +---------------+  +--> notifyOnlineHandler
                        |                         |
                        |                         +--> syncDataHandler
                        |       +-------------+   |
+-------------------+   +---->  | HttpServer  |   +->  dataSyncServerConnectionHandler
|DataServerBootstrap+-->+       +-------------+
+-------------------+   |
                        |
                        |       +------------+    +->  getDataHandler
                        +-----> | RaftClient |    |
                        |       +------------+    +--> clientOffHandler
                        |                         |
                        |       +-------------+   +--> getDataVersionsHandler
                        +-----> |  DataServer +-->+
                                +-------------+   +--> publishDataProcessor
                                                  |
                                                  +--> sessionServerRegisterHandler
                                                  |
                                                  +--> unPublishDataHandler
                                                  |
                                                  +--> dataServerConnectionHandler
                                                  |
                                                  +--> renewDatumHandler
                                                  |
                                                  +->  datumSnapshotHandler

0x06 鏈接抽象 Exchange

Exchange 做爲 Client / Server 鏈接的抽象,負責節點之間的鏈接。

Data Server 這裏主要是 DataNodeExchanger 和 MetaNodeExchanger,用來:

  • 封裝 BoltExchange

  • 把 Bolt Client 和 Bolt Channel 進行抽象。

  • 提供能夠直接使用的網絡API,好比ForwardServiceImpl,GetSyncDataHandler這些散落的Bean能夠直接使用DataNodeExchanger來作網絡交互。

從對外接口中能夠看出來,

  • connect 函數用來建立鏈接,返回Channel,這個設置了handler,用來處理服務器推送;
  • request 函數用來發起請求;

這裏有兩個問題:

  • 爲何沒有 SessionNodeExchanger?
  • 一樣是網絡資源的管理,這裏和 ConnectionFactory有什麼區別?

多是由於Session Server可能會不少,不必保存Bolt client和Server,可是Session對應的有ConnectionFactory。ConectionFactory 是較低層次的封裝,下文會講解。

6.1 DataNodeExchanger

咱們以 DataNodeExchanger 爲例。

把全部 Data Server 相關的 「Server,Client概念 直接強相關」 的網絡操做統一集中在這裏。

6.1.1 具體實現

能夠看到,主要是對 boltExchange 進行了更高層次的封裝。

具體代碼以下:

import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Client;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;

public class DataNodeExchanger implements NodeExchanger {

    @Autowired
    private Exchange                          boltExchange;

    @Autowired
    private DataServerConfig                  dataServerConfig;

    @Resource(name = "dataClientHandlers")
    private Collection<AbstractClientHandler> dataClientHandlers;

    @Override
    public Response request(Request request) {
        Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);

        if (null != request.getCallBackHandler()) {
            client.sendCallback(request.getRequestUrl(), request.getRequestBody(),
                    request.getCallBackHandler(),
                    request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
            return () -> Response.ResultStatus.SUCCESSFUL;
        } else {
            final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
                    dataServerConfig.getRpcTimeout());
            return () -> result;
        }
    }

    public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
        if (client == null) {
            synchronized (this) {
                client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
                if (client == null) {
                    client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url,
                        dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()]));
                }
            }
        }

        Channel channel = client.getChannel(url);
        if (channel == null) {
            synchronized (this) {
                channel = client.getChannel(url);
                if (channel == null) {
                    channel = client.connect(url);
                }
            }
        }

        return channel;
    }
}

6.1.2 推送Handler相關Bean

上面代碼中使用了 dataClientHandlers,其是BoltClient所使用的,Server會對此進行推送,這兩個Handler會處理。

@Bean(name = "dataClientHandlers")
public Collection<AbstractClientHandler> dataClientHandlers() {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(notifyDataSyncHandler());
    list.add(fetchDataHandler());
    return list;
}

此時具體網絡概念以下:

+-------------------+
| ForwardServiceImpl| +-----------------+
+-------------------+                   |
                                        |
+--------------------+                  |        +-------------------+
| GetSyncDataHandler | +-----------------------> | DataNodeExchanger |
+--------------------+                  |        +-------+-----------+
                                        |                |
+----------------------------------+    |                |
| LocalDataServerChangeEventHandler| +--+                |
+----------------------------------+    |                |
                                        |                |
+------------------------------+        |                v
| DataServerChangeEventHandler +--------+          +-----+--------+
+------------------------------+                   | BoltExchange |
                                                   +-----+--------+
                                                         |
        +-----------------+                              |
        |  JerseyExchange |                              |
        +-------+---------+                      +-------+-------+
                |                                |               |
                |                                |               |
                v                                v               v
       +--------+----------+              +------+-----+   +-----+------+
       | JerseyJettyServer |              | BoltServer |   | BoltServer |
       +-------------------+              +------------+   +------------+
             httpServer                       server       dataSyncServer

0x07 Handler

AbstractServerHandler 和 AbstractClientHandler 對 com.alipay.sofa.registry.remoting.ChannelHandler 進行了實現。

這裏須要結合SOFABolt來說解。

7.1 SOFABolt

以 RpcServer 爲例,SOFABolt在這裏的使用是兩種處理器:

  • 用戶請求處理器 (UserProcessor) :SOFABolt 提供了兩種用戶請求處理器,SyncUserProcessor 與 AsyncUserProcessor。 兩者的區別在於,前者須要在當前處理線程以return返回值的形式返回處理結果;然後者,有一個 AsyncContext 存根,能夠在當前線程,也能夠在異步線程,調用 sendResponse 方法返回處理結果;
  • 鏈接事件處理器 (ConnectionEventProcessor) :SOFABolt 提供了兩種事件監聽,建連事件(ConnectionEventType.CONNECT)與斷連事件(ConnectionEventType.CLOSE),用戶能夠建立本身的事件處理器,並註冊到客戶端或者服務端。客戶端與服務端,均可以監聽到各自的建連與斷連事件;

7.2 定義

Handler主要代碼分別以下:

public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {
    protected NodeType getConnectNodeType() {
        return NodeType.DATA;
    }

    @Override
    public Object reply(Channel channel, T request) {
        try {
            logRequest(channel, request);
            checkParam(request);
            return doHandle(channel, request);
        } catch (Exception e) {
            return buildFailedResponse(e.getMessage());
        }
    }
}

public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {
    @Override
    public Object reply(Channel channel, T request) {
        try {
            logRequest(channel, request);
            checkParam(request);
            return doHandle(channel, request);
        } catch (Exception e) {
            return buildFailedResponse(e.getMessage());
        }
    }
}

系統能夠據此實現各類派生類。

這裏須要注意的是:ChannelHandler之中分紅兩類,分別以下,分別對應了RpcServer中的listener和userProcessor。

enum HandlerType {
    LISENTER,
    PROCESSER
}

以serverSyncHandlers爲例,只有dataSyncServerConnectionHandler是Listener,其他都是Processor。

這也符合常理,由於消息響應函數就是應該只有一個

@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());  只有這個是Listener,其他都是Processor。
    return list;
}

總結以下:

+->  getDataHandler
                         |
                         +->  publishDataProcessor
                         |
                         +--> unPublishDataHandler
+-------------------+    |
| serverSyncHandlers+-------> notifyFetchDatumHandler
+-------------------+    |
                         +--> notifyOnlineHandler
                         |
                         +--> syncDataHandler
                         |
                         +->  dataSyncServerConnectionHandler(Listener)

7.3 使用

在啓動時,會使用serverSyncHandlers完成BoltServer的啓動。

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    } 
}

BoltExchange中有:

@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
    BoltServer server = createBoltServer(url, channelHandlers);
    setServer(server, url);
    server.startServer();
    return server;
}

protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
		return new BoltServer(url, Arrays.asList(channelHandlers));
}

在BoltServer中有以下代碼,主要是設置Handler。

public void startServer() {
    if (isStarted.compareAndSet(false, true)) {
        try {
            boltServer = new RpcServer(url.getPort(), true);
            initHandler();
            boltServer.start();
        } 
    }
}

private void initHandler() {
        if (initHandler.compareAndSet(false, true)) {
            boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
                new ConnectionEventAdapter(ConnectionEventType.CONNECT,
                    getConnectionEventHandler(), this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
                new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
                    this));
            boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
                new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
                    getConnectionEventHandler(), this));

            registerUserProcessorHandler();
        }
}

最終則是調用到RpcServer之中,註冊了鏈接響應函數和用戶定義函數。

/**
 * Add processor to process connection event.
 */
public void addConnectionEventProcessor(ConnectionEventType type,
                                        ConnectionEventProcessor processor) {
    this.connectionEventListener.addConnectionEventProcessor(type, processor);
}

/**
 * Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for server side.
 */
@Override
public void registerUserProcessor(UserProcessor<?> processor) {
    UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors);
}

此時與SOFABolt邏輯以下:

+----------------------------+
| [RpcServer]                |
|                            |
|                            |  +--> EXCEPTION +--> DataSyncServerConnectionHandler
|                            |  |
|    connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler
|                            |  |
|                            |  +--> CLOSE  +-----> DataSyncServerConnectionHandler
|                            |
|                            |
|                            |  +--> EXCEPTION +--> DataSyncServerConnectionHandler
|                            |  |
|    connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler
|                            |  |
|                            |  +--> CLOSE  +-----> DataSyncServerConnectionHandler
|                            |
|                            |
|                            |  +---->  GetDataRequest  +-------->  getDataHandler
|                            |  |
|                            |  +---->  PublishDataRequest +----->  publishDataProcessor
|                            |  |
|    userProcessors +---------------->  UnPublishDataRequest +----> unPublishDataHandler
|                            |  |
|                            |  +---->  NotifyOnlineRequest-----> notifyFetchDatumHandler
|                            |  |
|                            |  +---->  NotifyOnlineRequest +-----> notifyOnlineHandler
+----------------------------+  |
                                +---->  SyncDataRequest   +-------> syncDataHandler

0x08 總結

至此,咱們把SOFARegistry網絡封裝和操做大體梳理了下。

從邏輯上看,阿里提供了兩個層級的封裝:

從鏈接角度看,阿里實現了基於 netty.channel.Channel 的封裝,從下往上看是:

  • 由於SOFABolt基於Netty,因此封裝的核心是netty.channel.Channel。
  • 在此基礎上, SOFABolt封裝了com.alipay.remoting.Connection
  • 而後 SOFARegistry 基於SOFABolt 封裝了 BoltChannel

從應用角度看,阿里實現了Server,Client層次的封裝,從下往上看是:

  • SOFABolt構建了 RpcServer,RpcClient。
  • SOFARegisty 基於 RpcServer,RpcClient 構建了BoltClient 和 BoltServer。
  • 而後 SOFARegistry 基於此構建了 BoltExchange。做爲 Client / Server 鏈接的抽象,負責節點之間的鏈接。
  • 最後構建了 XXXNodeExchanger,在 BoltExchange 基礎上,把全部 Data Server 內部的 「Server,Client概念 強相關」 的網絡操做統一集中在這裏,用戶能夠直接使用。以DataServer業務模塊爲例,其內部按照業務不一樣,實現了 DataNodeExchanger 和 MetaNodeExchanger。用以讓:
    • DataServer 內部直接使用 DataNodeExchanger 與其餘 DataServer 交互,
    • DataServer 內部直接使用 MetaNodeExchanger 與 MetaServer 交互。

具體邏輯大體以下:

+---------------------+                                 +---------------------+
|                     |                                 |                     |
|  DataNodeExchanger  |                                 |  MetaNodeExchanger  |
|                     |                                 |                     |
+----------------+----+                                 +--------+------------+
                 |                                               |
                 +-----------+                     +-------------+
                             |                     |
                             v                     v
                   +---------+---------------------+--------+
                   |  BoltExchange                          |
                   | +------------------------------------+ |
                   | |                                    | |
                   | |       Map<String, Client>          | |
                   | |                                    | |
                   | | ConcurrentHashMap<Integer, Server> | |
                   | |                                    | |
                   | +------------------------------------+ |
                   +----------------------------------------+
                          |                           |
                          |                           |
                          |                           |
                          v                           v
  +-----------------------+----------+      +---------+---------------------------------+
  | BoltClient                       |      | BoltServer                                |
  |                                  |      |                                           |
  | +------------------------------+ |      | +---------------------------------------+ |
  | |     remoting.rpc.RpcClient   | |      | |     remoting.rpc.RpcServer            | |
  | |  +-------------------------+ | |      | |                                       | |
  | |  | ConnectionEventHandler  | | |      | |   Map<String, Channel> channels       | |
  | |  |                         | | |      | |                                       | |
  | |  |  ConnectionFactory      | | |      | | List<ChannelHandler> channelHandlers  | |
  | |  +-------------------------+ | |      | |                                       | |
  | +------------------------------+ |      | +---------------------------------------+ |
  +----------------------------------+      +-------------------------------------------+
        |       |           |                 |                        |
        |       |           +---------------------------+              |
        |       v                             |         |              v
        |   +---+------------+ <--------------+   +-----v--------------+--------------+
        |   | ChannelHandler |                    |   BoltChannel                     |
        |   +----------------+                    |                                   |
        |                                         |  +------------------------------+ |
        |                                         |  |com.alipay.remoting.Connection| |
        |                                         |  +------------------------------+ |
        |                                         +-----------------------------------+
        |                                                       |
        v                                                       v
    +---+-------------+                               +---------+------------+
    | CallbackHandler |                               | netty.channel.Channel|
    +-----------------+                               +----------------------+

阿里這裏封裝的很是細緻。由於SOFARegistry是比較繁雜的系統,因此把網絡概念,功能作封裝是至關有必要的。你們在平常開發中可能不用這麼細緻的封裝,能夠參考阿里的思路,本身作選擇和裁剪便可。

0xFF 參考

https://timyang.net/architecture/cell-distributed-system/

SOFABolt 源碼分析12 - Connection 鏈接管理設計

SOFABolt 源碼分析2 - RpcServer 服務端啓動的設計

SOFABolt 源碼分析3 - RpcClient 客戶端啓動的設計

螞蟻通訊框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 - 優雅簡潔的設計之道

SofaBolt源碼分析-服務啓動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 - UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 - Connection 事件處理機制的設計

相關文章
相關標籤/搜索