Apple公司Darwin流式服務器源代碼分析

當前,伴隨着Internet的飛速發展,計算機網絡已經進入到每個普通人的家庭。在這個過程當中,一個值得咱們關注的現象是:Internet中存儲和傳輸內容的構成已經發生了本質的改變,從傳統的基於文本或少許圖像的主頁變爲大容量、富信息量的流式媒體信息。一份早在1998年提交的研究報告就曾指出,流式媒體統治Internet的潮流是不可抗拒的,該報告估計到2003年,存儲在網絡服務器上的內容超過50%的將是流式媒體信息。但今天看來,這個估計仍是有些保守了。
所謂的流式媒體簡單的講就是指人們經過網絡實時的收看多媒體信息:如音頻流、視頻流等。與流式媒體對應的傳統工做方式是下載+播放模式,即用戶首先下載多媒體文件,而後再在本地播放,這種方法的一個主要缺點是啓動延遲較大,例如一個30分鐘長的MPEG-I文件(至關於VCD質量),即便使用1.5Mbps的速率下載,也須要半個小時才能完成,這樣一個漫長的等待時間實在是沒法忍受。在窄帶網絡環境中,幾乎全部基於Internet的流式媒體產品都有着相似的工做原理:首先須要開發高效的壓縮編碼技術,並經過一套完整有效的傳輸體系將其發佈到用戶的桌面上。目前在流式媒體領域,有三種佔有主導地位的產品,它們分別是Apple公司的Quick Time、Microsoft公司的Media Server以及Real公司的Real System。本文將介紹QuickTime技術及其開放源代碼的Darwin流化服務器。
1     QuickTime技術介紹
Apple公司近日發佈了QuickTime 5及QuickTime Streaming Server 3(簡稱QTSS)。做爲客戶端的QuickTime 5是用於在Internet上對高質量音頻和視頻內容進行建立、播放及提供數字流的軟件,目前QuickTime在全世界的使用量已經超過1億5千萬份。QuickTime Streaming Server 3是Apple基於標準的、開放式源代碼的流式服務器軟件的新版本,它包括如下新功能:跳讀保護(Skip Protection),一項得到專利的特性組合,它能夠保證Internet上數字流的質量,防止中斷;全新的易於使用、基於Web的界面,用戶能夠在本地或遠程進行管理,實現服務器配置。做爲Internet流媒體聯盟(ISMA)的建立者之一,Apple不斷致力於開⒎弦到綾曜嫉牟泛圖際酰ü岣呋ゲ僮饜嶽從嘔沒У氖褂錳逖椋殼癚uickTime已被國際標準組織(ISO)選爲MPEG-4的基本文件格式,可預見Apple將有更多MPEG-4 產品和技術的推出。
QuickTime正迅速成爲世界領先的跨平臺多媒體技術,並且是迄今爲止惟一的開放式源代碼、基於標準的數字流解決方案。ZDNet在2000年9月對於三種流式媒體服務器的特徵比較說明了QTSS不只僅被技術開發者關注,並且能夠經過簡單的定製成爲成熟強大的產品,評測結果可見表1。
表1  ZDNet對三類產品的評測結果
服務器模塊    QTSS 2.01     Media Server 7       RealServer Basic 7
操做系統支持       Windows NT, 2000; FreeBSD; Linux; Mac OS; Solaris       Windows NT, 2000       Windows NT, 2000
併發流個數    2,000      2,000      25 free/3000 pro
現場直播和廣播    Yes  Yes  Yes
在線廣告支持      Yes  Yes  Yes
PPV/流加密   No / No   Yes / Yes Yes / Yes
分配流能力    No   Yes  Yes
SMIL標準支持     Yes  No   Yes
RTSP標準支持     Yes        No   Yes
多播支持       Yes  Yes  Yes
狀態報告       Yes  Yes  Yes
服務器日誌    Yes  Yes  Yes
防火牆和代理支持       Yes  Yes  Yes
遠程監控       Yes  Yes  Yes
客戶可使用QuickTime Player或其餘支持QuickTime的應用程序在Windows或Macintosh平臺上接收視頻流,並且QuickTime Player能夠從蘋果公司的網站上下載無償使用。若是安裝了QuickTime的插件,客戶還能夠直接經過瀏覽器收看。
客戶但願點播一個節目時,QuickTime Player或插件將向QTSS發送請求,指明要點播的節目名。若是該節目存在,QTSS將向客戶發送相應的視頻流。當客戶但願收看現場直播(或實時廣播)時,它首先從QTSS得到關於當前頻道的編碼格式、地址等相關信息,而後再接受該頻道的媒體流。
對於那些但願在Internet上實時流化視頻或音頻信息的用戶,QTSS服務器將是一個很好的選擇,經過它可實現多項任務,例如:
建立一個24小時在線的Internet廣播電臺;
現場實況轉播:如公司會議、體育節目等;
建立遠程學習站點:如可以點播視頻和演講; 
       圖1是一個利用QTSS服務器創建的現場直播場景。
