Live555 分析(二):服務端

live555支持單播和組播,咱們先分析單播的流媒體服務端,後面分析組播的流媒體服務端。服務器

1、單播的流媒體服務端:session

     // Create the RTSP server:
        RTSPServer* rtspServer = NULL;
        // Normal case: Streaming from a built-in RTSP server:
        rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);
        if (rtspServer == NULL) {
                *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
                exit(1);
        }

        *env << "...done initializing \n";

        if( streamingMode == STREAMING_UNICAST )
        {
                ServerMediaSession* sms = ServerMediaSession::createNew(*env, 
                                                                        H264StreamName[video_type], 
                                                                        H264StreamName[video_type], 
                                                                        streamDescription,
                                                                        streamingMode == STREAMING_MULTICAST_SSM);
                sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate));
                sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type]));
                
                rtspServer->addServerMediaSession(sms);

                char *url = rtspServer->rtspURL(sms);
                *env << "Play this stream using the URL:\t" << url << "\n";
                delete[] url;  
        }
      

      // Begin the LIVE555 event loop:
      env->taskScheduler().doEventLoop(&watchVariable); // does not returnapp

咱們一步一步分析:less

1>  rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);dom

RTSPServer*
RTSPServer::createNew(UsageEnvironment& env, Port ourPort,
              UserAuthenticationDatabase* authDatabase,
              unsigned reclamationTestSeconds)
{
        int ourSocket = -1;

        do {
                int ourSocket = setUpOurSocket(env, ourPort);
                if (ourSocket == -1) break;

                return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
        } while (0);

        if (ourSocket != -1)  ::closeSocket(ourSocket);
        
        return NULL;
}

  此函數首先建立一個rtsp協議的socket,而且監聽rtspServerPortNum端口,建立RTSPServer類的實例。下面咱們看下RTSPServer的構造函數:socket

RTSPServer::RTSPServer(UsageEnvironment& env,
               int ourSocket, Port ourPort,
               UserAuthenticationDatabase* authDatabase,
               unsigned reclamationTestSeconds)
                : Medium(env),
                fServerSocket(ourSocket), fServerPort(ourPort),
                fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds),
                fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), 
                fSessionIdCounter(0) 
{
#ifdef USE_SIGNALS
        // Ignore the SIGPIPE signal, so that clients on the same host that are killed
        // don't also kill us:
        signal(SIGPIPE, SIG_IGN);
#endif

        // Arrange to handle connections from others:
        env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc*)&incomingConnectionHandler, this);
}

  RTSPServer構造函數,初始化fServerMediaSessions爲建立的HashTable,初始化fServerSocket爲咱們前面建立的tcp socket,fServerPort爲咱們監聽的端口rtspServerPortNum,而且向taskScheduler註冊fServerSocket的任務函數incomingConnectionHandler,這個任務函數主要監聽是否有新的客服端鏈接accept,若是有新的客服端接入,建立RTSPClientSession的實例。tcp

  RTSPClientSession要提供什麼功能呢?能夠想象:須要監聽客戶端的rtsp請求並回應它,須要在DESCRIBE請求中返回所請求的流的信息,須要在SETUP請求中創建起RTP會話,須要在TEARDOWN請求中關閉RTP會話,等等...ide

RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr)
                                                   : fOurServer(ourServer), fOurSessionId(sessionId),
                                                    fOurServerMediaSession(NULL),
                                                    fClientSocket(clientSocket), fClientAddr(clientAddr),
                                                    fLivenessCheckTask(NULL),
                                                    fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False),
                                                    fTCPStreamIdCount(0), fNumStreamStates(0), fStreamStates(NULL) 
{
        // Arrange to handle incoming requests:
        resetRequestBuffer();
        envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);
        noteLiveness();
}

  上面這個函數是RTSPClientSession的構造函數,初始化sessionId爲++fSessionIdCounter,初始化fClientSocket爲accept建立的socket(clientSocket),初始化fClientAddr爲accept接收的客服端地址,也向taskScheduler註冊了fClientSocket的認爲函數incomingRequestHandler。函數

  incomingRequestHandler會調用incomingRequestHandler1,incomingRequestHandler1函數定義以下:oop

