在握手結束後,即進入該函數中作進一步處理。nginx
void ngx_rtmp_cycle(ngx_rtmp_session_t *s) { ngx_connection_t *c; c = s->connection; /* 從新設置讀/寫事件的回調函數 */ c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; /* 初始化該會話的 ping_evt 事件,當網絡出現問題時, * 就會嘗試調用該事件的回調函數 ngx_rtmp_ping */ s->ping_evt.data = c; s->ping_evt.log = c->log; s->ping_evt.handler = ngx_rtmp_ping; /* 將 ping_evt 添加到定時器中 */ ngx_rtmp_reset_ping(s); /* 開始接收客戶端發來的rtmp數據,若接收不到客戶端的數據時,則將讀事件添加 * 到epoll中,並設置回調函數爲ngx_rtmp_recv方法,當再次監聽到客戶端發來 * 數據時會再次調用該方法進行處理 */ ngx_rtmp_recv(c->read); }
void ngx_rtmp_reset_ping(ngx_rtmp_session_t *s) { ngx_rtmp_core_srv_conf_t *cscf; /* 獲取該 server{} 下 ngx_rtmp_core_module 模塊的 srv 級別的配置結構體 */ cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); if (cscf->ping == 0) { return; } s->ping_active = 0; s->ping_reset = 0; /* 將 ping_evt 添加到定時器中,定時時間爲 cscf->ping, * 若配置文件中沒有配置有,則默認時間爲 60000 ms */ ngx_add_timer(&s->ping_evt, cscf->ping); ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "ping: wait %Mms", cscf->ping); }
static void ngx_rtmp_recv(ngx_event_t *rev) { ngx_int_t n; ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_header_t *h; ngx_rtmp_stream_t *st, *st0; ngx_chain_t *in, *head; ngx_buf_t *b; u_char *p, *pp, *old_pos; size_t size, fsize, old_size; uint8_t fmt, ext; uint32_t csid, timestamp; c = rev->data; s = c->data; b = NULL; old_pos = NULL; old_size = 0; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); /* 判斷該鏈接是否已被銷燬 */ if (c->destroyed) { return; } for( ;; ) { st = &s->in_streams[s->in_csid]; /* allocate new buffer */ if (st->in == NULL) { /* 爲類型爲 ngx_chain_t 的結構體指針 st->in 分配內存 */ st->in = ngx_rtmp_alloc_in_buf(s); if (st->in == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "in buf alloc failed"); ngx_rtmp_finalize_session(s); return; } } h = &st->hdr; in = st->in; b = in->buf; if (old_size) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reusing formerly read data: %d", old_size); /* 對上一次接收到的數據按一個 chunk size 進行切分以後, * 餘下的多餘的數據再次循環時進行從新使用 */ b->pos = b->start; b->last = ngx_movemem(b->pos, old_pos, old_size); if (s->in_chunk_size_changing) { ngx_rtmp_finalize_set_chunk_size(s); } } else { if (old_pos) { b->pos = b->last = b->start; } /* 這裏調用的回調函數爲 ngx_unix_recv 方法接收數據 */ n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { ngx_rtmp_finalize_session(s); return; } if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_rtmp_finalize_session(s); } return; } s->ping_reset = 1; ngx_rtmp_update_bandwidth(&ngx_rtmp_bw_in, n); /* 更新緩存區的數據指針 */ b->last += n; s->in_bytes += n; /* 當緩存區中數據滿溢時 */ if (s->in_bytes >= 0xf0000000) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "resetting byte counter"); s->in_bytes = 0; s->in_last_ack = 0; } if (s->ack_size && s->in_bytes - s->in_last_ack >= s->ack_size) { s->in_last_ack = s->in_bytes; ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "sending RTMP ACK(%uD)", s->in_bytes); if (ngx_rtmp_send_ack(s, s->in_bytes)) { ngx_rtmp_finalize_session(s); return; } } } old_pos = NULL; old_size = 0; /* parse headers */ if (b->pos == b->start) { /* 最初的時候: b->pos == b->start == b->last */ /** * Chunk format: * +----------------+------------------+--------------------+-------------+ * | Basic Header | Message Header | Extended Timestamp | Chunk Data | * +----------------+------------------+--------------------+-------------+ */ p = b->pos; /* Basic Header:基本頭信息 * * 包含chunk stream ID(流通道id)和chunk type(即fmt),chunk stream id通常被 * 簡寫爲CSID,用來惟一標識一個特定的流通道,chunk type決定了後面Message Header的格式。 * Basic Header的長度多是1,2,或3個字節,其中chunk type的長度是固定的(佔2位,單位 * 是bit),Basic Header的長度取決於CSID的大小,在足夠存儲這兩個字段的前提下最好用盡可能 * 少的字節從而減小因爲引入Header增長的數據量。 * * RTMP協議支持用戶自定義[3,65599]之間的CSID,0,1,2由協議保留表示特殊信息。0表明Basic * Header總共要佔用2個字節,CSID在[64,319]之間; 1表明佔用3個字節,CSID在[64,65599]之 * 間; 2 表明該 chunk 是控制信息和一些命令信息。 * * chunk type的長度是固定2 bit,所以CSID的長度是(6=8-2)、(14=16-2)、(22=24-2)中的一 * 個。當Basic Header爲1個字節時,CSID佔6bit,6bit最多能夠表示64個數,所以在這種狀況 * 下CSID在[0,63]之間,其中用戶可自定義的範圍爲[3,63]。 * * Basic Header:1 byte * 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+ * |fmt| cs id | * +-+-+-+-+-+-+-+-+ * * Basic Header: 2 byte , csid == 0 * CSID佔14bit,此時協議將於chunk type所在字節的其餘bit都置爲0, * 剩下的一個字節表示CSID - 64,這樣共有8個bit來存儲CSID,8bit能夠表示[0,255]個數,所以 * 這種狀況下CSID在[64,319],其中319=255+64。 * 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |fmt| 0 | cs id - 64 | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * * Basic Header: 3 bytes , csid == 1 * CSID佔22bit,此時協議將第一個字節的[2,8]bit置1,餘下的16個bit表示CSID - 64,這樣共有 * 16個bit來存儲CSID,16bit能夠表示[0,65535]共 65536 個數,所以這種狀況下 CSID 在 * [64,65599],其中65599=65535+64,須要注意的是,Basic Header是採用小端存儲的方式,越往 * 後的字節數量級越高,所以經過3個字節的每個bit的值來計算CSID時,應該是: * <第三個字節的值> * 256 + <第二個字節的值> + 64. * 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |fmt| 1 | cs id - 64 | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ /* chunk basic header */ fmt = (*p >> 6) & 0x03; csid = *p++ & 0x3f; /* csid 爲 0 表示 Basic Header 佔 2 bytes */ if (csid == 0) { if (b->last - p < 1) continue; csid = 64; csid += *(uint8_t*)p++; } /* csid 爲 1 表示 Basic Header 佔 3 bytes */ else if (csid == 1) { if (b->last - p < 2) continue; csid = 64; csid += *(uint8_t*)p++; csid += (uint32_t)256 * (*(uint8_t*)p++); } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, "RTMP bheader fmt=%d csid=%D", (int)fmt, csid); if (csid >= (uint32_t)cscf->max_streams) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "RTMP in chunk stream too big: %D >= %D", csid, cscf->max_streams); ngx_rtmp_finalize_session(s); return; } /* link orphan */ if (s->in_csid == 0) { /* unlink from stream #0 */ st->in = st->in->next; /* link to new stream */ s->in_csid = csid; st = &s->in_streams[csid]; if (st->in == NULL) { in->next = in; } else { in->next = st->in->next; st->in->next = in; } st->in = in; h = &st->hdr; h->csid = csid; } /* Message Header: * * 包含了要發送的實際信息(多是完整的,也多是一部分)的描述信息。Message Header的格式 * 和長度取決於Basic Header的chunk type,即fmt,共有四種不一樣的格式。其中第一種格式能夠表 * 示其餘三種表示的全部數據,但因爲其餘三種格式是基於對以前chunk的差量化的表示,所以能夠 * 更簡潔地表示相同的數據,實際使用的時候仍是應該採用儘可能少的字節表示相贊成義的數據。下面 * 按字節從多到少的順序分別介紹這四種格式的 Message Header。 * * 下面是 Message Header 四種消息頭格式。 * * 1、Chunk Type(fmt) = 0:11 bytes * 0 1 2 3 * 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | timestamp |message length | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | message length (coutinue) |message type id| msg stream id | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | msg stream id | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * type=0時Message Header佔用11個字節,其餘三種能表示的數據它都能表示,但在chunk stream * 的開始第一個chunk和頭信息中的時間戳後退(即值與上一個chunk相比減少,一般在回退播放的 * 時候會出現這種狀況)的時候必須採用這種格式。 * - timestamp(時間戳):佔用3個字節,所以它最多能表示到16777215=0xFFFFFF=2^24-1,當它 * 的值超過這個最大值時,這三個字節都置爲1,這樣實際的timestamp會轉存到 Extended * Timestamp 字段中,接收端在判斷timestamp字段24個位都爲1時就會去Extended Timestamp * 中解析實際的時間戳。 * - message length(消息數據長度):佔用3個字節,表示實際發送的消息的數據如音頻幀、視頻 * 幀等數據的長度,單位是字節。注意這裏是Message的長度,也就是chunk屬於的Message的總長 * 度,而不是chunk自己data的長度。 * - message type id(消息的類型id):1個字節,表示實際發送的數據的類型,如8表明音頻數據, * 9表明視頻數據。 * - message stream id(消息的流id):4個字節,表示該chunk所在的流的ID,和Basic Header * 的CSID同樣,它採用小端存儲方式。 * * 2、Chunk Type(fmt) = 1:7 bytes * 0 1 2 3 * 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | timestamp delta |message length | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | message length (coutinue) |message type id| * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * type爲1時佔用7個字節,省去了表示message stream id的4個字節,表示此chunk和上一次發的 * chunk所在的流相同,若是在發送端和對端有一個流連接的時候能夠儘可能採起這種格式。 * - timestamp delta:3 bytes,這裏和type=0時不一樣,存儲的是和上一個chunk的時間差。相似 * 上面提到的timestamp,當它的值超過3個字節所能表示的最大值時,三個字節都置爲1,實際 * 的時間戳差值就會轉存到Extended Timestamp字段中,接收端在判斷timestamp delta字段24 * 個bit都爲1時就會去Extended Timestamp 中解析實際的與上次時間戳的差值。 * - 其餘字段與上面的解釋相同. * * 3、Chunk Type(fmt) = 2:3 bytes * 0 1 2 * 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | timestamp delta | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * type爲2時佔用3個字節,相對於type=1格式又省去了表示消息長度的3個字節和表示消息 * 類型的1個字節,表示此chunk和上一次發送的chunk所在的流、消息的長度和消息的類型都相同。 * 餘下的這三個字節表示timestamp delta,使用同type=1。 * * * 4、Chunk Type(fmt) = 3: 0 byte * type=3時,爲0字節,表示這個chunk的Message Header和上一個是徹底相同的。當它跟在type=0 * 的chunk後面時,表示和前一個chunk的時間戳都是相同。何時連時間戳都是相同呢?就是 * 一個Message拆分紅多個chunk,這個chunk和上一個chunk同屬於一個Message。而當它跟在 * type = 1或 type = 2 的chunk後面時的chunk後面時,表示和前一個chunk的時間戳的差是相同 * 的。好比第一個 chunk 的 type = 0,timestamp = 100,第二個 chunk 的 type = 2, * timestamp delta = 20,表示時間戳爲 100 + 20 = 120,第三個 chunk 的 type = 3, * 表示 timestamp delta = 20 時間戳爲 120 + 20 = 140。 */ ext = st->ext; timestamp = st->dtime; if (fmt <= 2 ) { if (b->last - p < 3) continue; /* timestamp: * big-endian 3b -> little-endian 4b */ pp = (u_char*)×tamp; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; pp[3] = 0; ext = (timestamp == 0x00ffffff); if (fmt <= 1) { if (b->last - p < 4) continue; /* size: * big-endian 3b -> little-endian 4b * type: * 1b -> 1b*/ pp = (u_char*)&h->mlen; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; pp[3] = 0; /* message type id */ h->type = *(uint8_t*)p++; if (fmt == 0) { if (b->last - p < 4) continue; /* stream: * little-endian 4b -> little-endian 4b */ pp = (u_char*)&h->msid; pp[0] = *p++; pp[1] = *p++; pp[2] = *p++; pp[3] = *p++; } } } /* * Extended Timestamp(擴展時間戳): * 在 chunk 中會有時間戳 timestamp 和時間戳差 timestamp delta, * 而且它們不會同時存在,只有這二者之一大於3字節能表示的最大數值 * 0xFFFFFF=16777215時,纔會用這個字段來表示真正的時間戳,不然 * 這個字段爲 0。擴展時間戳佔 4 個字節,能表示的最大數值就是 * 0xFFFFFFFF=4294967295。當擴展時間戳啓用時,timestamp字段或者 * timestamp delta要全置爲1,而不是減去時間戳或者時間戳差的值。 */ /* extended header */ if (ext) { if (b->last - p < 4) continue; pp = (u_char*)×tamp; pp[3] = *p++; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; } if (st->len == 0) { /* Messages with type=3 should * never have ext timestamp field * according to standard. * However that's not always the case * in real life */ st->ext = (ext && cscf->publish_time_fix); if (fmt) { st->dtime = timestamp; } else { h->timestamp = timestamp; st->dtime = 0; } } ngx_log_debug8(NGX_LOG_DEBUG_RTMP, c->log, 0, "RTMP mheader fmt=%d %s (%d) " "time=%uD+%uD mlen=%D len=%D msid=%D", (int)fmt, ngx_rtmp_message_type(h->type), (int)h->type, h->timestamp, st->dtime, h->mlen, st->len, h->msid); /* header done */ /* 更新緩存區pos指針的位置,更新後pos應指向rtmp trunk的實際數據 */ b->pos = p; if (h->mlen > cscf->max_message) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "too big message: %uz", cscf->max_message); ngx_rtmp_finalize_session(s); return; } } /* b->last:指向本次recv到的數據的尾部 * b->pos: 指向rtmp trunk的實際data起始 * 所以,size爲本次接收到的實際數據的大小 */ size = b->last - b->pos; /* h->mlen:該RTMP message的長度 * st->len:該流的長度 */ fsize = h->mlen - st->len; if (size < ngx_min(fsize, s->in_chunk_size)) continue; /* buffer is ready */ /* 本次所要接收的 rtmp message 的大小大於 s->in_shunk_size 塊的大小 * 所以要進行切分 */ if (fsize > s->in_chunk_size) { /* collect fragmented chunks */ st->len += s->in_chunk_size; b->last = b->pos + s->in_chunk_size; old_pos = b->last; /* 切分一個塊後餘下的數據大小 */ old_size = size - s->in_chunk_size; } else { /* 完整接收一個 rtmp message 後,進行處理 */ /* handle! */ head = st->in->next; st->in->next = NULL; b->last = b->pos + fsize; old_pos = b->last; old_size = size - fsize; st->len = 0; h->timestamp += st->dtime; /* 根據該 rtmp 消息的類型調用相應的回調函數進行處理 */ if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) { ngx_rtmp_finalize_session(s); return; } if (s->in_chunk_size_changing) { /* copy old data to a new buffer */ if (!old_size) { ngx_rtmp_finalize_set_chunk_size(s); } } else { /* add used bufs to stream #0 */ st0 = &s->in_streams[0]; st->in->next = st0->in; st0->in = head; st->in = NULL; } } s->in_csid = 0; } }
ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_rtmp_core_main_conf_t *cmcf; ngx_array_t *evhs; size_t n; ngx_rtmp_handler_pt *evh; cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); #ifdef NGX_DEBUG { int nbufs; ngx_chain_t *ch; for(nbufs = 1, ch = in; ch->next; ch = ch->next, ++nbufs); ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "RTMP recv %s (%d) csid=%D timestamp=%D " "mlen=%D msid=%D nbufs=%d", ngx_rtmp_message_type(h->type), (int)h->type, h->csid, h->timestamp, h->mlen, h->msid, nbufs); } #endif if (h->type > NGX_RTMP_MSG_MAX) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "unexpected RTMP message type: %d", (int)h->type); return NGX_OK; } /* 根據 message type 取出事件 */ evhs = &cmcf->events[h->type]; evh = evhs->elts; ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "nhandlers: %d", evhs->nelts); for(n = 0; n < evhs->nelts; ++n, ++evh) { if (!evh) { continue; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "calling handler %d", n); /* 若接收到的是 amf 類型的事件,則對應的處理函數爲 ngx_rtmp_amf_message_handler; * 若接收到的是 standard protocol 類型事件,則對應的處理函數爲 * ngx_rtmp_protocol_message_handler; * 若接收到的是 user protocol 事件,則對應的處理函數爲 ngx_rtmp_user_message_handler; */ switch ((*evh)(s, h, in)) { case NGX_ERROR: ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "handler %d failed", n); return NGX_ERROR; case NGX_DONE: return NGX_OK; } } return NGX_OK; }
如上圖,接收到客戶端發來的第一個 RTMP message 的類型爲 20,即 NGX_RTMP_MSG_AMF_CMD,所以將會調用以前由
ngx_rtmp_init_event_handlers() 方法對該類型設置的回調方法 ngx_rtmp_amf_message_handler。而且,由上圖可
知,客戶端發送的是 connect 鏈接,所以,在 ngx_rtmp_amf_message_handler 主要作的就是:數組
ngx_int_t ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_rtmp_amf_ctx_t act; ngx_rtmp_core_main_conf_t *cmcf; ngx_array_t *ch; ngx_rtmp_handler_pt *ph; size_t len, n; static u_char func[128]; static ngx_rtmp_amf_elt_t elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, func, sizeof(func) }, }; /* AMF command names come with string type, but shared object names * come without type */ if (h->type == NGX_RTMP_MSG_AMF_SHARED || h->type == NGX_RTMP_MSG_AMF3_SHARED) { elts[0].type |= NGX_RTMP_AMF_TYPELESS; } else { elts[0].type &= ~NGX_RTMP_AMF_TYPELESS; } if ((h->type == NGX_RTMP_MSG_AMF3_SHARED || h->type == NGX_RTMP_MSG_AMF3_META || h->type == NGX_RTMP_MSG_AMF3_CMD) && in->buf->last > in->buf->pos) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF3 prefix: %ui", (ngx_int_t)*in->buf->pos); ++in->buf->pos; } cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); /* read AMF func name & transaction id */ ngx_memzero(&act, sizeof(act)); act.link = in; act.log = s->connection->log; memset(func, 0, sizeof(func)); /* 讀取接收到的 amf 數據,獲取 elts 中指定類型的 amf 數據,並保存在 elts->data 中 */ if (ngx_rtmp_amf_read(&act, elts, sizeof(elts) / sizeof(elts[0])) != NGX_OK) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF cmd failed"); return NGX_ERROR; } /* skip name */ in = act.link; /* 跳過已經讀取的數據 */ in->buf->pos += act.offset; len = ngx_strlen(func); /* 根據讀取到的 func,在 amf_hash 指向的 hash 表中查找,若能找到, * 則返回對應的 ngx_array_t * 指針數組,該數組中保存着該 func 中 * 保存的內容的相關處理函數,這些處理函數是各 RTMP 模塊在 * postconfiguration 方法中保存到 cmcf->amf (ngx_array_t)數組中, * 而後在 ngx_rtmp_init_event_handlers 方法中存到 amf_hash 哈希表中, * 以便快速查找 */ ch = ngx_hash_find(&cmcf->amf_hash, ngx_hash_strlow(func, func, len), func, len); /* 若 ch 不爲 NULL 且 ch->nelts 大於 0,代表有該類 ch 的處理函數 */ if (ch && ch->nelts) { ph = ch->elts; for (n = 0; n < ch->nelts; ++n, ++ph) { ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF func '%s' passed to handler %d/%d", func, n, ch->nelts); switch ((*ph)(s, h, in)) { case NGX_ERROR: return NGX_ERROR; case NGX_DONE: return NGX_OK; } } } else { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF cmd '%s' no handler", func); } return NGX_OK; }
在 ngx_rtmp_cmd_module.c 文件,即 ngx_rtmp_cmd_module 中,能夠找到以下:緩存
static ngx_rtmp_amf_handler_t ngx_rtmp_cmd_map[] = { { ngx_string("connect"), ngx_rtmp_cmd_connect_init }, { ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init }, { ngx_string("closeStream"), ngx_rtmp_cmd_close_stream_init }, { ngx_string("deleteStream"), ngx_rtmp_cmd_delete_stream_init }, { ngx_string("publish"), ngx_rtmp_cmd_publish_init }, { ngx_string("play"), ngx_rtmp_cmd_play_init }, { ngx_string("play2"), ngx_rtmp_cmd_play2_init }, { ngx_string("seek"), ngx_rtmp_cmd_seek_init }, { ngx_string("pause"), ngx_rtmp_cmd_pause_init }, { ngx_string("pauseraw"), ngx_rtmp_cmd_pause_init }, };
上面即 ngx_rtmp_cmd_module 模塊在接收到左邊的命令,如 connect 時,就會調用右邊的處理函數。所以,下面將
會調用 ngx_rtmp_cmd_connect_init 方法。服務器
static ngx_int_t ngx_rtmp_cmd_connect_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { size_t len; static ngx_rtmp_connect_t v; /* 從這裏能夠看出,對比圖1,這裏是指定要獲取的 amf 類型的值 */ static ngx_rtmp_amf_elt_t in_cmd[] = { { NGX_RTMP_AMF_STRING, ngx_string("app"), v.app, sizeof(v.app) }, { NGX_RTMP_AMF_STRING, ngx_string("flashVer"), v.flashver, sizeof(v.flashver) }, { NGX_RTMP_AMF_STRING, ngx_string("swfUrl"), v.swf_url, sizeof(v.swf_url) }, { NGX_RTMP_AMF_STRING, ngx_string("tcUrl"), v.tc_url, sizeof(v.tc_url) }, { NGX_RTMP_AMF_NUMBER, ngx_string("audioCodecs"), &v.acodecs, sizeof(v.acodecs) }, { NGX_RTMP_AMF_NUMBER, ngx_string("videoCodecs"), &v.vcodecs, sizeof(v.vcodecs) }, { NGX_RTMP_AMF_STRING, ngx_string("pageUrl"), v.page_url, sizeof(v.page_url) }, { NGX_RTMP_AMF_NUMBER, ngx_string("objectEncoding"), &v.object_encoding, 0}, }; static ngx_rtmp_amf_elt_t in_elts[] = { { NGX_RTMP_AMF_NUMBER, ngx_null_string, &v.trans, 0 }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, in_cmd, sizeof(in_cmd) }, }; ngx_memzero(&v, sizeof(v)); /* 從 in 中提取 in_elts 中指定的數據,並保存在 in_elts 中的相應位置 */ if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } len = ngx_strlen(v.app); if (len > 10 && !ngx_memcmp(v.app + len - 10, "/_definst_", 10)) { v.app[len - 10] = 0; } else if (len && v.app[len - 1] == '/') { v.app[len - 1] = 0; } ngx_rtmp_cmd_fill_args(v.app, v.args); ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "connect: app='%s' args='%s' flashver='%s' swf_url='%s' " "tc_url='%s' page_url='%s' acodecs=%uD vcodecs=%uD " "object_encoding=%ui", v.app, v.args, v.flashver, v.swf_url, v.tc_url, v.page_url, (uint32_t)v.acodecs, (uint32_t)v.vcodecs, (ngx_int_t)v.object_encoding); /* ngx_rtmp_connect 和 各個 RTMP 模塊的 next_connect 構成了一個單鏈表, * 該鏈表的元素都是各 RTMP 模塊想要在 connect 階段進行的操做,初始化該鏈表 * 是在 postconfiguration 方法中進行的 */ return ngx_rtmp_connect(s, &v); }
對於 connect,主要有如下的 RTMP 模塊添加了相應的操做:(下面是根據調用順序排列的)網絡
ngx_int_t ngx_rtmp_receive_amf(ngx_rtmp_session_t *s, ngx_chain_t *in, ngx_rtmp_amf_elt_t *elts, size_t nelts) { ngx_rtmp_amf_ctx_t act; /* 構建一個 amf 的上下文結構體 */ ngx_memzero(&act, sizeof(act)); act.link = in; act.log = s->connection->log; /* 從 amf數據 中提取所需的數據 */ return ngx_rtmp_amf_read(&act, elts, nelts); }
static ngx_int_t ngx_rtmp_notify_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) { ngx_rtmp_notify_srv_conf_t *nscf; ngx_rtmp_netcall_init_t ci; ngx_url_t *url; /* 檢測是否置位了 auto_pushed 或者 relay,因爲是點播,所以該模塊至關於什麼也沒有作 */ if (s->auto_pushed || s->relay) { goto next; } /* 暫不分析 */ ... next: return next_connect(s, v); }
static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: connect"); ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_core_app_conf_t **cacfp; ngx_uint_t n; ngx_rtmp_header_t h; u_char *p; static double trans; static double capabilities = NGX_RTMP_CAPABILITIES; static double object_encoding = 0; static ngx_rtmp_amf_elt_t out_obj[] = { { NGX_RTMP_AMF_STRING, ngx_string("fmsVer"), NGX_RTMP_FMS_VERSION, 0 }, { NGX_RTMP_AMF_NUMBER, ngx_string("capabilities"), &capabilities, 0 }, }; static ngx_rtmp_amf_elt_t out_inf[] = { { NGX_RTMP_AMF_STRING, ngx_string("level"), "status", 0 }, { NGX_RTMP_AMF_STRING, ngx_string("code"), "NetConnection.Connect.Success", 0 }, { NGX_RTMP_AMF_STRING, ngx_string("description"), "Connection succeeded.", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_string("objectEncoding"), &object_encoding, 0 } }; /* 從內容能夠看出,這裏是針對客戶端發來的 connect,服務器將要發給客戶端的響應 */ static ngx_rtmp_amf_elt_t out_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "_result", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &trans, 0 }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, out_obj, sizeof(out_obj) }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, out_inf, sizeof(out_inf) }, }; /* 檢測當前會話是否已經鏈接了,如果,則代表重複接收到了客戶端的 connect, * 所以返回 ERROR */ if (s->connected) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "connect: duplicate connection"); return NGX_ERROR; } /* 獲取該 server{} 下的配置結構體 */ cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); trans = v->trans; /* fill session parameters */ s->connected = 1; /* 初始化 RTMP 頭 */ ngx_memzero(&h, sizeof(h)); h.csid = NGX_RTMP_CSID_AMF_INI; h.type = NGX_RTMP_MSG_AMF_CMD; #define NGX_RTMP_SET_STRPAR(name) \ s->name.len = ngx_strlen(v->name); \ s->name.data = ngx_palloc(s->connection->pool, s->name.len); \ ngx_memcpy(s->name.data, v->name, s->name.len) /* 將這些參數設置到 ngx_rtmp_session_t 會話結構體 s 中 */ NGX_RTMP_SET_STRPAR(app); NGX_RTMP_SET_STRPAR(args); NGX_RTMP_SET_STRPAR(flashver); NGX_RTMP_SET_STRPAR(swf_url); NGX_RTMP_SET_STRPAR(tc_url); NGX_RTMP_SET_STRPAR(page_url); #undef NGX_RTMP_SET_STRPAR p = ngx_strlchr(s->app.data, s->app.data + s->app.len, '?'); if (p) { s->app.len = (p - s->app.data); } s->acodecs = (uint32_t) v->acodecs; s->vcodecs = (uint32_t) v->vcodecs; /* 遍歷當前 server{} 下的全部 application{},找到與客戶端發來的 connect 命令中 * 要求鏈接的 app,而後把該 app 對應的 application{} 下所屬的 ngx_rtmp_conf_ctx_t * 的 app_conf 指針數組賦給當前會話 s->app_conf */ /* find application & set app_conf */ cacfp = cscf->applications.elts; for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) { if ((*cacfp)->name.len == s->app.len && ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0) { /* found app! */ s->app_conf = (*cacfp)->app_conf; break; } } /* 如果沒有找到,則表示沒有客戶端所要請求的 application,所以返回 ERROR */ if (s->app_conf == NULL) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "connect: application not found: '%V'", &s->app); return NGX_ERROR; } object_encoding = v->object_encoding; /* 這裏依次發送 ack_size、bandwidth、chunk_size、amf 給客戶端 */ return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK || ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK || ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK || ngx_rtmp_send_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK ? NGX_ERROR : NGX_OK; }
rtmp 服務端發送完上面的幾個 rtmp 包後,若客戶端沒有馬上發送 rtmp 包,則將讀事件添加到 epoll 中,
而後監聽客戶端發來的第二個 rtmp 包。session
其實,rtmp 服務器在接收到 "createStream" 以前,還接收到了客戶端一個 NGX_RTMP_MSG_ACK_SIZE(5) 的包,
接收到該包後,僅是調用 ngx_rtmp_protocol_message_handler 方法將包中的實際數據取出來,賦給
s->ack_size,以下:app
ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_buf_t *b; u_char *p; uint32_t val; uint8_t limit; b = in->buf; ... p = (u_char *)&val; p[0] = b->pos[3]; p[1] = b->pos[2]; p[2] = b->pos[1]; p[3] = b->pos[0]; switch (h->type) { ... case NGX_RTMP_MSG_ACK_SIZE: /* receive window size =val */ ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "receive ack_size=%uD", val); s->ack_size = val; break; ... } return NGX_OK; }
接收到客戶端發來的該 "createStream" amf 命令後,與上面分析的 "connect" 命令相似,調用到
ngx_rtmp_amf_message_handler 方法,而後在 amf_hash 中查找是否有 "createStream" 命令的回調方法,
這裏找到的回調方法爲 ngx_rtmp_cmd_create_stream_init。ide
static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { static ngx_rtmp_create_stream_t v; static ngx_rtmp_amf_elt_t in_elts[] = { { NGX_RTMP_AMF_NUMBER, ngx_null_string, &v.trans, sizeof(v.trans) }, }; /* 從 amf 數據中讀取 in_elts 中指定的數據 */ if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream"); /* 同理,這裏開始調用 ngx_rtmp_create_stream 構建的鏈表中的方法 */ return ngx_rtmp_create_stream(s, &v); }
在 nginx-rtmp 源碼中,可知,僅有 ngx_rtmp_cmd_module 模塊設置了該命令的方法。函數
static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) { /* support one message stream per connection */ static double stream; static double trans; ngx_rtmp_header_t h; static ngx_rtmp_amf_elt_t out_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "_result", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &trans, 0 }, { NGX_RTMP_AMF_NULL, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &stream, sizeof(stream) }, }; trans = v->trans; stream = NGX_RTMP_MSID; ngx_memzero(&h, sizeof(h)); h.csid = NGX_RTMP_CSID_AMF_INI; h.type = NGX_RTMP_MSG_AMF_CMD; /* 發送該 amf 數據 */ return ngx_rtmp_send_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ? NGX_DONE : NGX_ERROR; }
而後,服務器又再次等待客戶端發來的 rtmp 包。這次,接收 client 發來的到 rtmp 包爲 類型爲 amf_cmd(20)
的 "play" 命令.post
從接收到的這個 rtmp 包能夠看出實際上是一個 rtmp message 中包含兩個 chunk,第一個 chunk 爲 amf_cmd(20),
這裏爲 "play",第二個 chunk 爲 user control message(4)。
static ngx_int_t ngx_rtmp_cmd_play_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { static ngx_rtmp_play_t v; static ngx_rtmp_amf_elt_t in_elts[] = { /* transaction is always 0 */ { NGX_RTMP_AMF_NUMBER, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_NULL, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_STRING, ngx_null_string, &v.name, sizeof(v.name) }, { NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_NUMBER, ngx_null_string, &v.start, 0 }, { NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_NUMBER, ngx_null_string, &v.duration, 0 }, { NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_BOOLEAN, ngx_null_string, &v.reset, 0 } }; ngx_memzero(&v, sizeof(v)); /* 從接收到的 amf 數據中提取指定數據 */ if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } ngx_rtmp_cmd_fill_args(v.name, v.args); ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "play: name='%s' args='%s' start=%i duration=%i " "reset=%i silent=%i", v.name, v.args, (ngx_int_t) v.start, (ngx_int_t) v.duration, (ngx_int_t) v.reset, (ngx_int_t) v.silent); return ngx_rtmp_play(s, &v); }
ngx_rtmp_play 構成的鏈表主要有如下幾個 RTMP 模塊添加有回調方法:(調用順序排序)
從 nginx-rtmp 的源碼可知,主要就是調用 ngx_rtmp_play_module 模塊設置的回調方法 ngx_rtmp_play_play,
而其餘模塊的都沒有作什麼,由於沒有在配置文件中配置有相關命令來啓動它們。
在看 ngx_rtmp_play_play 方法前,先看 ngx_rtmp_play_module 模塊的配置項:
static ngx_command_t ngx_rtmp_play_commands[] = { { ngx_string("play"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, ngx_rtmp_play_url, NGX_RTMP_APP_CONF_OFFSET, 0, NULL }, { ngx_string("play_temp_path"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, ngx_conf_set_str_slot, NGX_RTMP_APP_CONF_OFFSET, offsetof(ngx_rtmp_play_app_conf_t, temp_path), NULL }, { ngx_string("play_local_path"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, ngx_conf_set_str_slot, NGX_RTMP_APP_CONF_OFFSET, offsetof(ngx_rtmp_play_app_conf_t, local_path), NULL }, ngx_null_command };
可見,第一項爲 "play",回顧點播業務的 nginx.conf 中,就配置了該命令。所以先看該模塊是如何解析 play 配置項的:
static char * ngx_rtmp_play_url(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_rtmp_play_app_conf_t *pacf = conf; ngx_rtmp_play_entry_t *pe, **ppe; ngx_str_t url; ngx_url_t *u; size_t add, n; ngx_str_t *value; /* 初始化 pacf->entries 數組 */ if (pacf->entries.nalloc == 0 && ngx_array_init(&pacf->entries, cf->pool, 1, sizeof(void *)) != NGX_OK) { return NGX_CONF_ERROR; } value = cf->args->elts; for (n = 1; n < cf->args->nelts; ++n) { ppe = ngx_array_push(&pacf->entries); if (ppe == NULL) { return NGX_CONF_ERROR; } pe = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_play_entry_t)); if (pe == NULL) { return NGX_CONF_ERROR; } *ppe = pe; /* 若是 "play" 配置項後的值,即 url 不爲 "http://" 開始的,即爲本地文件 */ if (ngx_strncasecmp(value[n].data, (u_char *) "http://", 7)) { /* local file */ pe->root = ngx_palloc(cf->pool, sizeof(ngx_str_t)); if (pe->root == NULL) { return NGX_CONF_ERROR; } *pe->root = value[n]; continue; } /* 不然,爲網絡文件 */ /* http case */ url = value[n]; add = sizeof("http://") - 1; url.data += add; url.len -= add; u = ngx_pcalloc(cf->pool, sizeof(ngx_url_t)); if (u == NULL) { return NGX_CONF_ERROR; } u->url.len = url.len; u->url.data = url.data; u->default_port = 80; u->uri_part = 1; if (ngx_parse_url(cf->pool, u) != NGX_OK) { if (u->err) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%s in url \"%V\"", u->err, &u->url); } return NGX_CONF_ERROR; } pe->url = u; } return NGX_CONF_OK; }
該函數主要就是提取 "play" 配置項後的值,判斷是本地文件仍是網絡文件,根據相應值構造 ngx_rtmp_play_entry_t,並將該指向該
結構體的指針放入到 pacf->entries 數組中.
下面繼續看 ngx_rtmp_play_play 函數。
static ngx_int_t ngx_rtmp_play_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { ngx_rtmp_play_main_conf_t *pmcf; ngx_rtmp_play_app_conf_t *pacf; ngx_rtmp_play_ctx_t *ctx; u_char *p; ngx_rtmp_play_fmt_t *fmt, **pfmt; ngx_str_t *pfx, *sfx; ngx_str_t name; ngx_uint_t n; pmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_play_module); pacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_play_module); if (pacf == NULL || pacf->entries.nelts == 0) { goto next; } ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "play: play name='%s' timestamp=%i", v->name, (ngx_int_t) v->start); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx && ctx->file.fd != NGX_INVALID_FILE) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "play: already playing"); goto next; } /* check for double-dot in v->name; * we should not move out of play directory */ for (p = v->name; *p; ++p) { if (ngx_path_separator(p[0]) && p[1] == '.' && p[2] == '.' && ngx_path_separator(p[3])) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "play: bad name '%s'", v->name); return NGX_ERROR; } } if (ctx == NULL) { /* 分配 ngx_rtmp_play_module 模塊的上下文結構體,並將其放入到全局數組 s->ctx 中 */ ctx = ngx_palloc(s->connection->pool, sizeof(ngx_rtmp_play_ctx_t)); ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_play_module); } ngx_memzero(ctx, sizeof(*ctx)); ctx->session = s; ctx->aindex = ngx_rtmp_play_parse_index('a', v->args); ctx->vindex = ngx_rtmp_play_parse_index('v', v->args); ctx->file.log = s->connection->log; ngx_memcpy(ctx->name, v->name, NGX_RTMP_MAX_NAME); name.len = ngx_strlen(v->name); name.data = v->name; /* pmcf->fmts 數組是在 postconfiguration 方法中初始化好的, * 在 nginx-rtmp 中,該數組僅有兩項,分別是 flv 和 mp4 */ pfmt = pmcf->fmts.elts; for (n = 0; n < pmcf->fmts.nelts; ++n, ++pfmt) { fmt = *pfmt; pfx = &fmt->pfx; // 封裝格式的前綴 sfx = &fmt->sfx; // 封裝格式的後綴 if (pfx->len == 0 && ctx->fmt == NULL) { ctx->fmt = fmt; } if (pfx->len && name.len >= pfx->len && ngx_strncasecmp(pfx->data, name.data, pfx->len) == 0) { ctx->pfx_size = pfx->len; ctx->fmt = fmt; break; } /* 比較客戶端請求播放的文件的後綴與服務器支持的文件後綴是否一致 */ if (name.len >= sfx->len && ngx_strncasecmp(sfx->data, name.data + name.len - sfx->len, sfx->len) == 0) { ctx->fmt = fmt; } } if (ctx->fmt == NULL) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "play: fmt not found"); goto next; } ctx->file.fd = NGX_INVALID_FILE; ctx->nentry = NGX_CONF_UNSET_UINT; ctx->post_seek = NGX_CONF_UNSET_UINT; sfx = &ctx->fmt->sfx; if (name.len < sfx->len || ngx_strncasecmp(sfx->data, name.data + name.len - sfx->len, sfx->len)) { ctx->sfx = *sfx; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: fmt=%V", &ctx->fmt->name); return ngx_rtmp_play_next_entry(s, v); next: return next_play(s, v); }
static ngx_int_t ngx_rtmp_play_next_entry(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { ngx_rtmp_play_app_conf_t *pacf; ngx_rtmp_play_ctx_t *ctx; ngx_rtmp_play_entry_t *pe; u_char *p; static u_char path[NGX_MAX_PATH + 1]; pacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_play_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); for ( ;; ) { /* 若文件描述符不爲 -1 ,則關閉它 */ if (ctx->file.fd != NGX_INVALID_FILE) { ngx_close_file(ctx->file.fd); ctx->file.fd = NGX_INVALID_FILE; } if (ctx->file_id) { ngx_rtmp_play_cleanup_local_file(s); } ctx->nentry = (ctx->nentry == NGX_CONF_UNSET_UINT ? 0 : ctx->nentry + 1); if (ctx->nentry >= pacf->entries.nelts) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: all entries failed"); ngx_rtmp_send_status(s, "NetStream.Play.StreamNotFound", "error", "Video on demand stream not found"); break; } /* 根據 ctx->nentry 獲取當前要播放的路徑,該路徑保存在 ngx_rtmp_play_entry_t, * 若播放的是本地文件,則路徑保存在成員 root 中,若播放的是網絡文件,則路徑 * 保存在 url 中 */ pe = ngx_rtmp_play_get_current_entry(s); ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: trying %s entry %ui/%uz '%V'", pe->url ? "remote" : "local", ctx->nentry + 1, pacf->entries.nelts, pe->url ? &pe->url->url : pe->root); /* open remote */ if (pe->url) { return ngx_rtmp_play_open_remote(s, v); } /* open local */ p = ngx_snprintf(path, NGX_MAX_PATH, "%V/%s%V", pe->root, v->name + ctx->pfx_size, &ctx->sfx); *p = 0; /* 打開要播放的本地文件 */ ctx->file.fd = ngx_open_file(path, NGX_FILE_RDONLY, NGX_FILE_OPEN, NGX_FILE_DEFAULT_ACCESS); if (ctx->file.fd == NGX_INVALID_FILE) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, ngx_errno, "play: error opening file '%s'", path); continue; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: open local file '%s'", path); if (ngx_rtmp_play_open(s, v->start) != NGX_OK) { return NGX_ERROR; } break; } return next_play(s, v); }
static ngx_int_t ngx_rtmp_play_open(ngx_rtmp_session_t *s, double start) { ngx_rtmp_play_ctx_t *ctx; ngx_event_t *e; ngx_uint_t timestamp; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx->file.fd == NGX_INVALID_FILE) { return NGX_ERROR; } /* 向客戶端發送 消息類型爲NGX_RTMP_MSG_USER(4) 的 stream_begin 消息, 以下圖[9] */ if (ngx_rtmp_send_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) { return NGX_ERROR; } /* 而後接着發送狀態消息,以下圖[10] */ if (ngx_rtmp_send_status(s, "NetStream.Play.Start", "status", "Start video on demand") != NGX_OK) { return NGX_ERROR; } if (ngx_rtmp_play_join(s) != NGX_OK) { return NGX_ERROR; } /* 初始化該 send_evt 事件 */ e = &ctx->send_evt; e->data = s; /* 初始化發送 指定播放文件數據的 回調函數 */ e->handler = ngx_rtmp_play_send; e->log = s->connection->log; /* 發送 recored 命令,以下圖[11] */ ngx_rtmp_send_recorded(s, 1); /* 發送 sample access,以下圖[12] */ if (ngx_rtmp_send_sample_access(s) != NGX_OK) { return NGX_ERROR; } /* 調用相應封裝格式的 init 函數 */ if (ngx_rtmp_play_do_init(s) != NGX_OK) { return NGX_ERROR; } /* 計算時間戳,有下一步 seek 可知,其實就是計算髮送 媒體文件的起始位置 */ timestamp = ctx->post_seek != NGX_CONF_UNSET_UINT ? ctx->post_seek : (start < 0 ? 0 : (ngx_uint_t) start); /* 同理,調用相應封裝格式的 seek 函數,將文件指針 seek 到 timstamp 位置(相對文件起始) */ if (ngx_rtmp_play_do_seek(s, timestamp) != NGX_OK) { return NGX_ERROR; } if (ngx_rtmp_play_do_start(s) != NGX_OK) { return NGX_ERROR; } /* 置位 opend 標誌位,表示文件已經打開了 */ ctx->opened = 1; return NGX_OK; }
static ngx_int_t ngx_rtmp_play_join(ngx_rtmp_session_t *s) { ngx_rtmp_play_ctx_t *ctx, **pctx; ngx_rtmp_play_app_conf_t *pacf; ngx_uint_t h; ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: join"); pacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_play_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx == NULL || ctx->joined) { return NGX_ERROR; } h = ngx_hash_key(ctx->name, ngx_strlen(ctx->name)); pctx = &pacf->ctx[h % pacf->nbuckets]; while (*pctx) { if (!ngx_strncmp((*pctx)->name, ctx->name, NGX_RTMP_MAX_NAME)) { break; } pctx = &(*pctx)->next; } ctx->next = *pctx; *pctx = ctx; ctx->joined = 1; return NGX_OK; }
該函數主要根據 ctx->name 經過 hash 的 key 方法,獲得映射槽的位置 h,即 pacf->ctx[h],而後將其插入到
ngx_rtmp_play_module 的上下文結構體 ctx 的 ctx->next 構建的鏈表中.
static ngx_int_t ngx_rtmp_play_do_init(ngx_rtmp_session_t *s) { ngx_rtmp_play_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx == NULL) { return NGX_ERROR; } /* 根據 fmt 調用相應封裝格式的 init 函數,這裏只支持兩種封裝格式,flv 和 mp4 */ if (ctx->fmt && ctx->fmt->init && ctx->fmt->init(s, &ctx->file, ctx->aindex, ctx->vindex) != NGX_OK) { return NGX_ERROR; } return NGX_OK; }
static ngx_int_t ngx_rtmp_play_do_seek(ngx_rtmp_session_t *s, ngx_uint_t timestamp) { ngx_rtmp_play_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx == NULL) { return NGX_ERROR; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: seek timestamp=%ui", timestamp); /* 調用相應封裝的 seek 函數 */ if (ctx->fmt && ctx->fmt->seek && ctx->fmt->seek(s, &ctx->file, timestamp) != NGX_OK) { return NGX_ERROR; } /* 若播放標誌位已經置位,則將 ctx->send_evt 事件放入到 延遲隊列 ngx_posted_events 中, * 正常來講應該尚未置位,僅在 ngx_rtmp_play_do_stark 中將其置 1 */ if (ctx->playing) { ngx_post_event((&ctx->send_evt), &ngx_posted_events); } return NGX_OK; }
static ngx_int_t ngx_rtmp_play_do_start(ngx_rtmp_session_t *s) { ngx_rtmp_play_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx == NULL) { return NGX_ERROR; } ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: start"); /* 調用相應封裝的 start 函數 */ if (ctx->fmt && ctx->fmt->start && ctx->fmt->start(s, &ctx->file) != NGX_OK) { return NGX_ERROR; } /* 將 ctx->send_evt 事件放入到 ngx_posted_events 延遲隊列中 */ ngx_post_event((&ctx->send_evt), &ngx_posted_events); /* 置位 playing 標誌位,表示開始發送文件 */ ctx->playing = 1; return NGX_OK; }
從 ngx_rtmp_play_open 函數返回後,會一路返回到 ngx_rtmp_recv 函數中,而後解析完接收到的下一個客戶端
發送來的 user control message(4) 消息(在開始分析 play 時說過)後,就一路返回到
ngx_process_events_and_timers 函數中,即以下:
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ... (void) ngx_process_events(cycle, timer, flags); /* 返回到這裏,而後接着往下執行 */ ... /* 將會執行到這裏,由上面的分析可知,已經將 send_evt 事件放入到了 * ngx_posted_events 延遲隊列中,所以會取出該事件,並執行 */ ngx_event_process_posted(cycle, &ngx_posted_events); }
send_evt 事件的回調方法爲 ngx_rtmp_play_send 函數。
點播的最後一個調用就是在該函數中循環的,直到所有發送文件完成後,才返回.
static void ngx_rtmp_play_send(ngx_event_t *e) { ngx_rtmp_session_t *s = e->data; ngx_rtmp_play_ctx_t *ctx; ngx_int_t rc; ngx_uint_t ts; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); if (ctx == NULL || ctx->fmt == NULL || ctx->fmt->send == NULL) { return; } ts = 0; /* 調用相應封裝的 send 函數 */ rc = ctx->fmt->send(s, &ctx->file, &ts); if (rc > 0) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: send schedule %i", rc); ngx_add_timer(e, rc); return; } if (rc == NGX_AGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: send buffer full"); ngx_post_event(e, &s->posted_dry_events); return; } if (rc == NGX_OK) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: send restart"); /* 每發送完成一次,就將其放入到 ngx_posted_events 延遲隊列中, * 而後由下一次 ngx_process_events_and_timers 函數中的循環 * 調用到它 */ ngx_post_event(e, &ngx_posted_events); return; } ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: send done"); /* 發送 stream_eof */ ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID); ngx_rtmp_send_play_status(s, "NetStream.Play.Complete", "status", ts, 0); ngx_rtmp_send_status(s, "NetStream.Play.Stop", "status", "Stopped"); }
致此,點播分析完畢。