螞蟻通信框架SOFABolt之私有通信協議設計

 

前言

SOFABolt 是螞蟻金融服務集團開發的一套基於 Netty 實現的網絡通訊框架。html

  • 爲了讓 Java 程序員能將更多的精力放在基於網絡通訊的業務邏輯實現上,而不是過多的糾結於網絡底層 NIO 的實現以及處理難以調試的網絡問題,Netty 應運而生。java

  • 爲了讓中間件開發者能將更多的精力放在產品功能特性實現上,而不是重複地一遍遍製造通訊框架的輪子,SOFABolt 應運而生。git

Bolt 名字取自迪士尼動畫-閃電狗,是一個基於 Netty 最佳實踐的輕量、易用、高性能、易擴展的通訊框架。 這些年螞蟻金融服務集團在微服務與消息中間件在網絡通訊上解決過不少問題,積累了不少經驗,並持續的進行着優化和完善,但願能把總結出的解決方案沉澱到 SOFABolt 這個基礎組件裏,讓更多的使用網絡通訊的場景可以統一受益。 目前該產品已經運用在了螞蟻中間件的微服務 (SOFARPC)、消息中心、分佈式事務、分佈式開關、以及配置中心等衆多產品上。程序員

 

調試環境搭建

依賴工具

  • Mavengithub

  • Git算法

  • JDKjson

  • IntelliJ IDEA數組

源碼拉取

從官方倉庫https://github.com/alipay/sofa-bolt Fork 出屬於本身的倉庫,爲何要Fork ? 既然開始閱讀、調試源碼,咱們可能會寫一些註釋,有了本身的倉庫,能夠進行自由的提交。😈安全

使用 IntelliJ IDEAFork 出來的倉庫拉取代碼。服務器

example 模塊

在test模塊裏,官網提供了多個Bolt的使用示例。

咱們提供了一個 RpcClient 與 RpcServer,通過簡單的必要功能初始化,或者功能開關,便可使用。

RpcServer

執行 com.alipay.remoting.demo.RpcServerDemoByMain#main(args) 方法,啓動服務端。輸出日誌以下:

Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j2 ]
server start ok!

RpcClient

