SRS之SrsRtmpConn::stream_service_cycle詳解

首先使用 obs 推流符合以下流程:參考自 Hanvision Makito X cann't publish to SRS. .html

FFMPEG:

C/S: Handshake
C: ConnectApp() tcUrl=xxx
S: Ack Size 2500,000
S: Set Peer Bandwidth 2500,000
S: Set Chunk Size 60,000
C: Set Chunk Size 60,000
S: ConnectApp() _result
S: onBWDone()

C: releaseStream+FCPublish(s0)
C: createStream()
S: releaseStream _result
C: _checkbw()
S: FCPublish() _result
S: createStream() _result
C: publish(s0)
S: onFCPublish()
S: onStatus()

下面的分析是繼服務器發送 onBWDone 後,進入 while 循環開始執行 stream_service_cycle。git

1. SrsRtmpConn::stream_service_cycle

int SrsRtmpConn::stream_service_cycle()
{
    int ret = ERROR_SUCCESS;
    
    /* the rtmp client type: play/publish/unknown */
    SrsRtmpConnType type;
    /* 首先鑑別客戶端請求的類型,是play/publish 或其餘,還有播放/推流的流名稱 */
    if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) 
        != ERROR_SUCCESS) {
        if (!srs_is_client_gracefully_close(ret)) {
            srs_error("identify client failed. ret=%d", ret);
        }
        return ret;
    }
    req->strip();
    srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", 
        srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);
    
    /* 只有當配置文件中使能了 security 配置項,纔會真正進入到該 check 函數進行
     * 一系列的檢測 */
    /* allow all if security disabled. */
    // secutity check
    if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
        srs_error("security check failed. ret=%d", ret);
        return ret;
    }
    srs_info("security check ok");
    
    /* SRS 不容許請求的流名稱爲空 */
    // Never allow the empty stream name, for HLS may write to a file with empty name.
    // @see https://github.com/ossrs/srs/issues/834: 
    //      SRS2 crashed for TS encoder assert failed
    if (req->stream.empty()) {
        ret = ERROR_RTMP_STREAM_NAME_EMPTY;
        srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
        return ret;
    }
    
    /* 設置服務器 send/recv 的超時時間 */
    // client is identified, set the timeout to service timeout.
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
    
    /* 首先根據 vhost/app/stream 構造一個 stream_url,而後根據該 stream_url 在 SrsSource::pool
     * 中查找是否存在一個 stream_url 對應的 SrsSource,若能找到,則直接返回該 SrsSource,不然,
     * 新構造一個 SrsSource,並將其按 stream_url 放到 SrsSource::pool map 容器中 */
    // find a source to serve.
    SrsSource* source = NULL;
    if ((ret = SrsSorce::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
        return ret;
    }
    srs_assert(source != NULL);
    
    /* 構造統計類,將統計當前的 vhost、stream 等信息 */
    // update the statistic when source disconveried.
    SrsStatistic* stat = SrsStatistic::instance();
    if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) 
    {
        srs_error("stat client failed. ret=%d", ret);
        return ret;
    }
    
    /* 若 vhost 中沒有配置 mode,則返回 false */
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
    /* 默認開始 gop_cache */
    bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
    srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", 
        req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
        source->source_id(), source->source_id());
    /* 根據 enabled_cache 設置是否啓動 gop_cache,爲 true,則啓動 */
    source->set_cache(enabled_cache);
    
    /* 根據鑑別到的客戶端的類型:play 或者 publish,開始進行相應的處理 */
    /* The type of client, play or publish. */
    client_type = type;
    switch (type) {
        case SrsRtmpConnPlay: {
            srs_verbose("start to play stream %s.", req->stream.c_str());
            
            // response connection start play
            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to play stream failed. ret=%d", ret);
                return ret;
            }
            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
                srs_error("http hook on_play failed. ret=%d", ret);
                return ret;
            }
            
            srs_info("start to play stream %s success", req->stream.c_str());
            ret = playing(source);
            http_hooks_on_stop();
            
            return ret;
        }
        /* 由前面知,若 obs 推流的話爲該類型 */
        case SrsRtmpConnFMLEPublish: {
            srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
            
            /* 該函數主要是接收並響應一系列消息:
             * C: FCPublish
             * S: FCPublish response
             * C: createStream
             * S: createStream response
             * C: publish
             * S: publish response onFCPublish(NetStream.Publish.Start)
             * S: publish response onStatus(NetStream.Publish.Start) */
            if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            /* 服務器響應客戶端的publish消息後,即開始進入接收客戶端推流的
             * metadata、video、audio等數據的處理 */
            return publishing(source);
        }
        case SrsRtmpConnHaivisionPublish: {
            srs_verbose("Haivision start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        case SrsRtmpConnFlashPublish: {
            srs_verbose("flash start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("flash start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        default: {
            ret = ERROR_SYSTEM_CLIENT_INVALID;
            srs_info("invalid client type=%d. ret=%d", type, ret);
            return ret;
        }
    }
    
    return ret;
}

2. SrsRtmpServer::identify_client

該函數是對客戶端請求進行鑑定,以便作出相應的處理。github

int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, 
    string& stream_name, double& duration)
{
    type = SrsRtmpConnUnknown;
    int ret = ERROR_SUCCESS;
    
    while (true) {
        SrsCommonMessage* msg = NULL;
        /* 接收一個完整的消息 */
        if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("recv identify client message failed. ret=%d", ret);
            }
            return ret;
        }
        
        SrsAutoFree(SrsCommonMessage, msg);
        SrsMessageHeader& h = msg->header;
        
        if (h.is_ackledgement() || h.is_set_chunk_size() || 
            h.is_windonw_ackledgenemt_size() || h.is_user_control_message()) {
            continue;
        }
        
        /* 若不爲 amf 類型的消息,則忽略該消息,繼續接收下一個消息 */
        if (!h.is_amf0_commnad() && !h.is_amf3_command()) {
            srs_trace("identify ignore message except "
                "AMF0/AMF3 command message. type=%#x", h.message_type);
            continue;
        }
        
        SrsPacket* pkt = NULL;
        /* 對接收到的 amf 命令消息進行解碼,解碼後的數據保存在 pkt 指向的子類中 */
        if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("identify decode message failed. ret=%d", ret);
            return ret;
        }
        
        SrsAutoFree(SrsPacket, pkt);
        
        /* 下面是經過 dynamic_cast 動態轉換嘗試將 pkt 轉爲指定的類型,
         * 若不爲 NULL,則代表接收到的消息即爲所要的消息 */
        if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
            srs_info("identify client by create stream, play or flash publish.");
            return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt), 
                stream_id, type, stream_name, duration);
        }
        /* 當接收到的是 releaseStream/FCPublish/FCUnpublish 這三個中的一個時,
         * 構造的類都爲 SrsFMLEStartPacket */
        if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
            srs_info("identify client by releaseStream, fmle publish");
            /* 這裏便可肯定 client 的類型爲 publish  */
            return identify_fmle_publish_client(dynamic_cast<SrsFMLEStartPacket*>(pkt), 
                                                type, stream_name);
        }
        if (dynamic_cast<SrsPlayPacket*>(pkt)) {
            srs_info("level0 identify client by play.");
            return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, 
                                        stream_name, duration);
        }
        /* call msg,
         * support response null first,
         * @see https://github.com/ossrs/srs/issues/106
         * TODO: FIXME: response in right way, or forward in edge mode. */
        SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
        if (call) {
            SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
            res->command_object = SrsAmf0Any::null();
            res->response = SrsAmf0Any::null();
            if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) {
                if (!srs_is_system_control_error(ret) && 
                    !srs_is_client_gracefully_close(ret)) {
                    srs_warn("response call failed. ret=%d", ret);
                }
                return ret;
            }
            
            /* For encoder of Haivision, it always send a _checkbe call message.
             * @remark the next message is createStream, so we continue to identify it.
             * @see https://github.com/ossrs/srs/issues/844 */
            if (call->command_name == "_checkbw") {
                srs_info("Havision encoder identified.");
                continue;
            }
            continue;
        }
        
        srs_trace("ignore AMF0/AMF3 command message.");
    }
    
    return ret;
}