void RTSPServer::RTSPClientSession::incomingRequestHandler1() 
{
        noteLiveness();

        struct sockaddr_in dummy; // 'from' address, meaningless in this case
        Boolean endOfMsg = False;
        unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];

        int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy);
        if (bytesRead <= 0 || (unsigned)bytesRead >= fRequestBufferBytesLeft) {
                // Either the client socket has died, or the request was too big for us.
                // Terminate this connection:
#ifdef DEBUG
                fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!\n", this, bytesRead, fRequestBufferBytesLeft);
#endif
                delete this;
                return;
        }
#ifdef DEBUG
        ptr[bytesRead] = '\0';
        fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s\n", this, bytesRead, ptr);
#endif

        // Look for the end of the message: <CR><LF><CR><LF>
        unsigned char *tmpPtr = ptr;
        if (fRequestBytesAlreadySeen > 0) --tmpPtr;
        // in case the last read ended with a <CR>
        while (tmpPtr < &ptr[bytesRead-1]) {
                if (*tmpPtr == '\r' && *(tmpPtr+1) == '\n') {
                        if (tmpPtr - fLastCRLF == 2) { // This is it:
                                endOfMsg = 1;
                                break;
                        }
                        fLastCRLF = tmpPtr;
                }
                ++tmpPtr;
        }

        fRequestBufferBytesLeft -= bytesRead;
        fRequestBytesAlreadySeen += bytesRead;

        if (!endOfMsg) return; // subsequent reads will be needed to complete the request

        // Parse the request string into command name and 'CSeq',
        // then handle the command:
        fRequestBuffer[fRequestBytesAlreadySeen] = '\0';
        char cmdName[RTSP_PARAM_STRING_MAX];
        char urlPreSuffix[RTSP_PARAM_STRING_MAX];
        char urlSuffix[RTSP_PARAM_STRING_MAX];
        char cseq[RTSP_PARAM_STRING_MAX];
        if (!parseRTSPRequestString((char*)fRequestBuffer, fRequestBytesAlreadySeen,
                                                        cmdName, sizeof cmdName,
                                                        urlPreSuffix, sizeof urlPreSuffix,
                                                        urlSuffix, sizeof urlSuffix,
                                                        cseq, sizeof cseq)) 
        {
#ifdef DEBUG
                fprintf(stderr, "parseRTSPRequestString() failed!\n");
#endif handleCmd_bad(cseq);
        } else {
#ifdef DEBUG
                fprintf(stderr, "parseRTSPRequestString() returned cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\"\n", cmdName, urlPreSuffix, urlSuffix);
#endif
                if (strcmp(cmdName, "OPTIONS") == 0) {
                        handleCmd_OPTIONS(cseq);
                } else if (strcmp(cmdName, "DESCRIBE") == 0) {
                        printf("incomingRequestHandler1 ~~~~~~~~~~~~~~\n");
                        handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer);
                } else if (strcmp(cmdName, "SETUP") == 0) {
                        handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
                } else if (strcmp(cmdName, "TEARDOWN") == 0
                                || strcmp(cmdName, "PLAY") == 0
                                || strcmp(cmdName, "PAUSE") == 0
                                || strcmp(cmdName, "GET_PARAMETER") == 0) {
                        handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
                } else {
                        handleCmd_notSupported(cseq);
                }
        }

#ifdef DEBUG
        fprintf(stderr, "sending response: %s", fResponseBuffer);
#endif
        send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0);

        if (strcmp(cmdName, "SETUP") == 0 && fStreamAfterSETUP) {
                // The client has asked for streaming to commence now, rather than after a
                // subsequent "PLAY" command.  So, simulate the effect of a "PLAY" command:
                handleCmd_withinSession("PLAY", urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
        }

        resetRequestBuffer(); // to prepare for any subsequent request
        if (!fSessionIsActive) delete this;
}

  此函數,咱們能夠看到rtsp的協議的各個命令的接收處理和應答。

2> ServerMediaSession* sms = ServerMediaSession::createNew(... ...)

   建立ServerMediaSession類的實例,初始化fStreamName爲"h264_ch1",fInfoSDPString爲"h264_ch1",fDescriptionSDPString爲"RTSP/RTP stream from NETRA",fMiscSDPLines爲null,fCreationTime獲取的時間,fIsSSM爲false。