執行 com.alipay.remoting.demo.RpcClientDemoByMain#main(args) 方法,啓動服務端。輸出日誌以下:

Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j2 ]
invoke sync result = [HELLO WORLD! I'm server return]

如此,咱們就能夠愉快的進行 Netty 調試啦。讀源碼,必定要多多調試源碼。很是重要!!!👿

 

私有通信協議設計

 

圖1 - 私有協議與必要功能模塊

Protocol

字段名 字節範圍 備註
proto 1字節 協議的魔數
ver1 1字節 協議版本
type 1字節 (1)request (2)response (3) request oneway
cmdcode 2字節 遠程命令代碼
ver2 1字節 遠程命令版本
requestId 4字節 請求ID
codec 1字節 序列化代碼
switch 1字節 協議功能開關
timeout或者respstatus 4字節或者2字節 請求超時或者回復狀態
classLen 2字節 請求或響應類名稱的長度
headerLen 2字節 協議頭長度
contentLen 4字節 協議內容長度
content N字節 內容
CRC32(optional) 4字節 幀的CRC32(當ver1> 1時存在)

在Bolt通信框架中,有2個協議規範。由於設計偏差,其中RpcProtocol這個協議版本被廢棄,如下的解讀爲RpcProtocolV2版本。

  1. 首先,第一個字段是魔數,一般狀況下爲固定的幾個字節(咱們這邊規定爲1個字節)。 爲何須要這個字段,並且仍是一個固定的數?假設咱們在服務器上開了一個端口,好比 80 端口,若是沒有這個魔數,任何數據包傳遞到服務器,服務器都會根據自定義協議來進行處理。 例如,咱們直接經過 http://服務器ip 來訪問服務器(默認爲 80 端口), 服務端收到的是一個標準的 HTTP 協議數據包,可是它仍然會按照事先約定好的協議來處理 HTTP 協議,顯然,這是會解析出錯的。而有了這個魔數以後,服務端首先取出前面四個字節進行比對,可以在第一時間識別出這個數據包並不是是遵循自定義協議的,也就是無效數據包,爲了安全考慮能夠直接關閉鏈接以節省資源。 在 Java 的字節碼的二進制文件中,開頭的 1 個字節爲(byte)2 用來標識這是個字節碼文件,亦是殊途同歸之妙。

  2. 接下來一個字節爲版本號,一般狀況下是預留字段,用於協議升級的時候用到,有點相似 TCP 協議中的一個字段標識是 IPV4 協議仍是 IPV6 協議,其中第一個版本爲(byte) 1,第二個版本爲(byte) 2。

  3. 第三部分,type表示Rpc類型是請求命令仍是回覆命令。其中請求命令分爲request_oneway和request,其中request_oneway表明單工,即只請求,不用回覆。而request就是常規的請求回覆模型。

  4. 第四部分是遠程命令代碼,遠程命令代碼表明一種特定的遠程命令,每種命令有本身的編號。其中在Bolt,(short) 0被心跳所佔用,不能被其餘命令所使用。

  5. 第五部分是遠程命令代碼版本,其做用和協議版本做用相同,爲預留字段,用於遠程命令版本升級的時候用到。

  6. 第六部分爲請求編號,

  7. 第七部分爲序列化代碼,雖然字段標示是codec,可是實際的意思爲Serializer,兩者是不一樣的意思。Serializer主要用於將字節反序列化爲對象,或將對象序列化爲字節。咱們可使用hessian,json,protocol buff等。默認序列化爲Hessian2。

  8. 第八部分爲功能開關,這個能夠對通信協議部分功能的開啓仍是關閉來決定是否編解碼此位置,例如經過判斷協議crc功能是否開啓,判斷是否對內容進行循環冗餘校驗。

  9. 第九部分爲timeout或respstatus,在Rpc請求命令中此位置爲timout(超時時間),在Rpc回覆命令中此位置爲respstatus(回覆狀態)。回覆狀態有SUCCESS,ERROR,SERVER_EXCEPTION,TIMEOUT等。

  10. 第十部分爲classLen,headerLen,contentLen。這些字段表示在負載內容中,類,頭部以及內容所佔的長度。

  11. CRC32(optional),最後這個字段是可選擇的,經過協議開關來決定是否對內容進行循環冗餘校驗。

Encoder 與 Decoder

協議相關的編解碼方式: 私有協議須要有核心的encode與decode過程,而且針對業務負載能支持不一樣的序列化與反序列化機制。這部分,不一樣的私有協議,因爲字段的差別,核心encode和decode過程是不同的,所以須要分開考慮。

Encoder

首先咱們來看編碼實現,源代碼路徑 com.alipay.remoting.rpc.protocol.RpcCommandEncoderV2, 代碼以下:

  1 /**
  2  * Encode remoting command into ByteBuf v2.
  3  * 編碼遠程命令成ByteBuf 第二版本
  4  *
  5  * @author jiangping
  6  * @version $Id: RpcCommandEncoderV2.java, v 0.1 2017-05-27 PM8:11:27 tao Exp $
  7  */
  8 public class RpcCommandEncoderV2 implements CommandEncoder {
  9     /** logger  日誌 */
 10     private static final Logger logger = LoggerFactory.getLogger("RpcRemoting");
 11  12     /**
 13      * @see CommandEncoder#encode(ChannelHandlerContext, Serializable, ByteBuf)
 14      */
 15     @Override
 16     public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
 17         try {
 18             if (msg instanceof RpcCommand) {
 19                 /*
 20                  * proto: magic code for protocol 協議的魔數
 21                  * ver: version for protocol 協議版本
 22                  * type: request/response/request oneway Rpc命令類型
 23                  * cmdcode: code for remoting command 遠程命令代碼
 24                  * ver2:version for remoting command 遠程命令版本
 25                  * requestId: id of request 請求編號
 26                  * codec: code for codec 序列化代碼
 27                  * switch: function switch 協議功能開關
 28                  * (req)timeout: request timeout. 當命令類型是請求時,此位置爲超時時間,4個字節
 29                  * (resp)respStatus: response status 當命令類型是回覆時,此位置爲回覆狀態,2個字節
 30                  * classLen: length of request or response class name 請求類和回覆類的長度
 31                  * headerLen: length of header 頭部長度
 32                  * cotentLen: length of content 內容長度
 33                  * className 類名
 34                  * header 協議
 35                  * content 內容
 36                  * crc (optional) 幀的CRC32(當ver1 > 1時存在)
 37                  */
 38                 int index = out.writerIndex(); //寫指針
 39                 RpcCommand cmd = (RpcCommand) msg;
 40                 //寫入版本魔數 (byte) 2
 41                 out.writeByte(RpcProtocolV2.PROTOCOL_CODE);
 42                 //從鏈接屬性中獲取協議版本
 43                 Attribute<Byte> version = ctx.channel().attr(Connection.VERSION);
 44                 byte ver = RpcProtocolV2.PROTOCOL_VERSION_1;
 45                 if (version != null && version.get() != null) {
 46                     ver = version.get();
 47                 }
 48                 //寫入協議版本
 49                 out.writeByte(ver);
 50                 //寫入RPC類型代碼
 51                 out.writeByte(cmd.getType());
 52                 //寫入RPC遠程命令代碼值
 53                 out.writeShort(((RpcCommand) msg).getCmdCode().value());
 54                 //寫入遠程命令版本
 55                 out.writeByte(cmd.getVersion());
 56                 //寫入Rpc編號
 57                 out.writeInt(cmd.getId());
 58                 //寫入協議序列化值
 59                 out.writeByte(cmd.getSerializer());
 60                 //寫入協議功能開關
 61                 out.writeByte(cmd.getProtocolSwitch().toByte());
 62                 // 判斷命令是RequestCommand仍是ResponseCommand來寫入超時仍是回覆狀態值
 63                 if (cmd instanceof RequestCommand) {
 64                     //timeout
 65                     out.writeInt(((RequestCommand) cmd).getTimeout());
 66                 }
 67                 if (cmd instanceof ResponseCommand) {
 68                     //response status
 69                     ResponseCommand response = (ResponseCommand) cmd;
 70                     out.writeShort(response.getResponseStatus().getValue());
 71                 }
 72                 //寫入類長度
 73                 out.writeShort(cmd.getClazzLength());
 74                 //寫入頭部長度
 75                 out.writeShort(cmd.getHeaderLength());
 76                 //寫入內容長度
 77                 out.writeInt(cmd.getContentLength());
 78                 //寫入類
 79                 if (cmd.getClazzLength() > 0) {
 80                     out.writeBytes(cmd.getClazz());
 81                 }
 82                 //寫入頭部
 83                 if (cmd.getHeaderLength() > 0) {
 84                     out.writeBytes(cmd.getHeader());
 85                 }
 86                 //寫入內容
 87                 if (cmd.getContentLength() > 0) {
 88                     out.writeBytes(cmd.getContent());
 89                 }
 90                 //經過判斷協議是v2且crc功能是開啓的,對內容進行循環冗餘校驗
 91                 if (ver == RpcProtocolV2.PROTOCOL_VERSION_2
 92                     && cmd.getProtocolSwitch().isOn(ProtocolSwitch.CRC_SWITCH_INDEX)) {
 93                     // compute the crc32 and write to out
 94                     byte[] frame = new byte[out.readableBytes()];
 95                     out.getBytes(index, frame);
 96                     out.writeInt(CrcUtil.crc32(frame));
 97                 }
 98             } else {
 99                 // 拋出異常
100                 String warnMsg = "msg type [" + msg.getClass() + "] is not subclass of RpcCommand";
101                 logger.warn(warnMsg);
102             }
103         } catch (Exception e) {
104             logger.error("Exception caught!", e);
105             throw e;
106         }
107     }
108 }

 

