透視RPC協議:SOFA-BOLT協議源碼分析

前提

最近在看Netty相關的資料,恰好SOFA-BOLT是一個比較成熟的Netty自定義協議棧實現,因而決定研讀SOFA-BOLT的源碼,詳細分析其協議的組成,簡單分析其客戶端和服務端的源碼實現。java

  • 吐槽一下:SOFA-BOLT的代碼縮進和FastJson相似,變量名稱強制對齊,對於通常開發者來講看着源碼會有不適感

當前閱讀的源碼是2021-08左右的SOFA-BOLT倉庫的master分支源碼。mysql

SOFA-BOLT簡單介紹

SOFA-BOLT是螞蟻金融服務集團開發的一套基於Netty實現的網絡通訊框架,本質是一套Netty私有協議棧封裝,目的是爲了讓開發者能將更多的精力放在基於網絡通訊的業務邏輯實現上,而不是過多的糾結於網絡底層NIO的實現以及處理難以調試的網絡問題和Netty二次開發問題。SOFA-BOLT的架構設計和功能以下:git

上圖來源於SOFA-BOLT官網https://www.sofastack.tech/projects/sofa-bolt/overviewgithub

SOFA-BOLT協議透視

因爲SOFA-BOLT協議是基於Netty實現的自定義協議棧,協議自己的實現能夠快速地在EncoderDecoder的實現中找到,進一步定位到com.alipay.remoting.rpc包中。從源碼得知,SOFA-BOLT協議目前有兩個版本,協議在RpcProtocolRpcProtocolV2的類頂部註釋中有比較詳細的介紹,基於這些介紹能夠簡單整理出兩個版本協議的基本構成。spring

V1版本協議的基本構成

  • V1版本的協議請求Frame基本構成:

  • V1版本的協議響應Frame基本構成:

針對V1版本的協議,各個屬性展開以下:sql

  • 請求Frame和響應Frame的公共屬性:
屬性Code 屬性含義 Java類型 大小(byte) 備註
proto 協議編碼 byte 1 V1版本下,proto = 1V2版本下,proto = 2
type 類型 byte 1 0 => RESPONSE1 => REQUEST2 => REQUEST_ONEWAY
cmdcode 命令編碼 short 2 1 => rpc request2 => rpc response
ver2 命令版本 byte 1 從源碼得知目前固定爲1
requestId 請求ID int 4 某個請求CMD的全局惟一標識
codec 編碼解碼器 byte 1 -

上表中,codec從字面上理解是編碼解碼器,其實是序列化和反序列實現的標記,V1和V2目前都是固定codec = 1,經過源碼跟蹤到SerializerManager的配置值爲Hessian2 = 1,也就是默認使用Hessian2進行序列化和反序列化,詳細見源碼中的HessianSerializershell

  • 請求Frame特有的屬性:
屬性Code 屬性含義 Java類型 大小(byte) 備註
timeout 請求超時時間 int 4
classLen 請求對象(參數)類型的名稱長度 short 2 >=0
headerLen 請求頭長度 short 2 >=0
contentLen 請求內容長度 int 4 >=0
className bytes 請求對象(參數)類型的名稱 byte[] -
header bytes 請求頭 byte[] -
content bytes 請求內容 byte[] -
  • 響應Frame特有的屬性:
屬性Code 屬性含義 Java類型 大小(byte) 備註
respstatus 響應狀態值 short 2 ResponseStatus中定義,目前內置13種狀態,例如0 => SUCCESS
classLen 響應對象(參數)類型的名稱長度 short 2 >=0
headerLen 響應頭長度 short 2 >=0
contentLen 響應內容長度 int 4 >=0
className bytes 響應對象(參數)類型的名稱 byte[] -
header bytes 響應頭 byte[] -
content bytes 響應內容 byte[] -

這裏能夠看出V1版本中的請求Frame和響應Frame只有細微的差異,(請求Frame中獨立存在timeout屬性,而響應Frame獨立存在respstatus屬性),絕大部分的屬性都是複用的,而且三個長度和三個字節數組是相互制約的:數據庫

  • classLen <=> className bytes
  • headerLen <=> header bytes
  • contentLen <=> content bytes

V2版本協議的基本構成

  • V2版本的協議請求Frame基本構成:

  • V2版本的協議響應Frame基本構成:

V2版本的協議相比V1版本多了2個必傳公共屬性和1個可選公共屬性:編程