2     Darwin流化服務器介紹
Darwin Streaming Server(簡稱DSS)是QuickTime Streaming Server開放式源代碼的版本,同時支持FreeBSD、Linux、Solaris、Windows NT和Windows 2000等多個操做系統,是當前全部同類產品中支持平臺最多的一個。DSS的源代碼和相關文檔可從如下站點得到:http://www.apple.com
DSS源代碼徹底採用標準C++語言寫成,編程風格很是優秀,每一個C++類都對應着一對和類同名的.h/.cpp文件。可是因爲大量採用了面向對象的概念,如繼承、多態等等;並且源文件和類至關多,因此不太容易講清楚。所以,讀者最好事先把代碼完整的過濾一兩遍,再配合本文,就能看得更清楚點。
整個服務器包括多個子系統,分別存放在獨立的工程內,如圖2所示。
其中,最爲重要的是基礎功能類庫(CommonUtilitiesLib)和流化服務器(StreamingServer)兩個工程,前者是整個系統的通用代碼工具箱,包括了線程管理、數據結構、網絡和文本分析等多個功能模塊。DSS和其餘相關的工具使用基礎功能類庫工程中定義的功能類實現如下三個目標:
(1)抽象出系統中相同或相似的功能,用於下降代碼的冗餘度;
(2)封裝基本功能,簡化高層編碼的複雜度;
(3)隔離開操做系統平臺相關的代碼。
而流化服務器工程中包含了DSS對多個國際標準的實現,是整個服務器的主工程。在本文中,咱們將重點分析這兩個工程中的核心代碼和模塊。另外,咱們還將簡單介紹利用DSS提供的開發接口(Module)擴展和定製服務器的方法。
DSS實現了四種IETF制定的國際標準,分別是:實時流傳輸協議RTSP(Real-time Streaming Protocol, RFC 2326)、實時傳輸協議(RTP Real-time Transfer Protocol,RFC 1889)、實時傳輸控制協議RTCP(Real-time Transport Control Protocol,RFC 1889)、會話描述協議SDP(Session Description Protocol,RFC 2327)。這四個標準是開發全部流式媒體產品都必須掌握的,所以在對相關代碼進行分析和二次開發以前,但願讀者瞭解上述四種協議的基本思想,上述協議樣本可從如下網站得到:http://www.ietf.org
3     基礎功能類庫(Common Utilities)
3.1   OS類
Darwin Streaming Server支持包括Windows,Linux以及Solaris在內的多種操做系統平臺。咱們知道,Windows和Unix(或Unix-like)操做系統之間不管從內核仍是編程接口上都有着本質的區別,即便是Linux和Solaris,在編程接口上也大爲不一樣。爲此,DSS開發了多個用於處理時間、臨界區、信號量、事件、互斥量和線程等操做系統相關的類,這些類爲上層提供了統一的使用接口,但在內部卻須要針對不一樣的操做系統採用不一樣的方法實現。表2羅列出了DSS中的主要OS類和數據結構。
表2  DSS中的主要OS類和數據結構
類(數據結構)名       主要功能
OS   平臺相關的功能類,如內存分配、時間等
OSCond  狀態變量的基本功能和操做
OSMutex 互斥量的基本功能和操做
OSThread       線程類
OSFileSource  簡單文件類
OSQueue 隊列類
OSHashTable  哈希表類
OSHeap   堆類
OSRef     參考引用類
3.1.1       OSMutex/OSCond Class
在有多個線程併發運行的環境中,能同步不一樣線程的活動是很重要的,DSS開發了OSMutex和OSCond兩個類用以封裝不一樣操做系統對線程同步支持的差別。
咱們首先分析OSMutex類,這個類定義了廣義互斥量的基本操做,類定義以下:
class OSMutex
{
1     public:
2            OSMutex();         //構造函數
3            ~OSMutex();              //析構函數
4            inline void Lock();              //加鎖
5            inline void Unlock();    //解鎖
6            inline Bool16 TryLock();     //異步鎖,不管是否成功當即返回
7     private:
8     #ifdef __Win32__
9            CRITICAL_SECTION fMutex;  //臨界區
10           DWORD        fHolder;                     //擁有臨界區的線程id
11           UInt32           fHolderCount;             //進入臨界區線程數
              //其餘略…
}
在Windows平臺上,OSMutex類是經過臨界區(CRITICAL_SECTION)來實現的,第10行定義了臨界區變量fMutex。類實例化時構造函數調用InitializeCriticalSection(&fMutex)初始化臨界區變量,對應的在析構函數中調用DeleteCriticalSection(&fMutex)清除。
Lock()函數用於對互斥量加鎖,它調用私有方法RecursiveLock實現:
void        OSMutex::RecursiveLock()
{
       // 當前線程已經擁有互斥量,只需增長引用計數
1     if (OSThread::GetCurrentThreadID() == fHolder)
2     {
3            fHolderCount++;  //增長引用計數
4            return;
5     }
6     #ifdef __Win32__
7            ::EnterCriticalSection(&fMutex); //申請進入臨界區
8     #else
9            (void)pthread_mutex_lock(&fMutex);
10    #endif
11    Assert(fHolder == 0);
12    fHolder = OSThread::GetCurrentThreadID();    //更新臨界區擁有者標誌
13    fHolderCount++;  
14    Assert(fHolderCount == 1);
}
       第1行檢測若是當前線程已經擁有互斥量,就只需將內部計數fHolderCount加1,以便紀錄正在使用互斥量的方法數。若是當前線程尚未獲得互斥量,第7行調用EnterCriticalSection()函數申請進入臨界區;若是當前已經有其餘線程進入臨界區,該函數就會阻塞,使得當前線程進入睡眠狀態,直到佔用臨界區的線程調用LeaveCriticalSection(&fMutex)離開臨界區後纔可能被喚醒。一旦線程進入臨界區後,它將首先更新臨界區持有者標誌(第12行),同時將臨界區引用計數加1。
       注意到另一個函數TryLock(),該函數也是用於爲互斥量加鎖,但與Lock()不一樣的是,TryLock()函數爲用戶提供了異步調用互斥量的功能,這是由於它調用::TryEnterCriticalSection(&fMutex)函數申請進入緩衝區:若是臨界區沒有被任何線程擁有,該函數將臨界區的訪問區給予調用的線程,並返回TRUE,不然它將馬上返回FALSE。TryEnterCriticalSection()和EnterCriticalSection()函數的本質區別在於前者從不掛起線程。
接着分析OSCond類,該類定義了狀態變量(Condition Variable)的基本操做,類定義以下:
class OSCond 
{
1     public:
2            OSCond();   //構造函數
3            ~OSCond(); //析構函數
              
4            inline void      Signal();       //傳信函數
5            inline void      Wait(OSMutex* inMutex, SInt32 inTimeoutInMilSecs = 0);   
//等待傳信函數
6            inline void       Broadcast(); //廣播傳信函數
7     private:
8     #ifdef __Win32__
9            HANDLE                     fCondition;   //事件句柄
10           UInt32                         fWaitCount;  //等待傳信用戶數
//其餘略…
       }
       雖然同是用於線程同步,但OSCond類與OSMutex大不相同,後者用來控制對關鍵數據的訪問,而前者則經過發信號表示某一操做已經完成。在Windows平臺中,OSCond是經過事件(event)來實現的;構造函數調用CreateEvent()函數初始化事件句柄fCondition,而析構函數則調用CloseHandle()關閉句柄。
       OSCond的使用流程是這樣的:線程調用Wait(OSMutex* inMutex, SInt32 inTimeoutInMilSecs = 0)函數等待某個事件的發生,其中inTimeoutInMilSecs是最長等待時間,0表明無限長。Wait()函數內部調用了WaitForSingleObject (fCondition, theTimeout)函數,該函數告訴系統線程在等待由事件句柄fCondition標識的內核對象變爲有信號,參數theTimeout告訴系統線程最長願意等待多少毫秒。若是指定的內核對象在規定時間內沒有變爲有信號,系統就會喚醒該線程,讓它繼續執行。而函數Signal()正是用來使事件句柄fCondition有信號的。Signal()函數內部實現很簡單,只是簡單調用SetEvent函數將事件句柄設置爲有信號狀態。
       使用OSCond的過程當中存在一種需求,就是但願通知全部正在等待的用戶事件已經完成,而Signal()函數每次只能通知一個用戶,所以又開發了另一個廣播傳信函數以下:
inline void OSCond::Broadcast()
{     //提示:本函數至關循環調用Signal()函數
1     #ifdef __Win32__
2     UInt32 waitCount = fWaitCount;       //等待傳信的用戶數
3     for (UInt32 x = 0; x ; x++)        //循環爲每一個用戶傳信
4     {
5            BOOL theErr = ::SetEvent(fCondition);             //設置事件句柄爲有信號狀態
6            Assert(theErr == TRUE);
7     }
//此處略…
       }
       Broadcast首先統計全部等待傳信的用戶數(第2行),而後用一個循環爲每一個用戶傳信(第3~7)行。這種編程方法雖然不是很優雅(elegant),可是因爲Windows平臺上不支持廣播傳信功能(Linux和Solaris均支持),也只好如此。
3.1.2       OSThread Class
OSThread是DSS中最重要的類之一,它封裝而且定義了使用線程的方式,所以須要重點討論。OSThread類的定義以下:
class OSThread
{
1     public:
       // 必須在使用其餘OSThread函數前調用該初始化函數
2     static void              Initialize();
                            
3     OSThread(); //構造函數
4     virtual                                 ~OSThread();      //析構函數
       
       //子類繼承該純虛函數完成本身的工做
5     virtual     void                     Entry() = 0;  
       
6     void                     Start(); //啓動線程
7     void                     Join();  //等待線程運行完成後刪除
8     void                     Detach();     //使線程處於fDetached狀態
9     static void              ThreadYield();     //Windows平臺不用
10    static void              Sleep(UInt32 inMsec); //讓線程睡眠
       …
11    private:
       //標識線程的狀態
12    Bool16 fStopRequested:1;
13    Bool16 fRunning:1;
14    Bool16 fCancelThrown:1;
15    Bool16 fDetached:1;
16    Bool16 fJoined:1;
       …
17    static void              CallEntry(OSThread* thread);//調用子類重載的虛函數
18    #ifdef __Win32__
//使用_beginghreadex建立線程時的標準入口函數
19    static unsigned int WINAPI _Entry(LPVOID inThread); 
20    #else
21    static void*    _Entry(void* inThread); //unix下的入口函數
22    #endif
}
OSThread封裝了線程的基本功能,一個OSThread的實例表明一個線程。用戶經過繼承OSThread,而且重載其中的純虛函數Entry(第5行),從而將本身的任務交給該線程運行。OSThread內部運行機制比較複雜,爲此咱們用圖3所示的流程來描述其運行過程。
      另外,OSThread對於線程的狀態定義了一套完整的控制方法。當用戶調用start()函數後,按照上圖,最終將調用CallEntry()函數,而該函數在調用Entry()以前將線程設定爲運行狀態(thread->fRunning = true),當Entry()函數運行完後再設爲非運行狀態;在運行過程當中,用戶能夠經過StopAndWaitForThread()、join()、Detach()以及ThrowStopRequest()等函數改變線程其餘狀態變量。