從代碼中,咱們能夠看到Netty裏面的數據讀寫是以ByteBuf爲單位進行交互的,咱們就來簡要了解一下ByteBuf。

ByteBuf結構

 

以上就是一個 ByteBuf 的結構圖,從上面這幅圖能夠看到

  1. ByteBuf 是一個字節容器,容器裏面的的數據分爲三個部分,第一個部分是已經丟棄的字節,這部分數據是無效的;第二部分是可讀字節,這部分數據是 ByteBuf 的主體數據, 從 ByteBuf 裏面讀取的數據都來自這一部分;最後一部分的數據是可寫字節,全部寫到 ByteBuf 的數據都會寫到這一段。最後一部分虛線表示的是該 ByteBuf 最多還能擴容多少容量

  2. 以上三段內容是被兩個指針給劃分出來的,從左到右,依次是讀指針(readerIndex)、寫指針(writerIndex),而後還有一個變量 capacity,表示 ByteBuf 底層內存的總容量

  3. 從 ByteBuf 中每讀取一個字節,readerIndex 自增1,ByteBuf 裏面總共有 writerIndex-readerIndex 個字節可讀, 由此能夠推論出當 readerIndex 與 writerIndex 相等的時候,ByteBuf 不可讀

  4. 寫數據是從 writerIndex 指向的部分開始寫,每寫一個字節,writerIndex 自增1,直到增到 capacity,這個時候,表示 ByteBuf 已經不可寫了

  5. ByteBuf 裏面其實還有一個參數 maxCapacity,當向 ByteBuf 寫數據的時候,若是容量不足,那麼這個時候能夠進行擴容,直到 capacity 擴容到 maxCapacity,超過 maxCapacity 就會報錯