屬性Code 屬性含義 Java類型 大小(byte) 備註
ver1 協議版本 byte 1 是爲了在V2版本協議中兼容V1版本的協議
switch 協議開關 byte 1 基於BitSet實現的開關,最多8
CRC32 循環冗餘校驗值 int 4 可選的,由開關ProtocolSwitch.CRC_SWITCH_INDEX決定是否啓用,啓用的時候會基於整個Frame進行計算

這幾個新增屬性中,switch表明ProtocolSwitch實現中的BitSet轉換出來的byte字段,因爲byte只有8位,所以協議在傳輸過程當中最多隻能傳遞8個開關的狀態,這些開關的下標爲[0,7]CRC32是基於整個Frame轉換出來的byte數組進行計算,JDK中有原生從API,能夠簡單構建一個工具類以下進行計算:json

public enum Crc32Utils {

    /**
     * 單例
     */
    X;

    /**
     * 進行CRC32結果計算
     *
     * @param content 內容
     * @return crc32 result
     */
    public long crc32(byte[] content) {
        CRC32 crc32 = new CRC32();
        crc32.update(content, 0, content.length);
        long r = crc32.getValue();
        // crc32.reset();
        return r;
    }
}

V2版本協議把CRC32的計算結果強制轉換爲int類型,能夠思考一下這裏爲何不會溢出。

SOFA-BOLT架構

考慮到若是分析源碼,文章篇幅會比較長,而且若是有開發過Netty自定義協議棧的經驗,SOFA-BOLT的源碼並不複雜,這裏僅僅分析SOFA-BOLT的架構和核心組件功能。協議由接口Protocol定義:

public interface Protocol {
    
    // 命令編碼器
    CommandEncoder getEncoder();

    // 命令解碼器
    CommandDecoder getDecoder();

    // 心跳觸發器
    HeartbeatTrigger getHeartbeatTrigger();

    // 命令處理器
    CommandHandler getCommandHandler();

    // 命令工廠
    CommandFactory getCommandFactory();
}

V2版本協議實現RpcProtocolV2能夠得知:

另外,全部須要發送或者接收的Frame都被封裝爲Command,而Command的類族以下:

也就是:

  • RequestCommand定義了請求命令須要的全部屬性,最終由RpcCommandEncoderV2進行編碼
  • ResponseCommand定義了響應命令須要的全部屬性,最終由RpcCommandDecoderV2進行解碼

梳理完上面的組件就能夠畫出下面的一個基於SOFA-BOLT協議進行的Client => Server的交互圖:

SOFA-BOLT使用

因爲sofa-bolt已經封裝好了完整的RpcClientRpcServer,使用此協議只須要引用依賴,而後初始化客戶端和服務端,編寫對應的UserProcessor實現便可。引入相關依賴:

<dependency>
    <groupId>com.alipay.sofa</groupId>
    <artifactId>bolt</artifactId>
    <version>1.6.3</version>
</dependency>
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.65</version>
</dependency>

新建請求實體類RequestMessage、響應實體類ResponseMessage和對應的處理器RequestMessageProcessor

@Data
public class RequestMessage implements Serializable {

    private Long id;

    private String content;
}

@Data
public class ResponseMessage implements Serializable {

    private Long id;

    private String content;

    private Long status;
}

public class RequestMessageProcessor extends SyncUserProcessor<RequestMessage> {

    @Override
    public Object handleRequest(BizContext bizContext, RequestMessage requestMessage) throws Exception {
        ResponseMessage message = new ResponseMessage();
        message.setContent(requestMessage.getContent());
        message.setId(requestMessage.getId());
        message.setStatus(10087L);
        return message;
    }

    @Override
    public String interest() {
        return RequestMessage.class.getName();
    }
}

其中處理器須要同步處理須要繼承超類SyncUserProcessor,選用異步處理的時候須要繼承超類AsyncUserProcessor,做爲參數的全部實體類必須實現Serializable接口(若是有嵌套對象,每一個嵌套對象所在類也必須實現Serializable接口),不然會出現序列化相關的異常。最後編寫客戶端和服務端的代碼:

@Slf4j
public class BlotApp {

    private static final int PORT = 8081;
    private static final String ADDRESS = "127.0.0.1:" + PORT;

