Darwin Streaming Server源碼分析

2     Darwin流化服務器介紹
DSS源代碼徹底採用標準C++語言寫成,編程風格很是優秀,每一個C++類都對應着一對和類同名的.h/.cpp文件。可是因爲大量採用了面向對象的概念,如繼承、多態等等;並且源文件和類至關多,因此不太容易講清楚。所以,讀者最好事先把代碼完整的過濾一兩遍,再配合本文,就能看得更清楚點。
其中,最爲重要的是基礎功能類庫(CommonUtilitiesLib)和流化服務器(StreamingServer)兩個工程,前者是整個系統的通用代碼工具箱,包括了線程管理、數據結構、網絡和文本分析等多個功能模塊。DSS和其餘相關的工具使用基礎功能類庫工程中定義的功能類實現如下三個目標:
(1)抽象出系統中相同或相似的功能,用於下降代碼的冗餘度;
(2)封裝基本功能,簡化高層編碼的複雜度;
(3)隔離開操做系統平臺相關的代碼。
而流化服務器工程中包含了DSS對多個國際標準的實現,是整個服務器的主工程。在本文中,咱們將重點分析這兩個工程中的核心代碼和模塊。另外,咱們還將簡單介紹利用DSS提供的開發接口(Module)擴展和定製服務器的方法。
DSS實現了四種IETF制定的國際標準,分別是:實時流傳輸協議RTSP(Real-time Streaming Protocol, RFC 2326)、實時傳輸協議(RTP Real-time Transfer Protocol,RFC 1889)、實時傳輸控制協議RTCP(Real-time Transport Control Protocol,RFC 1889)、會話描述協議SDP(Session Description Protocol,RFC 2327)。這四個標準是開發全部流式媒體產品都必須掌握的,所以在對相關代碼進行分析和二次開發以前,但願讀者瞭解上述四種協議的基本思想,上述協議樣本可從如下網站得到:http://www.ietf.org
3     基礎功能類庫(Common Utilities)
3.2   Tasks類
由於服務器從總體上採用了異步的運行模式,這就須要一種用於事件通訊的機制。舉例來講:一個RTSP鏈接對應的Socket端口監測到網絡上有數據到達,此時必須有一個模塊(或代碼)被通知(notify)去處理這些數據。爲此,DSS定義了Task及其相關類做爲實現這一通訊機制的核心。
在Task.h/cpp文件中,定義了三個主要的類,分別是:任務線程池類(TaskThreadPool Class)、任務線程類(TaskThread Class)以及任務類(Task Class)。
每一個Task對象有兩個主要的方法:Signal和Run。當服務器但願發送一個事件給某個Task對象時,就會調用Signal()方法;而Run()方法是在Task對象得到處理該事件的時間片後運行的,服務器中的大部分工做都是在不一樣Task對象的Run()函數中進行的。每一個Task對象的目標就是利用很小的且不會阻塞的時間片完成服務器指定某個工做。
任務線程類是上文介紹的OSThread類的一個子類,表明專門用於運行任務類的一個線程。在每一個任務線程對象內部都有一個OSQueue_Blocking類型的任務隊列,存儲該線程須要執行的任務。後面的分析能夠看到,服務器調用一個任務的Signal函數,實際上就是將該任務加入到某個任務線程類的任務隊列中去。另外,爲了統一管理這些任務線程,DSS還開發了任務線程池類,該類負責生成、刪除以及維護內部的任務線程列表。圖4描述了任務類的運行。
       下面咱們首先分析TashThread類,該類的定義以下:
class TaskThread : public OSThread     //OSThread的子類
{     //提示:全部的Task對象都將在TaskThread中運行
       1     public:
       2     TaskThread() :       OSThread(), fTaskThreadPoolElem(this){}  //構造函數
3     virtual                   ~TaskThread() { this->StopAndWaitForThread(); } //析構函數
       4     private:
              …
       5     virtual void     Entry();       //從OSThread重載的執行函數,仍然可以被子類重載
       6     Task*                   WaitForTask();    //檢測是否有該執行的任務
              