Netty 使用 ByteBuf 這個數據結構能夠有效地區分可讀數據和可寫數據,讀寫之間相互沒有衝突,固然,ByteBuf 只是對二進制數據的抽象,具體底層的實現咱們在下面的小節會講到,在這一小節,咱們 只須要知道 Netty 關於數據讀寫只認 ByteBuf。

 

容量 API

capacity()

表示 ByteBuf 底層佔用了多少字節的內存(包括丟棄的字節、可讀字節、可寫字節),不一樣的底層實現機制有不一樣的計算方式,後面咱們講 ByteBuf 的分類的時候會講到

maxCapacity()

表示 ByteBuf 底層最大可以佔用多少字節的內存,當向 ByteBuf 中寫數據的時候,若是發現容量不足,則進行擴容,直到擴容到 maxCapacity,超過這個數,就拋異常

readableBytes() 與 isReadable()

readableBytes() 表示 ByteBuf 當前可讀的字節數,它的值等於 writerIndex-readerIndex,若是二者相等,則不可讀,isReadable() 方法返回 false

writableBytes()、 isWritable() 與 maxWritableBytes()

writableBytes() 表示 ByteBuf 當前可寫的字節數,它的值等於 capacity-writerIndex,若是二者相等,則表示不可寫,isWritable() 返回 false,可是這個時候,並不表明不能往 ByteBuf 中寫數據了, 若是發現往 ByteBuf 中寫數據寫不進去的話,Netty 會自動擴容 ByteBuf,直到擴容到底層的內存大小爲 maxCapacity,而 maxWritableBytes() 就表示可寫的最大字節數,它的值等於 maxCapacity-writerIndex

 

讀寫API

本質上,關於 ByteBuf 的讀寫均可以看做從指針開始的地方開始讀寫數據

writeBytes(byte[] src) 與 buffer.readBytes(byte[] dst)

writeBytes() 表示把字節數組 src 裏面的數據所有寫到 ByteBuf,而 readBytes() 指的是把 ByteBuf 裏面的數據所有讀取到 dst,這裏 dst 字節數組的大小一般等於 readableBytes(),而 src 字節數組大小的長度一般小於等於 writableBytes()

writeByte(byte b) 與 buffer.readByte()

writeByte() 表示往 ByteBuf 中寫一個字節,而 buffer.readByte() 表示從 ByteBuf 中讀取一個字節,相似的 API 還有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 與 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 這裏就不一一贅述了,相信讀者應該很容易理解這些 API