    public static void main(String[] args) throws Exception {
        RequestMessageProcessor processor = new RequestMessageProcessor();
        RpcServer server = new RpcServer(8081, true);
        server.startup();
        server.registerUserProcessor(processor);
        RpcClient client = new RpcClient();
        client.startup();
        RequestMessage request = new RequestMessage();
        request.setId(99L);
        request.setContent("hello bolt");
        ResponseMessage response = (ResponseMessage) client.invokeSync(ADDRESS, request, 2000);
        log.info("響應結果:{}", response);
    }
}

運行輸出結果:

響應結果:ResponseMessage(id=99, content=hello bolt, status=10087)

基於SOFA-BOLT協議編寫簡單CURD項目

本地測試MySQL服務構建客戶表以下:

CREATE DATABASE test;

USE test;

CREATE TABLE t_customer
(
    id            BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
    customer_name VARCHAR(32)     NOT NULL
);

爲了簡化JDBC操做,引入spring-boot-starter-jdbc(這裏只借用JdbcTemplate的輕度封裝)相關依賴:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.20</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.3.0.RELEASE</version>
</dependency>

編寫核心同步處理器:

// 建立
@Data
public class CreateCustomerReq implements Serializable {

    private String customerName;
}

@Data
public class CreateCustomerResp implements Serializable {

    private Long code;

    private Long customerId;
}

public class CreateCustomerProcessor extends SyncUserProcessor<CreateCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public CreateCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, CreateCustomerReq req) throws Exception {
        KeyHolder keyHolder = new GeneratedKeyHolder();
        jdbcTemplate.update(connection -> {
            PreparedStatement ps = connection.prepareStatement("insert into t_customer(customer_name) VALUES (?)",
                    Statement.RETURN_GENERATED_KEYS);
            ps.setString(1, req.getCustomerName());
            return ps;
        }, keyHolder);
        CreateCustomerResp resp = new CreateCustomerResp();
        resp.setCustomerId(Objects.requireNonNull(keyHolder.getKey()).longValue());
        resp.setCode(RespCode.SUCCESS);
        return resp;
    }

    @Override
    public String interest() {
        return CreateCustomerReq.class.getName();
    }
}

// 更新
@Data
public class UpdateCustomerReq implements Serializable {

    private Long customerId;

    private String customerName;
}


public class UpdateCustomerProcessor extends SyncUserProcessor<UpdateCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public UpdateCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, UpdateCustomerReq req) throws Exception {
        UpdateCustomerResp resp = new UpdateCustomerResp();
        int updateCount = jdbcTemplate.update("UPDATE t_customer SET customer_name = ? WHERE id = ?", ps -> {
            ps.setString(1, req.getCustomerName());
            ps.setLong(2, req.getCustomerId());
        });
        if (updateCount > 0) {
            resp.setCode(RespCode.SUCCESS);
        }
        return resp;
    }

    @Override
    public String interest() {
        return UpdateCustomerReq.class.getName();
    }
}

// 刪除
@Data
public class DeleteCustomerReq implements Serializable {

    private Long customerId;
}

@Data
public class DeleteCustomerResp implements Serializable {

    private Long code;
}

public class DeleteCustomerProcessor extends SyncUserProcessor<DeleteCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public DeleteCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, DeleteCustomerReq req) throws Exception {
        DeleteCustomerResp resp = new DeleteCustomerResp();
        int updateCount = jdbcTemplate.update("DELETE FROM t_customer WHERE id = ?", ps -> ps.setLong(1,req.getCustomerId()));
        if (updateCount > 0){
            resp.setCode(RespCode.SUCCESS);
        }
        return resp;
    }

    @Override
    public String interest() {
        return DeleteCustomerReq.class.getName();
    }
}

// 查詢
@Data
public class SelectCustomerReq implements Serializable {

    private Long customerId;
}

@Data
public class SelectCustomerResp implements Serializable {

    private Long code;

    private Long customerId;

    private String customerName;
}

public class SelectCustomerProcessor extends SyncUserProcessor<SelectCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public SelectCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, SelectCustomerReq req) throws Exception {
        SelectCustomerResp resp = new SelectCustomerResp();
        Customer result = jdbcTemplate.query("SELECT * FROM t_customer WHERE id = ?", ps -> ps.setLong(1, req.getCustomerId()), rs -> {
            Customer customer = null;
            if (rs.next()) {
                customer = new Customer();
                customer.setId(rs.getLong("id"));
                customer.setCustomerName(rs.getString("customer_name"));
            }
            return customer;
        });
        if (Objects.nonNull(result)) {
            resp.setCustomerId(result.getId());
            resp.setCustomerName(result.getCustomerName());
            resp.setCode(RespCode.SUCCESS);
        }
        return resp;
    }

    @Override
    public String interest() {
        return SelectCustomerReq.class.getName();
    }

    @Data
    public static class Customer {

        private Long id;
        private String customerName;
    }
}