2.1 SrsProtocol::recv_message

int SrsProtocol::recv_message(SrsCommonMessage* pmsg) 
{
    *pmsg = NULL;
    
    int ret = ERROR_SUCCESS;
    
    while (true) {
        SrsCommonMessage* msg = NULL;
        
        /* 從 socket 中讀取一個消息 */
        if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("recv interlaced message failed. ret=%d", ret);
            }
            srs_freep(msg);
            return ret;
        }
        srs_verbose("entire msg received");
        
        if (!msg) {
            srs_info("got empty message without error.");
            continue;
        }
        
        if (msg->size <= 0 || msg->header.payload_length <= 0) {
            srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
                msg->header.message_type, msg->header.payload_length,
                msg->header.timestamp, msg->header.stream_id);
            srs_freep(msg);
            continue;
        }
        
        /* 若接收到的是一些control消息,如 set chunk size 等,則更改上下文信息,
         * 其餘的消息如音視頻或amf類型的則不作處理 */
        if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
            srs_error("hook the received msg failed. ret=%d", ret);
            srs_freep(msg);
            return ret;
        }
        
        srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64, 
            msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length, 
            msg->header.timestamp);
        *pmsg = msg;
        break;
    }
    
    return ret;
}

2.1.1 SrsProtocol::recv_interlaced_message