       7     OSQueueElem        fTaskThreadPoolElem;       //對應的線程池對象
       8     OSHeap                        fHeap; //紀錄任務運行時間的堆,用於WaitForTask函數
              /*關鍵數據結構:任務隊列;在Task的Signal函數中直接調用fTaskQueue對象的EnQueue函數將本身加入任務隊列*/
       9     OSQueue_Blocking fTaskQueue; 
              //此處略…
       }
       做爲OSThread的子類,TaskThread重載了Entry函數,一旦TaskThread的對象被實例化,便運行該函數。Entry()函數的主要任務就是調用WaitForTask()函數監測任務隊列,若是發現新任務,就在規定時間內執行;不然,就被阻塞。下面咱們簡要分析Entry()函數的流程:
       void TaskThread::Entry()
{
       1     Task* theTask = NULL; //空任務
       
       2     while (true) //線程循環執行
       3     {     //監測是否有須要執行的任務,若是有就返回該任務;不然阻塞;
       4            theTask = this->WaitForTask(); 
       5            Assert(theTask != NULL);
              
6            Bool16 doneProcessingEvent = false; //還沒有處理事件
              
7            while (!doneProcessingEvent)
       8            {
       9            theTask->fUseThisThread = NULL; // 對任務的調度獨立於線程
       10           SInt64 theTimeout = 0;      //Task中Run函數的返回值,重要
                     //核心部分:運行任務,根據返回值判斷任務進度
       11           if (theTask->fWriteLock)
       12           {     //若是任務中有寫鎖,須要使用寫互斥量,不然可能形成死鎖
       13                  OSMutexWriteLocker mutexLocker(&TaskThreadPool::sMutexRW);
       14                  theTimeout = theTask->Run();   //運行任務,獲得返回值
       15                  theTask->fWriteLock = false;
       16           }
       17           else
       18           {     //使用讀互斥量
       19                  OSMutexReadLocker mutexLocker(&TaskThreadPool::sMutexRW);
       20                  theTimeout = theTask->Run();   //運行任務,獲得返回值
       21           }
       22           //監測Task中Run()函數的返回值,共有三種狀況
       23           //一、返回負數,代表任務已經徹底結束
       24           if (theTimeout        25           {
       26                  delete theTask;     //刪除Task對象
       27                  theTask = NULL;
       28                  doneProcessingEvent = true;
       19           }
       30           //二、返回0,代表任務但願在下次傳信時被再次當即執行
       31           else if (theTimeout=0)
       32           {
       33                  doneProcessingEvent = compare_and_store(Task::kAlive, 0, &theTask->fEvents);
       34                  if (doneProcessingEvent)
       35                         theTask = NULL; 
       36           }
                     //三、返回正數,代表任務但願在等待theTimeout時間後再次執行
       37           else
       38           {
                     /*將該任務加入到Heap中,而且紀錄它但願等待的時間。Entry()函數將經過waitfortask()函數進行檢測,若是等待的時間到了,就再次運行該任務*/
       39                  theTask->fTimerHeapElem.SetValue(OS::Milliseconds() + theTimeout);
       40                  fHeap.Insert(&theTask->fTimerHeapElem);
       41                  (void)atomic_or(&theTask->fEvents, Task::kIdleEvent);//設置Idle事件
       42                  doneProcessingEvent = true;
       43           }
              //此處略…
       }
       注意,若是Task的Run()函數返回值TimeOut爲正數,意味着該任務是一個週期性的工做,例如發送數據的視頻泵(pump),須要每隔必定時間就發出必定量的視頻數據,直至整個節目結束。爲此,在第38~43行,將該任務加入到堆fHeap中去,而且標記該任務下次運行的時間爲TimeOut毫秒以後。未來經過調用WaitForTask()函數就能檢測到該任務是否到達規定的運行時間,WaitForTask()函數的代碼以下:
       Task* TaskThread::WaitForTask()
{
       1     while (true)
       2     {     //獲得當前時間,該函數爲靜態函數,定義見OS.h
       3            SInt64 theCurrentTime = OS::Milliseconds(); 
                     /*若是堆中有任務,且任務已經到執行時間,返回該任務。 PeekMin函數見OSHeap.h,竊聽堆中第一個元素(但不取出)*/
4     if ((fHeap.PeekMin() != NULL) && (fHeap.PeekMin()->GetValue() 從堆中取出第一個任務返回
5                   return (Task*)fHeap.ExtractMin()->GetEnclosingObject();
              //若是堆中有任務,可是還沒有到執行時間,計算須要等待的時間
       6            SInt32 theTimeout = 0;
       7            if (fHeap.PeekMin() != NULL)      //計算還需等待的時間
       8                   theTimeout = fHeap.PeekMin()->GetValue() - theCurrentTime;
       9            Assert(theTimeout >= 0);
              
              //等待theTimeout時間後從堆中取出任務返回
       10           OSQueueElem* theElem = fTaskQueue.DeQueueBlocking(this, theTimeout);
       11           if (theElem != NULL)
       12                  return (Task*)theElem->GetEnclosingObject();
       13    }     
}
       void Task::Signal(EventFlags events)
{
              …
              // fUseThisThread用於指定該任務運行的任務線程
       1     if (fUseThisThread != NULL)       //存在指定任務線程
                     //將該任務加入到指定任務線程的任務隊列中
       2            fUseThisThread->fTaskQueue.EnQueue(&fTaskQueueElem);
              //不存在指定的任務線程,隨機選擇一個任務線程運行該任務
3     else
       4     {
                     //從線程池中隨機選擇一個任務線程
       5            unsigned int theThread = atomic_add(&sThreadPicker, 1);
       6            theThread %= TaskThreadPool::sNumTaskThreads;
                     //將該任務加入到上面選擇的任務線程的任務隊列中
       7            TaskThreadPool::sTaskThreadArray[theThread]-> fTaskQueue.EnQueue (&fTaskQueueElem);
       8            }
       }
       至此咱們已經將DSS的線程和任務運行機制分析完了,這種由事件去觸發任務的概念已經被集成到了DSS的各個子系統中。例如,在DSS中常常將一個Task對象和一個Socket對象關聯在一塊兒,當Socket對象收到事件(經過select()函數),相對應的Task對象就會被傳信(經過Signal()函數);而包含着處理代碼的Run()函數就將在某個任務線程中運行。
       所以,經過使用這些Task對象,咱們就可讓全部鏈接都使用一個線程來處理,這也是DSS的缺省配置方法。
3.3   Socket類
做爲一個典型的網絡服務器,DSS源代碼中的Socket編程部分是其精華之一。DSS定義了一系列Socket類用於屏蔽不一樣平臺在TCP/UDP編程接口和使用方法上的差別。DSS中的Socket類通常都採用異步模式的(即非阻塞的),並且可以向對應的Task對象傳信(Signal),這點咱們在上一節介紹過。Socket類中具備表明性的類是:EventContext、EventThread、Socket、UDPSocket、TCPSocket以及TCPListenerSocket等等,它們之間的繼承關係見圖5。
       在eventcontext.h/.cpp文件中,定義了兩個類:EventContext類和EventThread類。 Event Context提供了檢測Unix式的文件描述符(Socket就是一種文件描述符)產生的事件(一般是EV_RE 或 EV_WR)的能力,同時還能夠傳信指定的任務。EventThread類是OSThread類的子類,它自己很簡單,只是重載了OSThread的純虛函數Entry(),用以監控全部的Socket端口是否有數據到來,其代碼分析以下:
       void EventThread::Entry()
{
/*該結構定義在ev.h中,記錄Socket描述符和在該描述符上發生的事件*/
       1     struct eventreq theCurrentEvent;       
       2     ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );   //初始化該結構
       
       3     while (true)
4     {
//首先監聽Socket端口的事件
       5            int theErrno = EINTR;
       6            while (theErrno=EINTR)
       7            {
8     #if MACOSXEVENTQUEUE //Macos平臺
       9                   int theReturnValue = waitevent(&theCurrentEvent, NULL);
10    #else       //其餘平臺
              /*調用select_waitevent函數監聽全部的Socket端口,直到有事件發生爲止*/
       11                  int theReturnValue = select_waitevent(&theCurrentEvent, NULL);
12    #endif     
              …
              //有事件發生,喚醒相應的Socket端口
13    if (theCurrentEvent.er_data != NULL)
       14    {
                     //經過事件中的標識找到相應的對象參考指針
       15           StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
       16           OSRef* ref = fRefTable.Resolve(&idStr);
       17           if (ref != NULL)
       18           {     //經過參考指針獲得EventContext對象
       19                  EventContext* theContext = (EventContext*)ref->GetObject();
                            //利用EventContext對象的ProcessEvent方法傳信對應的Task
       20                  theContext->ProcessEvent(theCurrentEvent.er_eventbits);
       21                  fRefTable.Release(ref);       //減小引用計數
       22           }
//此處略…
}
       上述代碼有兩點須要注意:首先在第11行,調用select_waitevent函數監聽全部Socket端口的事件。該函數在Windows平臺上是採用WSAAsyncSelect(異步選擇)模型實現的。具體實現是:系統首先建立一個窗口類,該類專門用於接受消息;在每一個Socket端口建立後,調用WSAsyncSelect函數,同時將上述窗口類的句柄做爲參數傳入;未來這些Socket端口有事件發生時,Windows就會自動將這些事件映射爲標準的Windows消息發送給窗口類,此時select_waitevent函數經過檢查消息就可以得到對應Socket端口發生的事件。對於Windows平臺下Socket的異步編程技術細節請參閱《Windows網絡編程技術》一書。
       另外,在第20行調用的EventContext對象的ProcessEvent函數實現上很簡單,只有一行代碼:fTask->Signal(Task::kReadEvent);其中fTask爲該EventContext對象對應的Task對象;ProcessEvent函數向Task對象傳信,以便及時處理剛剛發生的Socket事件。
       與EventThread對應的EventContext對象負責維護指定的描述符,其主要函數包括InitNonBlocking、CleanUp和RequestEvent等。其中InitNonBlocking函數調用Socket API ioctlsocket將用戶指定的描述符設置爲異步,CleanUp函數用於關閉該描述符;另外,用戶經過RequestEvent函數申請對該描述符中某些事件的監聽,如前所述,該函數內部調用了WSAsyncSelect來實現這一功能。
       Socket Class、UDPSocket Class和TCPSocketClass三個類都是EventContext的子類,它們封裝了TCP和UDP的部分實現,同時擴展了EventContext中的事件,但都沒有改變其運行機制,所以此處再也不詳述,留給讀者自行分析。咱們要爲你們分析的是另一個比較複雜的Socket類TCPListenerSocket類。TCPListenerSocket用於監聽TCP端口,當一個新鏈接請求到達後,該類將賦予這個新鏈接一個Socket對象和一個Task對象的配對。首先分析TCPListenerSocket類的主要定義以下:
       class TCPListenerSocket : public TCPSocket, public IdleTask
{
/*提示:該類從有兩個基類,因此它既是一個事件監聽者,同時也是一個任務Task。做爲一個任務,給TCPListenerObject發送Kill事件就能夠刪除它*/
       1     public:
       2            TCPListenerSocket() :   TCPSocket(NULL, Socket::kNonBlockingSocketType), IdleTask(), fAddr(0), fPort(0), fOutOfDescriptors(false) {}  //構造函數
       3            virtual ~TCPListenerSocket() {}   //析構函數
              
                     //addr爲地址,port爲端口號,初始化函數自動監聽TCP端口
       4            OS_Error              Initialize(UInt32 addr, UInt16 port);
                     //子類必須重載該純虛函數,用於創建新鏈接時生成任務對象
       5            virtual Task*   GetSessionTask(TCPSocket** outSocket) = 0;
       6            virtual SInt64  Run();  //重載Task的Run函數,子類仍可重載
                     
       7     private:
                     //重載EventContext的ProcessEvent函數,用於產生Socket和Task對象配對
8            virtual void ProcessEvent(int eventBits);
       9            OS_Error       Listen(UInt32 queueLength);
//其餘略…
}
       前面咱們分析得知,EventContext類經過ProcessEvent函數來實現對任務的傳信工做,但在TCPListenerSocket 中,ProcessEvent函數被重載用來建立Socket和Task對象得配對,該函數的實現以下:
       void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
{     /*提示:該函數運行於系統惟一的EventThread線程中,因此要儘可能快速,以避免佔用過多的系統資源*/
              //此處略去部分定義…
       1     Task* theTask = NULL;     //Task對象
       2     TCPSocket* theSocket = NULL;       //Socket對象
       
              //建立對象配對
       3     while (true)
       4     {     //accept鏈接
       5            int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size); 
       6            if (osSocket == -1) //監聽端口出錯
       7            {     //此處略去出錯處理     }
                     //用子類重載的GetSessionTask函數建立Task對象
       8            if ((theTask = this->GetSessionTask(&theSocket))=NULL) //建立出錯
       9                   close(osSocket);
       10           else  //建立成功,接着建立Socket對象
       11           {     
       12                  Assert(osSocket != EventContext::kInvalidFileDesc);
                            //此處略去部分對新建鏈接端口的設置(setsockopt函數)
                            //建立新的Socket對象
       13                  theSocket->Set(osSocket, &addr);
       14                  theSocket->InitNonBlocking(osSocket); //初始化
       15                  theSocket->SetTask(theTask); //設置對應的任務
       16           theSocket->RequestEvent(EV_RE); //新對象監聽讀事件
       17           }
       18    }
              //處理完一次鏈接請求後,TCPListenerSocket對象還要接着監聽
       19    this->RequestEvent(EV_RE);
}
       對Socket類的分析基本完成了,從中咱們能夠發現,DSS對於網絡傳信和任務調度之間的處理很是精密,環環相扣,在某種程度上甚至是有些過a於花哨。可是這些基本類是上層RTSP/RTP等服務器子系統編碼的基礎,所以但願讀者可以從本質上掌握這些代碼。
4     核心功能庫(Server Core)
4.1 RTSP 子系統
       RTSP標準是實時流控制協議(Real-Time Streaming Protocol RFC2326)的簡稱,它被客戶和流式媒體服務器用來交換對媒體的控制信息。圖6是RTSP基本操做的描述。
再給出一個RTSP協議的例子以下:
       DSS開發了一個RTSP子系統來支持標準的RTSP協議,本節將分析這些源代碼。
       首先,DSS定義了一個TCPListenerSocket類的子類RTSPListenerSocket,用於監聽RTSP鏈接請求。RTSPListenerSocket類作的惟一一件事就是重載了GetSessionTask函數,當客戶的鏈接請求到達後,它建立了一個Socket對象和RTSPSession對象的配對。RTSPSession對象是Task類的子類,是專門用於處理RTSP請求的任務類。
       如圖7所示,RTSP鏈接創建後,服務器會爲每一個客戶維護一個Socket對象和RTSPSession對象的配對;當客戶的RTSP請求到達時,Socket對象就會調用RTSPSession對象的Signal方法傳信,即將RTSPSession對象加入到TaskThread對象的任務隊列中去;而當時間片到來,TaskThread線程就會調用RTSPSession對象的Run方法,這個方法就會處理客戶發送過來的RTSP請求。所以,下面咱們將主要分析RTSPSession的Run方法。
       爲了跟蹤當前處理的狀況,RTSPSession類內部定義了多個狀態,而Run方法其實就是經過在這些狀態之間不斷切換,同時對客戶的RTSP請求作出不一樣的處理。
                     enum
                     {
                     //RTSPSession的基本狀態
                     kReadingRequest= 0,
                     kFilteringRequest= 1,
                     kRoutingRequest= 2,
                     kAuthenticatingRequest= 3,
                     kPreprocessingRequest= 4,
                     kProcessingRequest= 5,
                     kSendingResponse= 6,
                     kPostProcessingRequest       = 7,
                     kCleaningUp= 8,
              
                     //當RTSP協議經過HTTP隧道實現時將用到下面的狀態
       kWaitingToBindHTTPTunnel = 9,         
kSocketHasBeenBoundIntoHTTPTunnel = 10,
kHTTPFilteringRequest = 11,               
                     kReadingFirstRequest = 12,                 
                     kHaveNonTunnelMessage = 13                          
              }
       另外,值得注意的是,DSS提供一種稱爲Module的二次開發模式,開發人員能夠編寫新的Module而且註冊其但願運行的狀態,系統就會在相應的狀態下調用該Module,從而將控制權暫時交給二次開發的代碼,以便加強系統的功能。簡單起見,下面咱們將分析不存在客戶模塊的Run()函數源代碼。首先分析其主框架以下:
       SInt64 RTSPSession::Run()
{
       1     EventFlags events = this->GetEvents();     //取出事件
       2     QTSS_Error err = QTSS_NoErr;
       3     QTSSModule* theModule = NULL;
       4     UInt32 numModules = 0;
       
       // 設定當前的Module狀態
       5     OSThread::GetCurrent()->SetThreadData(&fModuleState);
       
       //檢查該鏈接是否超時,若是是就設定狀態斷掉該鏈接
       6     if ((events & Task::kTimeoutEvent) || (events & Task::kKillEvent))
       7            fLiveSession = false;
              
       8     while (this->IsLiveSession()) //若是鏈接還沒有拆除,執行狀態機
9     {
              /* 提示:下面是RTSPSession的狀態機。由於在處理RTSP請求過程當中,有多個地方須要Run方法返回以便繼續監聽新的事件。爲此,咱們須要跟蹤當前的運行狀態,以便在被打斷後還能回到原狀態*/
       10           switch (fState)
       11           {
       12                  case 狀態1: //處理略
13    case 狀態2: //處理略…
14    case 狀態n: //處理略
       15           }     //此處略…
       }
       Run函數的主框架比較簡單,其核心就在於10~15的狀態機,所以咱們但願按照客戶請求到達而且被處理的主要流程爲讀者描述該狀態機的運轉。
       1第一次請求到達進入kReadingFirstRequest狀態,該狀態主要負責從RTSPRequestStream類的對象fInputStream中讀出客戶的RTSP請求,其處理以下:
              case kReadingFirstRequest:
              {
              1     if ((err = fInputStream.ReadRequest())=QTSS_NoErr)
              2     {/* RequestStream返回QTSS_NoErr意味着全部數據已經從Socket中讀出,但尚不能構成一個完整的請求,所以必須等待更多的數據到達*/
              3            fInputSocketP->RequestEvent(EV_RE); //接着請求監聽讀事件
              4            return 0;      //Run函數返回,等待下一個事件發生
              5     }
              6     if ((err != QTSS_RequestArrived) && (err != E2BIG))
              7     {//出錯,中止處理
              8            Assert(err > 0); 
              9            Assert(!this->IsLiveSession());
              10           break;
              11    }
                     //請求已經徹底到達,轉入kHTTPFilteringRequest狀態
              12    if (err = QTSS_RequestArrived)
              13           fState = kHTTPFilteringRequest;
                     //接收緩衝區溢出,轉入kHaveNonTunnelMessage狀態
       14    if (err=E2BIG)
              15           fState = kHaveNonTunnelMessage;
              }
              continue;
       2正常狀況下,在得到一個完整的RTSP請求後(上第12行),系統將進入kHTTPFilteringRequest狀態該狀態檢查RTSP鏈接是否須要通過HTTP代理實現;如不須要,轉入kHaveNonTunnelMessage狀態。
       3進入kHaveNonTunnelMessage狀態後,系統建立了RTSPRequest類的對象fRequest,該對象解析客戶的RTSP請求,並保存各類屬性。fRequest對象被傳遞給其餘狀態處理。
       4接着進入kFilteringRequest狀態,二次開發人員能夠經過編寫Module對客戶的請求作出特殊處理。若是客戶的請求爲正常的RTSP請求,系統調用SetupRequest函數創建用於管理數據傳輸的RTPSession類對象,其源代碼分析以下:
       void RTSPSession::SetupRequest()
{
       // 首先分析RTSP請求,細節見RTSPRequest.h/.cpp
       1     QTSS_Error theErr = fRequest->Parse();
2     if (theErr != QTSS_NoErr)   
       3            return;
       
              //OPTIONS請求,簡單發回標準OPTIONS響應便可
4     if (fRequest->GetMethod() = qtssOptionsMethod)
       5     {//此處略去部分處理代碼…
6     }
              
       //DESCRIBE請求,必須保證已經有了SessionID
       7     if (fRequest->GetMethod() = qtssDescribeMethod)
       8     {
       9            if (fRequest->GetHeaderDictionary()->GetValue(qtssSessionHeader)->Len > 0)
       10           {
       11                  (void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientHeaderFieldNotValid, qtssMsgNoSesIDOnDescribe);
12                  return;
       13           }
14    }
       
              //查找該請求的RTPSession
       15    OSRefTable* theMap = QTSServerInterface::GetServer()->GetRTPSessionMap();
       16    theErr = this->FindRTPSession(theMap);
       17    if (theErr != QTSS_NoErr)
       18           return;
       
       //若是未查找到,創建一個新的RTPSession
       19    if (fRTPSession= NULL)
       20    {
       21           theErr = this->CreateNewRTPSession(theMap);
       22           if (theErr != QTSS_NoErr)
       23                  return;
       24    }
              //此處略…
}
       5進入kRoutingRequest狀態,調用二次開發人員加入的Module,用於將該請求路由(Routing)出去。缺省狀況下,系統自己對此狀態不作處理。
       6進入kAuthenticatingRequest狀態,調用二次開發人員加入的安全模塊,主要用於客戶身份驗證以及其餘如規則的處理。讀者若是但願開發具備商業用途的流式媒體服務器,該模塊必須進行二次開發。
       7進入kPreprocessingRequest和kProcessingRequest及kPostProcessingRequest狀態,這三種狀態都是經過調用系統自帶或二次開發人員添加的Module來處理RTSP請求,例如系統提供了QTSSReflector Module、QTSSSplitter Module以及QTSSFile Module等模塊。其中比較重要的QTSSFile Module屬於QTLib庫的部分,此處再也不詳述。
       8進入kSendingResponse狀態,用於發送對客戶RTSP請求處理完成以後的響應。系統在該狀態調用了fOutputStream.Flush()函數將在fOutputStream中還沒有發出的請求響應經過Socket端口徹底發送出去。
       9進入kCleaningUp狀態,清除全部上次處理的數據,並將狀態設置爲kReadingRequest等待下次請求到達。
       RTSPSession的主流程分析完了,但輔助其操做的多個RTSP類還須要讀者自行分析,它們分別是:RTSPSessionInterface Class、RTSPRequest Class、RTSPRequestInterface Class、RTSPRequestStream Class以及RTSPResponseStream Class等等。
4.2 RTP子系統
       RTP標準是實時傳輸協議(Real-Time Transfer Protocol)的簡稱,它被客戶和流式媒體服務器用來處理流式媒體數據的傳輸。在介紹RTSP的運行流程時,咱們發現RTSPSession對象經過調用SetupRequest函數爲客戶創建RTPSession對象。RTPSession類是Task類的子類,所以它重載了Task類的Run函數,該函數經過調用FileModule.cpp文件中的SendPacket()函數向客戶發送RTP協議打包的流式媒體數據。當客戶經過利用RTSP向RTSPSession對象發出PLAY命令後,RTSPSession對象將調用RTPSession對象的Play()函數。Play函數準備好須要打包發送的數據後,利用Task類的Signal函數傳信RTPSession對象,使其被加入某個TaskThread的任務隊列,從而運行其Run函數。
另外,對同一個節目中的每個獨立的RTP流(如音頻流或視頻流等),DSS都定義了一個RTPStream類與之對應;顯然一個RTPSession對象可能包含多個RTPStream對象。整個RTP子系統的核心運行流程見圖8。
       下面,咱們首先分析RTPSession中Run()函數的用法:
       SInt64 RTPSession::Run()
{ //提示:該函數代碼在TaskThread內運行
1     EventFlags events = this->GetEvents(); //取出事件
2     QTSS_RoleParams theParams;
       //提供給其餘Module運行的參數,第一個成員是對象自己
       3     theParams.clientSessionClosingParams.inClientSession = this;        
       //設定本身爲當前運行的線程
       4     OSThread::GetCurrent()->SetThreadData(&fModuleState);
              /*若是事件是通知RTPSession對象死亡,就準備自殺。可能致使這種狀況的有兩種事件:自殺kKillEvent;超時kTimeoutEvent*/
       5     if ((events & Task::kKillEvent) || (events & Task::kTimeoutEvent) || (fModuleDoingAsyncStuff))
       6     {     //處理對象自殺代碼,此處略…
       7            return –1;     //返回出錯信息,這樣析構函數就會被調用,從而讓對象徹底死亡
       8     }
              //若是正處於暫停(PAUSE)狀態,什麼都不作就返回,等待PLAY命令
       9     if ((fState == qtssPausedState) || (fModule == NULL))
       10           return 0;
       
              //下面代碼負責發送數據
       11    {     //對Session互斥量加鎖,防止發送數據過程當中RTSP請求到來
       12           OSMutexLocker locker(&fSessionMutex);
                     //設定數據包發送時間,防止被提早發送
       13           theParams.rtpSendPacketsParams.inCurrentTime = OS::Milliseconds();
       14           if (fPlayTime > theParams.rtpSendPacketsParams.inCurrentTime) //未到發送時間
       15                  theParams.rtpSendPacketsParams.outNextPacketTime=fPlayTime- theParams.rtpSendPacketsParams.inCurrentTime; //計算還需多長時間纔可運行
       16           else
       17           {     //下次運行時間的缺缺省值爲0
       18                  theParams.rtpSendPacketsParams.outNextPacketTime = 0;
                     // 設置Module狀態
       19                  fModuleState.eventRequested = false;
       20                  Assert(fModule != NULL);
                            //調用QTSS_RTPSendPackets_Role內的函數發送數據,見FileModule.cpp
       21                  (void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams);
                            //將返回值從負數改成0,不然任務對象就會被TaskThread刪除
       22                  if (theParams.rtpSendPacketsParams.outNextPacketTime        23                         theParams.rtpSendPacketsParams.outNextPacketTime = 0;
       24           }
       25    }
              //返回下一次但願被運行的時間;返回值含義見前文的分析
       26    return theParams.rtpSendPacketsParams.outNextPacketTime;
}
       從上面分析可見,正常狀態下Run函數的返回值有兩種:若是返回值爲正數,表明下一次發送數據包的時間,規定時間到來的時候,TaskThread線程會自動調用Run函數;若是返回值等於0,在下次任何事件發生時,Run函數就會被調用,這種狀況每每發生在全部數據都已經發送完成或者該RTPSession對象將要被殺死的時候。
       在第21行咱們看到,Run函數調用了QTSSFileModule中的QTSS_RTPSendPackets_Role發送數據。在QTSSFileModule.cpp文件的QTSSFileModule_Main函數內,系統又調用了SendPackets函數,這纔是真正發送RTP數據包的函數,咱們對其代碼分析以下:
       QTSS_Error SendPackets(QTSS_RTPSendPackets_Params* inParams)
{
              //此處略去部分定義…
       //獲得要發送數據的FileSession對象,其定義見QTSSFileModule.cpp文件
       1     FileSession** theFile = NULL;
       2     UInt32 theLen = 0;
       3     QTSS_Error theErr = QTSS_GetValuePtr(inParams->inClientSession, sFileSessionAttr, 0, (void**)&theFile, &theLen); 
       4     if ((theErr != QTSS_NoErr) || (theLen != sizeof(FileSession*))) //出錯
       5     {     //設定出錯緣由,而後斷掉鏈接,並返回
       6            QTSS_CliSesTeardownReason reason = qtssCliSesTearDownServerInternalErr;
       7            (void) QTSS_SetValue(inParams->inClientSession, qtssCliTeardownReason, 0, &reason, sizeof(reason));
       8            (void)QTSS_Teardown(inParams->inClientSession);
       9            return QTSS_RequestFailed;
       10    }
       //該節目文件中音頻所能忍受的最大延遲
       11    maxDelayToleranceForStream = (*theFile)->fMaxAudioDelayTolerance;
       
       12    while (true)
       13    {     
                     //不存在待發送數據包,多是文件還沒有打開
       14           if ((*theFile)->fNextPacket == NULL)
       15           {
       16                  void* theCookie = NULL;
                            //得到第一個數據包,theTransmitTime爲傳輸數據花費的時間
       17                  Float64 theTransmitTime = (*theFile)->fFile.GetNextPacket(&(*theFile)->fNextPacket, &(*theFile)->fNextPacketLen, &theCookie);
       18                  if ( QTRTPFile::errNoError != (*theFile)->fFile.Error() )
                            {//讀數據出錯,斷掉鏈接,返回。此處略 }
                            …
       19                  (*theFile)->fStream = (QTSS_RTPStreamObject)theCookie; //獲得RTPStream對象
       20                  (*theFile)->fPacketPlayTime = (*theFile)->fAdjustedPlayTime + ((SInt64)(theTransmitTime * 1000)); //推遲theTransmitTime長度的播放時間
       21                  (*theFile)->fPacketWasJustFetched = true;       
       22                  if ((*theFile)->fNextPacket != NULL)
       23                  {     // 判斷流格式
       24                         QTSS_RTPPayloadType* thePayloadType = NULL;
       25                         QTSS_Error theErr = QTSS_GetValuePtr( (*theFile)->fStream, qtssRTPStrPayloadType, 0, (void**)&thePayloadType, &theLen );
                                   //設定視頻流可忍受的最大延遲時間
       26                         if (*thePayloadType == qtssVideoPayloadType)
       27                         maxDelayToleranceForStream = (*theFile)->fMaxVideoDelayTolerance;
       28                  }
       29           }
              
                     //仍無數據,說明全部數據已經傳輸完成了
       30           if ((*theFile)->fNextPacket = NULL)
       31           {     //向fStream中寫入長度爲0的空數據,以便強制緩衝區刷新
       32                  (void)QTSS_Write((*theFile)->fStream, NULL, 0, NULL, qtssWriteFlagsIsRTP);
       33                  inParams->outNextPacketTime = qtssDontCallSendPacketsAgain;
       34                  return QTSS_NoErr; //完成任務返回
       35           }
                     //提示:開始發送RTP數據包
                     //計算當前時間和該段數據應該發送的時間之間的相對間隔
       36           SInt64 theRelativePacketTime = (*theFile)->fPacketPlayTime - inParams->inCurrentTime;  // inCurrentTime = OS::Milliseconds();
              
       37           SInt32 currentDelay = theRelativePacketTime * -1L; //計算傳輸延遲
       38           theErr =  QTSS_SetValue( (*theFile)->fStream, qtssRTPStrCurrentPacketDelay, 0, ¤tDelay, sizeof(currentDelay) ); //保存該延遲
                     //若是延遲過大,就丟棄該包,等待發送下一個數據包
       39           if (theRelativePacketTime > sMaxAdvSendTimeInMsec)
       40           {
       41                  Assert( theRelativePacketTime > 0 );
       42                  inParams->outNextPacketTime = theRelativePacketTime;
       43                  return QTSS_NoErr;
       44           }
                     //此處略去部分處理視頻質量的代碼…
                     // 發送當前數據包
       45           QTSS_Error writeErr = QTSS_Write((*theFile)->fStream, (*theFile)->fNextPacket, (*theFile)->fNextPacketLen, NULL, qtssWriteFlagsIsRTP);
                     
//其他代碼略…
}
       RTP子系統是DSS中最爲複雜的部分之一,這是由於發送RTP數據包的過程不但涉及到網絡接口,並且和文件系統有着密切的關係。DSS的一個重要特徵就是可以將線索化(Hinted)過的QuickTime文件經過RTSP和RTP協議流化出去。全部分析這些文件的代碼都被提取出來而且封裝在QTFile庫中。這種封裝方式使得系統的各個部分都變得簡單:QTFile負責處理文件的分析;而DSS其餘部分負責處理網絡和協議。服務器中的RTPFileModule調用QTFile庫檢索索引過的QuickTime文件的數據包和元數據。QTFile庫的講解超出了本文的範圍,可是但願讓DSS支持其餘媒體格式的讀者可以掌握它的實現機制。
5  DSS二次開發接口:Module開發流程
       做爲一個運行於多個操做系統平臺的開發源代碼的服務器,DSS提供了一種稱爲Module的二次開發接口。使用這個開發接口,咱們能夠充分利用服務器的可擴展性及其實現的多種協議,而且可以保證和未來版本兼容。DSS中的許多核心功能也是以Module的方式預先實現而且編譯的,所以能夠說對Module的支持已經被設計到DSS的內核中去了。
       下面咱們將分析DSS的一個內嵌Module:QTSSFileModule的源代碼來講明Module的編程方式,QTSSFileModule的實如今QTSSFileModule.cpp文件中。
       每一個QTSS Module必須實現兩個函數:
首先,每一個QTSS Module必須實現一個主函數,服務器調用該函數用於啓動和初始化模塊中的QTSS函數;QTSSFileModule主函數的實現以下:
QTSS_Error QTSSFileModule_Main(void* inPrivateArgs)
{
       return _stublibrary_main(inPrivateArgs, QTSSFileModuleDispatch);
}
其中QTSSFileModuleDispatch是Module必須實現的分發函數名。
另外一個須要實現的是分發函數,服務器調用該函數實現某個特殊任務。此時,服務器將向分發函數傳入任務的名字和一個任務相關的參數塊。QTSSFileModule分發函數的實現以下:
QTSS_Error QTSSFileModuleDispatch(QTSS_Role inRole, QTSS_RoleParamPtr inParamBlock)
{     //根據傳入的任務名稱和入參執行相應的處理函數
       switch (inRole)      //任務名稱
       {
              case QTSS_Register_Role:
                     return Register(&inParamBlock->regParams);
              case QTSS_Initialize_Role:
                     return Initialize(&inParamBlock->initParams);
              case QTSS_RereadPrefs_Role:
                     return RereadPrefs();
              case QTSS_RTSPRequest_Role:
                     return ProcessRTSPRequest(&inParamBlock->rtspRequestParams);
              case QTSS_RTPSendPackets_Role:
                     return SendPackets(&inParamBlock->rtpSendPacketsParams);
              case QTSS_ClientSessionClosing_Role:
                     return DestroySession(&inParamBlock->clientSessionClosingParams);
       }
       return QTSS_NoErr;
}
       其中,分發函數的入參是一個聯合,它根據任務名稱的不一樣,具體的數據結構也不一樣,下面是該數據結構的定義:
       typedef union
{
              QTSS_Register_Params                             regParams;
              QTSS_Initialize_Params                            initParams;
              QTSS_ErrorLog_Params                           errorParams;
              //此處略去其餘多個數據結構…
} QTSS_RoleParams, *QTSS_RoleParamPtr;
       DSS提供了兩種方式把咱們本身開發的Module添加到服務器中:一種稱爲靜態模塊(Static Module),該方式將咱們開發的Module代碼直接編譯到內核中去;另外一種稱爲動態模塊(Dynamic Module),該方式將咱們開發的Module單獨編譯稱爲一個動態庫,而後修改配置,使服務器在啓動時將其加載。圖9描述了DSS啓動和關閉時模塊調用流程。
       當服務器啓動時,它首先裝載沒有被編譯進內核的動態模塊,而後才裝載被編譯進內核的靜態模塊;因爲現有的大部分系統功能都是以靜態模塊的方式存在的,若是你但願用本身的模塊替換某個系統功能,最好是編寫一個動態模塊,由於它們將早於靜態模塊被裝載。
       不管是靜態模塊仍是動態模塊,它們的代碼都是相同的,惟一的不一樣就是它們的編譯方式。首先爲了將靜態模塊編譯到服務器中,咱們必須修改QTSServer.cpp文件中的QTSServer::LoadCompiledInModules,並向其中加入如下代碼:
       QTSSModule*       myModule=new QTSSModule(*_XYZ_*);
       (void)myModule->Initialize(&sCallbacks,&_XYZMAIN_);
       (void)AddModule(MyModule);
       其中,XYZ是靜態模塊的名字,而XYZMAIN則是其主函數入口。
       動態模塊的編譯方法以下:首先單獨編譯動態模塊爲一個動態共享庫;將該共享庫與QTSS API stub library連接到一塊兒;最後將結果文件放置到/usr/sbin/QTSSModules目錄中去。此後,服務器在啓動時就將自動調用該動態模塊。編程

相關文章
相關標籤/搜索