編寫數據源、客戶端和服務端代碼:

public class CurdApp {

    private static final int PORT = 8081;
    private static final String ADDRESS = "127.0.0.1:" + PORT;

    public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        CreateCustomerProcessor createCustomerProcessor = new CreateCustomerProcessor(jdbcTemplate);
        UpdateCustomerProcessor updateCustomerProcessor = new UpdateCustomerProcessor(jdbcTemplate);
        DeleteCustomerProcessor deleteCustomerProcessor = new DeleteCustomerProcessor(jdbcTemplate);
        SelectCustomerProcessor selectCustomerProcessor = new SelectCustomerProcessor(jdbcTemplate);
        RpcServer server = new RpcServer(PORT, true);
        server.registerUserProcessor(createCustomerProcessor);
        server.registerUserProcessor(updateCustomerProcessor);
        server.registerUserProcessor(deleteCustomerProcessor);
        server.registerUserProcessor(selectCustomerProcessor);
        server.startup();
        RpcClient client = new RpcClient();
        client.startup();
        CreateCustomerReq createCustomerReq = new CreateCustomerReq();
        createCustomerReq.setCustomerName("throwable.club");
        CreateCustomerResp createCustomerResp = (CreateCustomerResp)
                client.invokeSync(ADDRESS, createCustomerReq, 5000);
        System.out.println("建立用戶[throwable.club]結果:" + createCustomerResp);
        SelectCustomerReq selectCustomerReq = new SelectCustomerReq();
        selectCustomerReq.setCustomerId(createCustomerResp.getCustomerId());
        SelectCustomerResp selectCustomerResp = (SelectCustomerResp)
                client.invokeSync(ADDRESS, selectCustomerReq, 5000);
        System.out.println(String.format("查詢用戶[id=%d]結果:%s", selectCustomerReq.getCustomerId(),
                selectCustomerResp));
        UpdateCustomerReq updateCustomerReq = new UpdateCustomerReq();
        updateCustomerReq.setCustomerId(selectCustomerReq.getCustomerId());
        updateCustomerReq.setCustomerName("throwx.cn");
        UpdateCustomerResp updateCustomerResp = (UpdateCustomerResp)
                client.invokeSync(ADDRESS, updateCustomerReq, 5000);
        System.out.println(String.format("更新用戶[id=%d]結果:%s", updateCustomerReq.getCustomerId(),
                updateCustomerResp));
        selectCustomerReq.setCustomerId(updateCustomerReq.getCustomerId());
        selectCustomerResp = (SelectCustomerResp)
                client.invokeSync(ADDRESS, selectCustomerReq, 5000);
        System.out.println(String.format("查詢更新後的用戶[id=%d]結果:%s", selectCustomerReq.getCustomerId(),
                selectCustomerResp));
        DeleteCustomerReq deleteCustomerReq = new DeleteCustomerReq();
        deleteCustomerReq.setCustomerId(selectCustomerResp.getCustomerId());
        DeleteCustomerResp deleteCustomerResp = (DeleteCustomerResp)
                client.invokeSync(ADDRESS, deleteCustomerReq, 5000);
        System.out.println(String.format("刪除用戶[id=%d]結果:%s", deleteCustomerReq.getCustomerId(),
                deleteCustomerResp));
    }
}

執行結果以下:

建立用戶[throwable.club]結果:CreateCustomerResp(code=0, customerId=1)
查詢用戶[id=1]結果:SelectCustomerResp(code=0, customerId=1, customerName=throwable.club)
更新用戶[id=1]結果:UpdateCustomerResp(code=0)
查詢更新後的用戶[id=1]結果:SelectCustomerResp(code=0, customerId=1, customerName=throwx.cn)
更新用戶[id=1]結果:DeleteCustomerResp(code=0)

確認最後刪除操做結束後驗證數據庫表,確認t_customer表爲空。

基於GO語言編寫SOFA-BOLT協議客戶端

