最近在看Netty
相關的資料,恰好SOFA-BOLT
是一個比較成熟的Netty
自定義協議棧實現,因而決定研讀SOFA-BOLT
的源碼,詳細分析其協議的組成,簡單分析其客戶端和服務端的源碼實現。java
SOFA-BOLT
的代碼縮進和FastJson
相似,變量名稱強制對齊,對於通常開發者來講看着源碼會有不適感當前閱讀的源碼是2021-08
左右的SOFA-BOLT
倉庫的master
分支源碼。mysql
SOFA-BOLT
是螞蟻金融服務集團開發的一套基於Netty
實現的網絡通訊框架,本質是一套Netty
私有協議棧封裝,目的是爲了讓開發者能將更多的精力放在基於網絡通訊的業務邏輯實現上,而不是過多的糾結於網絡底層NIO
的實現以及處理難以調試的網絡問題和Netty
二次開發問題。SOFA-BOLT
的架構設計和功能以下:git
上圖來源於SOFA-BOLT官網https://www.sofastack.tech/projects/sofa-bolt/overviewgithub
因爲SOFA-BOLT
協議是基於Netty
實現的自定義協議棧,協議自己的實現能夠快速地在Encoder
和Decoder
的實現中找到,進一步定位到com.alipay.remoting.rpc
包中。從源碼得知,SOFA-BOLT
協議目前有兩個版本,協議在RpcProtocol
和RpcProtocolV2
的類頂部註釋中有比較詳細的介紹,基於這些介紹能夠簡單整理出兩個版本協議的基本構成。spring
V1
版本的協議請求Frame
基本構成:V1
版本的協議響應Frame
基本構成:針對V1
版本的協議,各個屬性展開以下:sql
Frame
和響應Frame
的公共屬性:屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備註 |
---|---|---|---|---|
proto | 協議編碼 | byte | 1 | V1 版本下,proto = 1 ,V2 版本下,proto = 2 |
type | 類型 | byte | 1 | 0 => RESPONSE ,1 => REQUEST ,2 => REQUEST_ONEWAY |
cmdcode | 命令編碼 | short | 2 | 1 => rpc request ,2 => rpc response |
ver2 | 命令版本 | byte | 1 | 從源碼得知目前固定爲1 |
requestId | 請求ID | int | 4 | 某個請求CMD 的全局惟一標識 |
codec | 編碼解碼器 | byte | 1 | - |
上表中,codec從字面上理解是編碼解碼器,其實是序列化和反序列實現的標記,V1和V2目前都是固定codec = 1,經過源碼跟蹤到SerializerManager的配置值爲Hessian2 = 1,也就是默認使用Hessian2進行序列化和反序列化,詳細見源碼中的HessianSerializershell
Frame
特有的屬性:屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備註 |
---|---|---|---|---|
timeout | 請求超時時間 | int | 4 | |
classLen | 請求對象(參數)類型的名稱長度 | short | 2 | 值>=0 |
headerLen | 請求頭長度 | short | 2 | 值>=0 |
contentLen | 請求內容長度 | int | 4 | 值>=0 |
className bytes | 請求對象(參數)類型的名稱 | byte[] |
- | |
header bytes | 請求頭 | byte[] |
- | |
content bytes | 請求內容 | byte[] |
- |
Frame
特有的屬性:屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備註 |
---|---|---|---|---|
respstatus | 響應狀態值 | short | 2 | 在ResponseStatus 中定義,目前內置13 種狀態,例如0 => SUCCESS |
classLen | 響應對象(參數)類型的名稱長度 | short | 2 | 值>=0 |
headerLen | 響應頭長度 | short | 2 | 值>=0 |
contentLen | 響應內容長度 | int | 4 | 值>=0 |
className bytes | 響應對象(參數)類型的名稱 | byte[] |
- | |
header bytes | 響應頭 | byte[] |
- | |
content bytes | 響應內容 | byte[] |
- |
這裏能夠看出V1
版本中的請求Frame
和響應Frame
只有細微的差異,(請求Frame
中獨立存在timeout
屬性,而響應Frame
獨立存在respstatus
屬性),絕大部分的屬性都是複用的,而且三個長度和三個字節數組是相互制約的:數據庫
classLen <=> className bytes
headerLen <=> header bytes
contentLen <=> content bytes
V2
版本的協議請求Frame
基本構成:V2
版本的協議響應Frame
基本構成:V2
版本的協議相比V1
版本多了2
個必傳公共屬性和1
個可選公共屬性:編程
屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備註 |
---|---|---|---|---|
ver1 | 協議版本 | byte | 1 | 是爲了在V2 版本協議中兼容V1 版本的協議 |
switch | 協議開關 | byte | 1 | 基於BitSet 實現的開關,最多8 個 |
CRC32 | 循環冗餘校驗值 | int | 4 | 可選的,由開關ProtocolSwitch.CRC_SWITCH_INDEX 決定是否啓用,啓用的時候會基於整個Frame 進行計算 |
這幾個新增屬性中,switch
表明ProtocolSwitch
實現中的BitSet
轉換出來的byte
字段,因爲byte
只有8
位,所以協議在傳輸過程當中最多隻能傳遞8
個開關的狀態,這些開關的下標爲[0,7]
。CRC32
是基於整個Frame
轉換出來的byte
數組進行計算,JDK
中有原生從API
,能夠簡單構建一個工具類以下進行計算:json
public enum Crc32Utils { /** * 單例 */ X; /** * 進行CRC32結果計算 * * @param content 內容 * @return crc32 result */ public long crc32(byte[] content) { CRC32 crc32 = new CRC32(); crc32.update(content, 0, content.length); long r = crc32.getValue(); // crc32.reset(); return r; } }
V2
版本協議把CRC32
的計算結果強制轉換爲int
類型,能夠思考一下這裏爲何不會溢出。
考慮到若是分析源碼,文章篇幅會比較長,而且若是有開發過Netty
自定義協議棧的經驗,SOFA-BOLT
的源碼並不複雜,這裏僅僅分析SOFA-BOLT
的架構和核心組件功能。協議由接口Protocol
定義:
public interface Protocol { // 命令編碼器 CommandEncoder getEncoder(); // 命令解碼器 CommandDecoder getDecoder(); // 心跳觸發器 HeartbeatTrigger getHeartbeatTrigger(); // 命令處理器 CommandHandler getCommandHandler(); // 命令工廠 CommandFactory getCommandFactory(); }
由V2
版本協議實現RpcProtocolV2
能夠得知:
另外,全部須要發送或者接收的Frame
都被封裝爲Command
,而Command
的類族以下:
也就是:
RequestCommand
定義了請求命令須要的全部屬性,最終由RpcCommandEncoderV2
進行編碼ResponseCommand
定義了響應命令須要的全部屬性,最終由RpcCommandDecoderV2
進行解碼梳理完上面的組件就能夠畫出下面的一個基於SOFA-BOLT
協議進行的Client => Server
的交互圖:
因爲sofa-bolt
已經封裝好了完整的RpcClient
和RpcServer
,使用此協議只須要引用依賴,而後初始化客戶端和服務端,編寫對應的UserProcessor
實現便可。引入相關依賴:
<dependency> <groupId>com.alipay.sofa</groupId> <artifactId>bolt</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.65</version> </dependency>
新建請求實體類RequestMessage
、響應實體類ResponseMessage
和對應的處理器RequestMessageProcessor
:
@Data public class RequestMessage implements Serializable { private Long id; private String content; } @Data public class ResponseMessage implements Serializable { private Long id; private String content; private Long status; } public class RequestMessageProcessor extends SyncUserProcessor<RequestMessage> { @Override public Object handleRequest(BizContext bizContext, RequestMessage requestMessage) throws Exception { ResponseMessage message = new ResponseMessage(); message.setContent(requestMessage.getContent()); message.setId(requestMessage.getId()); message.setStatus(10087L); return message; } @Override public String interest() { return RequestMessage.class.getName(); } }
其中處理器須要同步處理須要繼承超類SyncUserProcessor
,選用異步處理的時候須要繼承超類AsyncUserProcessor
,做爲參數的全部實體類必須實現Serializable
接口(若是有嵌套對象,每一個嵌套對象所在類也必須實現Serializable
接口),不然會出現序列化相關的異常。最後編寫客戶端和服務端的代碼:
@Slf4j public class BlotApp { private static final int PORT = 8081; private static final String ADDRESS = "127.0.0.1:" + PORT; public static void main(String[] args) throws Exception { RequestMessageProcessor processor = new RequestMessageProcessor(); RpcServer server = new RpcServer(8081, true); server.startup(); server.registerUserProcessor(processor); RpcClient client = new RpcClient(); client.startup(); RequestMessage request = new RequestMessage(); request.setId(99L); request.setContent("hello bolt"); ResponseMessage response = (ResponseMessage) client.invokeSync(ADDRESS, request, 2000); log.info("響應結果:{}", response); } }
運行輸出結果:
響應結果:ResponseMessage(id=99, content=hello bolt, status=10087)
本地測試MySQL
服務構建客戶表以下:
CREATE DATABASE test; USE test; CREATE TABLE t_customer ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, customer_name VARCHAR(32) NOT NULL );
爲了簡化JDBC
操做,引入spring-boot-starter-jdbc
(這裏只借用JdbcTemplate
的輕度封裝)相關依賴:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.20</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <version>2.3.0.RELEASE</version> </dependency>
編寫核心同步處理器:
// 建立 @Data public class CreateCustomerReq implements Serializable { private String customerName; } @Data public class CreateCustomerResp implements Serializable { private Long code; private Long customerId; } public class CreateCustomerProcessor extends SyncUserProcessor<CreateCustomerReq> { private final JdbcTemplate jdbcTemplate; public CreateCustomerProcessor(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public Object handleRequest(BizContext bizContext, CreateCustomerReq req) throws Exception { KeyHolder keyHolder = new GeneratedKeyHolder(); jdbcTemplate.update(connection -> { PreparedStatement ps = connection.prepareStatement("insert into t_customer(customer_name) VALUES (?)", Statement.RETURN_GENERATED_KEYS); ps.setString(1, req.getCustomerName()); return ps; }, keyHolder); CreateCustomerResp resp = new CreateCustomerResp(); resp.setCustomerId(Objects.requireNonNull(keyHolder.getKey()).longValue()); resp.setCode(RespCode.SUCCESS); return resp; } @Override public String interest() { return CreateCustomerReq.class.getName(); } } // 更新 @Data public class UpdateCustomerReq implements Serializable { private Long customerId; private String customerName; } public class UpdateCustomerProcessor extends SyncUserProcessor<UpdateCustomerReq> { private final JdbcTemplate jdbcTemplate; public UpdateCustomerProcessor(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public Object handleRequest(BizContext bizContext, UpdateCustomerReq req) throws Exception { UpdateCustomerResp resp = new UpdateCustomerResp(); int updateCount = jdbcTemplate.update("UPDATE t_customer SET customer_name = ? WHERE id = ?", ps -> { ps.setString(1, req.getCustomerName()); ps.setLong(2, req.getCustomerId()); }); if (updateCount > 0) { resp.setCode(RespCode.SUCCESS); } return resp; } @Override public String interest() { return UpdateCustomerReq.class.getName(); } } // 刪除 @Data public class DeleteCustomerReq implements Serializable { private Long customerId; } @Data public class DeleteCustomerResp implements Serializable { private Long code; } public class DeleteCustomerProcessor extends SyncUserProcessor<DeleteCustomerReq> { private final JdbcTemplate jdbcTemplate; public DeleteCustomerProcessor(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public Object handleRequest(BizContext bizContext, DeleteCustomerReq req) throws Exception { DeleteCustomerResp resp = new DeleteCustomerResp(); int updateCount = jdbcTemplate.update("DELETE FROM t_customer WHERE id = ?", ps -> ps.setLong(1,req.getCustomerId())); if (updateCount > 0){ resp.setCode(RespCode.SUCCESS); } return resp; } @Override public String interest() { return DeleteCustomerReq.class.getName(); } } // 查詢 @Data public class SelectCustomerReq implements Serializable { private Long customerId; } @Data public class SelectCustomerResp implements Serializable { private Long code; private Long customerId; private String customerName; } public class SelectCustomerProcessor extends SyncUserProcessor<SelectCustomerReq> { private final JdbcTemplate jdbcTemplate; public SelectCustomerProcessor(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public Object handleRequest(BizContext bizContext, SelectCustomerReq req) throws Exception { SelectCustomerResp resp = new SelectCustomerResp(); Customer result = jdbcTemplate.query("SELECT * FROM t_customer WHERE id = ?", ps -> ps.setLong(1, req.getCustomerId()), rs -> { Customer customer = null; if (rs.next()) { customer = new Customer(); customer.setId(rs.getLong("id")); customer.setCustomerName(rs.getString("customer_name")); } return customer; }); if (Objects.nonNull(result)) { resp.setCustomerId(result.getId()); resp.setCustomerName(result.getCustomerName()); resp.setCode(RespCode.SUCCESS); } return resp; } @Override public String interest() { return SelectCustomerReq.class.getName(); } @Data public static class Customer { private Long id; private String customerName; } }
編寫數據源、客戶端和服務端代碼:
public class CurdApp { private static final int PORT = 8081; private static final String ADDRESS = "127.0.0.1:" + PORT; public static void main(String[] args) throws Exception { HikariConfig config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"); config.setDriverClassName(Driver.class.getName()); config.setUsername("root"); config.setPassword("root"); HikariDataSource dataSource = new HikariDataSource(config); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); CreateCustomerProcessor createCustomerProcessor = new CreateCustomerProcessor(jdbcTemplate); UpdateCustomerProcessor updateCustomerProcessor = new UpdateCustomerProcessor(jdbcTemplate); DeleteCustomerProcessor deleteCustomerProcessor = new DeleteCustomerProcessor(jdbcTemplate); SelectCustomerProcessor selectCustomerProcessor = new SelectCustomerProcessor(jdbcTemplate); RpcServer server = new RpcServer(PORT, true); server.registerUserProcessor(createCustomerProcessor); server.registerUserProcessor(updateCustomerProcessor); server.registerUserProcessor(deleteCustomerProcessor); server.registerUserProcessor(selectCustomerProcessor); server.startup(); RpcClient client = new RpcClient(); client.startup(); CreateCustomerReq createCustomerReq = new CreateCustomerReq(); createCustomerReq.setCustomerName("throwable.club"); CreateCustomerResp createCustomerResp = (CreateCustomerResp) client.invokeSync(ADDRESS, createCustomerReq, 5000); System.out.println("建立用戶[throwable.club]結果:" + createCustomerResp); SelectCustomerReq selectCustomerReq = new SelectCustomerReq(); selectCustomerReq.setCustomerId(createCustomerResp.getCustomerId()); SelectCustomerResp selectCustomerResp = (SelectCustomerResp) client.invokeSync(ADDRESS, selectCustomerReq, 5000); System.out.println(String.format("查詢用戶[id=%d]結果:%s", selectCustomerReq.getCustomerId(), selectCustomerResp)); UpdateCustomerReq updateCustomerReq = new UpdateCustomerReq(); updateCustomerReq.setCustomerId(selectCustomerReq.getCustomerId()); updateCustomerReq.setCustomerName("throwx.cn"); UpdateCustomerResp updateCustomerResp = (UpdateCustomerResp) client.invokeSync(ADDRESS, updateCustomerReq, 5000); System.out.println(String.format("更新用戶[id=%d]結果:%s", updateCustomerReq.getCustomerId(), updateCustomerResp)); selectCustomerReq.setCustomerId(updateCustomerReq.getCustomerId()); selectCustomerResp = (SelectCustomerResp) client.invokeSync(ADDRESS, selectCustomerReq, 5000); System.out.println(String.format("查詢更新後的用戶[id=%d]結果:%s", selectCustomerReq.getCustomerId(), selectCustomerResp)); DeleteCustomerReq deleteCustomerReq = new DeleteCustomerReq(); deleteCustomerReq.setCustomerId(selectCustomerResp.getCustomerId()); DeleteCustomerResp deleteCustomerResp = (DeleteCustomerResp) client.invokeSync(ADDRESS, deleteCustomerReq, 5000); System.out.println(String.format("刪除用戶[id=%d]結果:%s", deleteCustomerReq.getCustomerId(), deleteCustomerResp)); } }
執行結果以下:
建立用戶[throwable.club]結果:CreateCustomerResp(code=0, customerId=1) 查詢用戶[id=1]結果:SelectCustomerResp(code=0, customerId=1, customerName=throwable.club) 更新用戶[id=1]結果:UpdateCustomerResp(code=0) 查詢更新後的用戶[id=1]結果:SelectCustomerResp(code=0, customerId=1, customerName=throwx.cn) 更新用戶[id=1]結果:DeleteCustomerResp(code=0)
確認最後刪除操做結束後驗證數據庫表,確認t_customer
表爲空。
這裏嘗試使用GO
語言編寫一個SOFA-BOLT
協議客戶端,考慮到實現一個完整版本會比較複雜,這裏簡化爲只實現Encode
和命令調用部分,暫時不處理響應和Decode
。編寫結構體RequestCommand
以下:
// RequestCommand sofa-bolt v2 req cmd type RequestCommand struct { ProtocolCode uint8 ProtocolVersion uint8 Type uint8 CommandCode uint16 CommandVersion uint8 RequestId uint32 Codec uint8 Switch uint8 Timeout uint32 ClassLength uint16 HeaderLength uint16 ContentLength uint32 ClassName []byte Header []byte Content []byte }
這裏注意一點,全部的整數類型必須使用具體的類型,例如uint
必須用uint32
,不然會出現Buffer
寫入異常的問題。接着編寫一個編碼方法:
// encode req => slice func encode(cmd *RequestCommand) []byte { container := make([]byte, 0) buf := bytes.NewBuffer(container) buf.WriteByte(cmd.ProtocolCode) buf.WriteByte(cmd.ProtocolVersion) buf.WriteByte(cmd.Type) binary.Write(buf, binary.BigEndian, cmd.CommandCode) buf.WriteByte(cmd.CommandVersion) binary.Write(buf, binary.BigEndian, cmd.RequestId) buf.WriteByte(cmd.Codec) buf.WriteByte(cmd.Switch) binary.Write(buf, binary.BigEndian, cmd.Timeout) binary.Write(buf, binary.BigEndian, cmd.ClassLength) binary.Write(buf, binary.BigEndian, cmd.HeaderLength) binary.Write(buf, binary.BigEndian, cmd.ContentLength) buf.Write(cmd.ClassName) buf.Write(cmd.Header) buf.Write(cmd.Content) return buf.Bytes() }
最後編寫TCP
客戶端:
type Req struct { Id int64 `json:"id"` Name string `json:"name"` } package main import ( "bytes" "encoding/binary" "encoding/json" "fmt" "net" ) func main() { con, err := net.Dial("tcp", "127.0.0.1:9999") if err != nil { fmt.Println("err:", err) return } defer con.Close() req := &Req{ Id: 8080, Name: "throwx.cn", } content, err := json.Marshal(req) if err != nil { fmt.Println("err:", err) return } var header []byte className := []byte("com.alipay.remoting.Req") cmd := &RequestCommand{ ProtocolCode: 2, ProtocolVersion: 2, Type: 1, CommandCode: 1, CommandVersion: 1, RequestId: 10087, Codec: 1, Switch: 0, Timeout: 5000, ClassLength: uint16(len(className)), HeaderLength: 0, ContentLength: uint32(len(content)), ClassName: className, Header: header, Content: content, } pkg := encode(cmd) _, err = con.Write(pkg) if err != nil { fmt.Println("err:", err) return } }
協議的V2版本Crc32屬性是可選的,這裏爲了簡化處理也暫時忽略了
這裏看到Content
屬性爲了簡化處理使用了JSON
作序列化,所以須要稍微改動SOFA-BOLT
的源碼,引入FastJson
和FastJsonSerializer
,改動見下圖:
先啓動BoltApp
(SOFA-BOLT
服務端),再執行GO
編寫的客戶端,結果以下:
SOFA-BOLT
是一個高性能成熟可擴展的Netty
私有協議封裝,比起原生Netty
編程,提供了便捷的同步、異步調用,提供基礎心跳支持和重連等特性。引入SyncUserProcessor
和AsyncUserProcessor
的功能,對於業務開發更加友好。SOFA-BOLT
協議本質也是一個緊湊、高性能的RPC
協議。在考慮引入Netty
進行底層通信的場景,能夠優先考慮使用SOFA-BOLT
或者考慮把SOFA-BOLT
做爲候選方案之一,只因SOFA-BOLT
是輕量級的,學習曲線平緩,基本沒有其餘中間件依賴。
Demo
所在倉庫:
(本文完 c-5-d e-a-20210806)