live555支持單播和組播,咱們先分析單播的流媒體服務端,後面分析組播的流媒體服務端。服務器
1、單播的流媒體服務端:session
// Create the RTSP server: RTSPServer* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server: rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n"; exit(1); } *env << "...done initializing \n"; if( streamingMode == STREAMING_UNICAST ) { ServerMediaSession* sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription, streamingMode == STREAMING_MULTICAST_SSM); sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate)); sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type])); rtspServer->addServerMediaSession(sms); char *url = rtspServer->rtspURL(sms); *env << "Play this stream using the URL:\t" << url << "\n"; delete[] url; }
// Begin the LIVE555 event loop:
env->taskScheduler().doEventLoop(&watchVariable); // does not returnapp
咱們一步一步分析:less
1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);dom
RTSPServer* RTSPServer::createNew(UsageEnvironment& env, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds) { int ourSocket = -1; do { int ourSocket = setUpOurSocket(env, ourPort); if (ourSocket == -1) break; return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds); } while (0); if (ourSocket != -1) ::closeSocket(ourSocket); return NULL; }
此函數首先建立一個rtsp協議的socket,而且監聽rtspServerPortNum端口,建立RTSPServer類的實例。下面咱們看下RTSPServer的構造函數:socket
RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds) : Medium(env), fServerSocket(ourSocket), fServerPort(ourPort), fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds), fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), fSessionIdCounter(0) { #ifdef USE_SIGNALS // Ignore the SIGPIPE signal, so that clients on the same host that are killed // don't also kill us: signal(SIGPIPE, SIG_IGN); #endif // Arrange to handle connections from others: env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc*)&incomingConnectionHandler, this); }
RTSPServer構造函數,初始化fServerMediaSessions爲建立的HashTable,初始化fServerSocket爲咱們前面建立的tcp socket,fServerPort爲咱們監聽的端口rtspServerPortNum,而且向taskScheduler註冊fServerSocket的任務函數incomingConnectionHandler,這個任務函數主要監聽是否有新的客服端鏈接accept,若是有新的客服端接入,建立RTSPClientSession的實例。tcp
RTSPClientSession要提供什麼功能呢?能夠想象:須要監聽客戶端的rtsp請求並回應它,須要在DESCRIBE請求中返回所請求的流的信息,須要在SETUP請求中創建起RTP會話,須要在TEARDOWN請求中關閉RTP會話,等等...ide
RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr) : fOurServer(ourServer), fOurSessionId(sessionId), fOurServerMediaSession(NULL), fClientSocket(clientSocket), fClientAddr(clientAddr), fLivenessCheckTask(NULL), fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False), fTCPStreamIdCount(0), fNumStreamStates(0), fStreamStates(NULL) { // Arrange to handle incoming requests: resetRequestBuffer(); envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this); noteLiveness(); }
上面這個函數是RTSPClientSession的構造函數,初始化sessionId爲++fSessionIdCounter,初始化fClientSocket爲accept建立的socket(clientSocket),初始化fClientAddr爲accept接收的客服端地址,也向taskScheduler註冊了fClientSocket的認爲函數incomingRequestHandler。函數
incomingRequestHandler會調用incomingRequestHandler1,incomingRequestHandler1函數定義以下:oop
void RTSPServer::RTSPClientSession::incomingRequestHandler1() { noteLiveness(); struct sockaddr_in dummy; // 'from' address, meaningless in this case Boolean endOfMsg = False; unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen]; int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy); if (bytesRead <= 0 || (unsigned)bytesRead >= fRequestBufferBytesLeft) { // Either the client socket has died, or the request was too big for us. // Terminate this connection: #ifdef DEBUG fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!\n", this, bytesRead, fRequestBufferBytesLeft); #endif delete this; return; } #ifdef DEBUG ptr[bytesRead] = '\0'; fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s\n", this, bytesRead, ptr); #endif // Look for the end of the message: <CR><LF><CR><LF> unsigned char *tmpPtr = ptr; if (fRequestBytesAlreadySeen > 0) --tmpPtr; // in case the last read ended with a <CR> while (tmpPtr < &ptr[bytesRead-1]) { if (*tmpPtr == '\r' && *(tmpPtr+1) == '\n') { if (tmpPtr - fLastCRLF == 2) { // This is it: endOfMsg = 1; break; } fLastCRLF = tmpPtr; } ++tmpPtr; } fRequestBufferBytesLeft -= bytesRead; fRequestBytesAlreadySeen += bytesRead; if (!endOfMsg) return; // subsequent reads will be needed to complete the request // Parse the request string into command name and 'CSeq', // then handle the command: fRequestBuffer[fRequestBytesAlreadySeen] = '\0'; char cmdName[RTSP_PARAM_STRING_MAX]; char urlPreSuffix[RTSP_PARAM_STRING_MAX]; char urlSuffix[RTSP_PARAM_STRING_MAX]; char cseq[RTSP_PARAM_STRING_MAX]; if (!parseRTSPRequestString((char*)fRequestBuffer, fRequestBytesAlreadySeen, cmdName, sizeof cmdName, urlPreSuffix, sizeof urlPreSuffix, urlSuffix, sizeof urlSuffix, cseq, sizeof cseq)) { #ifdef DEBUG fprintf(stderr, "parseRTSPRequestString() failed!\n"); #endif handleCmd_bad(cseq); } else { #ifdef DEBUG fprintf(stderr, "parseRTSPRequestString() returned cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\"\n", cmdName, urlPreSuffix, urlSuffix); #endif if (strcmp(cmdName, "OPTIONS") == 0) { handleCmd_OPTIONS(cseq); } else if (strcmp(cmdName, "DESCRIBE") == 0) { printf("incomingRequestHandler1 ~~~~~~~~~~~~~~\n"); handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer); } else if (strcmp(cmdName, "SETUP") == 0) { handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); } else if (strcmp(cmdName, "TEARDOWN") == 0 || strcmp(cmdName, "PLAY") == 0 || strcmp(cmdName, "PAUSE") == 0 || strcmp(cmdName, "GET_PARAMETER") == 0) { handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer); } else { handleCmd_notSupported(cseq); } } #ifdef DEBUG fprintf(stderr, "sending response: %s", fResponseBuffer); #endif send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); if (strcmp(cmdName, "SETUP") == 0 && fStreamAfterSETUP) { // The client has asked for streaming to commence now, rather than after a // subsequent "PLAY" command. So, simulate the effect of a "PLAY" command: handleCmd_withinSession("PLAY", urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer); } resetRequestBuffer(); // to prepare for any subsequent request if (!fSessionIsActive) delete this; }
此函數,咱們能夠看到rtsp的協議的各個命令的接收處理和應答。
2> ServerMediaSession* sms = ServerMediaSession::createNew(... ...)
建立ServerMediaSession類的實例,初始化fStreamName爲"h264_ch1",fInfoSDPString爲"h264_ch1",fDescriptionSDPString爲"RTSP/RTP stream from NETRA",fMiscSDPLines爲null,fCreationTime獲取的時間,fIsSSM爲false。
3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(... ...);
WISH264VideoServerMediaSubsession::createNew():這個函數的主要目的是建立OnDemandServerMediaSubsession類的實例,這個類在前面已經分析,是單播時候必須建立的,初始化fWISInput爲*H264InputDevice[video_type]。
sms->addSubsession() 是將WISH264VideoServerMediaSubsession類的實例加入到fSubsessionsTail鏈表首節點中。
4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(... ...);
WISPCMAudioServerMediaSubsession::createNew():這個函數的主要目的是建立OnDemandServerMediaSubsession類的實例,這個類在前面已經分析,是單播時候必須建立的,初始化fWISInput爲*H264InputDevice[video_type]。
sms->addSubsession() 是將WISPCMAudioServerMediaSubsession類的實例加入到fSubsessionsTail->fNext中。
5> rtspServer->addServerMediaSession(sms)
將rtspServer加入到fServerMediaSessions的哈希表中。
6> env->taskScheduler().doEventLoop(&watchVariable);
這個doEventLoop在前面已經分析過,主要處理socket任務和延遲任務。
2、組播的流媒體服務器:
// Create the RTSP server: RTSPServer* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server: rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n"; exit(1); } *env << "...done initializing \n"; if( streamingMode == STREAMING_UNICAST ) { ... ... } else { if (streamingMode == STREAMING_MULTICAST_SSM) { if (multicastAddress == 0) multicastAddress = chooseRandomIPv4SSMAddress(*env); } else if (multicastAddress != 0) { streamingMode = STREAMING_MULTICAST_ASM; } struct in_addr dest; dest.s_addr = multicastAddress; const unsigned char ttl = 255; // For RTCP: const unsigned maxCNAMElen = 100; unsigned char CNAME[maxCNAMElen + 1]; gethostname((char *) CNAME, maxCNAMElen); CNAME[maxCNAMElen] = '\0'; // just in case ServerMediaSession* sms; sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM); /* VIDEO Channel initial */ if(1) { // Create 'groupsocks' for RTP and RTCP: const Port rtpPortVideo(videoRTPPortNum); const Port rtcpPortVideo(videoRTPPortNum+1); rtpGroupsockVideo = new Groupsock(*env, dest, rtpPortVideo, ttl); rtcpGroupsockVideo = new Groupsock(*env, dest, rtcpPortVideo, ttl); if (streamingMode == STREAMING_MULTICAST_SSM) { rtpGroupsockVideo->multicastSendOnly(); rtcpGroupsockVideo->multicastSendOnly(); } setVideoRTPSinkBufferSize(); sinkVideo = H264VideoRTPSink::createNew(*env, rtpGroupsockVideo,96, 0x42, "h264"); // Create (and start) a 'RTCP instance' for this RTP sink: unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+500)/1000; // in kbps; for RTCP b/w share rtcpVideo = RTCPInstance::createNew(*env, rtcpGroupsockVideo, totalSessionBandwidthVideo, CNAME, sinkVideo, NULL /* we're a server */ , streamingMode == STREAMING_MULTICAST_SSM); // Note: This starts RTCP running automatically sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkVideo, rtcpVideo)); sourceVideo = H264VideoStreamFramer::createNew(*env, H264InputDevice[video_type]->videoSource());
// Start streaming: sinkVideo->startPlaying(*sourceVideo, NULL, NULL); } /* AUDIO Channel initial */ if(1) { // there's a separate RTP stream for audio // Create 'groupsocks' for RTP and RTCP: const Port rtpPortAudio(audioRTPPortNum); const Port rtcpPortAudio(audioRTPPortNum+1); rtpGroupsockAudio = new Groupsock(*env, dest, rtpPortAudio, ttl); rtcpGroupsockAudio = new Groupsock(*env, dest, rtcpPortAudio, ttl); if (streamingMode == STREAMING_MULTICAST_SSM) { rtpGroupsockAudio->multicastSendOnly(); rtcpGroupsockAudio->multicastSendOnly(); } if( audioSamplingFrequency == 16000 ) sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 96, audioSamplingFrequency, "audio", "PCMU", 1); else sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 0, audioSamplingFrequency, "audio", "PCMU", 1); // Create (and start) a 'RTCP instance' for this RTP sink: unsigned totalSessionBandwidthAudio = (audioOutputBitrate+500)/1000; // in kbps; for RTCP b/w share rtcpAudio = RTCPInstance::createNew(*env, rtcpGroupsockAudio, totalSessionBandwidthAudio, CNAME, sinkAudio, NULL /* we're a server */, streamingMode == STREAMING_MULTICAST_SSM); // Note: This starts RTCP running automatically sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkAudio, rtcpAudio)); sourceAudio = H264InputDevice[video_type]->audioSource(); // Start streaming: sinkAudio->startPlaying(*sourceAudio, NULL, NULL); } rtspServer->addServerMediaSession(sms); { struct in_addr dest; dest.s_addr = multicastAddress; char *url = rtspServer->rtspURL(sms); //char *url2 = inet_ntoa(dest); *env << "Mulicast Play this stream using the URL:\n\t" << url << "\n"; //*env << "2 Mulicast addr:\n\t" << url2 << "\n"; delete[] url; } } // Begin the LIVE555 event loop: env->taskScheduler().doEventLoop(&watchVariable); // does not return
1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);
同前面單播的分析同樣。
2> sms = ServerMediaSession::createNew(... ...)
同前面單播的分析同樣。
3> 視頻
1. 建立視頻rtp、rtcp的Groupsock類的實例,實現rtp和rtcp的udp通訊socket。這裏應該瞭解下ASM和SSM。
2. 建立RTPSink類的實例,實現視頻數據的RTP打包傳輸。
3. 建立RTCPInstance類的實例,實現RTCP打包傳輸。
4. 建立PassiveServerMediaSubsession類的實例,並加入到fSubsessionsTail鏈表中的首節點。
5. 建立FramedSource類的實例,實現一幀視頻數據的獲取。
5. 開始發送RTP和RTCP數據到組播地址。
4> 音頻
1. 建立音頻rtp、rtcp的Groupsock類的實例,實現rtp和rtcp的udp通訊socket。這裏應該瞭解下ASM和SSM。
2. 建立RTPSink類的實例,實現音頻數據的RTP打包傳輸。
3. 建立RTCPInstance類的實例,實現RTCP打包傳輸。
4. 建立PassiveServerMediaSubsession類的實例,並加入到fSubsessionsTail鏈表中的下一個節點。
5. 建立FramedSource類的實例,實現一幀音頻數據的獲取。
5. 開始發送RTP和RTCP數據到組播地址。
5> rtspServer->addServerMediaSession(sms)
同前面單播的分析同樣。
6> env->taskScheduler().doEventLoop(&watchVariable)
同前面單播的分析同樣。
3、單播和組播的區別
1> 建立socket的時候,組播一開始就建立了,而單播的則是根據收到的「SETUP」命令建立相應的socket。
2> startPlaying的時候,組播一開始就發送數據到組播地址,而單播則是根據收到的「PLAY」命令開始startPlaying。
4、startPlaying分析
首先分析組播:
sinkVideo->startPlaying()實現不在H264VideoRTPSink類中,也不在RTPSink類中,而是在MediaSink類中實現:
Boolean MediaSink::startPlaying(MediaSource& source, afterPlayingFunc* afterFunc, void* afterClientData) { // Make sure we're not already being played: if (fSource != NULL) { envir().setResultMsg("This sink is already being played"); return False; } // Make sure our source is compatible: if (!sourceIsCompatibleWithUs(source)) { envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!"); return False; } fSource = (FramedSource*)&source; fAfterFunc = afterFunc; fAfterClientData = afterClientData; return continuePlaying(); }
這裏發現調用了continuePlaying()函數,那這個函數在哪裏實現的呢?由於sinkVideo是經過 H264VideoRTPSink::createNew()實現,返回的H264VideoRTPSink類的實例,所以咱們能夠斷定這個continuePlaying()在H264VideoRTPSink類實現。
Boolean H264VideoRTPSink::continuePlaying() { // First, check whether we have a 'fragmenter' class set up yet. // If not, create it now: if (fOurFragmenter == NULL) { fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/); fSource = fOurFragmenter; } //printf("function=%s line=%d\n",__func__,__LINE__); // Then call the parent class's implementation: return MultiFramedRTPSink::continuePlaying(); }
看到這裏咱們發現調用的是MultiFramedRTPSink類的成員函數continuePlaying,看下這個函數的實現:
Boolean MultiFramedRTPSink::continuePlaying() { // Send the first packet. // (This will also schedule any future sends.) buildAndSendPacket(True); return True; }
這裏咱們發現了buildAndSendPacket(),這個函數實現:
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) { //此函數中主要是準備rtp包的頭,爲一些須要跟據實際數據改變的字段留出位置。 fIsFirstPacket = isFirstPacket; // Set up the RTP header: unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later) rtpHdr |= (fRTPPayloadType << 16); rtpHdr |= fSeqNo; // sequence number fOutBuf->enqueueWord(rtpHdr);//向包中加入一個字 // Note where the RTP timestamp will go. // (We can't fill this in until we start packing payload frames.) fTimestampPosition = fOutBuf->curPacketSize(); fOutBuf->skipBytes(4); // leave a hole for the timestamp 在緩衝中空出時間戳的位置 fOutBuf->enqueueWord(SSRC()); // Allow for a special, payload-format-specific header following the // RTP header: fSpecialHeaderPosition = fOutBuf->curPacketSize(); fSpecialHeaderSize = specialHeaderSize(); fOutBuf->skipBytes(fSpecialHeaderSize); // Begin packing as many (complete) frames into the packet as we can: fTotalFrameSpecificHeaderSizes = 0; fNoFramesLeft = False; fNumFramesUsedSoFar = 0; // 一個包中已打入的幀數。 //頭準備好了,再打包幀數據 packFrame(); }
繼續看packFrame():
void MultiFramedRTPSink::packFrame() { // First, see if we have an overflow frame that was too big for the last pkt if (fOutBuf->haveOverflowData()) { //若是有幀數據,則使用之。OverflowData是指上次打包時剩下的幀數據,由於一個包可能容納不了一個幀。 // Use this frame before reading a new one from the source unsigned frameSize = fOutBuf->overflowDataSize(); struct timeval presentationTime = fOutBuf->overflowPresentationTime(); unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds(); fOutBuf->useOverflowData(); afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds); } else { //一點幀數據都沒有,跟source要吧。 // Normal case: we need to read a new frame from the source if (fSource == NULL) return; //更新緩衝中的一些位置 fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize(); fCurFrameSpecificHeaderSize = frameSpecificHeaderSize(); fOutBuf->skipBytes(fCurFrameSpecificHeaderSize); fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; //從source獲取下一幀 fSource->getNextFrame(fOutBuf->curPtr(),//新數據存放開始的位置 fOutBuf->totalBytesAvailable(),//緩衝中空餘的空間大小 afterGettingFrame, //由於可能source中的讀數據函數會被放在任務調度中,因此把獲取幀後應調用的函數傳給source this, ourHandleClosure, //這個是source結束時(好比文件讀完了)要調用的函數。 this); } }
fSource定義在MediaSink類中,在這個類中startPlaying()函數中,給fSource賦值爲傳入的參數sourceVideo,sourceVideo實現getNextFrame()函數在FramedSource中,這是一個虛函數:
void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize, afterGettingFunc* afterGettingFunc, void* afterGettingClientData, onCloseFunc* onCloseFunc, void* onCloseClientData) { // Make sure we're not already being read: if (fIsCurrentlyAwaitingData) { envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n"; exit(1); } fTo = to; fMaxSize = maxSize; fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame() fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame() fAfterGettingFunc = afterGettingFunc; fAfterGettingClientData = afterGettingClientData; fOnCloseFunc = onCloseFunc; fOnCloseClientData = onCloseClientData; fIsCurrentlyAwaitingData = True; doGetNextFrame(); }
sourceVideo經過實現H264VideoStreamFramer::createNew()實例化,發現doGetNextFrame()函數實如今H264VideoStreamFramer類中:
void H264VideoStreamFramer::doGetNextFrame() { //fParser->registerReadInterest(fTo, fMaxSize); //continueReadProcessing(); fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this); }
這fInputSource在H264VideoStreamFramer的基類StreamParser中被初始化爲傳入的參數H264InputDevice[video_type]->videoSource(),VideoOpenFileSource類繼承OpenFileSource類,所以這個doGetNextFrame再一次FramedSource類中的getNextFrame()函數,此次getNextFrame函數中調用的doGetNextFrame()函數則是在OpenFileSource類實現的:
void OpenFileSource::incomingDataHandler1() { int ret; if (!isCurrentlyAwaitingData()) return; // we're not ready for the data yet ret = readFromFile(); if (ret < 0) { handleClosure(this); fprintf(stderr,"In Grab Image, the source stops being readable!!!!\n"); } else if (ret == 0) { if( uSecsToDelay >= uSecsToDelayMax ) { uSecsToDelay = uSecsToDelayMax; }else{ uSecsToDelay *= 2; } nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this); } else { nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)afterGetting, this); } }
獲取一幀數據後,執行延遲隊列中的afterGetting()函數,此函數實現父類FramedSource中:
void FramedSource::afterGetting(FramedSource* source) { source->fIsCurrentlyAwaitingData = False; // indicates that we can be read again // Note that this needs to be done here, in case the "fAfterFunc" // called below tries to read another frame (which it usually will) if (source->fAfterGettingFunc != NULL) { (*(source->fAfterGettingFunc))(source->fAfterGettingClientData, source->fFrameSize, source->fNumTruncatedBytes, source->fPresentationTime, source->fDurationInMicroseconds); } }
fAfterGettingFunc函數指針在getNextFrame()函數被賦值,在MultiFramedRTPSink::packFrame() 函數中,被賦值MultiFramedRTPSink::afterGettingFrame():
void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData; sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime, durationInMicroseconds); }
繼續看afterGettingFrame1實現:
void MultiFramedRTPSink::afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { if (fIsFirstPacket) { // Record the fact that we're starting to play now: gettimeofday(&fNextSendTime, NULL); } //若是給予一幀的緩衝不夠大,就會發生截斷一幀數據的現象。但也只能提示一下用戶 if (numTruncatedBytes > 0) { unsigned const bufferSize = fOutBuf->totalBytesAvailable(); envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size (" << bufferSize << "). " << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least " << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is " << OutPacketBuffer::maxSize << ".)\n"; } unsigned curFragmentationOffset = fCurFragmentationOffset; unsigned numFrameBytesToUse = frameSize; unsigned overflowBytes = 0; //若是包只已經打入幀數據了,而且不能再向這個包中加數據了,則把新得到的幀數據保存下來。 // If we have already packed one or more frames into this packet, // check whether this new frame is eligible to be packed after them. // (This is independent of whether the packet has enough room for this // new frame; that check comes later.) if (fNumFramesUsedSoFar > 0) { //若是包中已有了一個幀,而且不容許再打入新的幀了,則只記錄下新的幀。 if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { // Save away this frame for next time: numFrameBytesToUse = 0; fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize, presentationTime, durationInMicroseconds); } } //表示當前打入的是不是上一個幀的最後一塊數據。 fPreviousFrameEndedFragmentation = False; //下面是計算獲取的幀中有多少數據能夠打到當前包中,剩下的數據就做爲overflow數據保存下來。 if (numFrameBytesToUse > 0) { // Check whether this frame overflows the packet if (fOutBuf->wouldOverflow(frameSize)) { // Don't use this frame now; instead, save it as overflow data, and // send it in the next packet instead. However, if the frame is too // big to fit in a packet by itself, then we need to fragment it (and // use some of it in this packet, if the payload format permits this.) if (isTooBigForAPacket(frameSize) && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { // We need to fragment this frame, and use some of it now: overflowBytes = computeOverflowForNewFrame(frameSize); numFrameBytesToUse -= overflowBytes; fCurFragmentationOffset += numFrameBytesToUse; } else { // We don't use any of this frame now: overflowBytes = frameSize; numFrameBytesToUse = 0; } fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse, overflowBytes, presentationTime, durationInMicroseconds); } else if (fCurFragmentationOffset > 0) { // This is the last fragment of a frame that was fragmented over // more than one packet. Do any special handling for this case: fCurFragmentationOffset = 0; fPreviousFrameEndedFragmentation = True; } } if (numFrameBytesToUse == 0 && frameSize > 0) { //若是包中有數據而且沒有新數據了,則發送之。(這種狀況好像很難發生啊!) // Send our packet now, because we have filled it up: sendPacketIfNecessary(); } else { //須要向包中打入數據。 // Use this frame in our outgoing packet: unsigned char* frameStart = fOutBuf->curPtr(); fOutBuf->increment(numFrameBytesToUse); // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes // Here's where any payload format specific processing gets done: doSpecialFrameHandling(curFragmentationOffset, frameStart, numFrameBytesToUse, presentationTime, overflowBytes); ++fNumFramesUsedSoFar; // Update the time at which the next packet should be sent, based // on the duration of the frame that we just packed into it. // However, if this frame has overflow data remaining, then don't // count its duration yet. if (overflowBytes == 0) { fNextSendTime.tv_usec += durationInMicroseconds; fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000; fNextSendTime.tv_usec %= 1000000; } //若是須要,就發出包,不然繼續打入數據。 // Send our packet now if (i) it's already at our preferred size, or // (ii) (heuristic) another frame of the same size as the one we just // read would overflow the packet, or // (iii) it contains the last fragment of a fragmented frame, and we // don't allow anything else to follow this or // (iv) one frame per packet is allowed: if (fOutBuf->isPreferredSize() || fOutBuf->wouldOverflow(numFrameBytesToUse) || (fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart( fOutBuf->curPtr() - frameSize, frameSize)) { // The packet is ready to be sent now sendPacketIfNecessary(); } else { // There's room for more frames; try getting another: packFrame(); } } }
看一下發送數據的函數:
void MultiFramedRTPSink::sendPacketIfNecessary() { //發送包 if (fNumFramesUsedSoFar > 0) { // Send the packet: #ifdef TEST_LOSS if ((our_random()%10) != 0) // simulate 10% packet loss ##### #endif if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) { // if failure handler has been specified, call it if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); } ++fPacketCount; fTotalOctetCount += fOutBuf->curPacketSize(); fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; ++fSeqNo; // for next time } //若是還有剩餘數據,則調整緩衝區 if (fOutBuf->haveOverflowData() && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) { // Efficiency hack: Reset the packet start pointer to just in front of // the overflow data (allowing for the RTP header and special headers), // so that we probably don't have to "memmove()" the overflow data // into place when building the next packet: unsigned newPacketStart = fOutBuf->curPacketSize()- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize()); fOutBuf->adjustPacketStart(newPacketStart); } else { // Normal case: Reset the packet start pointer back to the start: fOutBuf->resetPacketStart(); } fOutBuf->resetOffset(); fNumFramesUsedSoFar = 0; if (fNoFramesLeft) { //若是再沒有數據了,則結束之 // We're done: onSourceClosure(this); } else { //若是還有數據,則在下一次須要發送的時間再次打包發送。 // We have more frames left to send. Figure out when the next frame // is due to start playing, then make sure that we wait this long before // sending the next packet. struct timeval timeNow; gettimeofday(&timeNow, NULL); int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; int64_t uSecondsToGo = secsDiff * 1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec); if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative: uSecondsToGo = 0; } // Delay this amount of time: nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*) sendNext, this); } }
當一幀數據發送完,在doEventLoop()函數執行任務函數sendNext(),繼續發送一包,進行下一個循環。音頻數據的發送也是如此。
總結一下調用過程(參考牛搞大神):
單播數據發送:
單播的時候,只有收到客服端的「PLAY」的命令時,纔開始發送數據,在RTSPClientSession類中handleCmd_PLAY()函數中調用
void RTSPServer::RTSPClientSession ::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq, char const* fullRequestStr) { ... ... fStreamStates[i].subsession->startStream(fOurSessionId, fStreamStates[i].streamToken, (TaskFunc*)noteClientLiveness, this, rtpSeqNum, rtpTimestamp); ... ... }
startStream()函數定義在OnDemandServerMediaSubsession類中:
void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId, void* streamToken, TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData, unsigned short& rtpSeqNum, unsigned& rtpTimestamp)
{ StreamState* streamState = (StreamState*)streamToken; Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); if (streamState != NULL) { streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData); if (streamState->rtpSink() != NULL) { rtpSeqNum = streamState->rtpSink()->currentSeqNo(); rtpTimestamp = streamState->rtpSink()->presetNextTimestamp(); } } }
startPlaying函數實如今StreamState類中:
void StreamState::startPlaying(Destinations* dests, TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) { if (dests == NULL) return; if (!fAreCurrentlyPlaying && fMediaSource != NULL) { if (fRTPSink != NULL) { fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this); fAreCurrentlyPlaying = True; } else if (fUDPSink != NULL) { fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this); fAreCurrentlyPlaying = True; } } if (fRTCPInstance == NULL && fRTPSink != NULL) { // Create (and start) a 'RTCP instance' for this RTP sink: fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs, fTotalBW, (unsigned char*)fMaster.fCNAME, fRTPSink, NULL /* we're a server */); // Note: This starts RTCP running automatically } if (dests->isTCP) { // Change RTP and RTCP to use the TCP socket instead of UDP: if (fRTPSink != NULL) { fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId); } if (fRTCPInstance != NULL) { fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId); fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId, rtcpRRHandler, rtcpRRHandlerClientData); } } else { // Tell the RTP and RTCP 'groupsocks' about this destination // (in case they don't already have it): if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort); if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort); if (fRTCPInstance != NULL) { fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort, rtcpRRHandler, rtcpRRHandlerClientData); } } }
這個函數就會去調用RTPSink類中的startPlaying()函數,可是RTPSink沒有實現,直接調用父類MediaSink中的startPlaying函數。後面就跟組播同樣的採集,組包,發送數據了。