用RUST写流媒体服务器实战——rtmp chunk 深入解析

最近几个月断更了,把精力放在了新的开源项目上,一个用rust写的流媒体服务xiu
实现过程中踩了不少坑,今天说下rtmp中的chunk。

RTMP协议确实复杂,在做这个项目之前,看过很多帖子,看过官方文档,但总是感觉不能彻底的理解清楚,在实现过一遍此协议之后,感觉清楚了不少。

目前做的测试还不够多,倒是发现了一些问题。chunk这个东西看了很久可能很多人还是不明白,说明一下,RTMP 协议除了3次握手数据,其它的,包括信令和媒体数据(音视频相关的数据),都会被封装成chunk块。

handshake的残留数据

TCP发送数据不是按照协议信令,一次只发送一个信令,有时候会发送多个,rtmp握手阶段从TCP流中读一次数据,握手结束后,会留下一部分数据,这部分要填到chunk解析缓冲数据中。

chunk size

初始化的chunk size要设置成128。

我的测试和排查过程记录如下:
我一开始的chunk size设置成了4096,用ffplay播放流,发送connect信令的时候,总是会多出一个byte,导致amf解析失败,用wireshark抓包,这个byte是没有的,一开始认为wireshark是不会出错的,以为tokio网络库,于是换成了tcp基础库,这个byte还是存在,想了个笨方法,找到一个开源的rtmp服务器,也打印出此信令,刚收到tcp数据的时候,这个byte也有,但是amf解析却成功了,接下来就是把每一步的数据都打印出来,从解析chunk到解析amf. 看看这个byte究竟是在哪个步骤消失的,最后发现,这个byte是chunk的第一个byte,fmt+csid,初始化的chunk size不对。。

状态保留

解释状态保留之前说一下chunk的各部分组成,按照官方的文档,chunk由四部分组成:

  • basic header
  • message header
  • extended timestamp
  • payload

前三部分是都可以压缩的。

basic header

 /******************************************************************
 * 5.3.1.1. Chunk Basic Header
 * The Chunk Basic Header encodes the chunk stream ID and the chunk
 * type(represented by fmt field in the figure below). Chunk type
 * determines the format of the encoded message header. Chunk Basic
 * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
 * ID.
 *
 * The bits 0-5 (least significant) in the chunk basic header represent
 * the chunk stream ID.
 *
 * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
 * field.
 *    0 1 2 3 4 5 6 7
 *   +-+-+-+-+-+-+-+-+
 *   |fmt|   cs id   |
 *   +-+-+-+-+-+-+-+-+
 *   Figure 6 Chunk basic header 1
 *
 * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
 * field. ID is computed as (the second byte + 64).
 *   0                   1
 *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|    0      | cs id - 64    |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 7 Chunk basic header 2
 *
 * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
 * this field. ID is computed as ((the third byte)*256 + the second byte
 * + 64).
 *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|     1     |         cs id - 64            |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 8 Chunk basic header 3
 *
 * cs id: 6 bits
 * fmt: 2 bits
 * cs id - 64: 8 or 16 bits
 *
 * Chunk stream IDs with values 64-319 could be represented by both 2-
 * byte version and 3-byte version of this field.
 ***********************************************************************/

第一个byte的前两个bit是format,有0,1,2,3四个值,这个四个值的作用是压缩message header,详细的会在下面说,后6个bit是chunk stream ID, 简称csid(关于这个字段有坑,下面会解释),6个bit的取值范围为[0,63] ,0和1有特殊用途,2到63表示真正的csid,关于特殊值0和1:

  • 0 表示csid用 6+ 8个bit表示
  • 1 表示csid用 6 + 16个bit表示

解析代码如下:

      let mut csid = (byte & 0b00111111) as u32;
      match csid {
       0 => {
           if self.reader.len() < 1 {
               return Ok(UnpackResult::NotEnoughBytes);
           }
           csid = 64;
           csid += self.reader.read_u8()? as u32;
       }
       1 => {
           if self.reader.len() < 1 {
               return Ok(UnpackResult::NotEnoughBytes);
           }
           csid = 64;
           csid += self.reader.read_u8()? as u32;
           csid += self.reader.read_u8()? as u32 * 256;
       }
       _ => {}
   }

message header

下面说下message header, 这部分比较复杂,有四种类型,对应着basic header里面的format字段的0~3。

type 0

/*****************************************************************/
/*      5.3.1.2.1. Type 0                                        */
/*****************************************************************
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                timestamp(3bytes)              |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont)(3bytes) |message type id| msg stream id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|       message stream id (cont) (4bytes)       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*****************************************************************/

