Netty(三) 什麼是 TCP 拆、粘包?如何解決?

前言

記得前段時間咱們生產上的一個網關出現了故障。java

這個網關邏輯很是簡單,就是接收客戶端的請求而後解析報文最後發送短信。git

但這個請求並非常見的 HTTP ,而是利用 Netty 自定義的協議。程序員

有個前提是:網關是須要讀取一段完整的報文才能進行後面的邏輯。

問題是有天忽然發現網關解析報文出錯,查看了客戶端的發送日誌也沒發現問題,最後經過日誌發現收到了許多不完整的報文,有些還多了。github

因而想會不會是 TCP 拆、粘包帶來的問題,最後利用 Netty 自帶的拆包工具解決了該問題。shell

這便有了此文。api

TCP 協議

問題雖然解決了,但仍是得想一想緣由,爲啥會這樣?打破砂鍋問到底纔是一個靠譜的程序員。緩存

這就得從 TCP 這個協議提及了。網絡

TCP 是一個面向字節流的協議,它是性質是流式的,因此它並無分段。就像水流同樣,你無法知道何時開始,何時結束。app

因此他會根據當前的套接字緩衝區的狀況進行拆包或是粘包。框架

下圖展現了一個 TCP 協議傳輸的過程:

發送端的字節流都會先傳入緩衝區,再經過網絡傳入到接收端的緩衝區中,最終由接收端獲取。

當咱們發送兩個完整包到接收端的時候:

正常狀況會接收到兩個完整的報文。


但也有如下的狀況:

接收到的是一個報文,它是由發送的兩個報文組成的,這樣對於應用程序來講就很難處理了(這樣稱爲粘包)。


還有可能出現上面這樣的雖然收到了兩個包,可是裏面的內容倒是互相包含,對於應用來講依然沒法解析(拆包)。

對於這樣的問題只能經過上層的應用來解決,常見的方式有:

  • 在報文末尾增長換行符代表一條完整的消息,這樣在接收端能夠根據這個換行符來判斷消息是否完整。
  • 將消息分爲消息頭、消息體。能夠在消息頭中聲明消息的長度,根據這個長度來獲取報文(好比 808 協議)。
  • 規定好報文長度,不足的空位補齊,取的時候按照長度截取便可。

以上的這些方式咱們在 Netty 的 pipline 中里加入對應的解碼器均可以手動實現。

但其實 Netty 已經幫咱們作好了,徹底能夠開箱即用。

好比:

  • LineBasedFrameDecoder 能夠基於換行符解決。
  • DelimiterBasedFrameDecoder 可基於分隔符解決。
  • FixedLengthFrameDecoder 可指定長度解決。

字符串拆、粘包

下面來模擬一下最簡單的字符串傳輸。

仍是在以前的

https://github.com/crossoverJie/netty-action

進行演示。

在 Netty 客戶端中加了一個入口能夠循環發送 100 條字符串報文到接收端:

/**
     * 向服務端發消息 字符串
     * @param stringReqVO
     * @return
     */
    @ApiOperation("客戶端發送消息,字符串")
    @RequestMapping(value = "sendStringMsg", method = RequestMethod.POST)
    @ResponseBody
    public BaseResponse<NULLBody> sendStringMsg(@RequestBody StringReqVO stringReqVO){
        BaseResponse<NULLBody> res = new BaseResponse();

        for (int i = 0; i < 100; i++) {
            heartbeatClient.sendStringMsg(stringReqVO.getMsg()) ;
        }

        // 利用 actuator 來自增
        counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);

        SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
        sendMsgResVO.setMsg("OK") ;
        res.setCode(StatusEnum.SUCCESS.getCode()) ;
        res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
        return res ;
    }
    
    
    
    /**
     * 發送消息字符串
     *
     * @param msg
     */
    public void sendStringMsg(String msg) {
        ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
        message.writeBytes(msg.getBytes()) ;
        ChannelFuture future = channel.writeAndFlush(message);
        future.addListener((ChannelFutureListener) channelFuture ->
                LOGGER.info("客戶端手動發消息成功={}", msg));

    }

