本節將介紹第一個實現具體傳輸功能的類TSocket,這個類是基於TCP socket實現TTransport的接口。下面具體介紹這個類的相關函數功能實現。
1.構造函數
分析一個類的功能首先看它的定義和構造函數實現,先看看它的定義:編程
class TSocket : public TVirtualTransport<TSocket> { ......}
由定義能夠看書TSocket繼承至虛擬傳輸類,而且把本身當作模板參數傳遞過去,因此從虛擬傳輸類繼承下來的虛擬函數(如read_virt)調用非虛擬函數(如read)就是TSocket本身實現的。
TSocket類的構造函數有4個,固然還有一個析構函數。四個構造函數就是根據不一樣的參數來構造,它們的聲明以下:緩存
TSocket();//全部參數都默認 TSocket(std::string host, int port);//根據主機名和端口構造一個socket TSocket(std::string path);//構造unix域的一個socket TSocket(int socket);//構造一個原始的unix句柄socket
四個構造函數分別用於不一樣的狀況下來產生不一樣的TSocket對象,不過這些構造函數都只是簡單的初始化一些最基本的成員變量,而沒有真正的鏈接socket。它們初始化的變量基本以下:服務器
TSocket::TSocket() : host_(""), port_(0), path_(""), socket_(-1), connTimeout_(0), sendTimeout_(0), recvTimeout_(0), lingerOn_(1), lingerVal_(0), noDelay_(1), maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; }
大部分簡單的參數都採用初始化列表初始化了,須要簡單計算的就放在函數體內初始化,其餘幾個都是這種狀況。下面須要單獨介紹一下的是unix domain socket。
socket API本來是爲網絡通信設計的,但後來在socket的框架上發展出一種IPC機制,就是UNIX Domain Socket。雖然網絡socket也可用於同一臺主機的進程間通信(經過loopback地址127.0.0.1),可是UNIX Domain Socket用於IPC更有效率:不須要通過網絡協議棧,不須要打包拆包、計算校驗和、維護序號和應答等,只是將應用層數據從一個進程拷貝到另外一個進程。這是由於,IPC機制本質上是可靠的通信,而網絡協議是爲不可靠的通信設計的。UNIX Domain Socket也提供面向流和麪向數據包兩種API接口,相似於TCP和UDP,可是面向消息的UNIX Domain Socket也是可靠的,消息既不會丟失也不會順序錯亂。
UNIX Domain Socket是全雙工的,API接口語義豐富,相比其它IPC機制有明顯的優越性,目前已成爲使用最普遍的IPC機制,好比X Window服務器和GUI程序之間就是經過UNIX Domain Socket通信的。
使用UNIX Domain Socket的過程和網絡socket十分類似,也要先調用socket()建立一個socket文件描述符,address family指定爲AF_UNIX,type能夠選擇SOCK_DGRAM或SOCK_STREAM,protocol參數仍然指定爲0便可。
UNIX Domain Socket與網絡socket編程最明顯的不一樣在於地址格式不一樣,用結構體sockaddr_un表示,網絡編程的socket地址是IP地址加端口號,而UNIX Domain Socket的地址是一個socket類型的文件在文件系統中的路徑,這個socket文件由bind()調用建立,若是調用bind()時該文件已存在,則bind()錯誤返回。
打開鏈接函數open
首先看這個函數的代碼實現,以下:網絡
void TSocket::open() { if (isOpen()) {//若是已經打開就直接返回 return; } if (! path_.empty()) {//若是unix路徑不爲空就打開unix domian socket unix_open(); } else { local_open();//打開通用socket } }
Open函數又根據路徑爲不爲空(不爲空就是unix domain socket)調用相應的函數來繼續打開鏈接,首先看看打開unix domain socket,代碼以下:框架
void TSocket::unix_open(){ if (! path_.empty()) {//保證path_不爲空 // Unix Domain SOcket does not need addrinfo struct, so we pass NULL openConnection(NULL);//調用真正的打開鏈接函數 } }
由代碼能夠看出,真正實現打開鏈接的函數是openConnection,這個函數根據傳遞的參數來決定是不是打開unix domain socket,實現代碼以下(這個函數代碼比較多,其中除了錯誤部分代碼省略):dom
void TSocket::openConnection(struct addrinfo *res) { if (isOpen()) { return;//若是已經打開了直接返回 } if (! path_.empty()) {//根據路徑是否爲空建立不一樣的socket socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);//建立unix domain socket } else { socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);//建立通用的網絡通訊socket } if (sendTimeout_ > 0) {//若是發生超時設置大於0就調用設置發送超時函數設置發送超時 setSendTimeout(sendTimeout_); } if (recvTimeout_ > 0) {//若是接收超時設置大於0就調用設置接收超時函數設置接收超時 setRecvTimeout(recvTimeout_); } setLinger(lingerOn_, lingerVal_);//設置優雅斷開鏈接或關閉鏈接參數 setNoDelay(noDelay_);//設置無延時 #ifdef TCP_LOW_MIN_RTO if (getUseLowMinRto()) {//設置是否使用較低的最低TCP重傳超時 int one = 1; setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one)); } #endif //若是超時已經存在設置鏈接爲非阻塞 int flags = fcntl(socket_, F_GETFL, 0);//獲得socket_的標識 if (connTimeout_ > 0) {//超時已經存在 if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {//設置爲非阻塞 } } else { if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {//設置爲阻塞 } } // 鏈接socket int ret; if (! path_.empty()) {//unix domain socket #ifndef _WIN32 //window不支持 struct sockaddr_un address; socklen_t len; if (path_.length() > sizeof(address.sun_path)) {//path_長度不能超過最長限制 } address.sun_family = AF_UNIX; snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); len = sizeof(address); ret = connect(socket_, (struct sockaddr *) &address, len);//鏈接unix domain socket #else //window不支持unix domain socket #endif } else { ret = connect(socket_, res->ai_addr, res->ai_addrlen);//鏈接通用的非unix domain socket } if (ret == 0) {//失敗了就會執行後面的代碼,用poll來監聽寫事件 goto done;//成功了就直接跳轉到完成處 } struct pollfd fds[1];//定於用於poll的描述符 std::memset(fds, 0 , sizeof(fds));//初始化爲0 fds[0].fd = socket_;//描述符爲socket fds[0].events = POLLOUT;//接收寫事件 ret = poll(fds, 1, connTimeout_);//調用poll,有一個超時值 if (ret > 0) { // 確保socket已經被鏈接而且沒有錯誤被設置 int val; socklen_t lon; lon = sizeof(int); int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);//獲得錯誤選項參數 if (val == 0) {// socket沒有錯誤也直接到完成處了 goto done; } } else if (ret == 0) {// socket 超時 //相應處理代碼省略 } else { // poll()出錯了,相應處理代碼省略 } done: fcntl(socket_, F_SETFL, flags);//設置socket到原來的模式了(阻塞) if (path_.empty()) {//若是是unix domain socket就設置緩存地址 setCachedAddress(res->ai_addr, res->ai_addrlen); } }
上面這個函數代碼確實比較長,不過還好都是比較簡單的代碼實現,沒有什麼很繞的代碼,整個流程也很清晰,在代碼中也有比較詳細的註釋了。下面繼續看通用socket打開函數local_open(它也真正的執行打開功能也是調用上面剛纔介紹的那個函數,只是傳遞了具體的地址信息):socket
void TSocket::local_open(){ #ifdef _WIN32 TWinsockSingleton::create();//兼容window平臺 #endif // _WIN32 if (isOpen()) {//打開了就直接返回 return; } if (port_ < 0 || port_ > 0xFFFF) {//驗證端口是否爲有效值 throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid"); } struct addrinfo hints, *res, *res0; res = NULL; res0 = NULL; int error; char port[sizeof("65535")]; std::memset(&hints, 0, sizeof(hints));//內存設置爲0 hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; sprintf(port, "%d", port_); error = getaddrinfo(host_.c_str(), port, &hints, &res0);//根據主機名獲得全部網卡地址信息 // 循環遍歷全部的網卡地址信息,直到有一個成功打開 for (res = res0; res; res = res->ai_next) { try { openConnection(res);//調用打開函數 break;//成功就退出循環 } catch (TTransportException& ttx) { if (res->ai_next) {//異常處理,是否還有下一個地址,有就繼續 close(); } else { close(); freeaddrinfo(res0); // 清除地址信息內存和資源 throw;//拋出異常 } } } freeaddrinfo(res0);//釋放地址結構內存 }
整個local_open函數就是根據主機名獲得全部的網卡信息,而後依次嘗試打開,直到打開一個爲止就退出循環,若是全部都不成功就拋出一個異常信息。
讀函數read
在實現讀函數的時候須要注意區分返回錯誤爲EAGAIN的狀況,由於當超時和系統資源耗盡都會產生這個錯誤(沒有明顯的特徵能夠區分它們),因此Thrift在實現的時候設置一個最大的嘗試次數,若是超過這個了這個次數就認爲是系統資源耗盡了。下面具體看看read函數的實現,代碼以下(省略一些參數檢查和錯誤處理的代碼):函數
uint32_t TSocket::read(uint8_t* buf, uint32_t len) { int32_t retries = 0;//重試的次數 uint32_t eagainThresholdMicros = 0; if (recvTimeout_) {//若是設置了接收超時時間,那麼計算最大時間間隔來判斷是否系統資源耗盡 eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2); } try_again: struct timeval begin; if (recvTimeout_ > 0) { gettimeofday(&begin, NULL);//獲得開始時間 } else { begin.tv_sec = begin.tv_usec = 0;//默認爲0,不須要時間來判斷是超時了 } int got = recv(socket_, cast_sockopt(buf), len, 0);//從socket接收數據 int errno_copy = errno; //保存錯誤代碼 ++g_socket_syscalls;//系統調用次數統計加1 if (got < 0) {//若是讀取錯誤 if (errno_copy == EAGAIN) {//是否爲EAGAIN if (recvTimeout_ == 0) {//若是沒有設置超時時間,那麼就是資源耗盡錯誤了!拋出異常 throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)"); } struct timeval end; gettimeofday(&end, NULL);//獲得結束時間,會改變errno,因此前面須要保存就是這個緣由 uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000)//計算消耗的時間 + (((uint64_t)(end.tv_usec - begin.tv_usec)))); if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) { if (retries++ < maxRecvRetries_) {//重試次數還小於最大重試次數 usleep(50);//睡眠50毫秒 goto try_again;//再次嘗試從socket讀取數據 } else {//不然就認爲是資源不足了 throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)"); } } else {//推測爲超時了 throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (timed out)"); } } if (errno_copy == EINTR && retries++ < maxRecvRetries_) {//若是是中斷而且重試次數沒有超過 goto try_again;//那麼重試 } #if defined __FreeBSD__ || defined __MACH__ if (errno_copy == ECONNRESET) {//FreeBSD和MACH特殊處理錯誤代碼 return 0; } #endif #ifdef _WIN32 if(errno_copy == WSAECONNRESET) {//win32平臺處理錯誤代碼 return 0; // EOF } #endif return got; }
整個讀函數其實沒有什麼特別的,主要的任務就是錯誤狀況的處理,從這裏能夠看出其實實現一個功能是很容易的,可是要作到穩定和容錯性確實須要發很大功夫。
寫函數write
寫函數和讀函數實現差很少,主要的代碼仍是在處理錯誤上面,還有一點不一樣的是寫函數寫的內容可能一次沒有發送完畢,因此是在一個while循環中一直髮送直到指定的內容所有發送完畢。代碼實現以下:oop
void TSocket::write(const uint8_t* buf, uint32_t len) { uint32_t sent = 0;//記錄已經發送了的字節數 while (sent < len) {//是否已經發送了指定的字節長度 uint32_t b = write_partial(buf + sent, len - sent);//調部分寫入函數 if (b == 0) {//發送超時過時了 throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired"); } sent += b;//已經發送的字節數 } } 上面的函數尚未這種的調用send函數發送寫入的內容,而是調用部分寫入函數write_partial寫入,這個函數實現以下: uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { uint32_t sent = 0; int flags = 0; #ifdef MSG_NOSIGNAL //使用這個代替SIGPIPE 錯誤,代替咱們檢查返回EPIPE錯誤條件和關閉socket的狀況 flags |= MSG_NOSIGNAL;//設置這個標誌位 #endif int b = send(socket_, const_cast_sockopt(buf + sent), len - sent, flags);//發送數據 ++g_socket_syscalls;//系統調用計數加1 if (b < 0) { //錯誤處理 if (errno == EWOULDBLOCK || errno == EAGAIN) { return 0;//應該阻塞錯誤直接返回 } int errno_copy = errno;//保存錯誤代碼 if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) { close();//鏈接錯誤關閉掉socket } } return b;//返回寫入的字節數 }
這個寫入的實現邏輯和過程也是很是簡單的,只是須要考慮到各類錯誤的狀況而且相應的處理之。
其餘函數
TSocket類還有一些其餘函數,不過功能都比較簡單,好比設置一些超時和獲得一些成員變量值的函數,哪些函數通常都是幾句代碼完成了。ui