kqueue 是 FreeBSD 上的一種的多路複用機制。它是針對傳統的 select/poll 處理大量的文件描述符性能較低效而開發出來的。註冊一批描述符到 kqueue 之後,當其中的描述符狀態發生變化時,kqueue 將一次性通知應用程序哪些描述符可讀、可寫或出錯了。html
kqueue 支持多種類型的文件描述符,包括 socket、信號、定時器、AIO、VNODE、PIPE。本文重點討論 kqueue 如何控制 socket 描述符。其中 kqueue 對 AIO,POSIX 的異步 IO 系列的支持,是異步行爲完成通知機制之一。另外兩種常見的機制是異步信號和線程例程。用 kqueue 的明顯好處是完成事件的處理線程能夠靈活地指定。linux
本文重點在於 kqueue 技術自己。一些基礎的知識點,好比 socket API 和經常使用的 Unix數據結構將不做講解,有須要的讀者請先閱讀 UNIX 網絡編程方面書籍。ios
kqueue APIs程序員
kqueue 提供 kqueue()、kevent() 兩個系統調用和 struct kevent 結構。編程
kqueue 主要功能數組
經過 kevent() 提供三個主要的行爲功能。在下面小節中將會用到這兩個主要功能。性能優化
註冊 / 反註冊服務器
注意 kevent() 中的 neventlist 這個輸入參數,當將其設爲 0,且傳入合法的 changelist 和 nchangelist,就會將 changelist 中的事件註冊到 kqueue 中。網絡
當關閉某文件描述符時,與之關聯的事件會被自動地從 kqueue 移除。數據結構
容許 / 禁止過濾器事件
經過 flags EV_ENABLE 和 EV_DISABLE 使過濾器事件有效或無效。這個功能在利用 EVFILT_WRITE 發送數據時很是有用。
等待事件通知
將 nchangelist 設置成 0,固然要傳入其它合法的參數,當 kevent 非錯誤和超時返回時,在 eventlist 和 neventlist 中就保存可用事件集合。
kqueue()
int kqueue(void)
生成一個內核事件隊列,返回該隊列的文件描述索。其它 API 經過該描述符操做這個 kqueue。生成的多個 kqueue 的結構相似圖 1 所示。
圖 1. kqueue 隊列結構
kevent()
int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout);
kevent 提供向內核註冊 / 反註冊事件和返回就緒事件或錯誤事件: kq: kqueue 的文件描述符。 changelist: 要註冊 / 反註冊的事件數組; nchanges: changelist 的元素個數。 eventlist: 知足條件的通知事件數組; nevents: eventlist 的元素個數。 timeout: 等待事件到來時的超時時間,0,馬上返回;NULL,一直等待;有一個具體值,等待 timespec 時間值。 返回值:可用事件的個數。
struct kevent
struct kevent { uintptr_t ident; /* 事件 ID */ short filter; /* 事件過濾器 */ u_short flags; /* 行爲標識 */ u_int fflags; /* 過濾器標識值 */ intptr_t data; /* 過濾器數據 */ void *udata; /* 應用透傳數據 */ }; 在一個 kqueue 中,{ident, filter} 肯定一個惟一的事件。
ident
事件的 id,實際應用中,通常設置爲文件描述符。
filter
能夠將 kqueue filter 看做事件。內核檢測 ident 上註冊的 filter 的狀態,狀態發生了變化,就通知應用程序。kqueue 定義了較多的 filter,本文只介紹 Socket 讀寫相關的 filter。
EVFILT_READ
TCP 監聽 socket,若是在完成的鏈接隊列 ( 已收三次握手最後一個 ACK) 中有數據,此事件將被通知。收到該通知的應用通常調用 accept(),且可經過 data 得到完成隊列的節點個數。 流或數據報 socket,當協議棧的 socket 層接收緩衝區有數據時,該事件會被通知,而且 data 被設置成可讀數據的字節數。
EVFILT_WRIT
當 socket 層的寫入緩衝區可寫入時,該事件將被通知;data 指示目前緩衝區有多少字節空閒空間。
E
flags
EV_ADD
指示加入事件到 kqueue。
EV_DELETE
指示將傳入的事件從 kqueue 中移除。
EV_ENABLE
過濾器事件可用,註冊一個事件時,默認是可用的。
EV_DISABLE
過濾器事件不可用,當內部描述可讀或可寫時,將不通知應用程序。第 5 小節有這個 flag 的用法介紹。
EV_ERROR
一個輸出參數,當 changelist 中對應的描述符處理出錯時,將輸出這個 flag。應用程序要判斷這個 flag,不然可能出現 kevent 不斷地提示某個描述符出錯,卻沒將這個描述符從 kq 中清除。處理 EV_ERROR 相似下面的代碼: if (events[i].flags & EV_ERROR) close(events[i].ident); fflags 過濾器相關的一個輸入輸出類型標識,有時候和 data 結合使用。
data
過濾器相關的數據值,請看 EVFILT_READ 和 EVFILT_WRITE 描述。
udata
應用自定義數據,註冊的時候傳給 kernel,kernel 不會改變此數據,當有事件通知時,此數據會跟着返回給應用。
EV_SET
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
struct kevent 的初始化的輔助操做。
一個服務器示例
例子實現了一個只有較簡單通訊功能的但有性能保證的服務器。在下面各個清單中只寫出關鍵性的代碼,錯誤處理的代碼未寫出,完整的代碼請參考附帶的源碼:kqueue.cpp。
註冊事件到 kqueue
清單 1. 註冊事件
73 bool Register(int kq, int fd) 74 { 75 struct kevent changes[1]; 76 EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 77 78 int ret = kevent(kq, changes, 1, NULL, 0, NULL); 81 82 return true; 83 } Register 將 fd 註冊到 kq 中。註冊的方法是經過 kevent() 將 eventlist 和 neventlist 置成 NULL 和 0 來達到的。
建立監聽 socket 和 kqueue,等待內核事件通知
清單 2. 建立監聽
27 int main(int argc, char* argv[]) 28 { 29 listener_ = CreateListener(); 32 33 int kq = kqueue(); 34 if (!Register(kq, listener_)) 39 40 WaitEvent(kq); 41 42 return 0; 43 } 85 void WaitEvent(int kq) 86 { 87 struct kevent events[MAX_EVENT_COUNT]; 88 while (true) 89 { 90 int ret = kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, NULL); 96 97 HandleEvent(kq, events, ret); 98 } 99 }
29~40,建立監聽 socket,將監聽 socket 註冊到 kq,而後等待事件。 90,這一行就是 kevent 事件等待方法,將 changelist 和 nchangelist 分別置成 NULL 和 0,而且傳一個足夠大的 eventlist 空間給內核。當有事件過來時,kevent 返回,這時調用 HandleEvent 處理可用事件。
struct kevent data 字段在 accept 和 recv 時的用法
清單 3. 接收數據
101 void HandleEvent(int kq, struct kevent* events, int nevents) 102 { 103 for (int i = 0; i < nevents; i++) 104 { 105 int sock = events[i].ident; 106 int data = events[i].data; 107 108 if (sock == listener_) 109 Accept(kq, data); 110 else 111 Receive(sock, data); 112 } 113 } 114 115 void Accept(int kq, int connSize) 116 { 117 for (int i = 0; i < connSize; i++) 118 { 119 int client = accept(listener_, NULL, NULL); 125 126 if (!Register(kq, client)) 131 } 132 } 133 134 void Receive(int sock, int availBytes) 135 { 136 int bytes = recv(sock, buf_, availBytes, 0); 145 Enqueue(buf_, bytes); 146 }
108~111,根據 events.ident 的類型來調用 Accept() 或 Receive()。這裏要注意的是 events[i].data。
117~126,對於監聽 socket,data 表示鏈接完成隊列中的元素 ( 已經收到三次握手最後一個 ACK) 個數。119 行演示了這種用法,accept data 次。126 行將 accept 成功的 socket 註冊到 kq。
136~145,對於流 socket,data 表示協議棧 socket 層的接收緩衝區可讀數據的字節數。recv 時顯示地指定接收 availBytes 字節 ( 就是 data)。這個功能點將對 recv 和 send 的性能提高有積極的做用,第 4 小節將這方面的討論。145 行表示將收到的數據入緩衝隊列。
EVFILT_WRITE 用法
上面的例子沒有涉及寫事件的用法,這一小節簡單介紹一下經過 WRITE 事件自動地實現發送數據的方法。
kqueue 默認是水平觸發模式,當某個描述符的事件知足某種條件時,若是應用程序不處理對應的事件,kqueue 將會不斷地通知應用程序此描述符知足某種狀態了。以 EVFILT_WRITE 舉例,見圖 2。
圖 2. WRITE 通知流程
在某種情形下,應用程序需要禁止 kqueue 不斷地通知某個描述符的「可寫」狀態。將已註冊的 {ident, filter} 的 flags 設置成 EV_DISABLE 就達到這個目的。實現方法相似清單 4。
清單 4. 實現方法
struct kevent changes[1];
EV_SET(&changes[0], fd, EVFILT_WRITE, EV_DISABLE, 0, 0, NULL);
kevent(kq, changes, 1, NULL, 0, NULL);
將上面代碼中的 EV_DISABLE替換成 EV_ENABLE表示事件是可用的。
接下來,考慮一個實際的服務器應用,請見圖 3。
圖 3. 某個服務器應用
邏輯處理線程將處理結果寫到發送隊列,通訊線程將其讀出並經過 kqueue EVFILT_WRITE 機制發送。兩者具體流程請見圖 4。
圖 4. 邏輯流程
具體的代碼相對較大,將不在這裏列出。在 Speed 庫 demos/fb_tcp_server 有這種用法的代碼例子。特別強調一下,兩個線程中 writeEnable 變量和 EVFILTE_WRITE 狀態的設置是有嚴格的順序要求的。現代編譯器優化和處理器執行指令時都有可能打亂指令順序。有一種叫內存屏障(memory barrier)的技術能夠保證程序語句的編譯和執行順序,在 Linux 內核設計與實現中介紹了這一技術。
另外,這個例子能夠作性能優化,當發送隊列爲空時,將必定長度的數據直接經過 send()API 非阻塞地發送,未發送完的數據再寫入到發送隊列。這樣避免了大部分的數據拷貝。
阻塞與非阻塞 IO
用過 select 和 epoll 的讀者,通常將 socket IO 設置成非阻塞模式,以提升讀寫性能的同時,避免 IO 讀寫不當心被鎖定。
爲了達到某種目的,甚至有人會經過 getsocketopt 來偷看 socket 讀緩衝區的數據大小或寫緩區
可用空間的大小。kqueue 開發人員考慮到這些現狀,在 kevent 返回時,將讀寫緩衝區的可讀字
節數或可寫空間大小告訴應用程序。基於這個特性,使用 kqueue 的應用通常不使用非阻塞 IO。每次讀時,根據 kevent 返回的可讀字節大小,將接收緩衝區中的數據一次性讀完;而發送數據時,也根據 kevent 返回的寫緩衝區可寫空間的大小,一次只發可寫空間大小的數據。
結束語
本文介紹了 FreeBSD kqueue 這種多路複用 IO 模型的用法,重點介紹了 kqueue 對 Sockets IO 的控制和事件通知過程。有必定網絡編程基礎的程序員學習本文後,結合給出的例子就能開發出有必定性能保證的 FreeBSD 應用服務器了。
附:
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/event.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
const std::string IP = "192.168.79.18";
const int PORT = 4312;
const int MAX_EVENT_COUNT = 5000;
const int MAX_RECV_BUFF = 65535;
int listener_;
char buf_[MAX_RECV_BUFF];
int CreateListener();
bool Register(int kq, int fd);
void WaitEvent(int kq);
void HandleEvent(int kq, struct kevent* events, int nevents);
void Accept(int kq, int connSize);
void Receive(int sock, int availBytes);
void Enqueue(const char* buf, int bytes);
int main(int argc, char* argv[])
{
listener_ = CreateListener();
if (listener_ == -1)
return -1;
int kq = kqueue();
if (!Register(kq, listener_))
{
std::cerr << "Register listener to kq failed.\n";
return -1;
}
WaitEvent(kq);
return 0;
}
int CreateListener()
{
int sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1)
{
std::cerr << "socket() failed:" << errno << std::endl;
return -1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = inet_addr(IP.c_str());
if (bind(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr)) == -1)
{
std::cerr << "bind() failed:" << errno << std::endl;
return -1;
}
if (listen(sock, 5) == -1)
{
std::cerr << "listen() failed:" << errno << std::endl;
return -1;
}
return sock;
}
bool Register(int kq, int fd)
{
struct kevent changes[1];
EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
int ret = kevent(kq, changes, 1, NULL, 0, NULL);
if (ret == -1)
return false;
return true;
}
void WaitEvent(int kq)
{
struct kevent events[MAX_EVENT_COUNT];
while (true)
{
int ret = kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, NULL);
if (ret == -1)
{
std::cerr << "kevent failed!\n";
continue;
}
HandleEvent(kq, events, ret);
}
}
void HandleEvent(int kq, struct kevent* events, int nevents)
{
for (int i = 0; i < nevents; i++)
{
int sock = events[i].ident;
int data = events[i].data;
if (sock == listener_)
Accept(kq, data);
else
Receive(sock, data);
}
}
void Accept(int kq, int connSize)
{
for (int i = 0; i < connSize; i++)
{
int client = accept(listener_, NULL, NULL);
if (client == -1)
{
std::cerr << "Accept failed.\n";
continue;
}
if (!Register(kq, client))
{
std::cerr << "Register client failed.\n";
return;
}
}
}
void Receive(int sock, int availBytes)
{
int bytes = recv(sock, buf_, availBytes, 0);
if (bytes == 0 || bytes == -1)
{
close(sock);
std::cerr << "client close or recv failed.\n";
return;
}
// Write buf to the receive queue.
Enqueue(buf_, bytes);
}
void Enqueue(const char* buf, int bytes)
{
}