3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(... ...);

  WISH264VideoServerMediaSubsession::createNew():這個函數的主要目的是建立OnDemandServerMediaSubsession類的實例,這個類在前面已經分析,是單播時候必須建立的,初始化fWISInput爲*H264InputDevice[video_type]。

  sms->addSubsession() 是將WISH264VideoServerMediaSubsession類的實例加入到fSubsessionsTail鏈表首節點中。

4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(... ...);

  WISPCMAudioServerMediaSubsession::createNew():這個函數的主要目的是建立OnDemandServerMediaSubsession類的實例,這個類在前面已經分析,是單播時候必須建立的,初始化fWISInput爲*H264InputDevice[video_type]。

  sms->addSubsession() 是將WISPCMAudioServerMediaSubsession類的實例加入到fSubsessionsTail->fNext中。

5> rtspServer->addServerMediaSession(sms)

  將rtspServer加入到fServerMediaSessions的哈希表中。

6> env->taskScheduler().doEventLoop(&watchVariable); 

  這個doEventLoop在前面已經分析過,主要處理socket任務和延遲任務。   

2、組播的流媒體服務器:

        // Create the RTSP server:
        RTSPServer* rtspServer = NULL;
        // Normal case: Streaming from a built-in RTSP server:
        rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);
        if (rtspServer == NULL) {
                *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
                exit(1);
        }

        *env << "...done initializing \n";

        if( streamingMode == STREAMING_UNICAST )
        {
        ... ...
        }
        else
        {
                if (streamingMode == STREAMING_MULTICAST_SSM) 
                {
                        if (multicastAddress == 0) 
                                multicastAddress = chooseRandomIPv4SSMAddress(*env);
                } else if (multicastAddress != 0) {
                        streamingMode = STREAMING_MULTICAST_ASM;
                }

                struct in_addr dest; 
             dest.s_addr = multicastAddress;
        
                const unsigned char ttl = 255;

                // For RTCP:
                const unsigned maxCNAMElen = 100;
                unsigned char CNAME[maxCNAMElen + 1];
                gethostname((char *) CNAME, maxCNAMElen);
                CNAME[maxCNAMElen] = '\0';      // just in case

                ServerMediaSession* sms;
                sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM);
               
                /* VIDEO Channel initial */
                if(1)
                {
                        // Create 'groupsocks' for RTP and RTCP:
                    const Port rtpPortVideo(videoRTPPortNum);
                    const Port rtcpPortVideo(videoRTPPortNum+1);
                    
                    rtpGroupsockVideo = new Groupsock(*env, dest, rtpPortVideo, ttl);
                    rtcpGroupsockVideo = new Groupsock(*env, dest, rtcpPortVideo, ttl);
                    
                    if (streamingMode == STREAMING_MULTICAST_SSM) {
                            rtpGroupsockVideo->multicastSendOnly();
                            rtcpGroupsockVideo->multicastSendOnly();
                    }
                    
                    setVideoRTPSinkBufferSize();
                    sinkVideo = H264VideoRTPSink::createNew(*env, rtpGroupsockVideo,96, 0x42, "h264");    
                    
                    // Create (and start) a 'RTCP instance' for this RTP sink:
                    unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+500)/1000; // in kbps; for RTCP b/w share
                    rtcpVideo = RTCPInstance::createNew(*env, rtcpGroupsockVideo,
                                                         totalSessionBandwidthVideo, CNAME,
                                                         sinkVideo, NULL /* we're a server */ ,
                                                         streamingMode == STREAMING_MULTICAST_SSM);
                        
                    // Note: This starts RTCP running automatically
                    sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkVideo, rtcpVideo));

                    sourceVideo = H264VideoStreamFramer::createNew(*env, H264InputDevice[video_type]->videoSource());  