這裏嘗試使用GO語言編寫一個SOFA-BOLT協議客戶端,考慮到實現一個完整版本會比較複雜,這裏簡化爲只實現Encode和命令調用部分,暫時不處理響應和Decode。編寫結構體RequestCommand以下:

// RequestCommand sofa-bolt v2 req cmd
type RequestCommand struct {
	ProtocolCode    uint8
	ProtocolVersion uint8
	Type            uint8
	CommandCode     uint16
	CommandVersion  uint8
	RequestId       uint32
	Codec           uint8
	Switch          uint8
	Timeout         uint32
	ClassLength     uint16
	HeaderLength    uint16
	ContentLength   uint32
	ClassName       []byte
	Header          []byte
	Content         []byte
}

這裏注意一點,全部的整數類型必須使用具體的類型,例如uint必須用uint32,不然會出現Buffer寫入異常的問題。接着編寫一個編碼方法:

// encode req => slice
func encode(cmd *RequestCommand) []byte {
	container := make([]byte, 0)
	buf := bytes.NewBuffer(container)
	buf.WriteByte(cmd.ProtocolCode)
	buf.WriteByte(cmd.ProtocolVersion)
	buf.WriteByte(cmd.Type)
	binary.Write(buf, binary.BigEndian, cmd.CommandCode)
	buf.WriteByte(cmd.CommandVersion)
	binary.Write(buf, binary.BigEndian, cmd.RequestId)
	buf.WriteByte(cmd.Codec)
	buf.WriteByte(cmd.Switch)
	binary.Write(buf, binary.BigEndian, cmd.Timeout)
	binary.Write(buf, binary.BigEndian, cmd.ClassLength)
	binary.Write(buf, binary.BigEndian, cmd.HeaderLength)
	binary.Write(buf, binary.BigEndian, cmd.ContentLength)
	buf.Write(cmd.ClassName)
	buf.Write(cmd.Header)
	buf.Write(cmd.Content)
	return buf.Bytes()
}

最後編寫TCP客戶端:

type Req struct {
	Id   int64  `json:"id"`
	Name string `json:"name"`
}

package main

import (
	"bytes"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"net"
)

func main() {
	con, err := net.Dial("tcp", "127.0.0.1:9999")
	if err != nil {
		fmt.Println("err:", err)
		return
	}
	defer con.Close()
	req := &Req{
		Id:   8080,
		Name: "throwx.cn",
	}
	content, err := json.Marshal(req)
	if err != nil {
		fmt.Println("err:", err)
		return
	}
	var header []byte
	className := []byte("com.alipay.remoting.Req")
	cmd := &RequestCommand{
		ProtocolCode:    2,
		ProtocolVersion: 2,
		Type:            1,
		CommandCode:     1,
		CommandVersion:  1,
		RequestId:       10087,
		Codec:           1,
		Switch:          0,
		Timeout:         5000,
		ClassLength:     uint16(len(className)),
		HeaderLength:    0,
		ContentLength:   uint32(len(content)),
		ClassName:       className,
		Header:          header,
		Content:         content,
	}
	pkg := encode(cmd)
	_, err = con.Write(pkg)
	if err != nil {
		fmt.Println("err:", err)
		return
	}
}

協議的V2版本Crc32屬性是可選的,這裏爲了簡化處理也暫時忽略了

這裏看到Content屬性爲了簡化處理使用了JSON作序列化,所以須要稍微改動SOFA-BOLT的源碼,引入FastJsonFastJsonSerializer,改動見下圖:

先啓動BoltAppSOFA-BOLT服務端),再執行GO編寫的客戶端,結果以下:

小結

SOFA-BOLT是一個高性能成熟可擴展的Netty私有協議封裝,比起原生Netty編程,提供了便捷的同步、異步調用,提供基礎心跳支持和重連等特性。引入SyncUserProcessorAsyncUserProcessor的功能,對於業務開發更加友好。SOFA-BOLT協議本質也是一個緊湊、高性能的RPC協議。在考慮引入Netty進行底層通信的場景,能夠優先考慮使用SOFA-BOLT或者考慮把SOFA-BOLT做爲候選方案之一,只因SOFA-BOLT是輕量級的,學習曲線平緩,基本沒有其餘中間件依賴。

Demo所在倉庫:

(本文完 c-5-d e-a-20210806)

相關文章
相關標籤/搜索