int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
    int ret = ERROR_SUCCESS;
    
    // chunk stream basic header.
    char fmt = 0;
    int cid = 0;
    /* 讀取 chunk 的基本頭 */
    if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read basic header failed. ret=%d", ret);
        }
        return ret;
    }
    srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
    
    // thd cid must not negative.
    srs_assert(cid >= 0);
    
    /* 構造一個塊流緩存,由於一個 RTMP 消息可能包含多個塊,
     * 所以這裏使用該塊流緩存將多個塊的信息保存起來,直到
     * 接收到一個完整的消息爲止 */
    // get the cached chunk stream.
    SrsChunkStream* chunk = NULL;
    
    /* SrsChunkStream** cs_cache: 
     * SrsProtocol 類在構造的時候就爲 cs_cache 數組分配了 SRS_PERF_CHUNK_STREAM_CACHE(16)
     * 個 SrsChunkStream 元素空間,當 chunk 的 cid 不超過該數組最大值時,能夠直接從該數組中
     * 取出一個項,即 SrsChunkStream 使用 */
    // use chunk stream cache to get the chunk info.
    // @see https://github.com/ossrs/srs/issues/249
    if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
        // chunk stream cache hit.
        srs_verbose("cs-cache hit, cid=%d", cid);
        // already init, use it directly
        chunk = cs_cache[cid];
        srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                    "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
            chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
            chunk->header.message_type, chunk->header.payload_length,
            chunk->header.timestamp, chunk->header.stream_id);
    } else {
        // chunk stream cache miss, use map.
        if (chunk_streams.find(cid) == chunk_streams.end()) {
            chunk = chunk_streams[cid] = new SrsChunkStream(cid);
            // set the perfer cid of chunk,
            // which will copy to the message received.
            chunk->header.perfer_cid = cid;
            srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
        } else {
            chunk = chunk_streams[cid];
            srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                        "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        }
    }
    
    /* 根據 fmt 接收 chunk 的消息頭 */
    // chunk stream message header
    if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read message header failed. ret=%d", ret);
        }
        return ret;
    }
    srs_verbose("read message header success. "
            "fmt=%d, ext_time=%d, size=%d, "
            "message(type=%d, size=%d, time=%"PRId64", sid=%d)", 
            fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), 
            chunk->header.message_type, chunk->header.payload_length, 
            chunk->header.timestamp, chunk->header.stream_id);
    
    // read msg payload from chunk stream.
    SrsCommonMessage* msg = NULL;
    if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read message payload failed. ret=%d", ret);
        }
        return ret;
    }
    
    // not got an entire RTMP message, try next chunk.
    if (!msg) {
        srs_verbose("get partial message success. size=%d, "
                    "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        return ret;
    }
    
    *pmsg = msg;
    srs_info("get entire message success. size=%d, "
             "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
            (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
            chunk->header.message_type, chunk->header.payload_length,
            chunk->header.timestamp, chunk->header.stream_id);
            
    return ret;
}

2.2 SrsProtocol::decode_message

int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
    *ppacket = NULL;
    
    int ret = ERROR_SUCCESS;
    
    srs_assert(msg != NULL);
    srs_assert(msg->payload != NULL);
    srs_assert(msg->size > 0);
    
    SrsStream stream;
    
    /* 將消息負載的數據使用 SrsStream 類進行初始化 */
    // initialize the decode stream for all message,
    // it's ok for the initialize if fase and without memory copy.
    if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
        srs_error("initialize stream failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("decode stream initialized success");
    
    // decode the packet.
    SrsPacket* packet = NULL;
    if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
        srs_freep(packet);
        return ret;
    }
    
    // set to output ppacket only when success.
    *ppacket = packet;
    
    return ret;
}

2.2.1 SrsProtocol::do_decode_message