與讀寫 API 相似的 API 還有 getBytes、getByte() 與 setBytes()、setByte() 系列,惟一的區別就是 get/set 不會改變讀寫指針,而 read/write 會改變讀寫指針。

 

Decoder

接下來咱們來看解碼實現過程,源代碼路徑 com.alipay.remoting.rpc.protocol.RpcCommandDecoderV2

首先須要可讀數據進行長度判斷,是否大於請求報文頭部和回覆報文頭部的最小長度。以及對ByteBuf進行魔數的驗證,當不是可識別的協議,即拋出異常。

代碼以下:

 private int                 lessLen;
​
    {
        lessLen = RpcProtocolV2.getResponseHeaderLength() < RpcProtocolV2.getRequestHeaderLength() ? RpcProtocolV2
            .getResponseHeaderLength() : RpcProtocolV2.getRequestHeaderLength();
    }
    
     // 請求報文頭部和回覆報文頭部的最小長度
        // the less length between response header and request header
        if (in.readableBytes() >= lessLen) {
            //保存當前的讀指針
            in.markReaderIndex();
            //讀取協議魔數
            byte protocol = in.readByte();
            //恢復讀指針到原來的位置,即 in.mark..位置
            in.resetReaderIndex();
            if (protocol == RpcProtocolV2.PROTOCOL_CODE) {
               ......
            } else {
                //發現魔數異常,拋出不知道的協議錯誤!
                String emsg = "Unknown protocol: " + protocol;
                logger.error(emsg);
                throw new RuntimeException(emsg);
            }
​
        }

 

 

讀寫指針相關的 API

readerIndex() 與 readerIndex(int)

前者表示返回當前的讀指針 readerIndex, 後者表示設置讀指針

writeIndex() 與 writeIndex(int)

前者表示返回當前的寫指針 writerIndex, 後者表示設置寫指針

markReaderIndex() 與 resetReaderIndex()

前者表示把當前的讀指針保存起來,後者表示把當前的讀指針恢復到以前保存的值,下面兩段代碼是等價的

// 代碼片斷1
int readerIndex = buffer.readerIndex();
// .. 其餘操做
buffer.readerIndex(readerIndex);


// 代碼片斷二
buffer.markReaderIndex();
// .. 其餘操做
buffer.resetReaderIndex();

但願你們多多使用代碼片斷二這種方式,不須要本身定義變量,不管 buffer 看成參數傳遞到哪裏,調用 resetReaderIndex() 均可以恢復到以前的狀態,在解析自定義協議的數據包的時候很是常見,推薦你們使用這一對 API

markWriterIndex() 與 resetWriterIndex()

 

