首先使用 obs 推流符合以下流程:參考自 Hanvision Makito X cann't publish to SRS. .html
FFMPEG: C/S: Handshake C: ConnectApp() tcUrl=xxx S: Ack Size 2500,000 S: Set Peer Bandwidth 2500,000 S: Set Chunk Size 60,000 C: Set Chunk Size 60,000 S: ConnectApp() _result S: onBWDone() C: releaseStream+FCPublish(s0) C: createStream() S: releaseStream _result C: _checkbw() S: FCPublish() _result S: createStream() _result C: publish(s0) S: onFCPublish() S: onStatus()
下面的分析是繼服務器發送 onBWDone 後,進入 while 循環開始執行 stream_service_cycle。git
1. SrsRtmpConn::stream_service_cycle
int SrsRtmpConn::stream_service_cycle() { int ret = ERROR_SUCCESS; /* the rtmp client type: play/publish/unknown */ SrsRtmpConnType type; /* 首先鑑別客戶端請求的類型,是play/publish 或其餘,還有播放/推流的流名稱 */ if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("identify client failed. ret=%d", ret); } return ret; } req->strip(); srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration); /* 只有當配置文件中使能了 security 配置項,纔會真正進入到該 check 函數進行 * 一系列的檢測 */ /* allow all if security disabled. */ // secutity check if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) { srs_error("security check failed. ret=%d", ret); return ret; } srs_info("security check ok"); /* SRS 不容許請求的流名稱爲空 */ // Never allow the empty stream name, for HLS may write to a file with empty name. // @see https://github.com/ossrs/srs/issues/834: // SRS2 crashed for TS encoder assert failed if (req->stream.empty()) { ret = ERROR_RTMP_STREAM_NAME_EMPTY; srs_error("RTMP: Empty stream name not allowed, ret=%d", ret); return ret; } /* 設置服務器 send/recv 的超時時間 */ // client is identified, set the timeout to service timeout. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); /* 首先根據 vhost/app/stream 構造一個 stream_url,而後根據該 stream_url 在 SrsSource::pool * 中查找是否存在一個 stream_url 對應的 SrsSource,若能找到,則直接返回該 SrsSource,不然, * 新構造一個 SrsSource,並將其按 stream_url 放到 SrsSource::pool map 容器中 */ // find a source to serve. SrsSource* source = NULL; if ((ret = SrsSorce::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) { return ret; } srs_assert(source != NULL); /* 構造統計類,將統計當前的 vhost、stream 等信息 */ // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) { srs_error("stat client failed. ret=%d", ret); return ret; } /* 若 vhost 中沒有配置 mode,則返回 false */ bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); /* 默認開始 gop_cache */ bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id(), source->source_id()); /* 根據 enabled_cache 設置是否啓動 gop_cache,爲 true,則啓動 */ source->set_cache(enabled_cache); /* 根據鑑別到的客戶端的類型:play 或者 publish,開始進行相應的處理 */ /* The type of client, play or publish. */ client_type = type; switch (type) { case SrsRtmpConnPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); // response connection start play if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to play stream failed. ret=%d", ret); return ret; } if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) { srs_error("http hook on_play failed. ret=%d", ret); return ret; } srs_info("start to play stream %s success", req->stream.c_str()); ret = playing(source); http_hooks_on_stop(); return ret; } /* 由前面知,若 obs 推流的話爲該類型 */ case SrsRtmpConnFMLEPublish: { srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); /* 該函數主要是接收並響應一系列消息: * C: FCPublish * S: FCPublish response * C: createStream * S: createStream response * C: publish * S: publish response onFCPublish(NetStream.Publish.Start) * S: publish response onStatus(NetStream.Publish.Start) */ if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } /* 服務器響應客戶端的publish消息後,即開始進入接收客戶端推流的 * metadata、video、audio等數據的處理 */ return publishing(source); } case SrsRtmpConnHaivisionPublish: { srs_verbose("Haivision start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("flash start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; srs_info("invalid client type=%d. ret=%d", type, ret); return ret; } } return ret; }
2. SrsRtmpServer::identify_client
該函數是對客戶端請求進行鑑定,以便作出相應的處理。github
int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, double& duration) { type = SrsRtmpConnUnknown; int ret = ERROR_SUCCESS; while (true) { SrsCommonMessage* msg = NULL; /* 接收一個完整的消息 */ if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("recv identify client message failed. ret=%d", ret); } return ret; } SrsAutoFree(SrsCommonMessage, msg); SrsMessageHeader& h = msg->header; if (h.is_ackledgement() || h.is_set_chunk_size() || h.is_windonw_ackledgenemt_size() || h.is_user_control_message()) { continue; } /* 若不爲 amf 類型的消息,則忽略該消息,繼續接收下一個消息 */ if (!h.is_amf0_commnad() && !h.is_amf3_command()) { srs_trace("identify ignore message except " "AMF0/AMF3 command message. type=%#x", h.message_type); continue; } SrsPacket* pkt = NULL; /* 對接收到的 amf 命令消息進行解碼,解碼後的數據保存在 pkt 指向的子類中 */ if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("identify decode message failed. ret=%d", ret); return ret; } SrsAutoFree(SrsPacket, pkt); /* 下面是經過 dynamic_cast 動態轉換嘗試將 pkt 轉爲指定的類型, * 若不爲 NULL,則代表接收到的消息即爲所要的消息 */ if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) { srs_info("identify client by create stream, play or flash publish."); return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name, duration); } /* 當接收到的是 releaseStream/FCPublish/FCUnpublish 這三個中的一個時, * 構造的類都爲 SrsFMLEStartPacket */ if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) { srs_info("identify client by releaseStream, fmle publish"); /* 這裏便可肯定 client 的類型爲 publish */ return identify_fmle_publish_client(dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name); } if (dynamic_cast<SrsPlayPacket*>(pkt)) { srs_info("level0 identify client by play."); return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, stream_name, duration); } /* call msg, * support response null first, * @see https://github.com/ossrs/srs/issues/106 * TODO: FIXME: response in right way, or forward in edge mode. */ SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt); if (call) { SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id); res->command_object = SrsAmf0Any::null(); res->response = SrsAmf0Any::null(); if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) { if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) { srs_warn("response call failed. ret=%d", ret); } return ret; } /* For encoder of Haivision, it always send a _checkbe call message. * @remark the next message is createStream, so we continue to identify it. * @see https://github.com/ossrs/srs/issues/844 */ if (call->command_name == "_checkbw") { srs_info("Havision encoder identified."); continue; } continue; } srs_trace("ignore AMF0/AMF3 command message."); } return ret; }
2.1 SrsProtocol::recv_message
int SrsProtocol::recv_message(SrsCommonMessage* pmsg) { *pmsg = NULL; int ret = ERROR_SUCCESS; while (true) { SrsCommonMessage* msg = NULL; /* 從 socket 中讀取一個消息 */ if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("recv interlaced message failed. ret=%d", ret); } srs_freep(msg); return ret; } srs_verbose("entire msg received"); if (!msg) { srs_info("got empty message without error."); continue; } if (msg->size <= 0 || msg->header.payload_length <= 0) { srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).", msg->header.message_type, msg->header.payload_length, msg->header.timestamp, msg->header.stream_id); srs_freep(msg); continue; } /* 若接收到的是一些control消息,如 set chunk size 等,則更改上下文信息, * 其餘的消息如音視頻或amf類型的則不作處理 */ if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) { srs_error("hook the received msg failed. ret=%d", ret); srs_freep(msg); return ret; } srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64, msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length, msg->header.timestamp); *pmsg = msg; break; } return ret; }
2.1.1 SrsProtocol::recv_interlaced_message
int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) { int ret = ERROR_SUCCESS; // chunk stream basic header. char fmt = 0; int cid = 0; /* 讀取 chunk 的基本頭 */ if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read basic header failed. ret=%d", ret); } return ret; } srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid); // thd cid must not negative. srs_assert(cid >= 0); /* 構造一個塊流緩存,由於一個 RTMP 消息可能包含多個塊, * 所以這裏使用該塊流緩存將多個塊的信息保存起來,直到 * 接收到一個完整的消息爲止 */ // get the cached chunk stream. SrsChunkStream* chunk = NULL; /* SrsChunkStream** cs_cache: * SrsProtocol 類在構造的時候就爲 cs_cache 數組分配了 SRS_PERF_CHUNK_STREAM_CACHE(16) * 個 SrsChunkStream 元素空間,當 chunk 的 cid 不超過該數組最大值時,能夠直接從該數組中 * 取出一個項,即 SrsChunkStream 使用 */ // use chunk stream cache to get the chunk info. // @see https://github.com/ossrs/srs/issues/249 if (cid < SRS_PERF_CHUNK_STREAM_CACHE) { // chunk stream cache hit. srs_verbose("cs-cache hit, cid=%d", cid); // already init, use it directly chunk = cs_cache[cid]; srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); } else { // chunk stream cache miss, use map. if (chunk_streams.find(cid) == chunk_streams.end()) { chunk = chunk_streams[cid] = new SrsChunkStream(cid); // set the perfer cid of chunk, // which will copy to the message received. chunk->header.perfer_cid = cid; srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); } else { chunk = chunk_streams[cid]; srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); } } /* 根據 fmt 接收 chunk 的消息頭 */ // chunk stream message header if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read message header failed. ret=%d", ret); } return ret; } srs_verbose("read message header success. " "fmt=%d, ext_time=%d, size=%d, " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); // read msg payload from chunk stream. SrsCommonMessage* msg = NULL; if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read message payload failed. ret=%d", ret); } return ret; } // not got an entire RTMP message, try next chunk. if (!msg) { srs_verbose("get partial message success. size=%d, " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); return ret; } *pmsg = msg; srs_info("get entire message success. size=%d, " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); return ret; }
2.2 SrsProtocol::decode_message
int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) { *ppacket = NULL; int ret = ERROR_SUCCESS; srs_assert(msg != NULL); srs_assert(msg->payload != NULL); srs_assert(msg->size > 0); SrsStream stream; /* 將消息負載的數據使用 SrsStream 類進行初始化 */ // initialize the decode stream for all message, // it's ok for the initialize if fase and without memory copy. if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) { srs_error("initialize stream failed. ret=%d", ret); return ret; } srs_verbose("decode stream initialized success"); // decode the packet. SrsPacket* packet = NULL; if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) { srs_freep(packet); return ret; } // set to output ppacket only when success. *ppacket = packet; return ret; }
2.2.1 SrsProtocol::do_decode_message
int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** packet) { int ret = ERROR_SUCCESS; SrsPacket* packet = NULL; // decode specified packet type if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) { srs_verbose("start to decode AMF0/AMF3 command message."); // skip 1bytes to decode the amf3 command. if (header.is_amf3_command) && stream->require(1)) { srs_verbose("skip 1bytes to decode AMF3 command"); stream->skip(1); } // amf0 command message. // need to read the command name. std::string command; /* 讀取消息的命令名 */ if ((ret = srs_amfo_read_string(stream, command)) != ERROR_SUCCESS) { srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret); return ret; } srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str()); // result/error packet if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) { double transactionId = 0.0; if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) { srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret); return ret; } srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId); // reset stream, for header read completed. stream->skip(-1 * stream->pos()); if (header.is_amf3_command()) { stream->skip(1); } // find the call name if (requests.find(transactionId) == requests.end()) { ret = ERROR_RTMP_NO_REQUEST; srs_error("decode AMF0/AMF3 request failed. ret=%d", ret); return ret; } std::string request_name = requests[transactionId]; srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str()); if (request_name == RTMP_AMF0_COMMAND_CONNECT) { srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); *ppacket = packet = new SrsConnectAppResPacket(); return packet->decode(stream); } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) { srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); *ppacket = packet = new SrsCreateStreamResPacket(0, 0); return packet->decode(stream); } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) { srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); *ppacket = packet = new SrsFMLEStartResPacket(0); return packet->decode(stream); } else { ret = ERROR_RTMP_NO_REQUEST; srs_error("decode AMF0/AMF3 request failed. " "request_name=%s, transactionId=%.2f, ret=%d", request_name.c_str(), transactionId, ret); return ret; } } // reset to zero(amf3 to 1) to restart decode. stream->skip(-1 * stream->pos()); if (header.is_amf3_command()) { stream->skip(1); } /* 根據消息的命令名來構造對應的類,而後進行解碼 */ // decode command object. if (command == RTMP_AMF0_COMMAND_CONNECT) { srs_info("decode the AMF0/AMF3 command(connect vhost/app message)."); *ppacket = packet = new SrsConnectAppPacket(); return packet->decode(stream); } else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) { srs_info("decode the AMF0/AMF3 command(createStream message)."); *ppacket = packet = new SrsCreateStreamPacket(); return packet->decode(stream); } else if (command == RTMP_AMF0_COMMAND_PLAY) { srs_info("decode the AMF0/AMF3 command(paly message)."); *ppacket = packet = new SrsPlayPacket(); return packet->decode(stream); } else if(command == RTMP_AMF0_COMMAND_PAUSE) { srs_info("decode the AMF0/AMF3 command(pause message)."); *ppacket = packet = new SrsPausePacket(); return packet->decode(stream); } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) { srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message)."); *ppacket = packet = new SrsFMLEStartPacket(); return packet->decode(stream); } else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) { srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message)."); *ppacket = packet = new SrsFMLEStartPacket(); return packet->decode(stream); } else if(command == RTMP_AMF0_COMMAND_PUBLISH) { srs_info("decode the AMF0/AMF3 command(publish message)."); *ppacket = packet = new SrsPublishPacket(); return packet->decode(stream); } else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) { srs_info("decode the AMF0/AMF3 command(unpublish message)."); *ppacket = packet = new SrsFMLEStartPacket(); return packet->decode(stream); } else if(command == SRS_CONSTS_RTMP_SET_DATAFRAME || command == SRS_CONSTS_RTMP_ON_METADATA) { srs_info("decode the AMF0/AMF3 data(onMetaData message)."); *ppacket = packet = new SrsOnMetaDataPacket(); return packet->decode(stream); } else if(command == SRS_BW_CHECK_FINISHED || command == SRS_BW_CHECK_PLAYING || command == SRS_BW_CHECK_PUBLISHING || command == SRS_BW_CHECK_STARTING_PLAY || command == SRS_BW_CHECK_STARTING_PUBLISH || command == SRS_BW_CHECK_START_PLAY || command == SRS_BW_CHECK_START_PUBLISH || command == SRS_BW_CHECK_STOPPED_PLAY || command == SRS_BW_CHECK_STOP_PLAY || command == SRS_BW_CHECK_STOP_PUBLISH || command == SRS_BW_CHECK_STOPPED_PUBLISH || command == SRS_BW_CHECK_FINAL) { srs_info("decode the AMF0/AMF3 band width check message."); *ppacket = packet = new SrsBandwidthPacket(); return packet->decode(stream); } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) { srs_info("decode the AMF0/AMF3 closeStream message."); *ppacket = packet = new SrsCloseStreamPacket(); return packet->decode(stream); } else if (header.is_amf0_command() || header.is_amf3_command()) { srs_info("decode the AMF0/AMF3 call message."); *ppacket = packet = new SrsCallPacket(); return packet->decode(stream); } // default packet to drop message. srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); *ppacket = packet = new SrsPacket(); return ret; } else if (header.is_user_control_message()) { srs_verbose("start to decode user control message."); *ppacket = packet = new SrsUserControlPacket(); return packet->decode(stream); } else if(header.is_window_ackledgement_size()) { srs_verbose("start to decode set ack window size message."); *ppacket = packet = new SrsSetWindowAckSizePacket(); return packet->decode(stream); } else if(header.is_set_chunk_size()) { srs_verbose("start to decode set chunk size message."); *ppacket = packet = new SrsSetChunkSizePacket(); return packet->decode(stream); } else { if (!header.is_set_peer_bandwidth() && !header.is_ackledgement()) { srs_trace("drop unknown message, type=%d", header.message_type); } } return ret; }
由開始的流程知,服務器發送 onBWDone 後,接下來會接收到客戶端的 releaseStream 命令。對於 releaseStream/PublishStream/FCPublish/FCUnpublish 等都是使用 SrsFMLEStartPacket 類來構造的。算法
2.2.2 構造 SrsFMLEStartPacket 類
/** * FMLE start publish: ReleaseStream/PublishStream/FCPublish/FCUnpublish */ SrsFMLEStartPacket::SrsFMLEStartPacket() { /* 命令名:releaseStream */ command_name = RTMP_AMF0_COMMAND_RELEASE_STREAM; /* the transaction ID to get the response. */ transaction_id = 0; /** * If there exists any command info this is set, else this is set to null type. * @remark, never be NULL, an AMF0 null instance. */ command_object = SrsAmf0Any::null(); }
2.2.3 SrsFMLEStartPacket::decode
開始解析 releaseStream 消息的負載。數組
int SrsFMLEStartPacket::decode(SrsStream* stream) { int ret = ERROR_SUCCESS; /* 讀取該消息的命令名稱 */ if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret); return ret; } if (command_name.empty() || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH && command_name != RTMP_AMF0_COMMAND_UNPUBLISH)) { ret = ERROR_RTMP_AMF0_DECODE; srs_error("amf0 decode FMLE start command_name failed. " "command_name=%s, ret=%d", command_name.c_str(), ret); return ret; } if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret); return ret; } if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret); return ret; } if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret); return ret; } srs_info("amf0 decode FMLE start packet success"); return ret; }
recv:releaseStream('live')
2.3 SrsRtmpServer::identify_fmle_publish_client
當解析接收到的消息爲 releaseStream 的時候,會調用該函數。緩存
int SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, string& stream_name) { int ret = ERROR_SUCCESS; /* 鑑別 client 的類型爲 publish */ type = SrsRtmpConnFMLEPublish; /* 客戶端 publish 的流名稱 */ stream_name = req->stream_name; /* 下面是對 releaseStream 消息的響應 */ // releaseStream response if (true) { SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id); if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send releaseStream response message failed. ret=%d", ret); return ret; } srs_info("send releaseStream response message success."); } return ret; }
該函數中是對 releaseStream 的響應。 發送的包以下圖:服務器
send: response for releaseStream
2.3.1 構造 SrsFMLEStartResPacket 類
/** * response for SrsFMLEStartPacket. */ SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id) { /* 響應消息的名稱:_result */ command_name = RTMP_AMF0_COMMAND_RESULT; transaction_id = _transaction_id; /** * If there exists any command info this is set, else this is set to null type. * @remark, never be NULL, an AMF0 null instance. */ command_object = SrsAmf0Any::null(); /** * the optional args, set to undefined. * @remark, never be NULL, an AMF0 undefined instance. */ args = SrsAmf0Any::undefined(); }
2.3.2 SrsFMLEStartResPacket::encode_packet
構建 releaseStream response 消息的負載.app
int SrsFMLEStartResPacket::encode_packet(SrsStream* stream) { int ret = ERROR_SUCCESS; if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("encode command_name failed. ret=%d", ret); return ret; } srs_verbose("encode command_name success."); if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("encode transaction_id failed. ret=%d", ret); return ret; } srs_verbose("encode transaction_id success."); if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { srs_error("encode command_object failed. ret=%d", ret); return ret; } srs_verbose("encode command_object success."); if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) { srs_error("encode args failed. ret=%d", ret); return ret; } srs_verbose("encode args success."); srs_info("encode FMLE start response packet success."); return ret; }
3. SrsSource::fetch_or_create
/** * create source when fetch from cache failed. * @param r the client request. * @param h the event handler for source. * @param pps the matches source, if success never be NULL. */ int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) { int ret = ERROR_SUCCESS; SrsSource* source = NULL; /* 根據 vhost/app/stream 組成的 stream_url 在 pool 中查找是否存在 * 該與之對應的 SrsSource,若不存在,則返回 NULL */ if ((source = fetch(r)) != NULL) { *pps = source; return ret; } /* vhost/app/stream 生成一個 stream_url */ string stream_url = r->get_stream_url(); string vhost = r->vhost; /* 必須肯定在 pool 中沒有 stream_url 對應的項 */ // should always not exists for create a source. srs_assert(pool.find(stream_url) == pool.end()); /* 構建一個新的 SrsSource */ source = new SrsSource(); if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) { srs_freep(source); return ret; } /* 將該新生成的 source 放入到 pool 中 */ pool[stream_url] = source; srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); /* 經過 pps 返回該新生成的 source */ *pps = source; return ret; }
3.1 SrsSource::fetch
/** * get the exists source, NULL when not exists. * update the request and return the exists source. */ SrsSource* SrsSource::fetch(SrsRequest* r) { SrsSource* source = NULL; /* get the stream identify, vhost/app/stream. */ string stream_url = r->get_stream_url(); if (pool.find(stream_url) == pool.end()) { return NULL; } source = pool[stream_url]; /* we always update the request of resource, * for origin auth is on, the token in request maybe invalid, * and we only need to update the token of request, it's simple. */ source->_req->update_auth(r); return source; }
3.2 構造 SrsSource 類
構造一個直播流源。socket
/** * the time jitter algorithm: * 1. full, to ensure stream start at zero, and ensure stream monotonically increasing. * 2. zero, only ensure stream start at zero, ignore timestamp jitter. * 3. off, disable the time jitter algorithm, like atc. */ enum SrsRtmpJitterAlgorithm { SrsRtmpJitterAlgorithmFULL = 0x01, SrsRtmpJitterAlgorithmZERO, SrsRtmpJitterAlgorithmOFF }; SrsSource::SrsSource() { /* _req: deep copy of client request. */ _req = NULL; /* the time jitter algorithm for vhost: vhost 的時間抖動算法 */ jitter_algorithm = SrsRtmpJitterAlgorithmOFF; /* whether use interlaced/mixed algorithm to correct timestamp. * 這裏初始化禁止 */ mix_correct = false; mix_queue = new SrsMixQueue(); #ifdef SRS_AUTO_HLS /* 構造一個 hls handler */ hls = new SrsHls(); #endif #ifdef SRS_AUTO_DVR /* 構造一個 dvr handler */ dvr = new SrsDvr(); #endif #ifdef SRS_AUTO_TRANSCODE /* 構造一個 transcoding handler */ encoder = new SrsEncoder(); #endif #ifdef SRS_AUTO_HDS hds = new SrsHds(this); #endif /** * cache_sh_video: the cached video sequence header. * cache_sh_audio: the cached audio sequence header. */ cache_metadata = cache_sh_video = cache_sh_audio = NULL; /* can publish, true when is not streaming */ _can_publish = true; /** * source id, * for publish, it's the publish client id. * for edge, it's the edge ingest id. * when source id changed, for example, the edge reconnect, * invoke the on_source_id_changed() to let all clients know. * * _pre_source_id: previous source id. */ _pre_source_id = _source_id = -1; /** * last die time, when all consumeers quit and no publisher, * we will remove the source when source die. */ die_at = -1; /* edge control service */ play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); /* gop cache for client fast startup. */ gop_cache = new SrsGopCache(); /* for aggregate message */ aggregate_stream = new SrsStream(); /* whether stream is monotonically increase. */ is_monotonicaly_increase = false; last_packet_time = 0; _srs_config->subscribe(this); /** * atc whether atc(use absolute time and donot adjust time), * directly use msg time and donot adjust if atc is true, * otherwise, adjust msg time to start from 0 to make flash happy. * * TODO: FIXME: to support reload atc. */ atc = false; }
3.3 SrsSource::initialize
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) { int ret = ERROR_SUCCESS; srs_assert(h); srs_assert(!_req); handler = h; /* 深度拷貝,將 r 中的內容徹底拷貝到 _req 中 */ _req = r->copy(); /* 若 vhost 中沒有設置 atc 配置項,則返回 false, * atc 爲 false,則會調整 msg 時間從 0 開始 */ atc = _srs_config->get_atc(_req->vhost); /* 暫不分析 */ #ifdef SRS_AUTO_HLS if ((ret = hls->initialize(this)) != ERROR_SUCCESS) { return ret; } #endif #ifdef SRS_AUTO_DVR if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } #endif if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } /* 若 vhost 沒有設置 queue_length,則使用默認的,爲 30 */ double queue_size = _srs_config->get_queue_length(_req->vhost); publish_edge->set_queue_size(queue_size); jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost); mix_correct = _srs_config->get_mix_correct(_req->vhost); return ret; }
3.4 SrsStatistic::on_client
/** * when got a client to publish/play stream, * @param id, the client srs id. * @param req, the client request object. * @param conn, the physical abstract connection object. * @param type, the type of connection. */ int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type) { int ret = ERROR_SUCCESS; SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); // create client if not exists SrsStatisticClient* client = NULL; if (clients.find(id) == clients.end()) { client = new SrsStatisticClient(); client->id = id; client->stream = stream; clients[id] = client; } else { client = clients[id]; } // got client client->conn = conn; client->req = req; client->type = type; stream->nb_clients++; vhost->nb_clients++; return ret; }
3.4.1 SrsStatistic::create_vhost
SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req) { SrsStatisticVhost* vhost = NULL; /** * rvhost: * key: vhost url, value: vhost Object. * @remark a fast index for vhost. */ // create vhost if not exists. if (rvhosts.find(req->vhost) == rvhost.end()) { vhost = new SrsStatisticVhost(); vhost->vhost = req->vhost; rvhosts[req->vhost] = vhost; /* vhosts - key: vhost id, value: vhost object. */ vhosts[vhost->id] = vhost; return vhost; } vhost = rvhosts[req->vhost]; return vhost; }
3.4.2 SrsStatistic::create_stream
SrsStatisticStream* SrsStatistic::create_stream(SrsStatisticVhost* vhost, SrsRequest* req) { std::string url = req->get_stream_url(); SrsStatisticStream* stream = NULL; // create stream if not exists. if (rstreams.find(url) == rstreams.end()) { stream = new SrsStatisticStream(); stream->vhost = vhost; stream->stream = req->stream; stream->app = req->app; stream->url = url; rstreams[url] = stream; streams[stream->id] = stream; return stream; } stream = rstreams[url]; return stream; }
4. SrsSource::set_cache
void SrsSource::set_cache(bool enabled) { /* SrsGopCache* gop_cache: gop cache for client fast startup. */ gop_cache->set(enabled); }
4.1 SrsGopCache::set
/** * to enabled or disable the gop cache. */ void SrsGopCache::set(bool enabled) { /* if disabled the gop cache, the client will wait for the next * keyframe for h264, and will be black-screan. */ enabled_gop_cache = enabled; if (!enabled) { srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size()); clear(); return; } srs_info("enable gop cache"); }
5. SrsRtmpServer::start_fmle_publish
/** * when client type is publish, response with packets: * releaseStream response * FCPublish * FCPublish response * createStream response * onFCPublish(NetStream.Publish.Start) * onStatus(NetStream.Publish.Start) */ int SrsRtmpServer::start_fmle_publish(int stream_id) { int ret = ERROR_SUCCESS; // FCPublish double fc_publish_tid = 0; if (true) { SrsCommonMessage* msg = NULL; SrsFMLEStartPacket* pkt = NULL; /* 指定接收這幾個中的一個消息:ReleaseStream/FCPublish/FCUnpublish,若不是其中之一, * 則丟棄,直到接收到其中一個才返回 * 由開始的流程知,這裏應該是接收 FCPublish */ if ((ret = expect_message<SrsFMLEStartPacket>(&msg, &pkt)) != ERROR_SUCCESS) { srs_error("recv FCPublish message failed. ret=%d", ret); return ret; } srs_info("recv FCPublish request message success."); SrsAutoFree(SrsCommonMessage, msg); SrsAutoFree(SrsFMLEStartPacket, pkt); fc_publish_tid = pkt->transaction_id; } // FCPublish response if (true) { SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid); if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send FCPublish response message failed. ret=%d", ret); return ret; } srs_info("send FCPublish response message success."); } // createStream double create_stream_tid = 0; if (true) { SrsCommonMessage* msg = NULL; SrsCreateStreamPacket* pkt = NULL; if ((ret = expect_message<SrsCreateStreamPacket>(&msg, &pkt)) != ERROR_SUCCESS) { srs_error("recv createStream message failed. ret=%d", ret); return ret; } srs_info("recv createStream request message success."); SrsAutoFree(SrsCommonMessage, msg); SrsAutoFree(SrsCreateStreamPacket, pkt); create_stream_tid = pkt->transaction_id; } // createStream response if (true) { SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id); if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send createStream response message failed. ret=%d", ret); return ret; } srs_info("send createStream response message success."); } // publish if (true) { SrsCommonMessage* msg = NULL; SrsPublishPacket* pkt = NULL; if ((ret = expect_message<SrsPublishPacket>(&msg, &pkt)) != ERROR_SUCCESS) { srs_error("recv publish message failed. ret=%d", ret); return ret; } srs_info("recv publish request message success."); SrsAutoFree(SrsCommonMessage, msg); SrsAutoFree(SrsPublishPacket, pkt); } // publish response onFCPublish(NetStream.Publish.Start) if (true) { SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH; pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart)); pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream.")); if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret); return ret; } srs_info("send onFCPublish(NetStream.Publish.Start) message success."); } // publish response onStatus(NetStream.Publish.Start) if (true) { SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart)); pkt->data->set(StatusDescritption, SrsAmf0Any::str("Started publishing stream.")); pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID)); if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret); return ret; } srs_info("send onStatus(NetStream.Publish.Start) message success."); } srs_info("FMLE publish success."); return ret; }
5.1 FCPublish
5.1.1 FCPublish 接收
接收 FCPublish 後的解析以下代碼所示。ide
SrsProtocol::do_decode_message:
int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) { ... /* FCPublish */ else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) { srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message)."); *ppacket = packet = new SrsFMLEStartPacket(); return packet->decode(stream); } ... }
SrsFMLEStartPacket::decode
int SrsFMLEStartPacket::decode(SrsStream* stream) { int ret = ERROR_SUCCESS; /* 讀取消息的命令名,即 "FCPublish" */ if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret); return ret; } if (command_name.empty() || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH && command_name != RTMP_AMF0_COMMAND_UNPUBLISH) ) { ret = ERROR_RTMP_AMF0_DECODE; srs_error("amf0 decode FMLE start command_name failed. " "command_name=%s, ret=%d", command_name.c_str(), ret); return ret; } if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret); return ret; } if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret); return ret; } if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret); return ret; } srs_info("amf0 decode FMLE start packet success"); return ret; }
5.1.2 FCPublish response
FCPublish 的響應用 SrsFMLEStartResPacket 類構造數據。該類的構造以下:
SrsFMLEStartResPacket 構造函數
/** * response for SrsFMLEStartPacket. */ SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id) { /* _result */ command_name = RTMP_AMF0_COMMAND_RESULT; transaction_id = _transaction_id; command_object = SrsAmf0Any::null(); args = SrsAmf0Any::undefined(); }
SrsFMLEStartResPacket::encode_packet
/** * subpacket can override to encode the payload to stream. * @remark never invoke the super.encode_packet, it always failed. */ int SrsFMLEStartResPacket::encode_packet(SrsStream* stream) { int ret = ERROR_SUCCESS; if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("encode command_name failed. ret=%d", ret); return ret; } srs_verbose("encode command_name success."); if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("encode transaction_id failed. ret=%d", ret); return ret; } srs_verbose("encode transaction_id success."); if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { srs_error("encode command_object failed. ret=%d", ret); return ret; } srs_verbose("encode command_object success."); if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) { srs_error("encode args failed. ret=%d", ret); return ret; } srs_verbose("encode args success."); srs_info("encode FMLE start response packet success."); return ret; }
send: FCPublish response
5.2 createStream
5.2.1 createStream 接收
createStream 消息的表明類爲 SrsCreateStreamPacket,該類的構造以下。
SrsCreateStreamPacket 構造函數
/** * createStream * The client sends this command to the server to create a logical * channel for message communication The publishing of audio, video, and * metadata is carried out over stream channel created using the * createStream command. */ SrsCreateStreamPacket::SrsCreateStreamPacket() { /* createStream */ command_name = RTMP_AMF0_COMMAND_CREATE_STREAM; /** * Transaction ID of the command. */ transaction_id = 2; /** * If there exists any command info this is set, else this is set to null type. * @remark, never be NULL, an AMF0 null instance. */ command_object = SrsAmf0Any::null(); }
接收 createStream 後對該消息的解碼以下:
SrsCreateStreamPacket::decode
int SrsCreateStreamPacket::decode(SrsStream* stream) { int ret = ERROR_SUCCESS; if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("amf0 decode createStream command_name failed. ret=%d", ret); return ret; } if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CREATE_STREAM) { ret = ERROR_RTMP_AMF0_DECODE; srs_error("amf0 decode createStream command_name failed. " "command_name=%s, ret=%d", command_name.c_str(), ret); return ret; } if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret); return ret; } if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { srs_error("amf0 decode createStream command_object failed. ret=%d", ret); return ret; } srs_info("amf0 decode createStream packet success"); return ret; }
5.2.2 createStream response
createStream 的響應消息是經過 SrsCreateStreamResPacket 類構造的,該類的構造以下:
SrsCreateStreamResPacket 構造函數
SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, double _stream_id) { /* _result */ command_name = RTMP_AMF0_COMMAND_RESULT; /** * ID of the command that response belongs to. */ transaction_id = _transaction_id; /** * If there exists any command info this is set, else this is set to null type. * @remark, never be NULL, an AMF0 null instance. */ command_object = SrsAmf0Any::null(); /* The return value is either a stream ID or an error information object. */ stream_id = _stream_id; }
接着對該 createStream response 消息的負載數據進行編碼(即打包)。
SrsCreateStreamResPacket::encode_packet
int SrsCreateStreamResPacket::encode_packet(SrsStream* stream) { int ret = ERROR_SUCCESS; if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("encode command_name failed. ret=%d", ret); return ret; } srs_verbose("encode command_name success."); if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("encode transaction_id failed. ret=%d", ret); return ret; } srs_verbose("encode transaction_id success."); if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { srs_error("encode command_object failed. ret=%d", ret); return ret; } srs_verbose("encode command_object success."); if ((ret = srs_amf0_write_number(stream, stream_id)) != ERROR_SUCCESS) { srs_error("encode stream_id failed. ret=%d", ret); return ret; } srs_verbose("encode stream_id success."); srs_info("encode createStream response packet success."); return ret; }
send: createStream response
5.3 publish
5.3.1 publish 接收
publish 消息用 SrsPublishPacket 類表明。該類的構造以下:
SrsPublishPacket 構造函數
/** * FMLE/flash publish * Publish * The client sends the publish command to publish a named stream to the * server. Using this name, any client can play this stream and receive * the published audio, video, and data messages. */ SrsPublishPacket::SrsPublishPacket() { /* Name of the command, set to "publish". */ command_name = RTMP_AMF0_COMMAND_PUBLISH; /* Transaction ID set to 0. */ transaction_id = 0; /** * Command information object does not exist. Set to null type. * @remark, never be NULL, an AMF0 null instance. */ command_object = SrsAmf0Any::null(); /** * Type of publishing. Set to "live", "record", or "append". * record: The stream is published and the data is recorded to a new file. The file * is stored on the server in a subdirectory within the directory that * contains the server application. If the file already exists, it is * overwritten. * append: The stream is published and the data is appended to a file. If no file * is found, it is created. * live: Live data is published without recording it in a file. * @remark, SRS only support live. * @remark, optional, default to live. */ type = "live"; }
recv: publish
該 publish 消息的解析以下代碼。
SrsPublishPacket::decode
int SrsPublishPacket::decode(SrsStream* stream) { int ret = ERROR_SUCCESS; if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("amf0 decode publish command_name failed. ret=%d", ret); return ret; } if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PUBLISH) { ret = ERROR_RTMP_AMF0_DECODE; srs_error("amf0 decode publish command_name failed. " "command_name=%s, ret=%d", command_name.c_str(), ret); return ret; } if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("amf0 decode publish transaction_id failed. ret=%d", ret); return ret; } if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { srs_error("amf0 decode publish command_object failed. ret=%d", ret); return ret; } /* 讀取推流的流名稱 */ if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { srs_error("amf0 decode publish stream_name failed. ret=%d", ret); return ret; } /* 讀取推流的類型,SRS 僅支持 live */ if (!stream->empty() && (ret = srs_amf0_read_string(stream, type)) != ERROR_SUCCESS) { srs_error("amf0 decode publish type failed. ret=%d", ret); return ret; } srs_info("amf0 decode publish packet success"); return ret; }
5.3.2 publish response onFCPublish(NetStream.Publish.Start)
該 publish 的響應 onFCPublish 使用 SrsOnStatusCallPacket 構造,該類的構造函數以下。
SrsOnStatusCallPacket 構造函數
SrsOnStatusCallPacket::SrsOnStatusCallPacket() { /* Name of command. Set to "onStatus" */ command_name = RTMP_AMF0_COMMAND_ON_STATUS; /* Transaction ID set to 0. */ transaction_id = 0; /** * Command information does not exist. Set to null type. * @remark, never be NULL, an AMF0 null instance. */ args = SrsAmf0Any::null(); /** * Name-value pairs that describe the response from the server. * 'code','level', 'description' are names of few among such information. * @remark, never be NULL, an AMF0 object instance. */ data = SrsAmf0Any::object(); }
注:publish 的響應消息 onFCPublish 的消息名爲 onFCPublish。該消息的抓包以下:
send: onFCPublish
該 onFCPublish 消息負載數據的編碼以下。
SrsOnStatusCallPacket::encode_packet
int SrsOnStatusCallPacket::encode_packet(SrsStream* stream) { int ret = ERROR_SUCCESS; if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { srs_error("encode command_name failed. ret=%d", ret); return ret; } srs_verbose("encode command_name success."); if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { srs_error("encode transaction_id failed. ret=%d", ret); return ret; } srs_verbose("encode transaction_id success."); if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { srs_error("encode args failed. ret=%d", ret); return ret; } srs_verbose("encode args success.");; if ((ret = data->write(stream)) != ERROR_SUCCESS) { srs_error("encode data failed. ret=%d", ret); return ret; } srs_verbose("encode data success."); srs_info("encode onStatus(Call) packet success."); return ret; }
5.3.3 publish response onStatus(NetStream.Publish.Start)
該響應消息一樣使用 SrsOnStatusCallPacket 類構造,該消息的名稱即爲 onStatus。抓包以下圖
send: onStatus
6. SrsRtmpConn::publishing
當服務器成功響應 obs 發送的 publish 消息後,即進入 SrsRtmpConn::publishing 函數,開始處理 obs 推送的媒體數據。具體分析見 SRS之SrsRtmpConn::publishing詳解.