7、RTP打包與發送
rtp傳送開始於函數:MediaSink::startPlaying()。想一想也有道理,應是sink跟source要數據,因此從sink上調用startplaying(嘿嘿,至關於directshow的拉模式)。
看一下這個函數:app
Boolean MediaSink::startPlaying(MediaSource& source,dom
afterPlayingFunc* afterFunc, void* afterClientData)函數
{ui
//參數afterFunc是在播放結束時才被調用。this
// Make sure we're not already being played:spa
if (fSource != NULL) {orm
envir().setResultMsg("This sink is already being played");對象
return False;繼承
}ip
// Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg(
"MediaSink::startPlaying(): source is not compatible!");
return False;
}
//記下一些要使用的對象
fSource = (FramedSource*) &source;
fAfterFunc = afterFunc;
fAfterClientData = afterClientData;
return continuePlaying();
}
爲了進一步封裝(讓繼承類少寫一些代碼),搞出了一個虛函數continuePlaying()。讓咱們來看一下:
Boolean MultiFramedRTPSink::continuePlaying() {
// Send the first packet.
// (This will also schedule any future sends.)
buildAndSendPacket(True);
return True;
}
MultiFramedRTPSink是與幀有關的類,其實它要求每次必須從source得到一個幀的數據,因此才叫這個name。能夠看到continuePlaying()徹底被buildAndSendPacket()代替。看一下buildAndSendPacket():
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket)
{
//此函數中主要是準備rtp包的頭,爲一些須要跟據實際數據改變的字段留出位置。
fIsFirstPacket = isFirstPacket;
// Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)
rtpHdr |= (fRTPPayloadType << 16);
rtpHdr |= fSeqNo; // sequence number
fOutBuf->enqueueWord(rtpHdr);//向包中加入一個字
// Note where the RTP timestamp will go.
// (We can't fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp 在緩衝中空出時間戳的位置
fOutBuf->enqueueWord(SSRC());
// Allow for a special, payload-format-specific header following the
// RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
fSpecialHeaderSize = specialHeaderSize();
fOutBuf->skipBytes(fSpecialHeaderSize);
// Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0; // 一個包中已打入的幀數。
//頭準備好了,再打包幀數據
packFrame();
}
繼續看packFrame():
void MultiFramedRTPSink::packFrame()
{
// First, see if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) {
//若是有幀數據,則使用之。OverflowData是指上次打包時剩下的幀數據,由於一個包可能容納不了一個幀。
// Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize();
struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
fOutBuf->useOverflowData();
afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
} else {
//一點幀數據都沒有,跟source要吧。
// Normal case: we need to read a new frame from the source
if (fSource == NULL)
return;
//更新緩衝中的一些位置
fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
//從source獲取下一幀
fSource->getNextFrame(fOutBuf->curPtr(),//新數據存放開始的位置
fOutBuf->totalBytesAvailable(),//緩衝中空餘的空間大小
afterGettingFrame, //由於可能source中的讀數據函數會被放在任務調度中,因此把獲取幀後應調用的函數傳給source
this,
ourHandleClosure, //這個是source結束時(好比文件讀完了)要調用的函數。
this);
}
}
能夠想像下面就是source從文件(或某個設備)中讀取一幀數據,讀完後返回給sink,固然不是從函數返回了,而是以調用afterGettingFrame這個回調函數的方式。因此下面看一下afterGettingFrame():
void MultiFramedRTPSink::afterGettingFrame(void* clientData,
unsigned numBytesRead, unsigned numTruncatedBytes,
struct timeval presentationTime, unsigned durationInMicroseconds)
{
MultiFramedRTPSink* sink = (MultiFramedRTPSink*) clientData;
sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime,
durationInMicroseconds);
}
沒什麼可看的,只是過分爲調用成員函數,因此afterGettingFrame1()纔是重點:
void MultiFramedRTPSink::afterGettingFrame1(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds)
{
if (fIsFirstPacket) {
// Record the fact that we're starting to play now:
gettimeofday(&fNextSendTime, NULL);
}
//若是給予一幀的緩衝不夠大,就會發生截斷一幀數據的現象。但也只能提示一下用戶
if (numTruncatedBytes > 0) {
unsigned const bufferSize = fOutBuf->totalBytesAvailable();
envir()
<< "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
<< bufferSize
<< "). "
<< numTruncatedBytes
<< " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
<< OutPacketBuffer::maxSize + numTruncatedBytes
<< ", *before* creating this 'RTPSink'. (Current value is "
<< OutPacketBuffer::maxSize << ".)\n";
}
unsigned curFragmentationOffset = fCurFragmentationOffset;
unsigned numFrameBytesToUse = frameSize;
unsigned overflowBytes = 0;
//若是包只已經打入幀數據了,而且不能再向這個包中加數據了,則把新得到的幀數據保存下來。
// If we have already packed one or more frames into this packet,
// check whether this new frame is eligible to be packed after them.
// (This is independent of whether the packet has enough room for this
// new frame; that check comes later.)
if (fNumFramesUsedSoFar > 0) {
//若是包中已有了一個幀,而且不容許再打入新的幀了,則只記錄下新的幀。
if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
{
// Save away this frame for next time:
numFrameBytesToUse = 0;
fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
presentationTime, durationInMicroseconds);
}
}
//表示當前打入的是不是上一個幀的最後一塊數據。
fPreviousFrameEndedFragmentation = False;
//下面是計算獲取的幀中有多少數據能夠打到當前包中,剩下的數據就做爲overflow數據保存下來。
if (numFrameBytesToUse > 0) {
// Check whether this frame overflows the packet
if (fOutBuf->wouldOverflow(frameSize)) {
// Don't use this frame now; instead, save it as overflow data, and
// send it in the next packet instead. However, if the frame is too
// big to fit in a packet by itself, then we need to fragment it (and
// use some of it in this packet, if the payload format permits this.)
if (isTooBigForAPacket(frameSize)
&& (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
// We need to fragment this frame, and use some of it now:
overflowBytes = computeOverflowForNewFrame(frameSize);
numFrameBytesToUse -= overflowBytes;
fCurFragmentationOffset += numFrameBytesToUse;
} else {
// We don't use any of this frame now:
overflowBytes = frameSize;
numFrameBytesToUse = 0;
}
fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
overflowBytes, presentationTime, durationInMicroseconds);
} else if (fCurFragmentationOffset > 0) {
// This is the last fragment of a frame that was fragmented over
// more than one packet. Do any special handling for this case:
fCurFragmentationOffset = 0;
fPreviousFrameEndedFragmentation = True;
}
}
if (numFrameBytesToUse == 0 && frameSize > 0) {
//若是包中有數據而且沒有新數據了,則發送之。(這種狀況好像很難發生啊!)
// Send our packet now, because we have filled it up:
sendPacketIfNecessary();
} else {
//須要向包中打入數據。
// Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
fOutBuf->increment(numFrameBytesToUse);
// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes
// Here's where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime, overflowBytes);
++fNumFramesUsedSoFar;
// Update the time at which the next packet should be sent, based
// on the duration of the frame that we just packed into it.
// However, if this frame has overflow data remaining, then don't
// count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
fNextSendTime.tv_usec %= 1000000;
}
//若是須要,就發出包,不然繼續打入數據。
// Send our packet now if (i) it's already at our preferred size, or
// (ii) (heuristic) another frame of the same size as the one we just
// read would overflow the packet, or
// (iii) it contains the last fragment of a fragmented frame, and we
// don't allow anything else to follow this or
// (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize()
|| fOutBuf->wouldOverflow(numFrameBytesToUse)
|| (fPreviousFrameEndedFragmentation
&& !allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(
fOutBuf->curPtr() - frameSize, frameSize)) {
// The packet is ready to be sent now
sendPacketIfNecessary();
} else {
// There's room for more frames; try getting another:
packFrame();
}
}
}
看一下發送數據的函數:
void MultiFramedRTPSink::sendPacketIfNecessary()
{
//發送包
if (fNumFramesUsedSoFar > 0) {
// Send the packet:
#ifdef TEST_LOSS
if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) {
// if failure handler has been specified, call it
if (fOnSendErrorFunc != NULL)
(*fOnSendErrorFunc)(fOnSendErrorData);
}
++fPacketCount;
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize
- fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
++fSeqNo; // for next time
}
//若是還有剩餘數據,則調整緩衝區
if (fOutBuf->haveOverflowData()
&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) {
// Efficiency hack: Reset the packet start pointer to just in front of
// the overflow data (allowing for the RTP header and special headers),
// so that we probably don't have to "memmove()" the overflow data
// into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()-
(rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
fOutBuf->adjustPacketStart(newPacketStart);
} else {
// Normal case: Reset the packet start pointer back to the start:
fOutBuf->resetPacketStart();
}
fOutBuf->resetOffset();
fNumFramesUsedSoFar = 0;
if (fNoFramesLeft) {
//若是再沒有數據了,則結束之
// We're done:
onSourceClosure(this);
} else {
//若是還有數據,則在下一次須要發送的時間再次打包發送。
// We have more frames left to send. Figure out when the next frame
// is due to start playing, then make sure that we wait this long before
// sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL);
int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
int64_t uSecondsToGo = secsDiff * 1000000
+ (fNextSendTime.tv_usec - timeNow.tv_usec);
if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;
}
// Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
(TaskFunc*) sendNext, this);
}
}
能夠看到爲了延遲包的發送,使用了delay task來執行下次打包發送任務。
sendNext()中又調用了buildAndSendPacket()函數,呵呵,又是一個圈圈。
總結一下調用過程:
最後,再說明一下包緩衝區的使用:
MultiFramedRTPSink中的幀數據和包緩衝區共用一個,只是用一些額外的變量指明緩衝區中屬於包的部分以及屬於幀數據的部分(包之外的數據叫作overflow data)。它有時會把overflow data以mem move的方式移到包開始的位置,有時把包的開始位置直接設置到overflow data開始的地方。那麼這個緩衝的大小是怎樣肯定的呢?是跟據調用者指定的的一個最大的包的大小+60000算出的。這個地方把我搞胡塗了:若是一次從source獲取一個幀的話,那這個緩衝應設爲不小於最大的一個幀的大小纔是,爲什麼是按包的大小設置呢?能夠看到,當緩衝不夠時只是提示一下:
if (numTruncatedBytes > 0) {
unsigned const bufferSize = fOutBuf->totalBytesAvailable();
envir()
<< "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
<< bufferSize
<< "). "
<< numTruncatedBytes
<< " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
<< OutPacketBuffer::maxSize + numTruncatedBytes
<< ", *before* creating this 'RTPSink'. (Current value is "
<< OutPacketBuffer::maxSize << ".)\n";
}
固然此時不會出錯,但有可能致使時間戳計算不許,或增長時間戳計算與source端處理的複雜性(由於一次取一幀時間戳是很好計算的)。