RPC請求命令解碼和回覆命令解碼是類似的,如下我以請求解碼爲例進行解讀:

 1 if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
 2                         //decode request 因已經讀取三個byte了,因此須要減3
 3                         if (in.readableBytes() >= RpcProtocolV2.getRequestHeaderLength() - 3) {
 4                             short cmdCode = in.readShort();
 5                             byte ver2 = in.readByte();
 6                             int requestId = in.readInt();
 7                             byte serializer = in.readByte();
 8                             byte protocolSwitchValue = in.readByte();
 9                             int timeout = in.readInt();
10                             short classLen = in.readShort();
11                             short headerLen = in.readShort();
12                             int contentLen = in.readInt();
13                             byte[] clazz = null;
14                             byte[] header = null;
15                             byte[] content = null;
16 17                             // decide the at-least bytes length for each version
18                             int lengthAtLeastForV1 = classLen + headerLen + contentLen;
19                             //判斷協議是否開啓CRC,若有,最小bytes長度加4
20                             boolean crcSwitchOn = ProtocolSwitch.isOn(
21                                 ProtocolSwitch.CRC_SWITCH_INDEX, protocolSwitchValue);
22                             int lengthAtLeastForV2 = classLen + headerLen + contentLen;
23                             if (crcSwitchOn) {
24                                 lengthAtLeastForV2 += 4;// crc int
25                             }
26 27                             // 若是知足V1協議且長度大於最小V1協議長度 或 知足V2協議且長度大於最小V2協議長度,則繼續讀取
28                             // continue read
29                             if ((version == RpcProtocolV2.PROTOCOL_VERSION_1 && in.readableBytes() >= lengthAtLeastForV1)
30                                 || (version == RpcProtocolV2.PROTOCOL_VERSION_2 && in
31                                     .readableBytes() >= lengthAtLeastForV2)) {
32                                 // 讀取類
33                                 if (classLen > 0) {
34                                     clazz = new byte[classLen];
35                                     in.readBytes(clazz);
36                                 }
37                                 // 讀取頭部
38                                 if (headerLen > 0) {
39                                     header = new byte[headerLen];
40                                     in.readBytes(header);
41                                 }
42                                 // 讀取內容
43                                 if (contentLen > 0) {
44                                     content = new byte[contentLen];
45                                     in.readBytes(content);
46                                 }
47                                 if (version == RpcProtocolV2.PROTOCOL_VERSION_2 && crcSwitchOn) {
48                                     //校驗內容
49                                     checkCRC(in, startIndex);
50                                 }
51                             } else {// not enough data 不足夠的數據,重置讀指針
52                                 in.resetReaderIndex();
53                                 return;
54                             }
55 56                             RequestCommand command;
57                             //判斷是心跳命令仍是請求命令
58                             if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
59                                 command = new HeartbeatCommand();
60                             } else {
61                                 command = createRequestCommand(cmdCode);
62                             }
63                             //封裝實體
64                             command.setType(type);
65                             command.setVersion(ver2);
66                             command.setId(requestId);
67                             command.setSerializer(serializer);
68                             command.setProtocolSwitch(ProtocolSwitch.create(protocolSwitchValue));
69                             command.setTimeout(timeout);
70                             command.setClazz(clazz);
71                             command.setHeader(header);
72                             command.setContent(content);
73 74                             out.add(command);
75                         } else {
76                             in.resetReaderIndex();
77                         }
78  

 

Heartbeat

協議相關的心跳觸發與處理:不一樣的協議對心跳的需求,處理邏輯也多是不一樣的。所以心跳的觸發邏輯,心跳的處理邏輯,也都須要單獨考慮。源代碼路徑爲:com.alipay.remoting.rpc.protocol.RpcHeartbeatTrigger

 

/** max trigger times 最大觸發次數,默認爲3次 */
    public static final Integer maxCount               = ConfigManager.tcp_idle_maxtimes();
​
    private static final long   heartbeatTimeoutMillis = 1000;
     @Override
    public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
        //得到鏈接心跳次數
        Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
        final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
        //若是心跳次數觸發大於3次,則關閉鏈接
        if (heartbeatTimes >= maxCount) {
            try {
                conn.close();
                //拋出異常
                logger.error(
                    "Heartbeat failed for {} times, close the connection from client side: {} ",
                    heartbeatTimes, RemotingUtil.parseRemoteAddress(ctx.channel()));
            } catch (Exception e) {
                logger.warn("Exception caught when closing connection in SharableHandler.", e);
            }
        } else {
            boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
            if (!heartbeatSwitch) {
                return;
            }
            final HeartbeatCommand heartbeat = new HeartbeatCommand();
​
            final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
                new InvokeCallbackListener() {
                    @Override
                    public void onResponse(InvokeFuture future) {
                        ResponseCommand response;
                        ......
                        // 觸發次數加一
                            Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
                            ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
                        }
                    }
​
                    @Override
                    public String getRemoteAddress() {
                        return ctx.channel().remoteAddress().toString();
                    }
                }, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
            final int heartbeatId = heartbeat.getId();
            conn.addInvokeFuture(future);
            if (logger.isDebugEnabled()) {
                logger.debug("Send heartbeat, successive count={}, Id={}, to remoteAddr={}",
                    heartbeatTimes, heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
            }
            //異步回調結果
            ctx.writeAndFlush(heartbeat).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    ......
                }
            });
            //TimerHolder爲Netty工具類時間輪算法實現 
            TimerHolder.getTimer().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
                    if (future != null) {
                        future.putResponse(commandFactory.createTimeoutResponse(conn
                            .getRemoteAddress()));
                        future.tryAsyncExecuteInvokeCallbackAbnormally();
                    }
                }
            }, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
        }
