用RUST寫流媒體服務器實戰——rtmp chunk 深刻解析

用RUST寫流媒體服務器實戰——rtmp chunk 深刻解析

最近幾個月斷更了,把精力放在了新的開源項目上,一個用rust寫的流媒體服務xiu
實現過程當中踩了很多坑,今天說下rtmp中的chunk。git

RTMP協議確實複雜,在作這個項目以前,看過不少帖子,看過官方文檔,但老是感受不能完全的理解清楚,在實現過一遍此協議以後,感受清楚了很多。github

目前作的測試還不夠多,卻是發現了一些問題。chunk這個東西看了好久可能不少人仍是不明白,說明一下,RTMP 協議除了3次握手數據,其它的,包括信令和媒體數據(音視頻相關的數據),都會被封裝成chunk塊。服務器

handshake的殘留數據

TCP發送數據不是按照協議信令,一次只發送一個信令,有時候會發送多個,rtmp握手階段從TCP流中讀一次數據,握手結束後,會留下一部分數據,這部分要填到chunk解析緩衝數據中。網絡

chunk size

初始化的chunk size要設置成128。tcp

個人測試和排查過程記錄以下:
我一開始的chunk size設置成了4096,用ffplay播放流,發送connect信令的時候,老是會多出一個byte,致使amf解析失敗,用wireshark抓包,這個byte是沒有的,一開始認爲wireshark是不會出錯的,覺得tokio網絡庫,因而換成了tcp基礎庫,這個byte仍是存在,想了個笨方法,找到一個開源的rtmp服務器,也打印出此信令,剛收到tcp數據的時候,這個byte也有,可是amf解析卻成功了,接下來就是把每一步的數據都打印出來,從解析chunk到解析amf. 看看這個byte到底是在哪一個步驟消失的,最後發現,這個byte是chunk的第一個byte,fmt+csid,初始化的chunk size不對。。測試

狀態保留

解釋狀態保留以前說一下chunk的各部分組成,按照官方的文檔,chunk由四部分組成:this

  • basic header
  • message header
  • extended timestamp
  • payload

前三部分是均可以壓縮的。code

basic header

/******************************************************************
 * 5.3.1.1. Chunk Basic Header
 * The Chunk Basic Header encodes the chunk stream ID and the chunk
 * type(represented by fmt field in the figure below). Chunk type
 * determines the format of the encoded message header. Chunk Basic
 * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
 * ID.
 *
 * The bits 0-5 (least significant) in the chunk basic header represent
 * the chunk stream ID.
 *
 * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
 * field.
 *    0 1 2 3 4 5 6 7
 *   +-+-+-+-+-+-+-+-+
 *   |fmt|   cs id   |
 *   +-+-+-+-+-+-+-+-+
 *   Figure 6 Chunk basic header 1
 *
 * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
 * field. ID is computed as (the second byte + 64).
 *   0                   1
 *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|    0      | cs id - 64    |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 7 Chunk basic header 2
 *
 * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
 * this field. ID is computed as ((the third byte)*256 + the second byte
 * + 64).
 *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|     1     |         cs id - 64            |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 8 Chunk basic header 3
 *
 * cs id: 6 bits
 * fmt: 2 bits
 * cs id - 64: 8 or 16 bits
 *
 * Chunk stream IDs with values 64-319 could be represented by both 2-
 * byte version and 3-byte version of this field.
 ***********************************************************************/

第一個byte的前兩個bit是format,有0,1,2,3四個值,這個四個值的做用是壓縮message header,詳細的會在下面說,後6個bit是chunk stream ID, 簡稱csid(關於這個字段有坑,下面會解釋),6個bit的取值範圍爲[0,63] ,0和1有特殊用途,2到63表示真正的csid,關於特殊值0和1:orm

  • 0 表示csid用 6+ 8個bit表示
  • 1 表示csid用 6 + 16個bit表示

解析代碼以下:視頻

let mut csid = (byte & 0b00111111) as u32;
      match csid {
       0 => {
           if self.reader.len() < 1 {
               return Ok(UnpackResult::NotEnoughBytes);
           }
           csid = 64;
           csid += self.reader.read_u8()? as u32;
       }
       1 => {
           if self.reader.len() < 1 {
               return Ok(UnpackResult::NotEnoughBytes);
           }
           csid = 64;
           csid += self.reader.read_u8()? as u32;
           csid += self.reader.read_u8()? as u32 * 256;
       }
       _ => {}
   }

message header

下面說下message header, 這部分比較複雜,有四種類型,對應着basic header裏面的format字段的0~3。

type 0

/*****************************************************************/
/*      5.3.1.2.1. Type 0                                        */
/*****************************************************************
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                timestamp(3bytes)              |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont)(3bytes) |message type id| msg stream id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|       message stream id (cont) (4bytes)       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*****************************************************************/