// Start streaming: sinkVideo->startPlaying(*sourceVideo, NULL, NULL); } /* AUDIO Channel initial */ if(1) { // there's a separate RTP stream for audio // Create 'groupsocks' for RTP and RTCP: const Port rtpPortAudio(audioRTPPortNum); const Port rtcpPortAudio(audioRTPPortNum+1); rtpGroupsockAudio = new Groupsock(*env, dest, rtpPortAudio, ttl); rtcpGroupsockAudio = new Groupsock(*env, dest, rtcpPortAudio, ttl); if (streamingMode == STREAMING_MULTICAST_SSM) { rtpGroupsockAudio->multicastSendOnly(); rtcpGroupsockAudio->multicastSendOnly(); } if( audioSamplingFrequency == 16000 ) sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 96, audioSamplingFrequency, "audio", "PCMU", 1);        else sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 0, audioSamplingFrequency, "audio", "PCMU", 1); // Create (and start) a 'RTCP instance' for this RTP sink: unsigned totalSessionBandwidthAudio = (audioOutputBitrate+500)/1000; // in kbps; for RTCP b/w share rtcpAudio = RTCPInstance::createNew(*env, rtcpGroupsockAudio, totalSessionBandwidthAudio, CNAME, sinkAudio, NULL /* we're a server */, streamingMode == STREAMING_MULTICAST_SSM);        // Note: This starts RTCP running automatically        sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkAudio, rtcpAudio));    sourceAudio = H264InputDevice[video_type]->audioSource(); // Start streaming: sinkAudio->startPlaying(*sourceAudio, NULL, NULL); } rtspServer->addServerMediaSession(sms); { struct in_addr dest; dest.s_addr = multicastAddress; char *url = rtspServer->rtspURL(sms); //char *url2 = inet_ntoa(dest); *env << "Mulicast Play this stream using the URL:\n\t" << url << "\n"; //*env << "2 Mulicast addr:\n\t" << url2 << "\n"; delete[] url; } } // Begin the LIVE555 event loop: env->taskScheduler().doEventLoop(&watchVariable); // does not return

1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

  同前面單播的分析同樣。

2> sms = ServerMediaSession::createNew(... ...)

  同前面單播的分析同樣。

3> 視頻

  1. 建立視頻rtp、rtcp的Groupsock類的實例,實現rtp和rtcp的udp通訊socket。這裏應該瞭解下ASM和SSM。

  2. 建立RTPSink類的實例,實現視頻數據的RTP打包傳輸。

  3. 建立RTCPInstance類的實例,實現RTCP打包傳輸。

  4. 建立PassiveServerMediaSubsession類的實例,並加入到fSubsessionsTail鏈表中的首節點。

  5. 建立FramedSource類的實例,實現一幀視頻數據的獲取。

  5. 開始發送RTP和RTCP數據到組播地址。

4> 音頻 

  1. 建立音頻rtp、rtcp的Groupsock類的實例,實現rtp和rtcp的udp通訊socket。這裏應該瞭解下ASM和SSM。

  2. 建立RTPSink類的實例,實現音頻數據的RTP打包傳輸。

  3. 建立RTCPInstance類的實例,實現RTCP打包傳輸。

  4. 建立PassiveServerMediaSubsession類的實例,並加入到fSubsessionsTail鏈表中的下一個節點。

  5. 建立FramedSource類的實例,實現一幀音頻數據的獲取。

  5. 開始發送RTP和RTCP數據到組播地址。

5> rtspServer->addServerMediaSession(sms)

  同前面單播的分析同樣。

6> env->taskScheduler().doEventLoop(&watchVariable)

  同前面單播的分析同樣。

3、單播和組播的區別

1> 建立socket的時候,組播一開始就建立了,而單播的則是根據收到的「SETUP」命令建立相應的socket。

2> startPlaying的時候,組播一開始就發送數據到組播地址,而單播則是根據收到的「PLAY」命令開始startPlaying。

4、startPlaying分析

  首先分析組播: 

  sinkVideo->startPlaying()實現不在H264VideoRTPSink類中,也不在RTPSink類中,而是在MediaSink類中實現:

Boolean MediaSink::startPlaying(MediaSource& source,
                afterPlayingFunc* afterFunc,
                void* afterClientData) 
{
    // Make sure we're not already being played:
    if (fSource != NULL) {
        envir().setResultMsg("This sink is already being played");
        return False;
    }

    // Make sure our source is compatible:
    if (!sourceIsCompatibleWithUs(source)) {
        envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
        return False;
    }
    fSource = (FramedSource*)&source;

    fAfterFunc = afterFunc;
    fAfterClientData = afterClientData;
    
    return continuePlaying();
}

 

  這裏發現調用了continuePlaying()函數,那這個函數在哪裏實現的呢?由於sinkVideo是經過 H264VideoRTPSink::createNew()實現,返回的H264VideoRTPSink類的實例,所以咱們能夠斷定這個continuePlaying()在H264VideoRTPSink類實現。