int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, 
    SrsPacket** packet)
{
    int ret = ERROR_SUCCESS;
    
    SrsPacket* packet = NULL;
    
    // decode specified packet type
    if (header.is_amf0_command() || header.is_amf3_command() 
        || header.is_amf0_data() || header.is_amf3_data()) {
        srs_verbose("start to decode AMF0/AMF3 command message.");
        
        // skip 1bytes to decode the amf3 command.
        if (header.is_amf3_command) && stream->require(1)) {
            srs_verbose("skip 1bytes to decode AMF3 command");
            stream->skip(1);
        }
        
        // amf0 command message.
        // need to read the command name.
        std::string command;
        /* 讀取消息的命令名 */
        if ((ret = srs_amfo_read_string(stream, command)) != ERROR_SUCCESS) {
            srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
        
        // result/error packet
        if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
            double transactionId = 0.0;
            if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) {
                srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret);
                return ret;
            }
            srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId);
            
            // reset stream, for header read completed.
            stream->skip(-1 * stream->pos());
            if (header.is_amf3_command()) {
                stream->skip(1);
            }
            
            // find the call name
            if (requests.find(transactionId) == requests.end()) {
                ret = ERROR_RTMP_NO_REQUEST;
                srs_error("decode AMF0/AMF3 request failed. ret=%d", ret);
                return ret;
            }
            
            std::string request_name = requests[transactionId];
            srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
            
            if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
                srs_info("decode the AMF0/AMF3 response command(%s message).", 
                         request_name.c_str());
                *ppacket = packet = new SrsConnectAppResPacket();
                return packet->decode(stream);
            } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {
                srs_info("decode the AMF0/AMF3 response command(%s message).", 
                         request_name.c_str());
                *ppacket = packet = new SrsCreateStreamResPacket(0, 0);
                return packet->decode(stream);
            } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM
                || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH
                || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {
                srs_info("decode the AMF0/AMF3 response command(%s message).", 
                         request_name.c_str());
                *ppacket = packet = new SrsFMLEStartResPacket(0);
                return packet->decode(stream);
            } else {
                ret = ERROR_RTMP_NO_REQUEST;
                srs_error("decode AMF0/AMF3 request failed. "
                    "request_name=%s, transactionId=%.2f, ret=%d", 
                    request_name.c_str(), transactionId, ret);
                return ret;
            }
        }
        
        // reset to zero(amf3 to 1) to restart decode.
        stream->skip(-1 * stream->pos());
        if (header.is_amf3_command()) {
            stream->skip(1);
        }
        
        /* 根據消息的命令名來構造對應的類,而後進行解碼 */
        // decode command object.
        if (command == RTMP_AMF0_COMMAND_CONNECT) {
            srs_info("decode the AMF0/AMF3 command(connect vhost/app message).");
            *ppacket = packet = new SrsConnectAppPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
            srs_info("decode the AMF0/AMF3 command(createStream message).");
            *ppacket = packet = new SrsCreateStreamPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_PLAY) {
            srs_info("decode the AMF0/AMF3 command(paly message).");
            *ppacket = packet = new SrsPlayPacket();
            return packet->decode(stream);
        } else if(command == RTMP_AMF0_COMMAND_PAUSE) {
            srs_info("decode the AMF0/AMF3 command(pause message).");
            *ppacket = packet = new SrsPausePacket();
            return packet->decode(stream);
        } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
            srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
            srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } else if(command == RTMP_AMF0_COMMAND_PUBLISH) {
            srs_info("decode the AMF0/AMF3 command(publish message).");
            *ppacket = packet = new SrsPublishPacket();
            return packet->decode(stream);
        } else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) {
            srs_info("decode the AMF0/AMF3 command(unpublish message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } else if(command == SRS_CONSTS_RTMP_SET_DATAFRAME || 
                  command == SRS_CONSTS_RTMP_ON_METADATA) {
            srs_info("decode the AMF0/AMF3 data(onMetaData message).");
            *ppacket = packet = new SrsOnMetaDataPacket();
            return packet->decode(stream);
        } else if(command == SRS_BW_CHECK_FINISHED
            || command == SRS_BW_CHECK_PLAYING
            || command == SRS_BW_CHECK_PUBLISHING
            || command == SRS_BW_CHECK_STARTING_PLAY
            || command == SRS_BW_CHECK_STARTING_PUBLISH
            || command == SRS_BW_CHECK_START_PLAY
            || command == SRS_BW_CHECK_START_PUBLISH
            || command == SRS_BW_CHECK_STOPPED_PLAY
            || command == SRS_BW_CHECK_STOP_PLAY
            || command == SRS_BW_CHECK_STOP_PUBLISH
            || command == SRS_BW_CHECK_STOPPED_PUBLISH
            || command == SRS_BW_CHECK_FINAL)
        {
            srs_info("decode the AMF0/AMF3 band width check message.");
            *ppacket = packet = new SrsBandwidthPacket();
            return packet->decode(stream);
        } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) {
            srs_info("decode the AMF0/AMF3 closeStream message.");
            *ppacket = packet = new SrsCloseStreamPacket();
            return packet->decode(stream);
        } else if (header.is_amf0_command() || header.is_amf3_command()) {
            srs_info("decode the AMF0/AMF3 call message.");
            *ppacket = packet = new SrsCallPacket();
            return packet->decode(stream);
        }
        
        // default packet to drop message.
        srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
        *ppacket = packet = new SrsPacket();
        return ret;
    } else if (header.is_user_control_message()) {
        srs_verbose("start to decode user control message.");
        *ppacket = packet = new SrsUserControlPacket();
        return packet->decode(stream);
    } else if(header.is_window_ackledgement_size()) {
        srs_verbose("start to decode set ack window size message.");
        *ppacket = packet = new SrsSetWindowAckSizePacket();
        return packet->decode(stream);
    } else if(header.is_set_chunk_size()) {
        srs_verbose("start to decode set chunk size message.");
        *ppacket = packet = new SrsSetChunkSizePacket();
        return packet->decode(stream);
    } else {
        if (!header.is_set_peer_bandwidth() && !header.is_ackledgement()) {
            srs_trace("drop unknown message, type=%d", header.message_type);
        }
    }
    
    return ret;
}

由開始的流程知,服務器發送 onBWDone 後,接下來會接收到客戶端的 releaseStream 命令。對於 releaseStream/PublishStream/FCPublish/FCUnpublish 等都是使用 SrsFMLEStartPacket 類來構造的。算法

2.2.2 構造 SrsFMLEStartPacket 類

/**
 * FMLE start publish: ReleaseStream/PublishStream/FCPublish/FCUnpublish
 */
SrsFMLEStartPacket::SrsFMLEStartPacket()
{
    /* 命令名:releaseStream */
    command_name = RTMP_AMF0_COMMAND_RELEASE_STREAM;
    /* the transaction ID to get the response. */
    transaction_id = 0;
    /**
     * If there exists any command info this is set, else this is set to null type.
     * @remark, never be NULL, an AMF0 null instance.
     */
    command_object = SrsAmf0Any::null();
}

2.2.3 SrsFMLEStartPacket::decode

開始解析 releaseStream 消息的負載。數組

int SrsFMLEStartPacket::decode(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    /* 讀取該消息的命令名稱 */
    if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
        return ret;
    }
    if (command_name.empty()
        || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM 
        && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
        && command_name != RTMP_AMF0_COMMAND_UNPUBLISH))
    {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 decode FMLE start command_name failed. "
            "command_name=%s, ret=%d", command_name.c_str(), ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("amf0 decode FMLE start packet success");
    
    return ret;
}

recv:releaseStream('live')

2.3 SrsRtmpServer::identify_fmle_publish_client

當解析接收到的消息爲 releaseStream 的時候,會調用該函數。緩存

int SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req, 
    SrsRtmpConnType& type, string& stream_name)
{
    int ret = ERROR_SUCCESS;
    
    /* 鑑別 client 的類型爲 publish */
    type = SrsRtmpConnFMLEPublish;
    /* 客戶端 publish 的流名稱 */
    stream_name = req->stream_name;
    
    /* 下面是對 releaseStream 消息的響應 */
    // releaseStream response
    if (true) {
        SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
        if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
            srs_error("send releaseStream response message failed. ret=%d", ret);
            return ret;
        }
        srs_info("send releaseStream response message success.");
    }
    
    return ret;
}