​
    }

 

對HashedWheelTimer感興趣的人,能夠了解一下如下文章。

 

Command 與 Command Handler

  • 可擴展的命令與命令處理器管理

     

    圖2 - 通訊命令設計舉例

  • 負載命令:通常傳輸的業務的具體數據,好比帶着請求參數,響應結果的命令;

  • 控制命令:一些功能管理命令,心跳命令等,它們一般完成一些複雜的分佈式跨節點的協調功能,以此來保證負載命令通訊過程的穩定,是必不可少的一部分。

  • 協議的通訊過程,會有各類命令定義,邏輯上,咱們把傳輸業務具體負載的請求對象,叫作負載命令(Payload Command),另外一種叫作控制命令(Control Command),好比一些功能管理命令,或者心跳命令。

  • 定義了通訊命令,咱們還須要定義命令處理器,用來編寫各個命令對應的業務處理邏輯。同時,咱們須要保存命令與命令處理器的映射關係,以便在處理階段,走到正確的處理器。

     

    commandFactory與其RpcCommandFactory

    這2個類的主要做用爲命令工廠的做用,用請求實體生成請求命令,以及生成一些帶着請求參數,響應結果的命令。回覆狀態有SUCCESS,ERROR,SERVER_EXCEPTION,TIMEOUT等。

     

    RpcCommandHandler和CommandHandler

     1 /**
     2  * Command handler.
     3  *  命令處理類
     4  * @author jiangping
     5  * @version $Id: CommandHandler.java, v 0.1 2015-12-14 PM4:03:55 tao Exp $
     6  */
     7 public interface CommandHandler {
     8     /**
     9      * Handle the command.
    10      *  處理命令
    11      * @param ctx
    12      * @param msg
    13      * @throws Exception
    14      */
    15     void handleCommand(RemotingContext ctx, Object msg) throws Exception;
    16 17     /**
    18      * Register processor for command with specified code.
    19      * 註冊命令特定代碼的處理器
    20      * @param cmd
    21      * @param processor
    22      */
    23     void registerProcessor(CommandCode cmd, RemotingProcessor<?> processor);
    24 25     /**
    26      * Register default executor for the handler.
    27      *  註冊處理類的默認執行者
    28      * @param executor
    29      */
    30     void registerDefaultExecutor(ExecutorService executor);
    31 32     /**
    33      * Get default executor for the handler.
    34      * 獲得處理類的默認執行者
    35      */
    36     ExecutorService getDefaultExecutor();
    37 38 }

    經過建立ExecutorService線程池,將命令的處理提交給線程池來實現。若是沒有爲此處理類設置線程池,Bolt默認建立一個如下參數的線程池:

    • corePoolSize(線程池的基本大小) : 20

    • maximumPoolSize(線程池最大大小) :400

    • keepAliveTime(線程活動保持時間): 60s

    • runnableTaskQueue(任務隊列): ArrayBlockingQueue,隊列大小爲6000

    • ThreadFactory: 一個建立前綴爲"Bolt-default-executor"的命名工廠。

       

    如若對線程池瞭解很少的選手,能夠閱讀如下文章,認知一下。

    理解線程池到走進dubbo源碼

 

其餘

關於螞蟻通信框架SOFABolt之私有通信協議設計詳解到這裏就結束了。固然以上全部註釋,我已在個人github上上傳了個人Bolt註釋庫。

連接:https://github.com/sanshengshui/bolt

原創不易,若是感受不錯,但願給個推薦!您的支持是我寫做的最大動力!

版權聲明:

做者:穆書偉

github出處:https://github.com/sanshengshui    

我的博客出處:https://sanshengshui.github.io/

相關文章
相關標籤/搜索