Boolean H264VideoRTPSink::continuePlaying() 
{
    // First, check whether we have a 'fragmenter' class set up yet.
    // If not, create it now:
    if (fOurFragmenter == NULL) {
        fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/);
        fSource = fOurFragmenter;
    }
    
    //printf("function=%s line=%d\n",__func__,__LINE__);
    // Then call the parent class's implementation:
    return MultiFramedRTPSink::continuePlaying();
}

  看到這裏咱們發現調用的是MultiFramedRTPSink類的成員函數continuePlaying,看下這個函數的實現:

Boolean MultiFramedRTPSink::continuePlaying()
{
    // Send the first packet.
    // (This will also schedule any future sends.)
 buildAndSendPacket(True);
    return True;
}

  這裏咱們發現了buildAndSendPacket(),這個函數實現:

void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) 
{
    //此函數中主要是準備rtp包的頭,爲一些須要跟據實際數據改變的字段留出位置。
    fIsFirstPacket = isFirstPacket;


    // Set up the RTP header:
    unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)
    rtpHdr |= (fRTPPayloadType << 16);
    rtpHdr |= fSeqNo; // sequence number
    fOutBuf->enqueueWord(rtpHdr);//向包中加入一個字


    // Note where the RTP timestamp will go.
    // (We can't fill this in until we start packing payload frames.)
    fTimestampPosition = fOutBuf->curPacketSize();
    fOutBuf->skipBytes(4); // leave a hole for the timestamp 在緩衝中空出時間戳的位置


    fOutBuf->enqueueWord(SSRC()); 


    // Allow for a special, payload-format-specific header following the
    // RTP header:
    fSpecialHeaderPosition = fOutBuf->curPacketSize();
    fSpecialHeaderSize = specialHeaderSize();
    fOutBuf->skipBytes(fSpecialHeaderSize);


    // Begin packing as many (complete) frames into the packet as we can:
    fTotalFrameSpecificHeaderSizes = 0;
    fNoFramesLeft = False;
    fNumFramesUsedSoFar = 0; // 一個包中已打入的幀數。
    //頭準備好了,再打包幀數據
 packFrame();
}

  繼續看packFrame():

void MultiFramedRTPSink::packFrame()
{
    // First, see if we have an overflow frame that was too big for the last pkt
    if (fOutBuf->haveOverflowData()) {
        //若是有幀數據,則使用之。OverflowData是指上次打包時剩下的幀數據,由於一個包可能容納不了一個幀。
        // Use this frame before reading a new one from the source
        unsigned frameSize = fOutBuf->overflowDataSize();
        struct timeval presentationTime = fOutBuf->overflowPresentationTime();
        unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
        fOutBuf->useOverflowData();


        afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
    } else {
        //一點幀數據都沒有,跟source要吧。
        // Normal case: we need to read a new frame from the source
        if (fSource == NULL)
            return;

        //更新緩衝中的一些位置
        fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
        fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
        fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
        fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;

        //從source獲取下一幀
        fSource->getNextFrame(fOutBuf->curPtr(),//新數據存放開始的位置
                fOutBuf->totalBytesAvailable(),//緩衝中空餘的空間大小
                afterGettingFrame,    //由於可能source中的讀數據函數會被放在任務調度中,因此把獲取幀後應調用的函數傳給source
                this,
                ourHandleClosure, //這個是source結束時(好比文件讀完了)要調用的函數。
                this);
    }
}

  fSource定義在MediaSink類中,在這個類中startPlaying()函數中,給fSource賦值爲傳入的參數sourceVideo,sourceVideo實現getNextFrame()函數在FramedSource中,這是一個虛函數:

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
                afterGettingFunc* afterGettingFunc,
                void* afterGettingClientData,
                onCloseFunc* onCloseFunc,
                void* onCloseClientData) 
{
    // Make sure we're not already being read:
    if (fIsCurrentlyAwaitingData) {
        envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";
        exit(1);
    }

    fTo = to;
    fMaxSize = maxSize;
    fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
    fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
    fAfterGettingFunc = afterGettingFunc;
    fAfterGettingClientData = afterGettingClientData;
    fOnCloseFunc = onCloseFunc;
    fOnCloseClientData = onCloseClientData;
    fIsCurrentlyAwaitingData = True;

    doGetNextFrame();
}

  sourceVideo經過實現H264VideoStreamFramer::createNew()實例化,發現doGetNextFrame()函數實如今H264VideoStreamFramer類中:

