GitChat 做者:范蠡
原文:C++ 高性能服務器網絡框架設計細節
關注微信公衆號:「GitChat 技術雜談」 一本正經的講技術前端
【不要錯過文末彩蛋】java
這篇文章咱們將介紹服務器的開發,並從多個方面探究如何開發一款高性能高併發的服務器程序。須要注意的是通常大型服務器,其複雜程度在於其業務,而不是在於其代碼工程的基本框架。react
大型服務器通常有多個服務組成,可能會支持CDN,或者支持所謂的「分佈式」等,這篇文章不會介紹這些東西,由於無論結構多麼複雜的服務器,都是由單個服務器組成的。因此這篇文章的側重點是討論單個服務程序的結構,並且這裏的結構指的也是單個服務器的網絡通訊層結構,若是你能真正地理解了我所說的,那麼在這個基礎的結構上面開展任何業務都是能夠的,也能夠將這種結構擴展成複雜的多個服務器組,例如「分佈式」服務。linux
文中的代碼示例雖然是以C++爲例,但一樣適合Java(我本人也是Java開發者),原理都是同樣的,只不過Java可能在基本的操做系統網絡通訊API的基礎上用虛擬機包裹了一層接口而已(Java甚至可能基於一些經常使用的網絡通訊框架思想提供了一些現成的API,例如NIO)。有鑑於此,這篇文章不討論那些大而空、泛泛而談的技術術語,而是講的是實實在在的能指導讀者在實際工做中實踐的編碼方案或優化已有編碼的方法。另外這裏討論的技術同時涉及windows和linux兩個平臺。git
所謂高性能就是服務器能流暢地處理各個客戶端的鏈接並儘可能低延遲地應答客戶端的請求;所謂高併發,不只指的是服務器能夠同時支持多的客戶端鏈接,並且這些客戶端在鏈接期間內會不斷與服務器有數據來往。網絡上常常有各類網絡庫號稱單個服務能同時支持百萬甚至千萬的併發,而後我實際去看了下,結果發現只是能同時支持不少的鏈接而已。程序員
若是一個服務器能單純地接受n個鏈接(n可能很大),可是不能有條不紊地處理與這些鏈接之間的數據來往也沒有任何意義,這種服務器框架只是「玩具型」的,對實際生產和應用沒有任何意義。github
這篇文章將從兩個方面來介紹,一個是服務器中的基礎的網絡通訊部件;另一個是,如何利用這些基礎通訊部件整合成一個完整的高效的服務器框架。注意:本文如下內容中的客戶端是相對概念,指的是鏈接到當前討論的服務程序的終端,因此這裏的客戶端既多是咱們傳統意義上的客戶端程序,也多是鏈接該服務的其餘服務器程序。面試
###1、網絡通訊部件redis
按上面介紹的思路,咱們先從服務程序的網絡通訊部件開始介紹。數據庫
####須要解決的問題
既然是服務器程序確定會涉及到網絡通訊部分,那麼服務器程序的網絡通訊模塊要解決哪些問題?目前,網絡上有不少網絡通訊框架,如libevent、boost asio、ACE,但都網絡通訊的常見的技術手段都大同小異,至少要解決如下問題:
如何檢測有新客戶端鏈接?
如何接受客戶端鏈接?
如何檢測客戶端是否有數據發來?
如何收取客戶端發來的數據?
如何檢測鏈接異常?發現鏈接異常以後,如何處理?
如何給客戶端發送數據?
如何在給客戶端發完數據後關閉鏈接?
稍微有點網絡基礎的人,都能回答上面說的其中幾個問題,好比接收客戶端鏈接用socket API的accept函數,收取客戶端數據用recv函數,給客戶端發送數據用send函數,檢測客戶端是否有新鏈接和客戶端是否有新數據能夠用IO multiplexing技術(IO複用)的select、poll、epoll等socket API。確實是這樣的,這些基礎的socket API構成了服務器網絡通訊的地基,無論網絡通訊框架設計的如何巧妙,都是在這些基礎的socket API的基礎上構建的。可是如何巧妙地組織這些基礎的socket API,纔是問題的關鍵。咱們說服務器很高效,支持高併發,實際上只是一個技術實現手段,無論怎樣,從軟件開發的角度來說無非就是一個程序而已,因此,只要程序能最大可能地知足「儘可能減小等待或者不等待」這一原則就是高效的,也就是說高效不是「忙的忙死,閒的閒死」,而是你們均可以閒着,可是若是有活要幹,你們儘可能一塊兒幹,而不是一部分忙着依次作事情123456789,另一部分閒在那裏無所事事。說的可能有點抽象,下面咱們來舉一些例子具體來講明一下。
例如:
默認狀況下,recv函數若是沒有數據的時候,線程就會阻塞在那裏;
默認狀況下,send函數,若是tcp窗口不是足夠大,數據發不出去也會阻塞在那裏;
connect函數默認鏈接另一端的時候,也會阻塞在那裏;
又或者是給對端發送一份數據,須要等待對端回答,若是對方一直不該答,當前線程就阻塞在這裏。
以上都不是高效服務器的開發思惟方式,由於上面的例子都不知足「儘可能減小等待」的原則,爲何必定要等待呢?有沒用一種方法,這些過程不須要等待,最好是不只不須要等待,並且這些事情完成以後能通知我。這樣在這些原本用於等待的cpu時間片內,我就能夠作一些其餘的事情。有,也就是咱們下文要討論的IO Multiplexing技術(IO複用技術)。
####幾種IO複用機制的比較
目前windows系統支持select、WSAAsyncSelect、WSAEventSelect、完成端口(IOCP),linux系統支持select、poll、epoll。這裏咱們不具體介紹每一個具體的函數的用法,咱們來討論一點深層次的東西,以上列舉的API函數能夠分爲兩個層次:
層次一: select和poll
層次二: WSAAsyncSelect、WSAEventSelect、完成端口(IOCP)、epoll
爲何這麼分呢?先來介紹第一層次,select和poll函數本質上仍是在必定時間內主動去查詢socket句柄(多是一個也多是多個)上是否有事件,好比可讀事件,可寫事件或者出錯事件,也就是說咱們仍是須要每隔一段時間內去主動去作這些檢測,若是在這段時間內檢測出一些事件來,咱們這段時間就算沒白花,可是假若這段時間內沒有事件呢?咱們只能是作無用功了,說白了,仍是在浪費時間,由於假如一個服務器有多個鏈接,在cpu時間片有限的狀況下,咱們花費了必定的時間檢測了一部分socket鏈接,卻發現它們什麼事件都沒有,而在這段時間內咱們卻有一些事情須要處理,那咱們爲何要花時間去作這個檢測呢?把這個時間用在作咱們須要作的事情很差嗎?因此對於服務器程序來講,要想高效,咱們應該儘可能避免花費時間主動去查詢一些socket是否有事件,而是等這些socket有事件的時候告訴咱們去處理。這也就是層次二的各個函數作的事情,它們實際至關於變主動查詢是否有事件爲當有事件時,系統會告訴咱們,此時咱們再去處理,也就是「好鋼用在刀刃」上了。只不過層次二的函數通知咱們的方式是各不相同,好比WSAAsyncSelect是利用windows窗口消息隊列的事件機制來通知咱們設定的窗口過程函數,IOCP是利用GetQueuedCompletionStatus返回正確的狀態,epoll是epoll_wait函數返回而已。
例如,connect函數鏈接另一端,若是用於鏈接socket是非阻塞的,那麼connect雖然不能馬上鍊接完成,可是也是會馬上返回,無需等待,等鏈接完成以後,WSAAsyncSelect會返回FD_CONNECT
事件告訴咱們鏈接成功,epoll會產生EPOLLOUT事件,咱們也能知道鏈接完成。甚至socket有數據可讀時,WSAAsyncSelect產生FD_READ事件,epoll產生EPOLLIN事件,等等。因此有了上面的討論,咱們就能夠獲得網絡通訊檢測可讀可寫或者出錯事件的正確姿式。這是我這裏提出的第二個原則:儘可能減小作無用功的時間。這個在服務程序資源夠用的狀況下可能體現不出來什麼優點,可是若是有大量的任務要處理,這裏就成了性能的一個瓶頸。
####檢測網絡事件的正確姿式
根據上面的介紹,第一,爲了不無心義的等待時間,第二,不採用主動查詢各個socket的事件,而是採用等待操做系統通知咱們有事件的狀態的策略。咱們的socket都要設置成非阻塞的。在此基礎上咱們回到欄目(一)中提到的七個問題:
如何檢測有新客戶端鏈接?
如何接受客戶端鏈接?
默認accept函數會阻塞在那裏,若是epoll檢測到偵聽socket上有EPOLLIN事件,或者WSAAsyncSelect檢測到有FD_ACCEPT事件,那麼就代表此時有新鏈接到來,這個時候調用accept函數,就不會阻塞了。固然產生的新socket你應該也設置成非阻塞的。這樣咱們就能在新socket上收發數據了。
如何檢測客戶端是否有數據發來?
如何收取客戶端發來的數據?
同理,咱們也應該在socket上有可讀事件的時候纔去收取數據,這樣咱們調用recv或者read函數時不用等待,至於一次性收多少數據好呢?咱們能夠根據本身的需求來決定,甚至你能夠在一個循環裏面反覆recv或者read,對於非阻塞模式的socket,若是沒有數據了,recv或者read也會馬上返回,錯誤碼EWOULDBLOCK會代表當前已經沒有數據了。示例:
bool CIUSocket::Recv()
{
int nRet = 0;
while(true)
{
char buff[512];
nRet = ::recv(m_hSocket, buff, 512, 0);
if(nRet == SOCKET_ERROR) //一旦出現錯誤就馬上關閉Socket
{
if (::WSAGetLastError() == WSAEWOULDBLOCK)
break;
else
return false;
}
else if(nRet < 1)
return false;
m_strRecvBuf.append(buff, nRet);
::Sleep(1);
}
return true;
}複製代碼
如何檢測鏈接異常?發現鏈接異常以後,如何處理?
一樣當咱們收到異常事件後例如EPOLLERR或關閉事件FD_CLOSE,咱們就知道了有異常產生,咱們對異常的處理通常就是關閉對應的socket。另外,若是send/recv或者read/write函數對一個socket進行操做時,若是返回0,那說明對端已經關閉了socket,此時這路鏈接也不必存在了,咱們也能夠關閉對應的socket。
如何給客戶端發送數據?
這也是一道常見的網絡通訊面試題,某一年的騰訊後臺開發職位就問到過這樣的問題。給客戶端發送數據,比收數據要稍微麻煩一點,也是須要講點技巧的。首先咱們不能像註冊檢測數據可讀事件同樣一開始就註冊檢測數據可寫事件,由於若是檢測可寫的話,通常狀況下只要對端正常收取數據,咱們的socket就都是可寫的,若是咱們設置監聽可寫事件,會致使頻繁地觸發可寫事件,可是咱們此時並不必定有數據須要發送。因此正確的作法是:若是有數據要發送,則先嚐試着去發送,若是發送不了或者只發送出去部分,剩下的咱們須要將其緩存起來,而後再設置檢測該socket上可寫事件,下次可寫事件產生時,再繼續發送,若是仍是不能徹底發出去,則繼續設置偵聽可寫事件,如此往復,一直到全部數據都發出去爲止。一旦全部數據都發出去之後,咱們要移除偵聽可寫事件,避免無用的可寫事件通知。不知道你注意到沒有,若是某次只發出去部分數據,剩下的數據應該暫且存起來,這個時候咱們就須要一個緩衝區來存放這部分數據,這個緩衝區咱們稱爲「發送緩衝區」。發送緩衝區不只存放本次沒有發完的數據,還用來存放在發送過程當中,上層又傳來的新的須要發送的數據。爲了保證順序,新的數據應該追加在當前剩下的數據的後面,發送的時候從發送緩衝區的頭部開始發送。也就是說先來的先發送,後來的後發送。
如何在給客戶端發完數據後關閉鏈接?
這個問題比較難處理,由於這裏的「發送完」不必定是真正的發送完,咱們調用send或者write函數即便成功,也只是向操做系統的協議棧裏面成功寫入數據,至於可否被髮出去、什麼時候被髮出去很難判斷,發出去對方是否收到就更難判斷了。因此,咱們目前只能簡單地認爲send或者write返回咱們發出數據的字節數大小,咱們就認爲「發完數據」了。而後調用close等socket API關閉鏈接。固然,你也能夠調用shutdown函數來實現所謂的「半關閉」。關於關閉鏈接的話題,咱們再單獨開一個小的標題來專門討論一下。
####被動關閉鏈接和主動關閉鏈接
在實際的應用中,被動關閉鏈接是因爲咱們檢測到了鏈接的異常事件,好比EPOLLERR,或者對端關閉鏈接,send或recv返回0,這個時候這路鏈接已經沒有存在必要的意義了,咱們被迫關閉鏈接。
而主動關閉鏈接,是咱們主動調用close/closesocket來關閉鏈接。好比客戶端給咱們發送非法的數據,好比一些網絡攻擊的嘗試性數據包。這個時候出於安全考慮,咱們關閉socket鏈接。
####發送緩衝區和接收緩衝區
上面已經介紹了發送緩衝區了,並說明了其存在的意義。接收緩衝區也是同樣的道理,當收到數據之後,咱們能夠直接進行解包,可是這樣並很差,理由一:除非一些約定俗稱的協議格式,好比http協議,大多數服務器的業務的協議都是不一樣的,也就是說一個數據包裏面的數據格式的解讀應該是業務層的事情,和網絡通訊層應該解耦,爲了網絡層更加通用,咱們沒法知道上層協議長成什麼樣子,由於不一樣的協議格式是不同的,它們與具體的業務有關。理由二:即便知道協議格式,咱們在網絡層進行解包處理對應的業務,若是這個業務處理比較耗時,好比須要進行復雜的運算,或者鏈接數據庫進行帳號密碼驗證,那麼咱們的網絡線程會須要大量時間來處理這些任務,這樣其它網絡事件可能無法及時處理。鑑於以上二點,咱們確實須要一個接收緩衝區,將收取到的數據放到該緩衝區裏面去,並由專門的業務線程或者業務邏輯去從接收緩衝區中取出數據,並解包處理業務。
說了這麼多,那發送緩衝區和接收緩衝區該設計成多大的容量?這是一個老生常談的問題了,由於咱們常常遇到這樣的問題:預分配的內存過小不夠用,太大的話可能會形成浪費。怎麼辦呢?答案就是像string、vector同樣,設計出一個能夠動態增加的緩衝區,按需分配,不夠還能夠擴展。
須要特別注意的是,這裏說的發送緩衝區和接收緩衝區是每個socket鏈接都存在一個。這是咱們最多見的設計方案。
####協議的設計
除了一些通用的協議,如http、ftp協議之外,大多數服務器協議都是根據業務制定的。協議設計好了,數據包的格式就根據協議來設置。咱們知道tcp/ip協議是流式數據,因此流式數據就是像流水同樣,數據包與數據包之間沒有明顯的界限。好比A端給B端連續發了三個數據包,每一個數據包都是50個字節,B端可能先收到10個字節,再收到140個字節;或者先收到20個字節,再收到20個字節,再收到110個字節;也可能一次性收到150個字節。這150個字節能夠以任何字節數目組合和次數被B收到。因此咱們討論協議的設計第一個問題就是如何界定包的界限,也就是接收端如何知道每一個包數據的大小。目前經常使用有以下三種方法:
固定大小,這種方法就是假定每個包的大小都是固定字節數目,例如上文中討論的每一個包大小都是50個字節,接收端每收氣50個字節就當成一個包。
指定包結束符,例如以一個\r\n(換行符和回車符)結束,這樣對端只要收到這樣的結束符,就能夠認爲收到了一個包,接下來的數據是下一個包的內容。
指定包的大小,這種方法結合了上述兩種方法,通常包頭是固定大小,包頭中有一個字段指定包
體或者整個大的大小,對端收到數據之後先解析包頭中的字段獲得包體或者整個包的大小,而後根據這個大小去界定數據的界線。
協議要討論的第二個問題是,設計協議的時候要儘可能方便解包,也就是說協議的格式字段應該儘可能清晰明瞭。
協議要討論的第三個問題是,根據協議組裝的單個數據包應該儘可能小,注意這裏指的是單個數據包,這樣有以下好處:第1、對於一些移動端設備來講,其數據處理能力和帶寬能力有限,小的數據不只能加快處理速度,同時節省大量流量費用;第2、若是單個數據包足夠小的話,對頻繁進行網絡通訊的服務器端來講,能夠大大減少其帶寬壓力,其所在的系統也能使用更少的內存。試想:假如一個股票服務器,若是一隻股票的數據包是100個字節或者1000個字節,那一樣是10000只股票區別呢?
協議要討論的第四個問題是,對於數值類型,咱們應該顯式地指定數值的長度,好比long型,在32位機器上是32位4個字節,可是若是在64位機器上,就變成了64位8個字節了。這樣一樣是一個long型,發送方和接收方可能由於機器位數的不一樣會用不一樣的長度去解碼。因此建議最好,在涉及到跨平臺使用的協議最好顯式地指定協議中整型字段的長度,好比int3二、int64等等。下面是一個協議的接口的例子,固然java程序員應該很熟悉這樣的接口:
class BinaryReadStream
{
private:
const char* const ptr;
const size_t len;
const char* cur;
BinaryReadStream(const BinaryReadStream&);
BinaryReadStream& operator=(const BinaryReadStream&);
public:
BinaryReadStream(const char* ptr, size_t len);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool IsEmpty() const;
bool ReadString(string* str, size_t maxlen, size_t& outlen);
bool ReadCString(char* str, size_t strlen, size_t& len);
bool ReadCCString(const char** str, size_t maxlen, size_t& outlen);
bool ReadInt32(int32_t& i);
bool ReadInt64(int64_t& i);
bool ReadShort(short& i);
bool ReadChar(char& c);
size_t ReadAll(char* szBuffer, size_t iLen) const;
bool IsEnd() const;
const char* GetCurrent() const{ return cur; }
public:
bool ReadLength(size_t & len);
bool ReadLengthWithoutOffset(size_t &headlen, size_t & outlen);
};
class BinaryWriteStream
{
public:
BinaryWriteStream(string* data);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool WriteCString(const char* str, size_t len);
bool WriteString(const string& str);
bool WriteDouble(double value, bool isNULL = false);
bool WriteInt64(int64_t value, bool isNULL = false);
bool WriteInt32(int32_t i, bool isNULL = false);
bool WriteShort(short i, bool isNULL = false);
bool WriteChar(char c, bool isNULL = false);
size_t GetCurrentPos() const{ return m_data->length(); }
void Flush();
void Clear();
private:
string* m_data;
};複製代碼
其中BinaryWriteStream是編碼協議的類,BinaryReadStream是解碼協議的類。能夠按下面這種方式來編碼和解碼。
編碼:
std::string outbuf;
BinaryWriteStream writeStream(&outbuf);
writeStream.WriteInt32(msg_type_register);
writeStream.WriteInt32(m_seq);
writeStream.WriteString(retData);
writeStream.Flush();複製代碼
解碼:
BinaryReadStream readStream(strMsg.c_str(), strMsg.length());
int32_t cmd;
if (!readStream.ReadInt32(cmd))
{
return false;
}
//int seq;
if (!readStream.ReadInt32(m_seq))
{
return false;
}
std::string data;
size_t datalength;
if (!readStream.ReadString(&data, 0, datalength))
{
return false;
}複製代碼
###2、服務器程序結構的組織
上面的六個標題,咱們討論了不少具體的細節問題,如今是時候討論將這些細節組織起來了。根據個人我的經驗,目前主流的思想是one thread one loop+reactor模式(也有proactor模式)的策略。通俗點說就是一個線程一個循環,即在一個線程的函數裏面不斷地循環依次作一些事情,這些事情包括檢測網絡事件、解包數據產生業務邏輯。咱們先從最簡單地來講,設定一些線程在一個循環裏面作網絡通訊相關的事情,僞碼以下:
while(退出標誌)
{
//IO複用技術檢測socket可讀事件、出錯事件
//(若是有數據要發送,則也檢測可寫事件)
//若是有可讀事件,對於偵聽socket則接收新鏈接;
//對於普通socket則收取該socket上的數據,收取的數據存入對應的接收緩衝區,若是出錯則關閉鏈接;
//若是有數據要發送,有可寫事件,則發送數據
//若是有出錯事件,關閉該鏈接
}複製代碼
另外設定一些線程去處理接收到的數據,並解包處理業務邏輯,這些線程能夠認爲是業務線程了,僞碼以下:
//從接收緩衝區中取出數據解包,分解成不一樣的業務來處理 複製代碼
上面的結構是目前最通用的服務器邏輯結構,可是能不能再簡化一下或者說再綜合一下呢?咱們試試,你想過這樣的問題沒有:假如如今的機器有兩個cpu(準確的來講應該是兩個核),咱們的網絡線程數量是2個,業務邏輯線程也是2個,這樣可能存在的狀況就是:業務線程運行的時候,網絡線程並無運行,它們必須等待,若是是這樣的話,幹嗎要多建兩個線程呢?除了程序結構上可能稍微清楚一點,對程序性能沒有任何實質性提升,並且白白浪費cpu時間片在線程上下文切換上。因此,咱們能夠將網絡線程與業務邏輯線程合併,合併後的僞碼看起來是這樣子的:
while(退出標誌)
{
//IO複用技術檢測socket可讀事件、出錯事件
//(若是有數據要發送,則也檢測可寫事件)
//若是有可讀事件,對於偵聽socket則接收新鏈接;
//對於普通socket則收取該socket上的數據,收取的數據存入對應的接收緩衝區,若是出錯則關閉鏈接;
//若是有數據要發送,有可寫事件,則發送數據
//若是有出錯事件,關閉該鏈接
//從接收緩衝區中取出數據解包,分解成不一樣的業務來處理
}複製代碼
你沒看錯,其實就是簡單的合併,合併以後和不只能夠達到原來合併前的效果,並且在沒有網絡IO事件的時候,能夠及時處理咱們想處理的一些業務邏輯,而且減小了沒必要要的線程上下文切換時間。
咱們再更進一步,甚至咱們能夠在這個while循環增長其它的一些任務的處理,好比程序的邏輯任務隊列、定時器事件等等,僞碼以下:
while(退出標誌)
{
//定時器事件處理
//IO複用技術檢測socket可讀事件、出錯事件
//(若是有數據要發送,則也檢測可寫事件)
//若是有可讀事件,對於偵聽socket則接收新鏈接;
//對於普通socket則收取該socket上的數據,收取的數據存入對應的接收緩衝區,若是出錯則關閉鏈接;
//若是有數據要發送,有可寫事件,則發送數據
//若是有出錯事件,關閉該鏈接
//從接收緩衝區中取出數據解包,分解成不一樣的業務來處理
//程序自定義任務1
//程序自定義任務2
}複製代碼
注意:之因此將定時器事件的處理放在網絡IO事件的檢測以前,是由於避免定時器事件過時時間太長。假如放在後面的話,可能前面的處理耗費了一點時間,等處處理定時器事件時,時間間隔已通過去了很多時間。雖然這樣處理,也無法保證定時器事件百分百精確,可是能儘可能保證。固然linux系統下提供eventfd這樣的定時器對象,全部的定時器對象就能像處理socket這樣的fd同樣統一成處理。這也是網絡庫libevent的思想很像,libevent將socket、定時器、信號封裝成統一的對象進行處理。
說了這麼多理論性的東西,咱們來一款流行的開源網絡庫muduo來講明吧(做者:陳碩),原庫是基於boost的,我改爲了C++11的版本,並修改了一些bug,在此感謝原做者陳碩。
上文介紹的核心線程函數的while循環位於eventloop.cpp中:
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}複製代碼
poller_->poll
利用epoll分離網絡事件,而後接着處理分離出來的網絡事件,每個客戶端socket對應一個鏈接,即一個TcpConnection和Channel通道對象。currentActiveChannel->handleEvent(pollReturnTime)根據是可讀、可寫、出錯事件來調用對應的處理函數,這些函數都是回調函數,程序初始化階段設置進來的:
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
//當是偵聽socket時,readCallback_指向Acceptor::handleRead
//當是客戶端socket時,調用TcpConnection::handleRead
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
//若是是鏈接狀態服的socket,則writeCallback_指向Connector::handleWrite()
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}複製代碼
固然,這裏利用了Channel對象的「多態性」,若是是普通socket,可讀事件就會調用預先設置的回調函數;可是若是是偵聽socket,則調用Aceptor對象的handleRead()
來接收新鏈接:
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
//newConnectionCallback_實際指向TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of livev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }複製代碼
主循環裏面的業務邏輯處理對應:
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
[cpp] view plain copy
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}複製代碼
這裏增長業務邏輯是增長執行任務的函數指針的,增長的任務保存在成員變量pendingFunctors_
中,這個變量是一個函數指針數組(vector對象),執行的時候,調用每一個函數就能夠了。上面的代碼先利用一個棧變量將成員變量pendingFunctors_
裏面的函數指針換過來,接下來對這個棧變量進行操做就能夠了,這樣減小了鎖的粒度。由於成員變量pendingFunctors_
在增長任務的時候,也會被用到,設計到多個線程操做,因此要加鎖,增長任務的地方是:
void EventLoop::queueInLoop(const Functor& cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}複製代碼
而frameFunctor_就更簡單了,就是經過設置一個函數指針就能夠了。固然這裏有個技巧性的東西,即增長任務的時候,爲了可以當即執行,使用喚醒機制,經過往一個fd裏面寫入簡單的幾個字節,來喚醒epoll,使其馬上返回,由於此時沒有其它的socke有事件,這樣接下來就執行剛纔添加的任務了。
咱們看一下數據收取的邏輯:
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
//messageCallback_指向CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime)
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}複製代碼
將收到的數據放到接收緩衝區裏面,未來咱們來解包:
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)
{
while (true)
{
//不夠一個包頭大小
if (pBuffer->readableBytes() < (size_t)sizeof(msg))
{
LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);
return;
}
//不夠一個整包大小
msg header;
memcpy(&header, pBuffer->peek(), sizeof(msg));
if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))
return;
pBuffer->retrieve(sizeof(msg));
std::string inbuf;
inbuf.append(pBuffer->peek(), header.packagesize);
pBuffer->retrieve(header.packagesize);
if (!Process(conn, inbuf.c_str(), inbuf.length()))
{
LOG_WARN << "Process error, close TcpConnection";
conn->forceClose();
}
}// end while-loop
}複製代碼
先判斷接收緩衝區裏面的數據是否夠一個包頭大小,若是夠再判斷夠不夠包頭指定的包體大小,若是仍是夠的話,接着在Process函數裏面處理該包。
再看看發送數據的邏輯:
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}複製代碼
若是剩餘的數據remaining大於則調用channel_->enableWriting();開始監聽可寫事件,可寫事件處理以下:
[cpp] view plain copy
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}複製代碼
若是發送完數據之後調用channel_->disableWriting();移除監聽可寫事件。
不少讀者可能一直想問,文中不是說解包數據並處理邏輯是業務代碼而非網絡通訊的代碼,你這裏貌似都混在一塊兒了,其實沒有,這裏實際的業務代碼處理都是框架曾提供的回調函數裏面處理的,具體怎麼處理,由框架使用者——業務層本身定義。
總結起來,實際上就是一個線程函數裏一個loop那麼點事情,不信你再看我曾經工做上的一個交易系統服務器項目代碼:
void CEventDispatcher::Run()
{
m_bShouldRun = true;
while(m_bShouldRun)
{
DispatchIOs();
SyncTime();
CheckTimer();
DispatchEvents();
}
}
void CEpollReactor::DispatchIOs()
{
DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;
if (HandleOtherTask())
{
dwSelectTimeOut = 0;
}
struct epoll_event ev;
CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();
for(; itor!=m_mapEventHandlerId.end(); itor++)
{
CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;
if(pEventHandler == NULL){
continue;
}
ev.data.ptr = pEventHandler;
ev.events = 0;
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nReadID > 0)
{
ev.events |= EPOLLIN;
}
if (nWriteID > 0)
{
ev.events |= EPOLLOUT;
}
epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);
}
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000);
for (int i=0; i<nfds; i++)
{
struct epoll_event &evref = events[i];
CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;
if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleInput();
}
if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleOutput();
}
}
}
void CEventDispatcher::DispatchEvents()
{
CEvent event;
CSyncEvent *pSyncEvent;
while(m_queueEvent.PeekEvent(event))
{
int nRetval;
if(event.pEventHandler != NULL)
{
nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
else
{
nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
if(event.pAdd != NULL) //同步消息
{
pSyncEvent=(CSyncEvent *)event.pAdd;
pSyncEvent->nRetval = nRetval;
pSyncEvent->sem.UnLock();
}
}
}複製代碼
再看看蘑菇街開源的TeamTalk的源碼(代碼下載地址:github.com/baloonwj/Te…
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
fd_set read_set, write_set, excep_set;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = wait_timeout * 1000; // 10 millisecond
if(running)
return;
running = true;
while (running)
{
_CheckTimer();
_CheckLoop();
if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count)
{
Sleep(MIN_TIMER_DURATION);
continue;
}
m_lock.lock();
memcpy(&read_set, &m_read_set, sizeof(fd_set));
memcpy(&write_set, &m_write_set, sizeof(fd_set));
memcpy(&excep_set, &m_excep_set, sizeof(fd_set));
m_lock.unlock();
int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);
if (nfds == SOCKET_ERROR)
{
log("select failed, error code: %d", GetLastError());
Sleep(MIN_TIMER_DURATION);
continue; // select again
}
if (nfds == 0)
{
continue;
}
for (u_int i = 0; i < read_set.fd_count; i++)
{
//log("select return read count=%d\n", read_set.fd_count);
SOCKET fd = read_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnRead();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < write_set.fd_count; i++)
{
//log("select return write count=%d\n", write_set.fd_count);
SOCKET fd = write_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnWrite();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < excep_set.fd_count; i++)
{
//log("select return exception count=%d\n", excep_set.fd_count);
SOCKET fd = excep_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnClose();
pSocket->ReleaseRef();
}
}
}
}複製代碼
再看filezilla,一款ftp工具的服務器端,它採用的是Windows的WSAAsyncSelect模型(代碼下載地址:
github.com/baloonwj/fi…):
//Processes event notifications sent by the sockets or the layers
static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
if (message>=WM_SOCKETEX_NOTIFY)
{
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY+pWnd->m_nWindowDataSize)) //Index is within socket storage
{
//Lookup socket and verify if it's valid CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket; SOCKET hSocket = wParam; if (!pSocket) return 0; if (hSocket == INVALID_SOCKET) return 0; if (pSocket->m_SocketData.hSocket != hSocket) return 0; int nEvent = lParam & 0xFFFF; int nErrorCode = lParam >> 16; //Dispatch notification if (!pSocket->m_pFirstLayer) { //Dispatch to CAsyncSocketEx instance switch (nEvent) { case FD_READ: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_READ; break; } else if (pSocket->GetState() == attached) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; // Ignore further FD_READ events after FD_CLOSE has been received if (pSocket->m_SocketData.onCloseCalled) break; #endif //NOSOCKETSTATES #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { pSocket->OnReceive(nErrorCode); } break; case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_FORCEREAD;
break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_WRITE;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
{
if (nErrorCode && pSocket->m_SocketData.nextAddr)
{
if (pSocket->TryNextProtocol())
break;
}
pSocket->SetState(connected);
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if (!nErrorCode)
{
if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected)
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != listening && pSocket->GetState() != attached)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_ACCEPT)
pSocket->OnAccept(nErrorCode);
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != connected && pSocket->GetState() != attached)
break;
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
pSocket->m_SocketData.onCloseCalled = true;
pSocket->OnReceive(WSAESHUTDOWN);
break;
}
}
pSocket->SetState(nErrorCode ? aborted : closed);
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
break;
}
}
else //Dispatch notification to the lowest layer
{
if (nEvent == FD_READ)
{
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
return 0;
DWORD nBytes;
if (!pSocket->IOCtl(FIONREAD, &nBytes))
nErrorCode = WSAGetLastError();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (nEvent == FD_CLOSE)
{
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(FD_READ, 0);
return 0;
}
}
pSocket->m_SocketData.onCloseCalled = true;
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
}
return 0;
}
else if (message == WM_USER) //Notification event sent by a layer
{
//Verify parameters, lookup socket and notification message
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
{
return 0;
}
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam;
if (!pMsg || !pSocket || pSocket->m_SocketData.hSocket != pMsg->hSocket)
{
delete pMsg;
return 0;
}
int nEvent=pMsg->lEvent&0xFFFF;
int nErrorCode=pMsg->lEvent>>16;
//Dispatch to layer
if (pMsg->pLayer)
pMsg->pLayer->CallEvent(nEvent, nErrorCode);
else
{
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_READ;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_FORCEREAD; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnReceive(nErrorCode); } break; case FD_WRITE: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_WRITE; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_WRITE) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnSend(nErrorCode); } break; case FD_CONNECT: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting) pSocket->SetState(connected); else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_CONNECT) pSocket->OnConnect(nErrorCode); #ifndef NOSOCKETSTATES if (!nErrorCode) { if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE)) pSocket->OnSend(0); } pSocket->m_nPendingEvents = 0; #endif //NOSOCKETSTATES break; case FD_ACCEPT: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT)) #endif //NOSOCKETSTATES { pSocket->OnAccept(nErrorCode); } break; case FD_CLOSE: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE)) { pSocket->SetState(nErrorCode?aborted:closed); #else { #endif //NOSOCKETSTATES pSocket->OnClose(nErrorCode); } break; } } delete pMsg; return 0; } else if (message == WM_USER+1) { // WSAAsyncGetHostByName reply // Verify parameters ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd); if (!pWnd) return 0; CAsyncSocketEx *pSocket = NULL; for (int i = 0; i < pWnd->m_nWindowDataSize; ++i) { pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket; if (pSocket && pSocket->m_hAsyncGetHostByNameHandle && pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam && pSocket->m_pAsyncGetHostByNameBuffer) break; } if (!pSocket || !pSocket->m_pAsyncGetHostByNameBuffer) return 0; int nErrorCode = lParam >> 16; if (nErrorCode) { pSocket->OnConnect(nErrorCode); return 0; } SOCKADDR_IN sockAddr{}; sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr; sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort); BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr)); delete [] pSocket->m_pAsyncGetHostByNameBuffer; pSocket->m_pAsyncGetHostByNameBuffer = 0; pSocket->m_hAsyncGetHostByNameHandle = 0; if (!res) if (GetLastError() != WSAEWOULDBLOCK) pSocket->OnConnect(GetLastError()); return 0; } else if (message == WM_USER + 2) { //Verify parameters, lookup socket and notification message //Verify parameters if (!hWnd) return 0; CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); if (!pWnd) return 0; if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage return 0; CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket; if (!pSocket) return 0; // Process pending callbacks std::list<t_callbackMsg> tmp; tmp.swap(pSocket->m_pendingCallbacks); pSocket->OnLayerCallback(tmp); for (auto & cb : tmp) { delete [] cb.str; } } else if (message == WM_TIMER) { if (wParam != 1) return 0; ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd && pWnd->m_pThreadData); if (!pWnd || !pWnd->m_pThreadData) return 0; if (pWnd->m_pThreadData->layerCloseNotify.empty()) { KillTimer(hWnd, 1); return 0; } CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front(); pWnd->m_pThreadData->layerCloseNotify.pop_front(); if (pWnd->m_pThreadData->layerCloseNotify.empty()) KillTimer(hWnd, 1); if (socket) PostMessage(hWnd, socket->m_SocketData.nSocketIndex + WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE); return 0; } return DefWindowProc(hWnd, message, wParam, lParam); }複製代碼
上面截取的代碼段,若是你對這些項目不是很熟悉的話,估計你也沒有任何興趣去細細看每一行代碼邏輯。可是你必定要明白我所說的這個結構的邏輯,基本上目前主流的網絡框架都是這套原理。好比filezilla的網絡通訊層一樣也被用在大名鼎鼎的電驢(easyMule)中。
關於單個服務程序的框架,我已經介紹完了,若是你能徹底理解我要表達的意思,我相信你也能構建出一套高性能服務程序來。
另外,服務器框架也能夠在上面的設計思路的基礎上增長不少有意思的細節,好比流量控制。舉另外 一個我實際作過的項目中的例子吧:
通常實際項目中,當客戶端鏈接數目比較多的時候,服務器在處理網絡數據的時候,若是同時有多個socket上有數據要處理,因爲cpu核數有限,根據上面先檢測iO事件再處理IO事件可能會出現工做線程一直處理前幾個socket的事件,直到前幾個socket處理完畢後再處理後面幾個socket的數據。這就至關於,你去飯店吃飯,你們都點了菜,可是有些桌子上一直在上菜,而有些桌子上一直沒有菜。這樣確定很差,咱們來看下如何避免這種現象:
int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession)
{
//NET_IO_LOG0("CFtdEngine::HandlePackage\n");
FTDC_PACKAGE_DEBUG(pFTDCPackage);
if (pFTDCPackage->GetTID() != FTD_TID_ReqUserLogin)
{
if (!IsSessionLogin(pSession->GetSessionID()))
{
SendErrorRsp(pFTDCPackage, pSession, 1, "客戶未登陸");
return 0;
}
}
CalcFlux(pSession, pFTDCPackage->Length()); //統計流量
REPORT_EVENT(LOG_DEBUG, "Front/Fgateway", "登陸請求%0x", pFTDCPackage->GetTID());
int nRet = 0;
switch(pFTDCPackage->GetTID())
{
case FTD_TID_ReqUserLogin:
///huwp:20070608:檢查太高版本的API將被禁止登陸
if (pFTDCPackage->GetVersion()>FTD_VERSION)
{
SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version");
return 0;
}
nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqCheckUserLogin:
nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqSubscribeTopic:
nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
}
return 0;
}複製代碼
當有某個socket上有數據可讀時,接着接收該socket上的數據,對接收到的數據進行解包,而後調用CalcFlux(pSession, pFTDCPackage->Length())進行流量統計:
void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux)
{
TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID());
if (pSessionInfo != NULL)
{
//流量控制改成計數
pSessionInfo->nCommFlux ++;
///若流量超過規定,則掛起該會話的讀操做
if (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux)
{
pSession->SuspendRead(true);
}
}
}複製代碼
該函數會先讓某個鏈接會話(Session)處理的包數量遞增,接着判斷是否超過最大包數量,則設置讀掛起標誌:
void CSession::SuspendRead(bool bSuspend)
{
m_bSuspendRead = bSuspend;
}複製代碼
這樣下次將會從檢測的socket列表中排除該socket:
void CEpollReactor::RegisterIO(CEventHandler *pEventHandler)
{
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nWriteID != 0 && nReadID ==0)
{
nReadID = nWriteID;
}
if (nReadID != 0)
{
m_mapEventHandlerId[pEventHandler] = nReadID;
struct epoll_event ev;
ev.data.ptr = pEventHandler;
if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) != 0)
{
perror("epoll_ctl EPOLL_CTL_ADD");
}
}
}
void CSession::GetIds(int *pReadId, int *pWriteId)
{
m_pChannelProtocol->GetIds(pReadId,pWriteId);
if (m_bSuspendRead)
{
*pReadId = 0;
}
}複製代碼
也就是說再也不檢測該socket上是否有數據可讀。而後在定時器裏1秒後重置該標誌,這樣這個socket上有數據的話又能夠從新檢測到了:
const int SESSION_CHECK_TIMER_ID = 9;
const int SESSION_CHECK_INTERVAL = 1000;
SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);
void CFrontEngine::OnTimer(int nIDEvent)
{
if (nIDEvent == SESSION_CHECK_TIMER_ID)
{
CSessionMap::iterator itor = m_mapSession.Begin();
while (!itor.IsEnd())
{
TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID());
if (pFind != NULL)
{
CheckSession(*itor, pFind);
}
itor++;
}
}
}
void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo)
{
///從新開始計算流量
pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux;
if (pSessionInfo->nCommFlux < 0)
{
pSessionInfo->nCommFlux = 0;
}
///若流量超過規定,則掛起該會話的讀操做
pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux);
}複製代碼
這就至關與飯店裏面先給某一桌客人上一些菜,讓他們先吃着,等上了一些菜以後不會再給這桌繼續上菜了,而是給其它空桌上菜,你們都吃上後,繼續回來給原先的桌子繼續上菜。實際上咱們的飯店都是這麼作的。上面的例子是單服務流量控制的實現的一個很是好的思路,它保證了每一個客戶端都能均衡地獲得服務,而不是一些客戶端等好久纔有響應。固然,這樣的技術不能適用於有順序要求的業務,例如銷售系統,這些系統通常是先下單先獲得的。
另外如今的服務器爲了加快IO操做,大量使用緩存技術,緩存其實是以空間換取時間的策略。對於一些反覆使用的,可是不常常改變的信息,若是從原始地點加載這些信息就比較耗時的數據(好比從磁盤中、從數據庫中),咱們就可使用緩存。因此時下像redis、leveldb、fastdb等各類內存數據庫大行其道。若是你要從事服務器開發,你至少須要掌握它們中的幾種。
這是我在gitchat上的首篇文章,限於篇幅有限,不少細節不可能展開來敘述,同時這裏就再也不講述分佈式的服務器的設計技巧了,後面若是條件容許會給你們帶來更多的技術分享。同時感謝gitchat提供這樣一個與你們交流的平臺。
鑑於筆者能力和經驗有限,文中不免有錯漏之處,歡迎提意見。