該函數中是對 releaseStream 的響應。 發送的包以下圖:服務器

send: response for releaseStream

2.3.1 構造 SrsFMLEStartResPacket 類

/**
 * response for SrsFMLEStartPacket.
 */
SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
{
    /* 響應消息的名稱:_result */
    command_name = RTMP_AMF0_COMMAND_RESULT;
    transaction_id = _transaction_id;
    /**
     * If there exists any command info this is set, else this is set to null type.
     * @remark, never be NULL, an AMF0 null instance.
     */
    command_object = SrsAmf0Any::null();
    /**
     * the optional args, set to undefined.
     * @remark, never be NULL, an AMF0 undefined instance.
     */
    args = SrsAmf0Any::undefined();
}

2.3.2 SrsFMLEStartResPacket::encode_packet

構建 releaseStream response 消息的負載.app

int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("encode command_name failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_name success.");
    
    if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("encode transaction_id failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode transaction_id success.");
    
    if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
        srs_error("encode command_object failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_object success.");
    
    if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
        srs_error("encode args failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode args success.");
    
    srs_info("encode FMLE start response packet success.");
    
    return ret;
}

3. SrsSource::fetch_or_create

/**
 * create source when fetch from cache failed.
 * @param r the client request.
 * @param h the event handler for source.
 * @param pps the matches source, if success never be NULL.
 */
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
    int ret = ERROR_SUCCESS;
    
    SrsSource* source = NULL;
    /* 根據 vhost/app/stream 組成的 stream_url 在 pool 中查找是否存在
     * 該與之對應的 SrsSource,若不存在,則返回 NULL */
    if ((source = fetch(r)) != NULL) {
        *pps = source;
        return ret;
    }
    
    /* vhost/app/stream 生成一個 stream_url */
    string stream_url = r->get_stream_url();
    string vhost = r->vhost;
    
    /* 必須肯定在 pool 中沒有 stream_url 對應的項 */
    // should always not exists for create a source.
    srs_assert(pool.find(stream_url) == pool.end());
    
    /* 構建一個新的 SrsSource */
    source = new SrsSource();
    if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
        srs_freep(source);
        return ret;
    }
    
    /* 將該新生成的 source 放入到 pool 中 */
    pool[stream_url] = source;
    srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
    
    /* 經過 pps 返回該新生成的 source */
    *pps = source;
    
    return ret;
}

3.1 SrsSource::fetch

/** 
 * get the exists source, NULL when not exists.
 * update the request and return the exists source.
 */
SrsSource* SrsSource::fetch(SrsRequest* r)
{
    SrsSource* source = NULL;
    
    /* get the stream identify, vhost/app/stream. */
    string stream_url = r->get_stream_url();
    if (pool.find(stream_url) == pool.end()) {
        return NULL;
    }
    
    source = pool[stream_url];
    
    /* we always update the request of resource,
     * for origin auth is on, the token in request maybe invalid,
     * and we only need to update the token of request, it's simple. */
    source->_req->update_auth(r);
    
    return source;
}

3.2 構造 SrsSource 類

構造一個直播流源。socket

/**
 * the time jitter algorithm:
 * 1. full, to ensure stream start at zero, and ensure stream monotonically increasing.
 * 2. zero, only ensure stream start at zero, ignore timestamp jitter.
 * 3. off, disable the time jitter algorithm, like atc.
 */
enum SrsRtmpJitterAlgorithm
{
    SrsRtmpJitterAlgorithmFULL = 0x01,
    SrsRtmpJitterAlgorithmZERO,
    SrsRtmpJitterAlgorithmOFF
};

SrsSource::SrsSource()
{
    /* _req: deep copy of client request. */
    _req = NULL;
    /* the time jitter algorithm for vhost: vhost 的時間抖動算法 */
    jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
    /* whether use interlaced/mixed algorithm to correct timestamp. 
     * 這裏初始化禁止 */
    mix_correct = false;
    mix_queue = new SrsMixQueue();
    
#ifdef SRS_AUTO_HLS
    /* 構造一個 hls handler */
    hls = new SrsHls();
#endif
#ifdef SRS_AUTO_DVR
    /* 構造一個 dvr handler */
    dvr = new SrsDvr();
#endif
#ifdef SRS_AUTO_TRANSCODE
    /* 構造一個 transcoding handler */
    encoder = new SrsEncoder();
#endif
#ifdef SRS_AUTO_HDS
    hds = new SrsHds(this);
#endif
    
    /**
     * cache_sh_video: the cached video sequence header.
     * cache_sh_audio: the cached audio sequence header.
     */
    cache_metadata = cache_sh_video = cache_sh_audio = NULL;
    
    /* can publish, true when is not streaming */
    _can_publish = true;
    /**
     * source id,
     * for publish, it's the publish client id.
     * for edge, it's the edge ingest id.
     * when source id changed, for example, the edge reconnect,
     * invoke the on_source_id_changed() to let all clients know.
     *
     * _pre_source_id: previous source id.
     */
    _pre_source_id = _source_id = -1;
    /**
     * last die time, when all consumeers quit and no publisher,
     * we will remove the source when source die.
     */
    die_at = -1;
    
    /* edge control service */
    play_edge = new SrsPlayEdge();
    publish_edge = new SrsPublishEdge();
    /* gop cache for client fast startup. */
    gop_cache = new SrsGopCache();
    /* for aggregate message */
    aggregate_stream = new SrsStream();
    
    /* whether stream is monotonically increase. */
    is_monotonicaly_increase = false;
    last_packet_time = 0;
    
    _srs_config->subscribe(this);
    /**
     * atc whether atc(use absolute time and donot adjust time),
     * directly use msg time and donot adjust if atc is true,
     * otherwise, adjust msg time to start from 0 to make flash happy.
     *
     * TODO: FIXME: to support reload atc.
     */
    atc = false;
}