服務端直接打印便可:

@Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        LOGGER.info("收到msg={}", msg);

    }

順便提一下,這裏加的有一個字符串的解碼器:.addLast(new StringDecoder()) 其實就是把消息解析爲字符串。

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        out.add(msg.toString(charset));
    }

在 Swagger 中調用了客戶端的接口用於給服務端發送了 100 次消息:

正常狀況下接收端應該打印 100 次 hello 纔對,可是查看日誌會發現:

收到的內容有完整的、多的、少的、拼接的;這也就對應了上面提到的拆包、粘包。

該怎麼解決呢?這即可採用以前提到的 LineBasedFrameDecoder 利用換行符解決。

利用 LineBasedFrameDecoder 解決問題

LineBasedFrameDecoder 解碼器使用很是簡單,只須要在 pipline 鏈條上添加便可。

//字符串解析,換行防拆包
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())

構造函數中傳入了 1024 是指報的長度最大不超過這個值,具體能夠看下文的源碼分析。

而後咱們再進行一次測試看看結果:

注意,因爲 LineBasedFrameDecoder 解碼器是經過換行符來判斷的,因此在發送時,一條完整的消息須要加上 \n

最終的結果:

仔細觀察日誌,發現確實沒有一條被拆、粘包。

LineBasedFrameDecoder 的原理

目的達到了,來看看它的實現原理:

  1. 第一步主要就是 findEndOfLine 方法去找到當前報文中是否存在分隔符,存在就會返回分隔符所在的位置。
  2. 判斷是否須要丟棄,默認爲 false ,第一次走這個邏輯(下文會判斷是否須要改成 true)。
  3. 若是報文中存在換行符,就會將數據截取到那個位置。
  4. 若是不存在換行符(有多是拆包、粘包),就看當前報文的長度是否大於預設的長度。大於則須要緩存這個報文長度,並將 discarding 設爲 true。
  5. 若是是須要丟棄時,判斷是否找到了換行符,存在則須要丟棄掉以前記錄的長度而後截取數據。
  6. 若是沒有找到換行符,則將以前緩存的報文長度進行累加,用於下次拋棄。

從這個邏輯中能夠看出就是尋找報文中是否包含換行符,並進行相應的截取。

因爲是經過緩衝區讀取的,因此即便此次沒有換行符的數據,只要下一次的報文存在換行符,上一輪的數據也不會丟。

高效的編碼方式 Google Protocol

上面提到的其實就是在解碼中進行操做,咱們也能夠自定義本身的拆、粘包工具。

編解碼的主要目的就是爲了能夠編碼成字節流用於在網絡中傳輸、持久化存儲。

Java 中也能夠實現 Serializable 接口來實現序列化,但因爲它性能等緣由在一些 RPC 調用中用的不多。

Google Protocol 則是一個高效的序列化框架,下面來演示在 Netty 中如何使用。

安裝

首先第一步天然是安裝:

官網下載對應的包。

本地配置環境變量:

當執行 protoc --version 出現如下結果代表安裝成功:

定義本身的協議格式

接着是須要按照官方要求的語法定義本身的協議格式。

好比我這裏須要定義一個輸入輸出的報文格式:

BaseRequestProto.proto:

syntax = "proto2";

package protocol;

option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseRequestProto";

message RequestProtocol {
  required int32 requestId = 2;
  required string reqMsg = 1;
  

}

BaseResponseProto.proto:

syntax = "proto2";

package protocol;

option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseResponseProto";

message ResponseProtocol {
  required int32 responseId = 2;
  required string resMsg = 1;
  

}

再經過

protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto

protoc 命令將剛纔定義的協議格式轉換爲 Java 代碼,並生成在 /dev 目錄。

只須要將生成的代碼拷貝到咱們的項目中,同時引入依賴:

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.4.0</version>
</dependency>