任何字段都不省略。

type 1

/*****************************************************************/
/*      5.3.1.2.2. Type 1                                        */
/*****************************************************************
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                timestamp(3bytes)              |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont)(3bytes) |message type id|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*****************************************************************/

省略了message stream id,使用上一個chunk的數據。

type 2

/************************************************/
 /*      5.3.1.2.3. Type 2                       */
 /************************************************
  0                   1                   2
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                timestamp(3bytes)              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 ***************************************************/

更絕了,省略了message stream id、message length和 message type id,這個也從前邊的chunk讀。

type 3

3 啥都沒有,全從前邊拿。

extended timestamp

這個字段是可選的,佔用4個byte,若是message header裏面的timestamp字段大於0xFFFFFF,則讀取這個字段。

payload

最後是payload,payload的長度由 message header裏面的message length決定。

chunk塊的整個讀取流程以下,一開始個人實現流程是這樣的(有問題)

  1. 讀取一個chunk的第一個byte,解析 format和chunk stream ID。
  2. 根據format解析message header:
    • 若是是0 則每一個字段都要從TCP流裏面解析出來。
    • 若是是1 則使用上一個chunk塊的message stream ID。
    • 若是是2 則使用上一個chunk塊的message stream id、message length和 message type id。
    • 若是是3 則使用上一個chunk塊的message stream id、message length、message type id以及timestamp。
  3. 根據timestamp值來決定是否讀取4個bytes的extendtimestamp。
  4. 根據message length讀取payload值,這裏有種狀況比較特殊,有可能一塊payload數據被分紅了2個或者多個chunk塊,在這一步裏面就須要將這些分割的payload 數據合成一個完整的chunk數據再返回。也就是說若是讀完payload數據後發現message length 不等於payload的長度,要回到步驟1從下一個chunk塊裏面繼續讀剩餘的payload數據,直到讀完爲止。

好了,整個流程基本上介紹清楚了。大標題裏面的狀態保留我這裏有兩個意思,第一個意思是要說明一下我上面表述的問題。我說的是『從上一個chunk塊』拿省略的字段,這裏是不對的,由於有下面這種狀況存在:

+--------+---------+-----+------------+------- ---+------------+
    |        | Chunk   |Chunk|Header Data |No.of Bytes|Total No.of |
    |        |Stream ID|Type |            | After     |Bytes in the|
    |        |         |     |            |Header     |Chunk       |
    +--------+---------+-----+------------+-----------+------------+
    |Chunk#1 | 	3      | 0   | delta: 1000| 32        | 44         |
    |        | 	       |     | length: 32,|           |            |
    |        |         |     | type: 8,   |           |            |
    |        |         |     | stream ID: |           |            |
    |        |         |     | 12345 (11  |           |            |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+------------+-----------+------------+
    |Chunk#2 | 3       | 2   | 20 (3      | 32        | 36         |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+----+-------+-----------+------------+
    |Chunk#3 | 4       | 3   | none (0    | 32        | 33         |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+------------+-----------+------------+
    |Chunk#4 | 3       | 3   | none (0    | 32        | 33         |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+------------+-----------+------------+

注意:message header裏面的字段複用是針對chunk stream ID的。

所以上面的狀況,chunk2 能夠複用 chunk1的message header,可是chunk 4不能複用chunk 3的,因此,在代碼裏面要特殊處理,每一個csid的message header都須要保存一份,每解析一個chunk,讀完basic header以後,須要把這個csid的上一個message header先恢復出來。

第二種狀況也是我寫代碼時未曾想到的:

tcp數據包能夠在任何地方拆分。

也就是說,可能一個chunk還沒讀完,此次的tcp數據就用完了,須要等下一次的數據,這種狀況就要保留讀取各個字段的狀態了。每個讀取操做就應該設置一個標記,所以寫了下面的四個大狀態,message header裏面有4個小的狀態。

#[derive(Copy, Clone)]
enum ChunkReadState {
    ReadBasicHeader = 1,
    ReadMessageHeader = 2,
    ReadExtendedTimestamp = 3,
    ReadMessagePayload = 4,
    Finish = 5,
}

#[derive(Copy, Clone)]
enum MessageHeaderReadState {x'x
    ReadTimeStamp = 1,
    ReadMsgLength = 2,
    ReadMsgTypeID = 3,
    ReadMsgStreamID = 4,
}

例如: ReadExtendedTimestamp佔用4個bytes,可是讀到這裏的時候就還剩下2個bytes,就要保留這個狀態,下次從TCP裏面讀出新數據的時候從這個狀態開始。

最後rtmp chunk解析的rust完整實如今這裏

最後,歡迎star。

相關文章
相關標籤/搜索