3.3 SrsSource::initialize

int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(h);
    srs_assert(!_req);
    
    handler = h;
    /* 深度拷貝,將 r 中的內容徹底拷貝到 _req 中 */
    _req = r->copy();
    /* 若 vhost 中沒有設置 atc 配置項,則返回 false,
     * atc 爲 false,則會調整 msg 時間從 0 開始 */
    atc = _srs_config->get_atc(_req->vhost);
    
    /* 暫不分析 */
#ifdef SRS_AUTO_HLS
    if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
        return ret;
    }
#endif

#ifdef SRS_AUTO_DVR
    if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) {
        return ret;
    }
#endif

    if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
        return ret;
    }
    if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
        return ret;
    }
    
    /* 若 vhost 沒有設置 queue_length,則使用默認的,爲 30 */
    double queue_size = _srs_config->get_queue_length(_req->vhost);
    publish_edge->set_queue_size(queue_size);
    
    jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
    mix_correct = _srs_config->get_mix_correct(_req->vhost);
    
    return ret;
    
}

3.4 SrsStatistic::on_client

/**
 * when got a client to publish/play stream,
 * @param id, the client srs id.
 * @param req, the client request object.
 * @param conn, the physical abstract connection object.
 * @param type, the type of connection.
 */
int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, 
    SrsRtmpConnType type)
{
    int ret = ERROR_SUCCESS;
    
    SrsStatisticVhost* vhost = create_vhost(req);
    SrsStatisticStream* stream = create_stream(vhost, req);
    
    // create client if not exists
    SrsStatisticClient* client = NULL;
    if (clients.find(id) == clients.end()) {
        client = new SrsStatisticClient();
        client->id = id;
        client->stream = stream;
        clients[id] = client;
    } else {
        client = clients[id];
    }
    
    // got client
    client->conn = conn;
    client->req = req;
    client->type = type;
    stream->nb_clients++;
    vhost->nb_clients++;
    
    return ret;
}

3.4.1 SrsStatistic::create_vhost

SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
{
    SrsStatisticVhost* vhost = NULL;
    
    /**
     * rvhost: 
     * key: vhost url, value: vhost Object.
     * @remark a fast index for vhost.
     */
    // create vhost if not exists.
    if (rvhosts.find(req->vhost) == rvhost.end()) {
        vhost = new SrsStatisticVhost();
        vhost->vhost = req->vhost;
        rvhosts[req->vhost] = vhost;
        /* vhosts - key: vhost id, value: vhost object. */
        vhosts[vhost->id] = vhost;
        return vhost;
    }
    
    vhost = rvhosts[req->vhost];
    
    return vhost;
}

3.4.2 SrsStatistic::create_stream

SrsStatisticStream* SrsStatistic::create_stream(SrsStatisticVhost* vhost, SrsRequest* req)
{
    std::string url = req->get_stream_url();
    
    SrsStatisticStream* stream = NULL;
    
    // create stream if not exists.
    if (rstreams.find(url) == rstreams.end()) {
        stream = new SrsStatisticStream();
        stream->vhost = vhost;
        stream->stream = req->stream;
        stream->app = req->app;
        stream->url = url;
        rstreams[url] = stream;
        streams[stream->id] = stream;
        return stream;
    }
    
    stream = rstreams[url];
    
    return stream;
}

4. SrsSource::set_cache

void SrsSource::set_cache(bool enabled)
{
    /* SrsGopCache* gop_cache: gop cache for client fast startup. */
    gop_cache->set(enabled);
}

4.1 SrsGopCache::set

/**
 * to enabled or disable the gop cache.
 */
void SrsGopCache::set(bool enabled)
{
    /* if disabled the gop cache, the client will wait for the next 
     * keyframe for h264, and will be black-screan. */
    enabled_gop_cache = enabled;
    
    if (!enabled) {
        srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
        clear();
        return;
    }
    
    srs_info("enable gop cache");
}

5. SrsRtmpServer::start_fmle_publish

/**
 * when client type is publish, response with packets:
 * releaseStream response
 * FCPublish
 * FCPublish response
 * createStream response
 * onFCPublish(NetStream.Publish.Start)
 * onStatus(NetStream.Publish.Start)
 */
