客戶端和服務器通過三次握手後,執行函數ngx_rtmp_cycle()。nginx
void ngx_rtmp_cycle(ngx_rtmp_session_t *s) { ngx_connection_t *c; c = s->connection; c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; s->ping_evt.data = c; s->ping_evt.log = c->log; s->ping_evt.handler = ngx_rtmp_ping; ngx_rtmp_reset_ping(s); ngx_rtmp_recv(c->read); }
gdb跟蹤:服務器
(gdb) bt #0 ngx_rtmp_cycle (s=0x21a2220) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handler.c:93 #1 0x00000000005453c1 in ngx_rtmp_handshake_done (s=0x21a2220) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handshake.c:365 #2 0x00000000005457b2 in ngx_rtmp_handshake_recv (rev=0x7fffd79ff148) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handshake.c:455 #3 0x000000000047d730 in ngx_epoll_process_events (cycle=0x20990d0, timer=10000, flags=1) at src/event/modules/ngx_epoll_module.c:694 #4 0x000000000046fcce in ngx_process_events_and_timers (cycle=0x20990d0) at src/event/ngx_event.c:236 #5 0x000000000047a6ec in ngx_single_process_cycle (cycle=0x20990d0) at src/os/unix/ngx_process_cycle.c:317 #6 0x0000000000449f5e in main (argc=3, argv=0x7fffffffe518) at src/core/nginx.c:418
static void ngx_rtmp_recv(ngx_event_t *rev) { ngx_int_t n; ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_header_t *h; ngx_rtmp_stream_t *st, *st0; ngx_chain_t *in, *head; ngx_buf_t *b; u_char *p, *pp, *old_pos; size_t size, fsize, old_size; uint8_t fmt, ext; uint32_t csid, timestamp; c = rev->data; s = c->data; b = NULL; old_pos = NULL; old_size = 0; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); if (c->destroyed) { return; } for( ;; ) { st = &s->in_streams[s->in_csid]; /* allocate new buffer */ if (st->in == NULL) { st->in = ngx_rtmp_alloc_in_buf(s); if (st->in == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "in buf alloc failed"); ngx_rtmp_finalize_session(s); return; } } h = &st->hdr; in = st->in; b = in->buf; if (old_size) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reusing formerly read data: %d", old_size); b->pos = b->start; b->last = ngx_movemem(b->pos, old_pos, old_size); if (s->in_chunk_size_changing) { ngx_rtmp_finalize_set_chunk_size(s); } } else { if (old_pos) { b->pos = b->last = b->start; } n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { ngx_rtmp_finalize_session(s); return; } if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_rtmp_finalize_session(s); } return; } s->ping_reset = 1; ngx_rtmp_update_bandwidth(&ngx_rtmp_bw_in, n); b->last += n; s->in_bytes += n; if (s->in_bytes >= 0xf0000000) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "resetting byte counter"); s->in_bytes = 0; s->in_last_ack = 0; } if (s->ack_size && s->in_bytes - s->in_last_ack >= s->ack_size) { s->in_last_ack = s->in_bytes; ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "sending RTMP ACK(%uD)", s->in_bytes); if (ngx_rtmp_send_ack(s, s->in_bytes)) { ngx_rtmp_finalize_session(s); return; } } } old_pos = NULL; old_size = 0; /* parse headers */ if (b->pos == b->start) { p = b->pos; /* chunk basic header */ fmt = (*p >> 6) & 0x03; csid = *p++ & 0x3f; if (csid == 0) { if (b->last - p < 1) continue; csid = 64; csid += *(uint8_t*)p++; } else if (csid == 1) { if (b->last - p < 2) continue; csid = 64; csid += *(uint8_t*)p++; csid += (uint32_t)256 * (*(uint8_t*)p++); } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, "RTMP bheader fmt=%d csid=%D", (int)fmt, csid); if (csid >= (uint32_t)cscf->max_streams) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "RTMP in chunk stream too big: %D >= %D", csid, cscf->max_streams); ngx_rtmp_finalize_session(s); return; } /* link orphan */ if (s->in_csid == 0) { /* unlink from stream #0 */ st->in = st->in->next; /* link to new stream */ s->in_csid = csid; st = &s->in_streams[csid]; if (st->in == NULL) { in->next = in; } else { in->next = st->in->next; st->in->next = in; } st->in = in; h = &st->hdr; h->csid = csid; } ext = st->ext; timestamp = st->dtime; if (fmt <= 2 ) { if (b->last - p < 3) continue; /* timestamp: * big-endian 3b -> little-endian 4b */ pp = (u_char*)×tamp; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; pp[3] = 0; ext = (timestamp == 0x00ffffff); if (fmt <= 1) { if (b->last - p < 4) continue; /* size: * big-endian 3b -> little-endian 4b * type: * 1b -> 1b*/ pp = (u_char*)&h->mlen; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; pp[3] = 0; h->type = *(uint8_t*)p++; if (fmt == 0) { if (b->last - p < 4) continue; /* stream: * little-endian 4b -> little-endian 4b */ pp = (u_char*)&h->msid; pp[0] = *p++; pp[1] = *p++; pp[2] = *p++; pp[3] = *p++; } } } /* extended header */ if (ext) { if (b->last - p < 4) continue; pp = (u_char*)×tamp; pp[3] = *p++; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; } if (st->len == 0) { /* Messages with type=3 should * never have ext timestamp field * according to standard. * However that's not always the case * in real life */ st->ext = (ext && cscf->publish_time_fix); if (fmt) { st->dtime = timestamp; } else { h->timestamp = timestamp; st->dtime = 0; } } ngx_log_debug8(NGX_LOG_DEBUG_RTMP, c->log, 0, "RTMP mheader fmt=%d %s (%d) " "time=%uD+%uD mlen=%D len=%D msid=%D", (int)fmt, ngx_rtmp_message_type(h->type), (int)h->type, h->timestamp, st->dtime, h->mlen, st->len, h->msid); /* header done */ b->pos = p; if (h->mlen > cscf->max_message) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "too big message: %uz", cscf->max_message); ngx_rtmp_finalize_session(s); return; } } size = b->last - b->pos; fsize = h->mlen - st->len; if (size < ngx_min(fsize, s->in_chunk_size)) continue; /* buffer is ready */ if (fsize > s->in_chunk_size) { /* collect fragmented chunks */ st->len += s->in_chunk_size; b->last = b->pos + s->in_chunk_size; old_pos = b->last; old_size = size - s->in_chunk_size; } else { /* handle! */ head = st->in->next; st->in->next = NULL; b->last = b->pos + fsize; old_pos = b->last; old_size = size - fsize; st->len = 0; h->timestamp += st->dtime; if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) { ngx_rtmp_finalize_session(s); return; } if (s->in_chunk_size_changing) { /* copy old data to a new buffer */ if (!old_size) { ngx_rtmp_finalize_set_chunk_size(s); } } else { /* add used bufs to stream #0 */ st0 = &s->in_streams[0]; st->in->next = st0->in; st0->in = head; st->in = NULL; } } s->in_csid = 0; } }
gdb跟蹤:session
(gdb) bt #0 ngx_rtmp_recv (rev=0x7fffd79ff148) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handler.c:233 #1 0x0000000000546000 in ngx_rtmp_cycle (s=0x21a2220) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handler.c:102 #2 0x00000000005453c1 in ngx_rtmp_handshake_done (s=0x21a2220) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handshake.c:365 #3 0x00000000005457b2 in ngx_rtmp_handshake_recv (rev=0x7fffd79ff148) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handshake.c:455 #4 0x000000000047d730 in ngx_epoll_process_events (cycle=0x20990d0, timer=10000, flags=1) at src/event/modules/ngx_epoll_module.c:694 #5 0x000000000046fcce in ngx_process_events_and_timers (cycle=0x20990d0) at src/event/ngx_event.c:236 #6 0x000000000047a6ec in ngx_single_process_cycle (cycle=0x20990d0) at src/os/unix/ngx_process_cycle.c:317 #7 0x0000000000449f5e in main (argc=3, argv=0x7fffffffe518) at src/core/nginx.c:418
connect時,對應Command Message,即Type ID爲20,Chunk Stream ID爲3,Stream ID 爲0。app
結構體ngx_rtmp_header_t內容:ide
typedef struct { uint32_t csid; /* chunk stream id */ uint32_t timestamp; /* timestamp (delta) */ uint32_t mlen; /* message length */ uint8_t type; /* message type id */ uint32_t msid; /* message stream id */ } ngx_rtmp_header_t;
函數ngx_rtmp_receive_message()中,ngx_rtmp_header_t中type爲20,即函數指針evh對應函數ngx_rtmp_amf_message_handler():函數
ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_rtmp_core_main_conf_t *cmcf; ngx_array_t *evhs; size_t n; ngx_rtmp_handler_pt *evh; cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); #ifdef NGX_DEBUG { int nbufs; ngx_chain_t *ch; for(nbufs = 1, ch = in; ch->next; ch = ch->next, ++nbufs); ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "RTMP recv %s (%d) csid=%D timestamp=%D " "mlen=%D msid=%D nbufs=%d", ngx_rtmp_message_type(h->type), (int)h->type, h->csid, h->timestamp, h->mlen, h->msid, nbufs); } #endif if (h->type > NGX_RTMP_MSG_MAX) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "unexpected RTMP message type: %d", (int)h->type); return NGX_OK; } evhs = &cmcf->events[h->type]; evh = evhs->elts; ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "nhandlers: %d", evhs->nelts); for(n = 0; n < evhs->nelts; ++n, ++evh) { if (!evh) { continue; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "calling handler %d", n); switch ((*evh)(s, h, in)) { case NGX_ERROR: ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "handler %d failed", n); return NGX_ERROR; case NGX_DONE: return NGX_OK; } } return NGX_OK; }
ngx_int_t ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_rtmp_amf_ctx_t act; ngx_rtmp_core_main_conf_t *cmcf; ngx_array_t *ch; ngx_rtmp_handler_pt *ph; size_t len, n; static u_char func[128]; static ngx_rtmp_amf_elt_t elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, func, sizeof(func) }, }; /* AMF command names come with string type, but shared object names * come without type */ if (h->type == NGX_RTMP_MSG_AMF_SHARED || h->type == NGX_RTMP_MSG_AMF3_SHARED) { elts[0].type |= NGX_RTMP_AMF_TYPELESS; } else { elts[0].type &= ~NGX_RTMP_AMF_TYPELESS; } if ((h->type == NGX_RTMP_MSG_AMF3_SHARED || h->type == NGX_RTMP_MSG_AMF3_META || h->type == NGX_RTMP_MSG_AMF3_CMD) && in->buf->last > in->buf->pos) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF3 prefix: %ui", (ngx_int_t)*in->buf->pos); ++in->buf->pos; } cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); /* read AMF func name & transaction id */ ngx_memzero(&act, sizeof(act)); act.link = in; act.log = s->connection->log; memset(func, 0, sizeof(func)); if (ngx_rtmp_amf_read(&act, elts, sizeof(elts) / sizeof(elts[0])) != NGX_OK) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF cmd failed"); return NGX_ERROR; } /* skip name */ in = act.link; in->buf->pos += act.offset; len = ngx_strlen(func); ch = ngx_hash_find(&cmcf->amf_hash, ngx_hash_strlow(func, func, len), func, len); if (ch && ch->nelts) { ph = ch->elts; for (n = 0; n < ch->nelts; ++n, ++ph) { ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF func '%s' passed to handler %d/%d", func, n, ch->nelts); switch ((*ph)(s, h, in)) { case NGX_ERROR: return NGX_ERROR; case NGX_DONE: return NGX_OK; } } } else { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "AMF cmd '%s' no handler", func); } return NGX_OK; }
其中func表示"connect",經過hash找到對應的處理函數,即ph對應函數ngx_rtmp_cmd_connect_init()ui
static ngx_int_t ngx_rtmp_cmd_connect_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { size_t len; static ngx_rtmp_connect_t v; static ngx_rtmp_amf_elt_t in_cmd[] = { { NGX_RTMP_AMF_STRING, ngx_string("app"), v.app, sizeof(v.app) }, { NGX_RTMP_AMF_STRING, ngx_string("flashVer"), v.flashver, sizeof(v.flashver) }, { NGX_RTMP_AMF_STRING, ngx_string("swfUrl"), v.swf_url, sizeof(v.swf_url) }, { NGX_RTMP_AMF_STRING, ngx_string("tcUrl"), v.tc_url, sizeof(v.tc_url) }, { NGX_RTMP_AMF_NUMBER, ngx_string("audioCodecs"), &v.acodecs, sizeof(v.acodecs) }, { NGX_RTMP_AMF_NUMBER, ngx_string("videoCodecs"), &v.vcodecs, sizeof(v.vcodecs) }, { NGX_RTMP_AMF_STRING, ngx_string("pageUrl"), v.page_url, sizeof(v.page_url) }, { NGX_RTMP_AMF_NUMBER, ngx_string("objectEncoding"), &v.object_encoding, 0}, }; static ngx_rtmp_amf_elt_t in_elts[] = { { NGX_RTMP_AMF_NUMBER, ngx_null_string, &v.trans, 0 }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, in_cmd, sizeof(in_cmd) }, }; ngx_memzero(&v, sizeof(v)); if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } len = ngx_strlen(v.app); if (len > 10 && !ngx_memcmp(v.app + len - 10, "/_definst_", 10)) { v.app[len - 10] = 0; } else if (len && v.app[len - 1] == '/') { v.app[len - 1] = 0; } ngx_rtmp_cmd_fill_args(v.app, v.args); ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "connect: app='%s' args='%s' flashver='%s' swf_url='%s' " "tc_url='%s' page_url='%s' acodecs=%uD vcodecs=%uD " "object_encoding=%ui", v.app, v.args, v.flashver, v.swf_url, v.tc_url, v.page_url, (uint32_t)v.acodecs, (uint32_t)v.vcodecs, (ngx_int_t)v.object_encoding); return ngx_rtmp_connect(s, &v); }
debug調試:url
(gdb) bt #0 ngx_rtmp_cmd_connect_init (s=0x21a2220, h=0x209d298, in=0x209e0f0) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_cmd_module.c:206 #1 0x000000000054db82 in ngx_rtmp_amf_message_handler (s=0x21a2220, h=0x209d298, in=0x209e0f0) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_receive.c:448 #2 0x0000000000548eee in ngx_rtmp_receive_message (s=0x21a2220, h=0x209d298, in=0x209e0f0) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handler.c:1172 #3 0x00000000005470f6 in ngx_rtmp_recv (rev=0x7fffd79ff148) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handler.c:529 #4 0x0000000000546000 in ngx_rtmp_cycle (s=0x21a2220) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handler.c:102 #5 0x00000000005453c1 in ngx_rtmp_handshake_done (s=0x21a2220) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handshake.c:365 #6 0x00000000005457b2 in ngx_rtmp_handshake_recv (rev=0x7fffd79ff148) at ../../NGINX-RTMP/nginx-rtmp-module-master/ngx_rtmp_handshake.c:455 #7 0x000000000047d730 in ngx_epoll_process_events (cycle=0x20990d0, timer=10000, flags=1) at src/event/modules/ngx_epoll_module.c:694 #8 0x000000000046fcce in ngx_process_events_and_timers (cycle=0x20990d0) at src/event/ngx_event.c:236 #9 0x000000000047a6ec in ngx_single_process_cycle (cycle=0x20990d0) at src/os/unix/ngx_process_cycle.c:317 #10 0x0000000000449f5e in main (argc=3, argv=0x7fffffffe518) at src/core/nginx.c:418
服務器對於connect的響應爲_result,而後返回到函數ngx_rtmp_recv(),繼續接收客戶的數據。spa