3.1.3       OSHashTable/OSQueue/OSHeap/OSRef Class
DSS定義了幾個通用的較爲複雜的數據結構,它們都以類的方式封裝。這些數據結構不但貫穿於DSS的全部源代碼,並且因爲其封裝的十分好,讀者能夠在看懂源代碼的基礎上很容易的將它們從DSS的工程中抽取出來,構建本身的基礎類庫,爲未來的開發工做打下良好的基礎。另外,對這些基礎數據結構源代碼的研究將提升咱們對於面向對象技術的掌握和領會。
       最主要的數據結構有四種:哈希表(OSHashTable)、隊列(OSQueue)、堆(OSHeap)和對象引用表(OSRef)。前三種是咱們在編程中大量使用的數據結構,而對象引用表則是相似於COM/DCOM組件編程中IUNKOWN接口功能的數據結構,它首先爲每一個對象創建了一個字符串形式的ID,以便於經過這個ID找到對象(相似於QueryInterface);另外OSRef類還爲每一個對象實例創建了引用計數,只有一個對象再也不被任何人引用,纔可能被釋放(相似於AddRef和Release)。
       鑑於這幾個類在結構上有類似之處,下面咱們將分析OSHashTable的源代碼,以便可以幫助讀者更好的理解其餘幾個類。OSHashTable的代碼以下:
       template
       class OSHashTable {
       /*提示:OSHashTable被設計成爲一個類模版,兩個輸入參數分別爲:class T:實際的對象類;class K:用於爲class T計算哈希表鍵值的功能類。*/
1     public:
2     OSHashTable( UInt32 size )   //構造函數,入參是哈希表中對象的最大個數
3     {
4            fHashTable = new ( T*[size] );   //申請分配size個哈希對象class T的空間
5            Assert( fHashTable );
6            memset( fHashTable, 0, sizeof(T*) * size );      //初始化
7            fSize = size;
/*下面的代碼決定用哪一種方式爲哈希表的鍵值計算索引;
若是哈希表的大小不是2的冪,只好採用對fSize求餘的方法;
不然能夠直接用掩碼的方式,這種方式相對速度更快*/
8            fMask = fSize - 1;
9            if ((fMask & fSize) != 0)             //fSize不是2的冪
10                  fMask = 0;
11           fNumEntries = 0; //當前對象數
12    }
13    ~OSHashTable()     //析構函數
14    {
15           delete [] fHashTable;   //釋放空間
16    }
//下面介紹向哈希表中添加一個class T對象的源代碼
17    void Add( T* entry ) {
18           Assert( entry->fNextHashEntry == NULL );
                     /*利用功能類class K,計算class T對象的哈希鍵值,其計算方法由用戶在class K中定義*/
       19           K key( entry );     
20           UInt32 theIndex = ComputeIndex( key.GetHashKey() );//利用鍵值計算索引
21           entry->fNextHashEntry = fHashTable[ theIndex ]; //在新加對象中存儲索引值
22           fHashTable[ theIndex ] = entry; //將該對象插入到索引指定的位置
23           fNumEntries++;   /
24    }
//下面介紹從哈希表中刪除一個class T對象的源代碼
25    void Remove( T* entry )
26    {
//首先從哈希表中找到待刪除的對象
//一、計算哈希鍵值和其對應的對象索引
27           key( entry );        
28           UInt32 theIndex = ComputeIndex( key.GetHashKey() );   
29           T* elem = fHashTable[ theIndex ];
30           T* last = NULL;
/*二、經過對象索引查找對象,若是不是要找的對象,接着找下一個,直到找到爲止。這是由於,存放的時候就是按照這種模式計算索引的。*/
31           while (elem && elem != entry) { 
32                  last = elem;
33                  elem = elem->fNextHashEntry;
34           }
              //找到該對象,將其刪除
35           if ( elem )       
36           {
37                  Assert(elem);
38                  if (last)    
39                         last->fNextHashEntry = elem->fNextHashEntry;
40                  else //elem在頭部
41                         fHashTable[ theIndex ] = elem->fNextHashEntry;
42                  elem->fNextHashEntry = NULL;
43                  fNumEntries--;
44           }
45    }
//下面介紹從哈希表中查找一個class T對象的方法
46    T* Map( K* key )  //入參爲哈希鍵值
47    {
48                  UInt32 theIndex = ComputeIndex( key->GetHashKey() ); //計算索引
49                  T* elem = fHashTable[ theIndex ];     //找到索引對應的對象
50                  while (elem) {
51                         K elemKey( elem );
52                         if (elemKey =*key) //檢查是否找對
53                                break;
54                         elem = elem->fNextHashEntry;   //若是不是,繼續找下一個
55                  }
56                  return elem;
57           }
//如下略…
}
       以上介紹了哈希表的構造以及三種基本操做:添加、刪除和查詢。另外,DSS還定義了OSHashTableIter類用於枚舉OSHashTable中的class T對象;其中主要的操做有First和Next等,限於篇幅,此處就再也不詳述。
3.2   Tasks類
由於服務器從總體上採用了異步的運行模式,這就須要一種用於事件通訊的機制。舉例來講:一個RTSP鏈接對應的Socket端口監測到網絡上有數據到達,此時必須有一個模塊(或代碼)被通知(notify)去處理這些數據。爲此,DSS定義了Task及其相關類做爲實現這一通訊機制的核心。
在Task.h/cpp文件中,定義了三個主要的類,分別是:任務線程池類(TaskThreadPool Class)、任務線程類(TaskThread Class)以及任務類(Task Class)。
每一個Task對象有兩個主要的方法:Signal和Run。當服務器但願發送一個事件給某個Task對象時,就會調用Signal()方法;而Run()方法是在Task對象得到處理該事件的時間片後運行的,服務器中的大部分工做都是在不一樣Task對象的Run()函數中進行的。每一個Task對象的目標就是利用很小的且不會阻塞的時間片完成服務器指定某個工做。
任務線程類是上文介紹的OSThread類的一個子類,表明專門用於運行任務類的一個線程。在每一個任務線程對象內部都有一個OSQueue_Blocking類型的任務隊列,存儲該線程須要執行的任務。後面的分析能夠看到,服務器調用一個任務的Signal函數,實際上就是將該任務加入到某個任務線程類的任務隊列中去。另外,爲了統一管理這些任務線程,DSS還開發了任務線程池類,該類負責生成、刪除以及維護內部的任務線程列表。圖4描述了任務類的運行。
       下面咱們首先分析TashThread類,該類的定義以下:
class TaskThread : public OSThread     //OSThread的子類
{     //提示:全部的Task對象都將在TaskThread中運行
       1     public:
       2     TaskThread() :       OSThread(), fTaskThreadPoolElem(this){}  //構造函數
3     virtual                   ~TaskThread() { this->StopAndWaitForThread(); } //析構函數
       4     private:
              …
       5     virtual void     Entry();       //從OSThread重載的執行函數,仍然可以被子類重載
       6     Task*                   WaitForTask();    //檢測是否有該執行的任務
              