int SrsRtmpServer::start_fmle_publish(int stream_id)
{
    int ret = ERROR_SUCCESS;
    
    // FCPublish
    double fc_publish_tid = 0;
    if (true) {
        SrsCommonMessage* msg = NULL;
        SrsFMLEStartPacket* pkt = NULL;
        /* 指定接收這幾個中的一個消息:ReleaseStream/FCPublish/FCUnpublish,若不是其中之一,
         * 則丟棄,直到接收到其中一個才返回 
         * 由開始的流程知,這裏應該是接收 FCPublish */
        if ((ret = expect_message<SrsFMLEStartPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("recv FCPublish message failed. ret=%d", ret);
            return ret;
        }
        srs_info("recv FCPublish request message success.");
        
        SrsAutoFree(SrsCommonMessage, msg);
        SrsAutoFree(SrsFMLEStartPacket, pkt);
        
        fc_publish_tid = pkt->transaction_id;
    }
    // FCPublish response
    if (true) {
        SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
        if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
            srs_error("send FCPublish response message failed. ret=%d", ret);
            return ret;
        }
        srs_info("send FCPublish response message success.");
    }
    
    // createStream
    double create_stream_tid = 0;
    if (true) {
        SrsCommonMessage* msg = NULL;
        SrsCreateStreamPacket* pkt = NULL;
        if ((ret = expect_message<SrsCreateStreamPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("recv createStream message failed. ret=%d", ret);
            return ret;
        }
        srs_info("recv createStream request message success.");
        
        SrsAutoFree(SrsCommonMessage, msg);
        SrsAutoFree(SrsCreateStreamPacket, pkt);
        
        create_stream_tid = pkt->transaction_id;
    }
    // createStream response
    if (true) {
        SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, 
                                                                     stream_id);
        if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
            srs_error("send createStream response message failed. ret=%d", ret);
            return ret;
        }
        srs_info("send createStream response message success.");
    }
    
    // publish
    if (true) {
        SrsCommonMessage* msg = NULL;
        SrsPublishPacket* pkt = NULL;
        if ((ret = expect_message<SrsPublishPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("recv publish message failed. ret=%d", ret);
            return ret;
        }
        srs_info("recv publish request message success.");
        
        SrsAutoFree(SrsCommonMessage, msg);
        SrsAutoFree(SrsPublishPacket, pkt);
    }
    // publish response onFCPublish(NetStream.Publish.Start)
    if (true) {
        SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
        
        pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
        pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));
        
        if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
            srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", 
                      ret);
            return ret;
        }
        srs_info("send onFCPublish(NetStream.Publish.Start) message success.");
    }
    // publish response onStatus(NetStream.Publish.Start)
    if (true) {
        SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
        
        pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
        pkt->data->set(StatusDescritption, SrsAmf0Any::str("Started publishing stream."));
        pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
        
        if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
            srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", 
                      ret);
            return ret;
        }
        srs_info("send onStatus(NetStream.Publish.Start) message success.");
    }
    
    srs_info("FMLE publish success.");
    
    return ret;
}

5.1 FCPublish

5.1.1 FCPublish 接收

接收 FCPublish 後的解析以下代碼所示。ide

SrsProtocol::do_decode_message:

int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, 
    SrsPacket** ppacket)
{
    ...
    
    /* FCPublish */
    else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
        srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
        *ppacket = packet = new SrsFMLEStartPacket();
        return packet->decode(stream);
    } 
    
    ...
}

SrsFMLEStartPacket::decode

int SrsFMLEStartPacket::decode(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    /* 讀取消息的命令名,即 "FCPublish" */
    if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
        return ret;
    }
    if (command_name.empty() 
        || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM 
        && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
        && command_name != RTMP_AMF0_COMMAND_UNPUBLISH)
    ) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 decode FMLE start command_name failed. "
            "command_name=%s, ret=%d", command_name.c_str(), ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("amf0 decode FMLE start packet success");
    
    return ret;
}

5.1.2 FCPublish response

FCPublish 的響應用 SrsFMLEStartResPacket 類構造數據。該類的構造以下:

SrsFMLEStartResPacket 構造函數

/**
 * response for SrsFMLEStartPacket.
 */
SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
{
    /* _result */
    command_name = RTMP_AMF0_COMMAND_RESULT;
    transaction_id = _transaction_id;
    command_object = SrsAmf0Any::null();
    args = SrsAmf0Any::undefined();
}

SrsFMLEStartResPacket::encode_packet

/**
 * subpacket can override to encode the payload to stream.
 * @remark never invoke the super.encode_packet, it always failed.
 */
int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("encode command_name failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_name success.");
    
    if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("encode transaction_id failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode transaction_id success.");
    
    if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
        srs_error("encode command_object failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_object success.");
    
    if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
        srs_error("encode args failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode args success.");
    
    
    srs_info("encode FMLE start response packet success.");
    
    return ret;
}

send: FCPublish response

5.2 createStream

5.2.1 createStream 接收

createStream 消息的表明類爲 SrsCreateStreamPacket,該類的構造以下。

SrsCreateStreamPacket 構造函數

/**
 * createStream
 * The client sends this command to the server to create a logical
 * channel for message communication The publishing of audio, video, and
 * metadata is carried out over stream channel created using the 
 * createStream command.
 */
SrsCreateStreamPacket::SrsCreateStreamPacket()
{
    /* createStream */
    command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
    /**
     * Transaction ID of the command.
     */
    transaction_id = 2;
    /**
     * If there exists any command info this is set, else this is set to null type.
     * @remark, never be NULL, an AMF0 null instance.
     */
    command_object = SrsAmf0Any::null();
}

接收 createStream 後對該消息的解碼以下:

SrsCreateStreamPacket::decode

int SrsCreateStreamPacket::decode(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;

    if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode createStream command_name failed. ret=%d", ret);
        return ret;
    }
    if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CREATE_STREAM) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 decode createStream command_name failed. "
            "command_name=%s, ret=%d", command_name.c_str(), ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
        srs_error("amf0 decode createStream command_object failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("amf0 decode createStream packet success");
    
    return ret;
}

5.2.2 createStream response

createStream 的響應消息是經過 SrsCreateStreamResPacket 類構造的,該類的構造以下:

SrsCreateStreamResPacket 構造函數

SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, 
        double _stream_id)
{
    /* _result */
    command_name = RTMP_AMF0_COMMAND_RESULT;
    /**
     * ID of the command that response belongs to.
     */
    transaction_id = _transaction_id;
    /**
     * If there exists any command info this is set, else this is set to null type.
     * @remark, never be NULL, an AMF0 null instance.
     */
    command_object = SrsAmf0Any::null();
    /* The return value is either a stream ID or an error information object. */
    stream_id = _stream_id;
}

接着對該 createStream response 消息的負載數據進行編碼(即打包)。

SrsCreateStreamResPacket::encode_packet

int SrsCreateStreamResPacket::encode_packet(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("encode command_name failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_name success.");
    
    if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("encode transaction_id failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode transaction_id success.");
    
    if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
        srs_error("encode command_object failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_object success.");
    
    if ((ret = srs_amf0_write_number(stream, stream_id)) != ERROR_SUCCESS) {
        srs_error("encode stream_id failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode stream_id success.");
    
    
    srs_info("encode createStream response packet success.");
    
    return ret;
}

send: createStream response

5.3 publish

5.3.1 publish 接收

publish 消息用 SrsPublishPacket 類表明。該類的構造以下:

SrsPublishPacket 構造函數

/**
 * FMLE/flash publish
 * Publish
 * The client sends the publish command to publish a named stream to the
 * server. Using this name, any client can play this stream and receive
 * the published audio, video, and data messages.
 */
SrsPublishPacket::SrsPublishPacket()
{
    /* Name of the command, set to "publish". */
    command_name = RTMP_AMF0_COMMAND_PUBLISH;
    /* Transaction ID set to 0. */
    transaction_id = 0;
    /**
     * Command information object does not exist. Set to null type.
     * @remark, never be NULL, an AMF0 null instance.
     */
    command_object = SrsAmf0Any::null();
    /**
     * Type of publishing. Set to "live", "record", or "append".
     *   record: The stream is published and the data is recorded to a new file. The file
     *           is stored on the server in a subdirectory within the directory that
     *           contains the server application. If the file already exists, it is
     *           overwritten.
     *   append: The stream is published and the data is appended to a file. If no file
     *           is found, it is created.
     *   live: Live data is published without recording it in a file.
     * @remark, SRS only support live.
     * @remark, optional, default to live.
     */
    type = "live";
}

recv: publish

該 publish 消息的解析以下代碼。

SrsPublishPacket::decode

int SrsPublishPacket::decode(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode publish command_name failed. ret=%d", ret);
        return ret;
    }
    if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PUBLISH) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 decode publish command_name failed. "
            "command_name=%s, ret=%d", command_name.c_str(), ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("amf0 decode publish transaction_id failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
        srs_error("amf0 decode publish command_object failed. ret=%d", ret);
        return ret;
    }
    
    /* 讀取推流的流名稱 */
    if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
        srs_error("amf0 decode publish stream_name failed. ret=%d", ret);
        return ret;
    }
    
    /* 讀取推流的類型,SRS 僅支持 live */
    if (!stream->empty() && (ret = srs_amf0_read_string(stream, type)) != ERROR_SUCCESS) {
        srs_error("amf0 decode publish type failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("amf0 decode publish packet success");
    
    return ret;
}

5.3.2 publish response onFCPublish(NetStream.Publish.Start)

該 publish 的響應 onFCPublish 使用 SrsOnStatusCallPacket 構造,該類的構造函數以下。

SrsOnStatusCallPacket 構造函數

SrsOnStatusCallPacket::SrsOnStatusCallPacket()
{
    /* Name of command. Set to "onStatus" */
    command_name = RTMP_AMF0_COMMAND_ON_STATUS;
    /* Transaction ID set to 0. */
    transaction_id = 0;
    /**
     * Command information does not exist. Set to null type.
     * @remark, never be NULL, an AMF0 null instance.
     */
    args = SrsAmf0Any::null();
    /**
     * Name-value pairs that describe the response from the server. 
     * 'code','level', 'description' are names of few among such information.
     * @remark, never be NULL, an AMF0 object instance.
     */
    data = SrsAmf0Any::object();
}

注:publish 的響應消息 onFCPublish 的消息名爲 onFCPublish。該消息的抓包以下:

send: onFCPublish

該 onFCPublish 消息負載數據的編碼以下。

SrsOnStatusCallPacket::encode_packet

int SrsOnStatusCallPacket::encode_packet(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
        srs_error("encode command_name failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode command_name success.");
    
    if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
        srs_error("encode transaction_id failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode transaction_id success.");
    
    if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
        srs_error("encode args failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode args success.");;
    
    if ((ret = data->write(stream)) != ERROR_SUCCESS) {
        srs_error("encode data failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode data success.");
    
    srs_info("encode onStatus(Call) packet success.");
    
    return ret;
}

5.3.3 publish response onStatus(NetStream.Publish.Start)

該響應消息一樣使用 SrsOnStatusCallPacket 類構造,該消息的名稱即爲 onStatus。抓包以下圖

send: onStatus

6. SrsRtmpConn::publishing

當服務器成功響應 obs 發送的 publish 消息後,即進入 SrsRtmpConn::publishing 函數,開始處理 obs 推送的媒體數據。具體分析見 SRS之SrsRtmpConn::publishing詳解.

相關文章
相關標籤/搜索