任何字段都不省略。

type 1

/*****************************************************************/
/*      5.3.1.2.2. Type 1                                        */
/*****************************************************************
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                timestamp(3bytes)              |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont)(3bytes) |message type id|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*****************************************************************/

省略了message stream id,使用上一个chunk的数据。

type 2

 /************************************************/
 /*      5.3.1.2.3. Type 2                       */
 /************************************************
  0                   1                   2
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                timestamp(3bytes)              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 ***************************************************/

更绝了,省略了message stream id、message length和 message type id,这个也从前边的chunk读。

type 3

3 啥都没有,全从前边拿。

extended timestamp

这个字段是可选的,占用4个byte,如果message header里面的timestamp字段大于0xFFFFFF,则读取这个字段。

payload

最后是payload,payload的长度由 message header里面的message length决定。

chunk块的整个读取流程如下,一开始我的实现流程是这样的(有问题)

  1. 读取一个chunk的第一个byte,解析 format和chunk stream ID。
  2. 根据format解析message header:
    • 如果是0 则每个字段都要从TCP流里面解析出来。
    • 如果是1 则使用上一个chunk块的message stream ID。
    • 如果是2 则使用上一个chunk块的message stream id、message length和 message type id。
    • 如果是3 则使用上一个chunk块的message stream id、message length、message type id以及timestamp。
  3. 根据timestamp值来决定是否读取4个bytes的extendtimestamp。
  4. 根据message length读取payload值,这里有种情况比较特殊,有可能一块payload数据被分成了2个或者多个chunk块,在这一步里面就需要将这些分割的payload 数据合成一个完整的chunk数据再返回。也就是说如果读完payload数据后发现message length 不等于payload的长度,要回到步骤1从下一个chunk块里面继续读剩余的payload数据,直到读完为止。

好了,整个流程基本上介绍清楚了。大标题里面的状态保留我这里有两个意思,第一个意思是要说明一下我上面表述的问题。我说的是『从上一个chunk块』拿省略的字段,这里是不对的,因为有下面这种情况存在:

    +--------+---------+-----+------------+------- ---+------------+
    |        | Chunk   |Chunk|Header Data |No.of Bytes|Total No.of |
    |        |Stream ID|Type |            | After     |Bytes in the|
    |        |         |     |            |Header     |Chunk       |
    +--------+---------+-----+------------+-----------+------------+
    |Chunk#1 | 	3      | 0   | delta: 1000| 32        | 44         |
    |        | 	       |     | length: 32,|           |            |
    |        |         |     | type: 8,   |           |            |
    |        |         |     | stream ID: |           |            |
    |        |         |     | 12345 (11  |           |            |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+------------+-----------+------------+
    |Chunk#2 | 3       | 2   | 20 (3      | 32        | 36         |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+----+-------+-----------+------------+
    |Chunk#3 | 4       | 3   | none (0    | 32        | 33         |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+------------+-----------+------------+
    |Chunk#4 | 3       | 3   | none (0    | 32        | 33         |
    |        |         |     | bytes)     |           |            |
    +--------+---------+-----+------------+-----------+------------+

注意:message header里面的字段复用是针对chunk stream ID的。

因此上面的情况,chunk2 可以复用 chunk1的message header,但是chunk 4不能复用chunk 3的,所以,在代码里面要特殊处理,每个csid的message header都需要保存一份,每解析一个chunk,读完basic header之后,需要把这个csid的上一个message header先恢复出来。

第二种情况也是我写代码时不曾想到的:

tcp数据包可以在任何地方拆分。

也就是说,可能一个chunk还没读完,这次的tcp数据就用完了,需要等下一次的数据,这种情况就要保留读取各个字段的状态了。每一个读取操作就应该设置一个标记,因此写了下面的四个大状态,message header里面有4个小的状态。

#[derive(Copy, Clone)]
enum ChunkReadState {
    ReadBasicHeader = 1,
    ReadMessageHeader = 2,
    ReadExtendedTimestamp = 3,
    ReadMessagePayload = 4,
    Finish = 5,
}

#[derive(Copy, Clone)]
enum MessageHeaderReadState {x'x
    ReadTimeStamp = 1,
    ReadMsgLength = 2,
    ReadMsgTypeID = 3,
    ReadMsgStreamID = 4,
}

例如: ReadExtendedTimestamp占用4个bytes,但是读到这里的时候就还剩下2个bytes,就要保留这个状态,下次从TCP里面读出新数据的时候从这个状态开始。

最后rtmp chunk解析的rust完整实现在这里

最后,欢迎star。

04-13 08:02