void H264VideoStreamFramer::doGetNextFrame()
{

  //fParser->registerReadInterest(fTo, fMaxSize);
  //continueReadProcessing();
  fInputSource->getNextFrame(fTo, fMaxSize,
                             afterGettingFrame, this,
                             FramedSource::handleClosure, this);
}

  這fInputSource在H264VideoStreamFramer的基類StreamParser中被初始化爲傳入的參數H264InputDevice[video_type]->videoSource(),VideoOpenFileSource類繼承OpenFileSource類,所以這個doGetNextFrame再一次FramedSource類中的getNextFrame()函數,此次getNextFrame函數中調用的doGetNextFrame()函數則是在OpenFileSource類實現的:

void OpenFileSource::incomingDataHandler1() {
    int ret;

    if (!isCurrentlyAwaitingData()) return; // we're not ready for the data yet

    ret = readFromFile();
    if (ret < 0) {
        handleClosure(this);
        fprintf(stderr,"In Grab Image, the source stops being readable!!!!\n");
    }
    else if (ret == 0) 
    {
        if( uSecsToDelay >= uSecsToDelayMax )
        {
            uSecsToDelay = uSecsToDelayMax;
        }else{
            uSecsToDelay *= 2; 
        }
        nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this);
    }
    else {
        nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)afterGetting, this);
    }
}

  獲取一幀數據後,執行延遲隊列中的afterGetting()函數,此函數實現父類FramedSource中:

void FramedSource::afterGetting(FramedSource* source) 
{
    source->fIsCurrentlyAwaitingData = False;
    // indicates that we can be read again
    // Note that this needs to be done here, in case the "fAfterFunc"
    // called below tries to read another frame (which it usually will)

    if (source->fAfterGettingFunc != NULL) {
        (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
                                   source->fFrameSize, 
                                   source->fNumTruncatedBytes,
                                   source->fPresentationTime,
                                   source->fDurationInMicroseconds);
    }
}

  fAfterGettingFunc函數指針在getNextFrame()函數被賦值,在MultiFramedRTPSink::packFrame() 函數中,被賦值MultiFramedRTPSink::afterGettingFrame():

void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead,
                                        unsigned numTruncatedBytes,
                                        struct timeval presentationTime,
                                        unsigned durationInMicroseconds) 
{
      MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;
      sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,
                           presentationTime, durationInMicroseconds);
}

  繼續看afterGettingFrame1實現:

void MultiFramedRTPSink::afterGettingFrame1(
        unsigned frameSize,
        unsigned numTruncatedBytes,
        struct timeval presentationTime,
        unsigned durationInMicroseconds)
{
    if (fIsFirstPacket) {
        // Record the fact that we're starting to play now:
        gettimeofday(&fNextSendTime, NULL);
    }


    //若是給予一幀的緩衝不夠大,就會發生截斷一幀數據的現象。但也只能提示一下用戶
    if (numTruncatedBytes > 0) {


        unsigned const bufferSize = fOutBuf->totalBytesAvailable();
        envir()
                << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
                << bufferSize
                << ").  "
                << numTruncatedBytes
                << " bytes of trailing data was dropped!  Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
                << OutPacketBuffer::maxSize + numTruncatedBytes
                << ", *before* creating this 'RTPSink'.  (Current value is "
                << OutPacketBuffer::maxSize << ".)\n";
    }
    unsigned curFragmentationOffset = fCurFragmentationOffset;
    unsigned numFrameBytesToUse = frameSize;
    unsigned overflowBytes = 0;


    //若是包只已經打入幀數據了,而且不能再向這個包中加數據了,則把新得到的幀數據保存下來。
    // If we have already packed one or more frames into this packet,
    // check whether this new frame is eligible to be packed after them.
    // (This is independent of whether the packet has enough room for this
    // new frame; that check comes later.)
    if (fNumFramesUsedSoFar > 0) {
        //若是包中已有了一個幀,而且不容許再打入新的幀了,則只記錄下新的幀。
        if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())
                || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
        {
            // Save away this frame for next time:
            numFrameBytesToUse = 0;
            fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
                    presentationTime, durationInMicroseconds);
        }
    }
    
    //表示當前打入的是不是上一個幀的最後一塊數據。
    fPreviousFrameEndedFragmentation = False;


    //下面是計算獲取的幀中有多少數據能夠打到當前包中,剩下的數據就做爲overflow數據保存下來。
    if (numFrameBytesToUse > 0) {
        // Check whether this frame overflows the packet
        if (fOutBuf->wouldOverflow(frameSize)) {
            // Don't use this frame now; instead, save it as overflow data, and
            // send it in the next packet instead.  However, if the frame is too
            // big to fit in a packet by itself, then we need to fragment it (and
            // use some of it in this packet, if the payload format permits this.)
            if (isTooBigForAPacket(frameSize)
                    && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
                // We need to fragment this frame, and use some of it now:
                overflowBytes = computeOverflowForNewFrame(frameSize);
                numFrameBytesToUse -= overflowBytes;
                fCurFragmentationOffset += numFrameBytesToUse;
            } else {
                // We don't use any of this frame now:
                overflowBytes = frameSize;
                numFrameBytesToUse = 0;
            }
            fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
                    overflowBytes, presentationTime, durationInMicroseconds);
        } else if (fCurFragmentationOffset > 0) {
            // This is the last fragment of a frame that was fragmented over
            // more than one packet.  Do any special handling for this case:
            fCurFragmentationOffset = 0;
            fPreviousFrameEndedFragmentation = True;
        }
    }


    
    if (numFrameBytesToUse == 0 && frameSize > 0) {
        //若是包中有數據而且沒有新數據了,則發送之。(這種狀況好像很難發生啊!)
        // Send our packet now, because we have filled it up:
        sendPacketIfNecessary();
    } else {
        //須要向包中打入數據。
        
        // Use this frame in our outgoing packet:
        unsigned char* frameStart = fOutBuf->curPtr();
        fOutBuf->increment(numFrameBytesToUse);
        // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes


        // Here's where any payload format specific processing gets done:
        doSpecialFrameHandling(curFragmentationOffset, frameStart,
                numFrameBytesToUse, presentationTime, overflowBytes);


        ++fNumFramesUsedSoFar;


        // Update the time at which the next packet should be sent, based
        // on the duration of the frame that we just packed into it.
        // However, if this frame has overflow data remaining, then don't
        // count its duration yet.
        if (overflowBytes == 0) {
            fNextSendTime.tv_usec += durationInMicroseconds;
            fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
            fNextSendTime.tv_usec %= 1000000;
        }


        //若是須要,就發出包,不然繼續打入數據。
        // Send our packet now if (i) it's already at our preferred size, or
        // (ii) (heuristic) another frame of the same size as the one we just
        //      read would overflow the packet, or
        // (iii) it contains the last fragment of a fragmented frame, and we
        //      don't allow anything else to follow this or
        // (iv) one frame per packet is allowed:
        if (fOutBuf->isPreferredSize()
                || fOutBuf->wouldOverflow(numFrameBytesToUse)
                || (fPreviousFrameEndedFragmentation
                        && !allowOtherFramesAfterLastFragment())
                || !frameCanAppearAfterPacketStart(
                        fOutBuf->curPtr() - frameSize, frameSize)) {
            // The packet is ready to be sent now
            sendPacketIfNecessary();
        } else {
            // There's room for more frames; try getting another:
            packFrame();
        }
    }
}

看一下發送數據的函數:

void MultiFramedRTPSink::sendPacketIfNecessary()
{
    //發送包
    if (fNumFramesUsedSoFar > 0) {
        // Send the packet:
#ifdef TEST_LOSS
        if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
        if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) {
            // if failure handler has been specified, call it
            if (fOnSendErrorFunc != NULL)
                (*fOnSendErrorFunc)(fOnSendErrorData);
        }
        ++fPacketCount;
        fTotalOctetCount += fOutBuf->curPacketSize();
        fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize
                - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;


        ++fSeqNo; // for next time
    }


    //若是還有剩餘數據,則調整緩衝區
    if (fOutBuf->haveOverflowData()
            && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) {
        // Efficiency hack: Reset the packet start pointer to just in front of
        // the overflow data (allowing for the RTP header and special headers),
        // so that we probably don't have to "memmove()" the overflow data
        // into place when building the next packet:
        unsigned newPacketStart = fOutBuf->curPacketSize()- 
                (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
        fOutBuf->adjustPacketStart(newPacketStart);
    } else {
        // Normal case: Reset the packet start pointer back to the start:
        fOutBuf->resetPacketStart();
    }
    fOutBuf->resetOffset();
    fNumFramesUsedSoFar = 0;


    if (fNoFramesLeft) {
        //若是再沒有數據了,則結束之
        // We're done:
        onSourceClosure(this);
    } else {
        //若是還有數據,則在下一次須要發送的時間再次打包發送。
        // We have more frames left to send.  Figure out when the next frame
        // is due to start playing, then make sure that we wait this long before
        // sending the next packet.
        struct timeval timeNow;
        gettimeofday(&timeNow, NULL);
        int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
        int64_t uSecondsToGo = secsDiff * 1000000
                + (fNextSendTime.tv_usec - timeNow.tv_usec);
        if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
            uSecondsToGo = 0;
        }


        // Delay this amount of time:
        nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
                (TaskFunc*) sendNext, this);
    }
}

  當一幀數據發送完,在doEventLoop()函數執行任務函數sendNext(),繼續發送一包,進行下一個循環。音頻數據的發送也是如此。

總結一下調用過程(參考牛搞大神):

  單播數據發送:
  單播的時候,只有收到客服端的「PLAY」的命令時,纔開始發送數據,在RTSPClientSession類中handleCmd_PLAY()函數中調用
void RTSPServer::RTSPClientSession 
::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq,
          char const* fullRequestStr) 
{

      ... ...


    fStreamStates[i].subsession->startStream(fOurSessionId,
                           fStreamStates[i].streamToken,
                           (TaskFunc*)noteClientLiveness,
                           this,
                           rtpSeqNum,
                           rtpTimestamp);
     ... ...
}
  startStream()函數定義在OnDemandServerMediaSubsession類中:
void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
                        void* streamToken,
                        TaskFunc* rtcpRRHandler,
                        void* rtcpRRHandlerClientData,
                        unsigned short& rtpSeqNum,
                        unsigned& rtpTimestamp) 
{   StreamState
* streamState = (StreamState*)streamToken;   Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));   if (streamState != NULL) {     streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData);     if (streamState->rtpSink() != NULL) {       rtpSeqNum = streamState->rtpSink()->currentSeqNo();       rtpTimestamp = streamState->rtpSink()->presetNextTimestamp(); } } }

  startPlaying函數實如今StreamState類中:

void StreamState::startPlaying(Destinations* dests,
           TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) 
{
    if (dests == NULL) return;
    
    if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
        if (fRTPSink != NULL) {
            fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
            fAreCurrentlyPlaying = True;
        } else if (fUDPSink != NULL) {
            fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
            fAreCurrentlyPlaying = True;
        }
    }

    if (fRTCPInstance == NULL && fRTPSink != NULL) {
        // Create (and start) a 'RTCP instance' for this RTP sink:
        fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
                                            fTotalBW, (unsigned char*)fMaster.fCNAME,
                                            fRTPSink, NULL /* we're a server */);
        // Note: This starts RTCP running automatically
    }

    if (dests->isTCP) {
        // Change RTP and RTCP to use the TCP socket instead of UDP:
        if (fRTPSink != NULL) {
            fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
        }
        if (fRTCPInstance != NULL) {
            fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
            fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
                                            rtcpRRHandler, rtcpRRHandlerClientData);
        }
    } else {
        // Tell the RTP and RTCP 'groupsocks' about this destination
        // (in case they don't already have it):
        if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
        if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
        if (fRTCPInstance != NULL) {
            fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
                                            rtcpRRHandler, rtcpRRHandlerClientData);
        }
    }
}

  這個函數就會去調用RTPSink類中的startPlaying()函數,可是RTPSink沒有實現,直接調用父類MediaSink中的startPlaying函數。後面就跟組播同樣的採集,組包,發送數據了。

相關文章
相關標籤/搜索