       7     OSQueueElem        fTaskThreadPoolElem;       //對應的線程池對象
       8     OSHeap                        fHeap; //紀錄任務運行時間的堆,用於WaitForTask函數
              /*關鍵數據結構:任務隊列;在Task的Signal函數中直接調用fTaskQueue對象的EnQueue函數將本身加入任務隊列*/
       9     OSQueue_Blocking fTaskQueue; 
              //此處略…
       }
       做爲OSThread的子類,TaskThread重載了Entry函數,一旦TaskThread的對象被實例化,便運行該函數。Entry()函數的主要任務就是調用WaitForTask()函數監測任務隊列,若是發現新任務,就在規定時間內執行;不然,就被阻塞。下面咱們簡要分析Entry()函數的流程:
       void TaskThread::Entry()
{
       1     Task* theTask = NULL; //空任務
       
       2     while (true) //線程循環執行
       3     {     //監測是否有須要執行的任務,若是有就返回該任務;不然阻塞;
       4            theTask = this->WaitForTask(); 
       5            Assert(theTask != NULL);
              
6            Bool16 doneProcessingEvent = false; //還沒有處理事件
              
7            while (!doneProcessingEvent)
       8            {
       9            theTask->fUseThisThread = NULL; // 對任務的調度獨立於線程
       10           SInt64 theTimeout = 0;      //Task中Run函數的返回值,重要
                     //核心部分:運行任務,根據返回值判斷任務進度
       11           if (theTask->fWriteLock)
       12           {     //若是任務中有寫鎖,須要使用寫互斥量,不然可能形成死鎖
       13                  OSMutexWriteLocker mutexLocker(&TaskThreadPool::sMutexRW);
       14                  theTimeout = theTask->Run();   //運行任務,獲得返回值
       15                  theTask->fWriteLock = false;
       16           }
       17           else
       18           {     //使用讀互斥量
       19                  OSMutexReadLocker mutexLocker(&TaskThreadPool::sMutexRW);
       20                  theTimeout = theTask->Run();   //運行任務,獲得返回值
       21           }
       22           //監測Task中Run()函數的返回值,共有三種狀況
       23           //一、返回負數,代表任務已經徹底結束
       24           if (theTimeout        25           {
       26                  delete theTask;     //刪除Task對象
       27                  theTask = NULL;
       28                  doneProcessingEvent = true;
       19           }
       30           //二、返回0,代表任務但願在下次傳信時被再次當即執行
       31           else if (theTimeout=0)
       32           {
       33                  doneProcessingEvent = compare_and_store(Task::kAlive, 0, &theTask->fEvents);
       34                  if (doneProcessingEvent)
       35                         theTask = NULL; 
       36           }
                     //三、返回正數,代表任務但願在等待theTimeout時間後再次執行
       37           else
       38           {
                     /*將該任務加入到Heap中,而且紀錄它但願等待的時間。Entry()函數將經過waitfortask()函數進行檢測,若是等待的時間到了,就再次運行該任務*/
       39                  theTask->fTimerHeapElem.SetValue(OS::Milliseconds() + theTimeout);
       40                  fHeap.Insert(&theTask->fTimerHeapElem);
       41                  (void)atomic_or(&theTask->fEvents, Task::kIdleEvent);//設置Idle事件
       42                  doneProcessingEvent = true;
       43           }
              //此處略…
       }
       注意,若是Task的Run()函數返回值TimeOut爲正數,意味着該任務是一個週期性的工做,例如發送數據的視頻泵(pump),須要每隔必定時間就發出必定量的視頻數據,直至整個節目結束。爲此,在第38~43行,將該任務加入到堆fHeap中去,而且標記該任務下次運行的時間爲TimeOut毫秒以後。未來經過調用WaitForTask()函數就能檢測到該任務是否到達規定的運行時間,WaitForTask()函數的代碼以下:
       Task* TaskThread::WaitForTask()
{
       1     while (true)
       2     {     //獲得當前時間,該函數爲靜態函數,定義見OS.h
       3            SInt64 theCurrentTime = OS::Milliseconds(); 
                     /*若是堆中有任務,且任務已經到執行時間,返回該任務。 PeekMin函數見OSHeap.h,竊聽堆中第一個元素(但不取出)*/
4     if ((fHeap.PeekMin() != NULL) && (fHeap.PeekMin()->GetValue() 從堆中取出第一個任務返回
5                   return (Task*)fHeap.ExtractMin()->GetEnclosingObject();
              //若是堆中有任務,可是還沒有到執行時間,計算須要等待的時間
       6            SInt32 theTimeout = 0;
       7            if (fHeap.PeekMin() != NULL)      //計算還需等待的時間
       8                   theTimeout = fHeap.PeekMin()->GetValue() - theCurrentTime;
       9            Assert(theTimeout >= 0);
              
              //等待theTimeout時間後從堆中取出任務返回
       10           OSQueueElem* theElem = fTaskQueue.DeQueueBlocking(this, theTimeout);
       11           if (theElem != NULL)
       12                  return (Task*)theElem->GetEnclosingObject();
       13    }     
}
       上文曾經提到,Task對象內有兩個方法:Signal和Run。Run函數是一個虛函數,由Task的子類重載,它的用法咱們在分析TaskThread的Entry()函數和WaitForTask()函數中已經討論了。而另外一個Signal()函數也十分重要:服務器經過調用該函數將Task加入TaskThread,而且執行Run()函數。Signal()函數的核心部分以下:
       void Task::Signal(EventFlags events)
{
              …
              // fUseThisThread用於指定該任務運行的任務線程
       1     if (fUseThisThread != NULL)       //存在指定任務線程
                     //將該任務加入到指定任務線程的任務隊列中
       2            fUseThisThread->fTaskQueue.EnQueue(&fTaskQueueElem);
              //不存在指定的任務線程,隨機選擇一個任務線程運行該任務
3     else
       4     {
                     //從線程池中隨機選擇一個任務線程
       5            unsigned int theThread = atomic_add(&sThreadPicker, 1);
       6            theThread %= TaskThreadPool::sNumTaskThreads;
                     //將該任務加入到上面選擇的任務線程的任務隊列中
       7            TaskThreadPool::sTaskThreadArray[theThread]-> fTaskQueue.EnQueue (&fTaskQueueElem);
       8            }
       }
       至此咱們已經將DSS的線程和任務運行機制分析完了,這種由事件去觸發任務的概念已經被集成到了DSS的各個子系統中。例如,在DSS中常常將一個Task對象和一個Socket對象關聯在一塊兒,當Socket對象收到事件(經過select()函數),相對應的Task對象就會被傳信(經過Signal()函數);而包含着處理代碼的Run()函數就將在某個任務線程中運行。
       所以,經過使用這些Task對象,咱們就可讓全部鏈接都使用一個線程來處理,這也是DSS的缺省配置方法。
3.3   Socket類
做爲一個典型的網絡服務器,DSS源代碼中的Socket編程部分是其精華之一。DSS定義了一系列Socket類用於屏蔽不一樣平臺在TCP/UDP編程接口和使用方法上的差別。DSS中的Socket類通常都採用異步模式的(即非阻塞的),並且可以向對應的Task對象傳信(Signal),這點咱們在上一節介紹過。Socket類中具備表明性的類是:EventContext、EventThread、Socket、UDPSocket、TCPSocket以及TCPListenerSocket等等,它們之間的繼承關係見圖5。
       在eventcontext.h/.cpp文件中,定義了兩個類:EventContext類和EventThread類。 Event Context提供了檢測Unix式的文件描述符(Socket就是一種文件描述符)產生的事件(一般是EV_RE 或 EV_WR)的能力,同時還能夠傳信指定的任務。EventThread類是OSThread類的子類,它自己很簡單,只是重載了OSThread的純虛函數Entry(),用以監控全部的Socket端口是否有數據到來,其代碼分析以下:
       void EventThread::Entry()
{
/*該結構定義在ev.h中,記錄Socket描述符和在該描述符上發生的事件*/
       1     struct eventreq theCurrentEvent;       
       2     ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );   //初始化該結構
       
       3     while (true)
4     {
//首先監聽Socket端口的事件
       5            int theErrno = EINTR;
       6            while (theErrno=EINTR)
       7            {
8     #if MACOSXEVENTQUEUE //Macos平臺
       9                   int theReturnValue = waitevent(&theCurrentEvent, NULL);
10    #else       //其餘平臺
              /*調用select_waitevent函數監聽全部的Socket端口,直到有事件發生爲止*/
       11                  int theReturnValue = select_waitevent(&theCurrentEvent, NULL);
12    #endif     
              …
              //有事件發生,喚醒相應的Socket端口
13    if (theCurrentEvent.er_data != NULL)
       14    {
                     //經過事件中的標識找到相應的對象參考指針
       15           StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
       16           OSRef* ref = fRefTable.Resolve(&idStr);
       17           if (ref != NULL)
       18           {     //經過參考指針獲得EventContext對象
       19                  EventContext* theContext = (EventContext*)ref->GetObject();
                            //利用EventContext對象的ProcessEvent方法傳信對應的Task
       20                  theContext->ProcessEvent(theCurrentEvent.er_eventbits);
       21                  fRefTable.Release(ref);       //減小引用計數
       22           }
//此處略…
}
       上述代碼有兩點須要注意:首先在第11行,調用select_waitevent函數監聽全部Socket端口的事件。該函數在Windows平臺上是採用WSAAsyncSelect(異步選擇)模型實現的。具體實現是:系統首先建立一個窗口類,該類專門用於接受消息;在每一個Socket端口建立後,調用WSAsyncSelect函數,同時將上述窗口類的句柄做爲參數傳入;未來這些Socket端口有事件發生時,Windows就會自動將這些事件映射爲標準的Windows消息發送給窗口類,此時select_waitevent函數經過檢查消息就可以得到對應Socket端口發生的事件。對於Windows平臺下Socket的異步編程技術細節請參閱《Windows網絡編程技術》一書。
       另外,在第20行調用的EventContext對象的ProcessEvent函數實現上很簡單,只有一行代碼:fTask->Signal(Task::kReadEvent);其中fTask爲該EventContext對象對應的Task對象;ProcessEvent函數向Task對象傳信,以便及時處理剛剛發生的Socket事件。
       與EventThread對應的EventContext對象負責維護指定的描述符,其主要函數包括InitNonBlocking、CleanUp和RequestEvent等。其中InitNonBlocking函數調用Socket API ioctlsocket將用戶指定的描述符設置爲異步,CleanUp函數用於關閉該描述符;另外,用戶經過RequestEvent函數申請對該描述符中某些事件的監聽,如前所述,該函數內部調用了WSAsyncSelect來實現這一功能。
       Socket Class、UDPSocket Class和TCPSocketClass三個類都是EventContext的子類,它們封裝了TCP和UDP的部分實現,同時擴展了EventContext中的事件,但都沒有改變其運行機制,所以此處再也不詳述,留給讀者自行分析。咱們要爲你們分析的是另一個比較複雜的Socket類TCPListenerSocket類。TCPListenerSocket用於監聽TCP端口,當一個新鏈接請求到達後,該類將賦予這個新鏈接一個Socket對象和一個Task對象的配對。首先分析TCPListenerSocket類的主要定義以下:
       class TCPListenerSocket : public TCPSocket, public IdleTask
{
/*提示:該類從有兩個基類,因此它既是一個事件監聽者,同時也是一個任務Task。做爲一個任務,給TCPListenerObject發送Kill事件就能夠刪除它*/
       1     public:
       2            TCPListenerSocket() :   TCPSocket(NULL, Socket::kNonBlockingSocketType), IdleTask(), fAddr(0), fPort(0), fOutOfDescriptors(false) {}  //構造函數
       3            virtual ~TCPListenerSocket() {}   //析構函數
              
                     //addr爲地址,port爲端口號,初始化函數自動監聽TCP端口
       4            OS_Error              Initialize(UInt32 addr, UInt16 port);
                     //子類必須重載該純虛函數,用於創建新鏈接時生成任務對象
       5            virtual Task*   GetSessionTask(TCPSocket** outSocket) = 0;
       6            virtual SInt64  Run();  //重載Task的Run函數,子類仍可重載
                     
       7     private:
                     //重載EventContext的ProcessEvent函數,用於產生Socket和Task對象配對
8            virtual void ProcessEvent(int eventBits);
       9            OS_Error       Listen(UInt32 queueLength);
//其餘略…
}
       前面咱們分析得知,EventContext類經過ProcessEvent函數來實現對任務的傳信工做,但在TCPListenerSocket 中,ProcessEvent函數被重載用來建立Socket和Task對象得配對,該函數的實現以下:
       void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
{     /*提示:該函數運行於系統惟一的EventThread線程中,因此要儘可能快速,以避免佔用過多的系統資源*/
              //此處略去部分定義…
       1     Task* theTask = NULL;     //Task對象
       2     TCPSocket* theSocket = NULL;       //Socket對象
       
              //建立對象配對
       3     while (true)
       4     {     //accept鏈接
       5            int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
       6            if (osSocket == -1) //監聽端口出錯
       7            {     //此處略去出錯處理     }
                     //用子類重載的GetSessionTask函數建立Task對象
       8            if ((theTask = this->GetSessionTask(&theSocket))=NULL) //建立出錯
       9                   close(osSocket);
       10           else  //建立成功,接着建立Socket對象
       11           {     
       12                  Assert(osSocket != EventContext::kInvalidFileDesc);
                            //此處略去部分對新建鏈接端口的設置(setsockopt函數)
                            //建立新的Socket對象
       13                  theSocket->Set(osSocket, &addr);
       14                  theSocket->InitNonBlocking(osSocket); //初始化
       15                  theSocket->SetTask(theTask); //設置對應的任務
       16           theSocket->RequestEvent(EV_RE); //新對象監聽讀事件
       17           }
       18    }
              //處理完一次鏈接請求後,TCPListenerSocket對象還要接着監聽
       19    this->RequestEvent(EV_RE);
}
       對Socket類的分析基本完成了,從中咱們能夠發現,DSS對於網絡傳信和任務調度之間的處理很是精密,環環相扣,在某種程度上甚至是有些過a於花哨。可是這些基本類是上層RTSP/RTP等服務器子系統編碼的基礎,所以但願讀者可以從本質上掌握這些代碼。
4     核心功能庫(Server Core)
4.1 RTSP 子系統
       RTSP標準是實時流控制協議(Real-Time Streaming Protocol RFC2326)的簡稱,它被客戶和流式媒體服務器用來交換對媒體的控制信息。圖6是RTSP基本操做的描述。
再給出一個RTSP協議的例子以下:
       DSS開發了一個RTSP子系統來支持標準的RTSP協議,本節將分析這些源代碼。
       首先,DSS定義了一個TCPListenerSocket類的子類RTSPListenerSocket,用於監聽RTSP鏈接請求。RTSPListenerSocket類作的惟一一件事就是重載了GetSessionTask函數,當客戶的鏈接請求到達後,它建立了一個Socket對象和RTSPSession對象的配對。RTSPSession對象是Task類的子類,是專門用於處理RTSP請求的任務類。
       如圖7所示,RTSP鏈接創建後,服務器會爲每一個客戶維護一個Socket對象和RTSPSession對象的配對;當客戶的RTSP請求到達時,Socket對象就會調用RTSPSession對象的Signal方法傳信,即將RTSPSession對象加入到TaskThread對象的任務隊列中去;而當時間片到來,TaskThread線程就會調用RTSPSession對象的Run方法,這個方法就會處理客戶發送過來的RTSP請求。所以,下面咱們將主要分析RTSPSession的Run方法。
       爲了跟蹤當前處理的狀況,RTSPSession類內部定義了多個狀態,而Run方法其實就是經過在這些狀態之間不斷切換,同時對客戶的RTSP請求作出不一樣的處理。
                     enum
                     {
                     //RTSPSession的基本狀態
                     kReadingRequest= 0,
                     kFilteringRequest= 1,
                     kRoutingRequest= 2,
                     kAuthenticatingRequest= 3,
                     kPreprocessingRequest= 4,
                     kProcessingRequest= 5,
                     kSendingResponse= 6,
                     kPostProcessingRequest       = 7,
                     kCleaningUp= 8,
              
                     //當RTSP協議經過HTTP隧道實現時將用到下面的狀態
       kWaitingToBindHTTPTunnel = 9,         
kSocketHasBeenBoundIntoHTTPTunnel = 10,
kHTTPFilteringRequest = 11,               
                     kReadingFirstRequest = 12,                 
                     kHaveNonTunnelMessage = 13                          
              }
       另外,值得注意的是,DSS提供一種稱爲Module的二次開發模式,開發人員能夠編寫新的Module而且註冊其但願運行的狀態,系統就會在相應的狀態下調用該Module,從而將控制權暫時交給二次開發的代碼,以便加強系統的功能。簡單起見,下面咱們將分析不存在客戶模塊的Run()函數源代碼。首先分析其主框架以下:
       SInt64 RTSPSession::Run()
{
       1     EventFlags events = this->GetEvents();     //取出事件
       2     QTSS_Error err = QTSS_NoErr;
       3     QTSSModule* theModule = NULL;
       4     UInt32 numModules = 0;
       
       // 設定當前的Module狀態
       5     OSThread::GetCurrent()->SetThreadData(&fModuleState);
       
       //檢查該鏈接是否超時,若是是就設定狀態斷掉該鏈接
       6     if ((events & Task::kTimeoutEvent) || (events & Task::kKillEvent))
       7            fLiveSession = false;
              
       8     while (this->IsLiveSession()) //若是鏈接還沒有拆除,執行狀態機
9     {
              /* 提示:下面是RTSPSession的狀態機。由於在處理RTSP請求過程當中,有多個地方須要Run方法返回以便繼續監聽新的事件。爲此,咱們須要跟蹤當前的運行狀態,以便在被打斷後還能回到原狀態*/
       10           switch (fState)
       11           {
       12                  case 狀態1: //處理略
13    case 狀態2: //處理略…
14    case 狀態n: //處理略
       15           }     //此處略…
       }
       Run函數的主框架比較簡單,其核心就在於10~15的狀態機,所以咱們但願按照客戶請求到達而且被處理的主要流程爲讀者描述該狀態機的運轉。
       1第一次請求到達進入kReadingFirstRequest狀態,該狀態主要負責從RTSPRequestStream類的對象fInputStream中讀出客戶的RTSP請求,其處理以下:
              case kReadingFirstRequest:
              {
              1     if ((err = fInputStream.ReadRequest())=QTSS_NoErr)
              2     {/* RequestStream返回QTSS_NoErr意味着全部數據已經從Socket中讀出,但尚不能構成一個完整的請求,所以必須等待更多的數據到達*/
              3            fInputSocketP->RequestEvent(EV_RE); //接着請求監聽讀事件
              4            return 0;      //Run函數返回,等待下一個事件發生
              5     }
              6     if ((err != QTSS_RequestArrived) && (err != E2BIG))
              7     {//出錯,中止處理
              8            Assert(err > 0); 
              9            Assert(!this->IsLiveSession());
              10           break;
              11    }
                     //請求已經徹底到達,轉入kHTTPFilteringRequest狀態
              12    if (err = QTSS_RequestArrived)
              13           fState = kHTTPFilteringRequest;
                     //接收緩衝區溢出,轉入kHaveNonTunnelMessage狀態
       14    if (err=E2BIG)
              15           fState = kHaveNonTunnelMessage;
              }
              continue;
       2正常狀況下,在得到一個完整的RTSP請求後(上第12行),系統將進入kHTTPFilteringRequest狀態該狀態檢查RTSP鏈接是否須要通過HTTP代理實現;如不須要,轉入kHaveNonTunnelMessage狀態。
       3進入kHaveNonTunnelMessage狀態後,系統建立了RTSPRequest類的對象fRequest,該對象解析客戶的RTSP請求,並保存各類屬性。fRequest對象被傳遞給其餘狀態處理。
       4接着進入kFilteringRequest狀態,二次開發人員能夠經過編寫Module對客戶的請求作出特殊處理。若是客戶的請求爲正常的RTSP請求,系統調用SetupRequest函數創建用於管理數據傳輸的RTPSession類對象,其源代碼分析以下:
       void RTSPSession::SetupRequest()
{
       // 首先分析RTSP請求,細節見RTSPRequest.h/.cpp
       1     QTSS_Error theErr = fRequest->Parse();
2     if (theErr != QTSS_NoErr)   
       3            return;
       
              //OPTIONS請求,簡單發回標準OPTIONS響應便可
4     if (fRequest->GetMethod() = qtssOptionsMethod)
       5     {//此處略去部分處理代碼…
6     }
              
       //DESCRIBE請求,必須保證已經有了SessionID
       7     if (fRequest->GetMethod() = qtssDescribeMethod)
       8     {
       9            if (fRequest->GetHeaderDictionary()->GetValue(qtssSessionHeader)->Len > 0)
       10           {
       11                  (void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientHeaderFieldNotValid, qtssMsgNoSesIDOnDescribe);
12                  return;
       13           }
14    }
       
              //查找該請求的RTPSession
       15    OSRefTable* theMap = QTSServerInterface::GetServer()->GetRTPSessionMap();
       16    theErr = this->FindRTPSession(theMap);
       17    if (theErr != QTSS_NoErr)
       18           return;
       
       //若是未查找到,創建一個新的RTPSession
       19    if (fRTPSession= NULL)
       20    {
       21           theErr = this->CreateNewRTPSession(theMap);
       22           if (theErr != QTSS_NoErr)
       23                  return;
       24    }
              //此處略…
}
       5進入kRoutingRequest狀態,調用二次開發人員加入的Module,用於將該請求路由(Routing)出去。缺省狀況下,系統自己對此狀態不作處理。
       6進入kAuthenticatingRequest狀態,調用二次開發人員加入的安全模塊,主要用於客戶身份驗證以及其餘如規則的處理。讀者若是但願開發具備商業用途的流式媒體服務器,該模塊必須進行二次開發。
       7進入kPreprocessingRequest和kProcessingRequest及kPostProcessingRequest狀態,這三種狀態都是經過調用系統自帶或二次開發人員添加的Module來處理RTSP請求,例如系統提供了QTSSReflector Module、QTSSSplitter Module以及QTSSFile Module等模塊。其中比較重要的QTSSFile Module屬於QTLib庫的部分,此處再也不詳述。
       8進入kSendingResponse狀態,用於發送對客戶RTSP請求處理完成以後的響應。系統在該狀態調用了fOutputStream.Flush()函數將在fOutputStream中還沒有發出的請求響應經過Socket端口徹底發送出去。
       9進入kCleaningUp狀態,清除全部上次處理的數據,並將狀態設置爲kReadingRequest等待下次請求到達。
       RTSPSession的主流程分析完了,但輔助其操做的多個RTSP類還須要讀者自行分析,它們分別是:RTSPSessionInterface Class、RTSPRequest Class、RTSPRequestInterface Class、RTSPRequestStream Class以及RTSPResponseStream Class等等。
4.2 RTP子系統
       RTP標準是實時傳輸協議(Real-Time Transfer Protocol)的簡稱,它被客戶和流式媒體服務器用來處理流式媒體數據的傳輸。在介紹RTSP的運行流程時,咱們發現RTSPSession對象經過調用SetupRequest函數爲客戶創建RTPSession對象。RTPSession類是Task類的子類,所以它重載了Task類的Run函數,該函數經過調用FileModule.cpp文件中的SendPacket()函數向客戶發送RTP協議打包的流式媒體數據。當客戶經過利用RTSP向RTSPSession對象發出PLAY命令後,RTSPSession對象將調用RTPSession對象的Play()函數。Play函數準備好須要打包發送的數據後,利用Task類的Signal函數傳信RTPSession對象,使其被加入某個TaskThread的任務隊列,從而運行其Run函數。
另外,對同一個節目中的每個獨立的RTP流(如音頻流或視頻流等),DSS都定義了一個RTPStream類與之對應;顯然一個RTPSession對象可能包含多個RTPStream對象。整個RTP子系統的核心運行流程見圖8。
       下面,咱們首先分析RTPSession中Run()函數的用法:
       SInt64 RTPSession::Run()
{ //提示:該函數代碼在TaskThread內運行
1     EventFlags events = this->GetEvents(); //取出事件
2     QTSS_RoleParams theParams;
       //提供給其餘Module運行的參數,第一個成員是對象自己
       3     theParams.clientSessionClosingParams.inClientSession = this;        
       //設定本身爲當前運行的線程
       4     OSThread::GetCurrent()->SetThreadData(&fModuleState);
              /*若是事件是通知RTPSession對象死亡,就準備自殺。可能致使這種狀況的有兩種事件:自殺kKillEvent;超時kTimeoutEvent*/
       5     if ((events & Task::kKillEvent) || (events & Task::kTimeoutEvent) || (fModuleDoingAsyncStuff))
       6     {     //處理對象自殺代碼,此處略…
       7            return –1;     //返回出錯信息,這樣析構函數就會被調用,從而讓對象徹底死亡
       8     }
              //若是正處於暫停(PAUSE)狀態,什麼都不作就返回,等待PLAY命令
       9     if ((fState == qtssPausedState) || (fModule == NULL))
       10           return 0;
       
              //下面代碼負責發送數據
       11    {     //對Session互斥量加鎖,防止發送數據過程當中RTSP請求到來
       12           OSMutexLocker locker(&fSessionMutex);
                     //設定數據包發送時間,防止被提早發送
       13           theParams.rtpSendPacketsParams.inCurrentTime = OS::Milliseconds();
       14           if (fPlayTime > theParams.rtpSendPacketsParams.inCurrentTime) //未到發送時間
       15                  theParams.rtpSendPacketsParams.outNextPacketTime=fPlayTime- theParams.rtpSendPacketsParams.inCurrentTime; //計算還需多長時間纔可運行
       16           else
       17           {     //下次運行時間的缺缺省值爲0
       18                  theParams.rtpSendPacketsParams.outNextPacketTime = 0;
                     // 設置Module狀態
       19                  fModuleState.eventRequested = false;
       20                  Assert(fModule != NULL);
                            //調用QTSS_RTPSendPackets_Role內的函數發送數據,見FileModule.cpp
       21                  (void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams);
                            //將返回值從負數改成0,不然任務對象就會被TaskThread刪除
       22                  if (theParams.rtpSendPacketsParams.outNextPacketTime        23                         theParams.rtpSendPacketsParams.outNextPacketTime = 0;
       24           }
       25    }
              //返回下一次但願被運行的時間;返回值含義見前文的分析
       26    return theParams.rtpSendPacketsParams.outNextPacketTime;
}
       從上面分析可見,正常狀態下Run函數的返回值有兩種:若是返回值爲正數,表明下一次發送數據包的時間,規定時間到來的時候,TaskThread線程會自動調用Run函數;若是返回值等於0,在下次任何事件發生時,Run函數就會被調用,這種狀況每每發生在全部數據都已經發送完成或者該RTPSession對象將要被殺死的時候。
       在第21行咱們看到,Run函數調用了QTSSFileModule中的QTSS_RTPSendPackets_Role發送數據。在QTSSFileModule.cpp文件的QTSSFileModule_Main函數內,系統又調用了SendPackets函數,這纔是真正發送RTP數據包的函數,咱們對其代碼分析以下:
       QTSS_Error SendPackets(QTSS_RTPSendPackets_Params* inParams)
{
              //此處略去部分定義…
       //獲得要發送數據的FileSession對象,其定義見QTSSFileModule.cpp文件
       1     FileSession** theFile = NULL;
       2     UInt32 theLen = 0;
       3     QTSS_Error theErr = QTSS_GetValuePtr(inParams->inClientSession, sFileSessionAttr, 0, (void**)&theFile, &theLen);
       4     if ((theErr != QTSS_NoErr) || (theLen != sizeof(FileSession*))) //出錯
       5     {     //設定出錯緣由,而後斷掉鏈接,並返回
       6            QTSS_CliSesTeardownReason reason = qtssCliSesTearDownServerInternalErr;
       7            (void) QTSS_SetValue(inParams->inClientSession, qtssCliTeardownReason, 0, &reason, sizeof(reason));
       8            (void)QTSS_Teardown(inParams->inClientSession);
       9            return QTSS_RequestFailed;
       10    }
       //該節目文件中音頻所能忍受的最大延遲
       11    maxDelayToleranceForStream = (*theFile)->fMaxAudioDelayTolerance;
       
       12    while (true)
       13    {     
                     //不存在待發送數據包,多是文件還沒有打開
       14           if ((*theFile)->fNextPacket == NULL)
       15           {
       16                  void* theCookie = NULL;
                            //得到第一個數據包,theTransmitTime爲傳輸數據花費的時間
       17                  Float64 theTransmitTime = (*theFile)->fFile.GetNextPacket(&(*theFile)->fNextPacket, &(*theFile)->fNextPacketLen, &theCookie);
       18                  if ( QTRTPFile::errNoError != (*theFile)->fFile.Error() )
                            {//讀數據出錯,斷掉鏈接,返回。此處略 }
                            …
       19                  (*theFile)->fStream = (QTSS_RTPStreamObject)theCookie; //獲得RTPStream對象
       20                  (*theFile)->fPacketPlayTime = (*theFile)->fAdjustedPlayTime + ((SInt64)(theTransmitTime * 1000)); //推遲theTransmitTime長度的播放時間
       21                  (*theFile)->fPacketWasJustFetched = true;       
       22                  if ((*theFile)->fNextPacket != NULL)
       23                  {     // 判斷流格式
       24                         QTSS_RTPPayloadType* thePayloadType = NULL;
       25                         QTSS_Error theErr = QTSS_GetValuePtr( (*theFile)->fStream, qtssRTPStrPayloadType, 0, (void**)&thePayloadType, &theLen );
                                   //設定視頻流可忍受的最大延遲時間
       26                         if (*thePayloadType == qtssVideoPayloadType)
       27                         maxDelayToleranceForStream = (*theFile)->fMaxVideoDelayTolerance;
       28                  }
       29           }
              
                     //仍無數據,說明全部數據已經傳輸完成了
       30           if ((*theFile)->fNextPacket = NULL)
       31           {     //向fStream中寫入長度爲0的空數據,以便強制緩衝區刷新
       32                  (void)QTSS_Write((*theFile)->fStream, NULL, 0, NULL, qtssWriteFlagsIsRTP);
       33                  inParams->outNextPacketTime = qtssDontCallSendPacketsAgain;
       34                  return QTSS_NoErr; //完成任務返回
       35           }
                     //提示:開始發送RTP數據包
                     //計算當前時間和該段數據應該發送的時間之間的相對間隔
       36           SInt64 theRelativePacketTime = (*theFile)->fPacketPlayTime - inParams->inCurrentTime;  // inCurrentTime = OS::Milliseconds();
              
       37           SInt32 currentDelay = theRelativePacketTime * -1L; //計算傳輸延遲
       38           theErr =  QTSS_SetValue( (*theFile)->fStream, qtssRTPStrCurrentPacketDelay, 0, ¤tDelay, sizeof(currentDelay) ); //保存該延遲
                     //若是延遲過大,就丟棄該包,等待發送下一個數據包
       39           if (theRelativePacketTime > sMaxAdvSendTimeInMsec)
       40           {
       41                  Assert( theRelativePacketTime > 0 );
       42                  inParams->outNextPacketTime = theRelativePacketTime;
       43                  return QTSS_NoErr;
       44           }
                     //此處略去部分處理視頻質量的代碼…
                     // 發送當前數據包
       45           QTSS_Error writeErr = QTSS_Write((*theFile)->fStream, (*theFile)->fNextPacket, (*theFile)->fNextPacketLen, NULL, qtssWriteFlagsIsRTP);
                     
//其他代碼略…
}
       RTP子系統是DSS中最爲複雜的部分之一,這是由於發送RTP數據包的過程不但涉及到網絡接口,並且和文件系統有着密切的關係。DSS的一個重要特徵就是可以將線索化(Hinted)過的QuickTime文件經過RTSP和RTP協議流化出去。全部分析這些文件的代碼都被提取出來而且封裝在QTFile庫中。這種封裝方式使得系統的各個部分都變得簡單:QTFile負責處理文件的分析;而DSS其餘部分負責處理網絡和協議。服務器中的RTPFileModule調用QTFile庫檢索索引過的QuickTime文件的數據包和元數據。QTFile庫的講解超出了本文的範圍,可是但願讓DSS支持其餘媒體格式的讀者可以掌握它的實現機制。
5  DSS二次開發接口:Module開發流程
       做爲一個運行於多個操做系統平臺的開發源代碼的服務器,DSS提供了一種稱爲Module的二次開發接口。使用這個開發接口,咱們能夠充分利用服務器的可擴展性及其實現的多種協議,而且可以保證和未來版本兼容。DSS中的許多核心功能也是以Module的方式預先實現而且編譯的,所以能夠說對Module的支持已經被設計到DSS的內核中去了。
       下面咱們將分析DSS的一個內嵌Module:QTSSFileModule的源代碼來講明Module的編程方式,QTSSFileModule的實如今QTSSFileModule.cpp文件中。
       每一個QTSS Module必須實現兩個函數:
首先,每一個QTSS Module必須實現一個主函數,服務器調用該函數用於啓動和初始化模塊中的QTSS函數;QTSSFileModule主函數的實現以下:
QTSS_Error QTSSFileModule_Main(void* inPrivateArgs)
{
       return _stublibrary_main(inPrivateArgs, QTSSFileModuleDispatch);
}
其中QTSSFileModuleDispatch是Module必須實現的分發函數名。
另外一個須要實現的是分發函數,服務器調用該函數實現某個特殊任務。此時,服務器將向分發函數傳入任務的名字和一個任務相關的參數塊。QTSSFileModule分發函數的實現以下:
QTSS_Error QTSSFileModuleDispatch(QTSS_Role inRole, QTSS_RoleParamPtr inParamBlock)
{     //根據傳入的任務名稱和入參執行相應的處理函數
       switch (inRole)      //任務名稱
       {
              case QTSS_Register_Role:
                     return Register(&inParamBlock->regParams);
              case QTSS_Initialize_Role:
                     return Initialize(&inParamBlock->initParams);
              case QTSS_RereadPrefs_Role:
                     return RereadPrefs();
              case QTSS_RTSPRequest_Role:
                     return ProcessRTSPRequest(&inParamBlock->rtspRequestParams);
              case QTSS_RTPSendPackets_Role:
                     return SendPackets(&inParamBlock->rtpSendPacketsParams);
              case QTSS_ClientSessionClosing_Role:
                     return DestroySession(&inParamBlock->clientSessionClosingParams);
       }
       return QTSS_NoErr;
}
       其中,分發函數的入參是一個聯合,它根據任務名稱的不一樣,具體的數據結構也不一樣,下面是該數據結構的定義:
       typedef union
{
              QTSS_Register_Params                             regParams;
              QTSS_Initialize_Params                            initParams;
              QTSS_ErrorLog_Params                           errorParams;
              //此處略去其餘多個數據結構…
} QTSS_RoleParams, *QTSS_RoleParamPtr;
       DSS提供了兩種方式把咱們本身開發的Module添加到服務器中:一種稱爲靜態模塊(Static Module),該方式將咱們開發的Module代碼直接編譯到內核中去;另外一種稱爲動態模塊(Dynamic Module),該方式將咱們開發的Module單獨編譯稱爲一個動態庫,而後修改配置,使服務器在啓動時將其加載。圖9描述了DSS啓動和關閉時模塊調用流程。
       當服務器啓動時,它首先裝載沒有被編譯進內核的動態模塊,而後才裝載被編譯進內核的靜態模塊;因爲現有的大部分系統功能都是以靜態模塊的方式存在的,若是你但願用本身的模塊替換某個系統功能,最好是編寫一個動態模塊,由於它們將早於靜態模塊被裝載。
       不管是靜態模塊仍是動態模塊,它們的代碼都是相同的,惟一的不一樣就是它們的編譯方式。首先爲了將靜態模塊編譯到服務器中,咱們必須修改QTSServer.cpp文件中的QTSServer::LoadCompiledInModules,並向其中加入如下代碼:
       QTSSModule*       myModule=new QTSSModule(*_XYZ_*);
       (void)myModule->Initialize(&sCallbacks,&_XYZMAIN_);
       (void)AddModule(MyModule);
       其中,XYZ是靜態模塊的名字,而XYZMAIN則是其主函數入口。
       動態模塊的編譯方法以下:首先單獨編譯動態模塊爲一個動態共享庫;將該共享庫與QTSS API stub library連接到一塊兒;最後將結果文件放置到/usr/sbin/QTSSModules目錄中去。此後,服務器在啓動時就將自動調用該動態模塊。
6  結束語
DSS是一項十分龐大的工程,並且隨着新版本的不斷推出和功能的加強,其內容也愈來愈豐富。限於篇幅,本文只是介紹了一些筆者認爲比較重要的模塊或類,但願可以配合讀者更好的掌握DSS的精髓。
咱們之因此研究DSS的源代碼,基本上有兩個目標:一是但願利用DSS做爲平臺進行二次開發,如增長對媒體格式的支持,增長客戶身份認證,增長對媒體內容的管理等模塊,使DSS成爲一個符合實際需求的實用系統。抱此目的的讀者在掌握DSS總體流程的基礎上,應着重於其二次開發平臺(如Module)以及底層文件和媒體格式支持庫的研究。另外一類讀者可能但願經過研究DSS源代碼,掌握在Internet環境中處理流式媒體的關鍵技術,以便爲未來開發相關底層應用作準備。對於這些讀者,筆者認爲須要下更多的功夫去研究DSS源代碼中的許多細節部分:例如高級網絡編程(Socket)、多線程之間的通訊、任務調度、系統資源(CPU、磁盤等)的合理利用以及用於流式媒體的多個標準協議(RTP/RTCP、RTSP、SDP)的具體實現等等。
做爲三大主要流式媒體應用中惟一一個開放源代碼的產品,DSS讓開發人員可以從最底層研究流式媒體技術,事實上,當前國內外許多公司正是在DSS的基礎上開發了本身的流式媒體相關產品。可是須要指出,做爲一個開放源代碼的工程,DSS的分發和開發須遵循蘋果公司給出的一份版權文件(Apple Public Source License),但願進行商業化開發的讀者應該仔細研讀,該文件可從如下網址得到:http://www.publicsource.apple.com。編程

相關文章
相關標籤/搜索