tcp粘包和拆包的處理方案

隨着智能硬件愈來愈流行,不少後端開發人員都有可能接觸到socket編程。而不少狀況下,服務器與端上須要保證數據的有序,穩定到達,天然而然就會選擇基於tcp/ip協議的socekt開發。開發過程當中,常常會遇到tcp粘包,拆包的問題,本文將從產生緣由,和解決方案以及workerman是如何處理粘包拆包問題的,這幾個層面來講明這個問題。php

什麼是粘包拆包編程

對於什麼是粘包、拆包問題,我想先舉兩個簡單的應用場景:json

  1. 客戶端和服務器創建一個鏈接,客戶端發送一條消息,客戶端關閉與服務端的鏈接。後端

  2. 客戶端和服務器簡歷一個鏈接,客戶端連續發送兩條消息,客戶端關閉與服務端的鏈接。數組

對於第一種狀況,服務端的處理流程能夠是這樣的:當客戶端與服務端的鏈接創建成功以後,服務端不斷讀取客戶端發送過來的數據,當客戶端與服務端鏈接斷開以後,服務端知道已經讀完了一條消息,而後進行解碼和後續處理...。對於第二種狀況,若是按照上面相同的處理邏輯來處理,那就有問題了,咱們來看看第二種狀況下客戶端發送的兩條消息遞交到服務端有可能出現的狀況:服務器

第一種狀況:網絡

服務端一共讀到兩個數據包,第一個包包含客戶端發出的第一條消息的完整信息,第二個包包含客戶端發出的第二條消息,那這種狀況比較好處理,服務器只須要簡單的從網絡緩衝區去讀就行了,第一次讀到第一條消息的完整信息,消費完再從網絡緩衝區將第二條完整消息讀出來消費。app

沒有發生粘包、拆包示意圖框架

第二種狀況:socket

服務端一共就讀到一個數據包,這個數據包包含客戶端發出的兩條消息的完整信息,這個時候基於以前邏輯實現的服務端就蒙了,由於服務端不知道第一條消息從哪兒結束和第二條消息從哪兒開始,這種狀況實際上是發生了TCP粘包。

TCP粘包示意圖

第三種狀況:

服務端一共收到了兩個數據包,第一個數據包只包含了第一條消息的一部分,第一條消息的後半部分和第二條消息都在第二個數據包中,或者是第一個數據包包含了第一條消息的完整信息和第二條消息的一部分信息,第二個數據包包含了第二條消息的剩下部分,這種狀況實際上是發送了TCP拆,由於發生了一條消息被拆分在兩個包裏面發送了,一樣上面的服務器邏輯對於這種狀況是很差處理的。

TCP拆包示意圖

產生tcp粘包和拆包的緣由

咱們知道tcp是以流動的方式傳輸數據,傳輸的最小單位爲一個報文段(segment)。tcp Header中有個Options標識位,常見的標識爲mss(Maximum Segment Size)指的是,鏈接層每次傳輸的數據有個最大限制MTU(Maximum Transmission Unit),通常是1500比特,超過這個量要分紅多個報文段,mss則是這個最大限制減去TCP的header,光是要傳輸的數據的大小,通常爲1460比特。換算成字節,也就是180多字節。

tcp爲提升性能,發送端會將須要發送的數據發送到緩衝區,等待緩衝區滿了以後,再將緩衝中的數據發送到接收方。同理,接收方也有緩衝區這樣的機制,來接收數據。

發生TCP粘包、拆包主要是因爲下面一些緣由:

  1. 應用程序寫入的數據大於套接字緩衝區大小,這將會發生拆包。

  2. 應用程序寫入數據小於套接字緩衝區大小,網卡將應用屢次寫入的數據發送到網絡上,這將會發生粘包。

  3. 進行mss(最大報文長度)大小的TCP分段,當TCP報文長度-TCP頭部長度>mss的時候將發生拆包。

  4. 接收方法不及時讀取套接字緩衝區數據,這將發生粘包。

  5. ……

如何解決拆包粘包

既然知道了tcp是無界的數據流,且協議自己沒法避免粘包,拆包的發生,那咱們只能在應用層數據協議上,加以控制。一般在制定傳輸數據時,可使用以下方法:

  1. 使用帶消息頭的協議、消息頭存儲消息開始標識及消息長度信息,服務端獲取消息頭的時候解析出消息長度,而後向後讀取該長度的內容。

  2. 設置定長消息,服務端每次讀取既定長度的內容做爲一條完整消息。

  3. 設置消息邊界,服務端從網絡流中按消息編輯分離出消息內容。

a)先基於第三種方法,假設區分數據邊界的標識爲換行符"\n"(注意請求數據自己內部不能包含換行符),數據格式爲Json,例以下面是一個符合這個規則的請求包。

{"type":"message","content":"hello"}\n

注意上面的請求數據末尾有一個換行字符(在PHP中用雙引號字符串"\n"表示),表明一個請求的結束。

b)基於第一種方法,能夠制定,首部固定10個字節長度用來保存整個數據包長度,位數不夠補0的數據協議

0000000036{"type":"message","content":"hello"}

c)基於第一種方法,能夠制定,首部4字節網絡字節序unsigned int,標記整個包的長度

****{"type":"message","content":"hello all"}

其中首部四字節*號表明一個網絡字節序的unsigned int數據,爲不可見字符,緊接着是Json的數據格式的包體數據。