利用 Protocol 的編解碼也很是簡單:

public class ProtocolUtil {

    public static void main(String[] args) throws InvalidProtocolBufferException {
        BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
                .setRequestId(123)
                .setReqMsg("你好啊")
                .build();

        byte[] encode = encode(protocol);

        BaseRequestProto.RequestProtocol parseFrom = decode(encode);

        System.out.println(protocol.toString());
        System.out.println(protocol.toString().equals(parseFrom.toString()));
    }

    /**
     * 編碼
     * @param protocol
     * @return
     */
    public static byte[] encode(BaseRequestProto.RequestProtocol protocol){
        return protocol.toByteArray() ;
    }

    /**
     * 解碼
     * @param bytes
     * @return
     * @throws InvalidProtocolBufferException
     */
    public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
        return BaseRequestProto.RequestProtocol.parseFrom(bytes);
    }
}

利用 BaseRequestProto 來作一個演示,先編碼再解碼最後比較最終的結果是否相同。答案確定是一致的。

利用 protoc 命令生成的 Java 文件裏已經幫咱們把編解碼所有都封裝好了,只須要簡單調用就好了。

能夠看出 Protocol 建立對象使用的是構建者模式,對使用者來講清晰易讀,更多關於構建器的內容能夠參考這裏

更多關於 Google Protocol 內容請查看官方開發文檔

結合 Netty

Netty 已經自帶了對 Google protobuf 的編解碼器,也是隻須要在 pipline 中添加便可。

server 端:

// google Protobuf 編解碼
.addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())

客戶端:

// google Protobuf 編解碼

.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance()))

.addLast(new ProtobufEncoder())
稍微注意的是,在構建 ProtobufDecoder 時須要顯式指定解碼器須要解碼成什麼類型。

我這裏服務端接收的是 BaseRequestProto,客戶端收到的是服務端響應的 BaseResponseProto 因此就設置了對應的實例。

一樣的提供了一個接口向服務端發送消息,當服務端收到了一個特殊指令時也會向客戶端返回內容:

@Override
    protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception {
        LOGGER.info("收到msg={}", msg.getReqMsg());

        if (999 == msg.getRequestId()){
            BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder()
                    .setResponseId(1000)
                    .setResMsg("服務端響應")
                    .build();
            ctx.writeAndFlush(responseProtocol) ;
        }

    }

在 swagger 中調用相關接口:

在日誌能夠看到服務端收到了消息,同時客戶端也收到了返回:

雖然說 Netty 封裝了 Google Protobuf 相關的編解碼工具,其實查看它的編碼工具就會發現也是利用上文提到的 api 實現的。

Protocol 拆、粘包

Google Protocol 的使用確實很是簡單,但仍是有值的注意的地方,好比它依然會有拆、粘包問題。

不妨模擬一下:

連續發送 100 次消息看服務端收到的怎麼樣:

會發現服務端在解碼的時候報錯,其實就是被拆、粘包了。

這點 Netty 天然也考慮到了,因此已經提供了相關的工具。

//拆包解碼
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufVarint32LengthFieldPrepender())

只須要在服務端和客戶端加上這兩個編解碼工具便可,再來發送一百次試試。

查看日誌發現沒有出現一次異常,100 條信息所有都接收到了。

這個編解碼工具能夠簡單理解爲是在消息體中加了一個 32 位長度的整形字段,用於代表當前消息長度。

總結

網絡這塊一樣是計算機的基礎,因爲近期在作相關的工做因此接觸的比較多,也算是給大學補課了。

後面會接着更新 Netty 相關的內容,最後會產出一個高性能的 HTTP 以及 RPC 框架,敬請期待。

上文相關的代碼:

https://github.com/crossoverJie/netty-action

號外

最近在總結一些 Java 相關的知識點,感興趣的朋友能夠一塊兒維護。

地址: https://github.com/crossoverJie/Java-Interview

歡迎關注公衆號一塊兒交流:

相關文章
相關標籤/搜索