基於workerman的解決方案

制定了數據協議,那咱們下面來經過代碼具體分析一下,php中workerman,是如何解決上述問題的。爲了便於理解,能夠看下下面的流程圖

 

workerman是基於策略模式來設計處理tcp粘包,拆包問題的。具體數據協議的制定在應用目錄Applications/YourApp/Protocols目錄下,實現則是在框架目錄Workerman/Connection/TcpConnection.php中。這樣的好處就是用戶能夠隨意定製本身的數據協議格式,而框架代碼都能處理。

咱們如今Applications/YourApp/Protocols目錄下,建一個jsonNL.php,來實現本身制定本身定義的數據協議。

JsonNL.php的實現

namespace Protocols;
class JsonNL
{
    /**
     * 檢查包的完整性
     * 若是可以獲得包長,則返回包的在buffer中的長度,不然返回0繼續等待數據
     * 若是協議有問題,則能夠返回false,當前客戶端鏈接會所以斷開
     * @param string $buffer
     * @return int
     */
    public static function input($buffer)
    {
        // 得到換行字符"\n"位置
        $pos = strpos($buffer, "\n");
        // 沒有換行符,沒法得知包長,返回0繼續等待數據
        if($pos === false)
        {
            return 0;
        }
        // 有換行符,返回當前包長(包含換行符)
        return $pos+1;
    }

    /**
     * 打包,當向客戶端發送數據的時候會自動調用
     * @param string $buffer
     * @return string
     */
    public static function encode($buffer)
    {
        // json序列化,並加上換行符做爲請求結束的標記
        return json_encode($buffer)."\n";
    }

    /**
     * 解包,當接收到的數據字節數等於input返回的值(大於0的值)自動調用
     * 並傳遞給onMessage回調函數的$data參數
     * @param string $buffer
     * @return string
     */
    public static function decode($buffer)
    {
        // 去掉換行,還原成數組
        return json_decode(trim($buffer), true);
    }
}

再看下TcpConnection.php中,接收數據時,如何處理。

public function baseRead($socket, $check_eof = true)
    {
        $buffer = fread($socket, self::READ_BUFFER_SIZE);

        // Check connection closed.
        if ($buffer === '' || $buffer === false) {
            if ($check_eof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
                $this->destroy();
                return;
            }
        } else {
            $this->_recvBuffer .= $buffer;
        }

        // If the application layer protocol has been set up.
        if ($this->protocol) {
            $parser = $this->protocol;
            while ($this->_recvBuffer !== '' && !$this->_isPaused) {
                // The current packet length is known.
                if ($this->_currentPackageLength) {
                    // Data is not enough for a package.
                    if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
                        break;
                    }
                } else {
                    // Get current package length.
                    $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
                    // The packet length is unknown.
                    if ($this->_currentPackageLength === 0) {
                        break;
                    } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) {
                        // Data is not enough for a package.
                        if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
                            break;
                        }
                    } // Wrong package.
                    else {
                        echo 'error package. package_length=' . var_export($this->_currentPackageLength, true);
                        $this->destroy();
                        return;
                    }
                }

                // The data is enough for a packet.
                self::$statistics['total_request']++;
                // The current packet length is equal to the length of the buffer.
                if (strlen($this->_recvBuffer) === $this->_currentPackageLength) {
                    $one_request_buffer = $this->_recvBuffer;
                    $this->_recvBuffer  = '';
                } else {
                    // Get a full package from the buffer.
                    $one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
                    // Remove the current package from the receive buffer.
                    $this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
                }
                // Reset the current packet length to 0.
                $this->_currentPackageLength = 0;
                if (!$this->onMessage) {
                    continue;
                }
                try {
                    // Decode request buffer before Emitting onMessage callback.
                    call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
                } catch (\Exception $e) {
                    Worker::log($e);
                    exit(250);
                } catch (\Error $e) {
                    Worker::log($e);
                    exit(250);
                }
            }
            return;
        }

        if ($this->_recvBuffer === '' || $this->_isPaused) {
            return;
        }

        // Applications protocol is not set.
        self::$statistics['total_request']++;
        if (!$this->onMessage) {
            $this->_recvBuffer = '';
            return;
        }
        try {
            call_user_func($this->onMessage, $this, $this->_recvBuffer);
        } catch (\Exception $e) {
            Worker::log($e);
            exit(250);
        } catch (\Error $e) {
            Worker::log($e);
            exit(250);
        }
        // Clean receive buffer.
        $this->_recvBuffer = '';
    }

上面的代碼比較多,不須要細讀,幾個關鍵的地方能夠看出處理的思路,先把接收的數據包追加到_recvBuffer變量中,而後調用用戶本身定義的數據協議中的input方法。input方法則會判斷數據中是否包含邊界符,若是不包含則返回0,包含則返回當前數據包的大小。框架中接收到input的返回值後,若是接收值爲0,則跳出循環不作處理,若是接收值不爲0,則將截取的數據包賦值給one_request_buffer,而且重置_recvBuffer

// Get a full package from the buffer.
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
// Remove the current package from the receive buffer.
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);

 

最後:tcp雖然是個強大的協議,能保證數據的穩定性,一致性,但在實際開發中,咱們還須要根據實際的數據協議,來控制每次獲取的包是客戶端發過來的一個完整的能夠解析的包。

相關文章
相關標籤/搜索