libevent入門教程:Echo Server based on libevent - Blog of Felix021 - 日,泯然衆人矣。

libevent入門教程:Echo Server based on libevent

轉載請註明出自 http://www.felix021.com/blog/read.php?2068 ,如是轉載文則註明原出處,謝謝:)php

花了兩天的時間在libevent上,想總結下,就以寫簡單tutorial的方式吧,貌似沒有一篇簡單的說明,讓人立刻就能上手用的。html


首先給出官方文檔吧: http://libevent.org ,首頁有個Programming with Libevent,裏面是一節一節的介紹libevent,可是感受信息量太大了,並且仍是英文的-。-(固然,若是想好好用libevent,看看仍是頗有必要的),還有個Reference,大體就是對各個版本的 libevent 使用 doxgen 生成的文檔,用來查函數原型和基本用法什麼的。web


下面假定已經學習過基本的 socket 編程(socket,bind,listen,accept,connect,recv,send,close),而且對異步/callback有基本認識。編程


基本的 socket 編程是阻塞/同步的,每一個操做除非已經完成或者出錯纔會返回,這樣對於每個請求,要使用一個線程或者單獨的進程去處理,系統資源無法支撐大量的請求(所謂 c10k problem?),例如內存:默認狀況下每一個線程須要佔用2~8M的棧空間。posix定義了可使用異步的 select 系統調用,可是由於其採用了輪詢的方式來判斷某個fd是否變成 active,效率不高[O(n)],鏈接數一多,也仍是撐不住。因而各系統分別提出了基於異步/callback的系統調用,例如Linux的 epollBSDkqueueWindowsIOCP。因爲在內核層面作了支持,因此能夠用 `O(1)的效率查找到active的fd。基本上,libevent就是對這些高效 IO 的封裝,提供統一的API,簡化開發。windows


libevent簡介:

默認狀況下是單線程的(能夠配置成多線程,若是有須要的話),每一個線程有且只有一個 event_base ,對應一個 struct event_base 結構體(以及附於其上的事件管理器),用來 schedule 託管給它的一系列 event,能夠和操做系統的進程管理類比,固然,要更簡單一點。當一個事件發生後, event_base 會在合適的時間(不必定是當即)去調用綁定在這個事件上的函數(傳入一些預約義的參數,以及在綁定時指定的一個參數),直到這個函數執行完,再返回schedule其餘事件。緩存

//建立一個event_base 
  
struct event_base *base = event_base_new();  
  
assert(base != NULL );

event_base 內部有一個循環,循環阻塞在 epoll/kqueue 等系統調用上,直到有一個一些事件發生,而後去處理這些事件。固然,這些事件要被綁定在這個 event_base上 。每一個事件對應一個struct event,能夠是監聽一個fd或者 POSIX 信號量之類(這裏只講fd了,其餘的看manual吧)。struct event 使用 event_new 來建立和綁定,使用 event_add來啓用:服務器

//建立並綁定一個event
struct event *listen_event;網絡


//參數:event_base, 監聽的fd,事件類型及屬性,綁定的回調函數,給回調函數的參數
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, ( void *)base);多線程


//參數:event,超時時間( struct timeval *類型的, NULL 表示無超時設置)
event_add(listen_event, NULL );併發

注:libevent支持的事件及屬性包括(使用bitfield實現,因此要用 | 來讓它們合體)

  • EV_TIMEOUT: 超時

  • EV_READ: 只要網絡緩衝中還有數據,回調函數就會被觸發

  • EV_WRITE: 只要塞給網絡緩衝的數據被寫完,回調函數就會被觸發

  • EV_SIGNAL: POSIX 信號量,參考manual吧

  • EV_PERSIST: 不指定這個屬性的話,回調函數被觸發後事件會被刪除

  • EV_ET: Edge-Trigger邊緣觸發,參考EPOLL_ET

而後須要啓動event_base的循環,這樣才能開始處理髮生的事件。循環的啓動使用 event_base_dispatch ,循環將一直持續,直到再也不有須要關注的事件,或者是遇到 event_loopbreak()/ event_loopexit() 函數。

//啓動事件循環
event_base_dispatch(base);

接下來關注下綁定到 event 的回調函數 callback_func:傳遞給它的是一個 socket fd 、一個 event類型及屬性 bit_field 、以及傳遞給 event_new 的最後一個參數(去上面幾行回顧一下,把 event_base 給傳進來了,實際上更多地是分配一個結構體,把相關的數據都撂進去,而後丟給event_new,在這裏就能取獲得了)。其原型是:

typedef void (* event_callback_fn)(evutil_socket_t sockfd, short event_type, void *arg)


對於一個服務器而言,上面的流程大概是這樣組合的:

  1. listener = socket(),bind(),listen(),設置nonblocking(POSIX系統中可以使用fcntl設置,windows不須要設置,實際上libevent提供了統一的包裝evutil_make_socket_nonblocking)

  2. 建立一個event_base

  3. 建立一個event,將該socket託管給event_base,指定要監聽的事件類型,並綁定上相應的回調函數(及須要給它的參數)。對於listener socket來講,只須要監聽EV_READ|EV_PERSIST

  4. 啓用該事件

  5. 進入事件循環


  1. (異步) 當有client發起請求的時候,調用該回調函數,進行處理。

QUESTION : 爲何不在listen完立刻調用accept,得到客戶端鏈接之後再丟給event_base呢?這個問題先想一想噢。

ANSWER : 回調函數要作什麼事情呢?固然是處理client的請求了。首先要accept,得到一個能夠與client通訊的sockfd,而後……調用recv/send嗎?錯!大錯特錯!若是直接調用recv/send的話,這個線程就阻塞在這個地方了,若是這個客戶端很是的陰險(好比一直不發消息,或者網絡很差,總是丟包),libevent就只能等它,無法處理其餘的請求了——因此應該建立一個新的event來託管這個sockfd。

magichan 註釋: 這段話應該是說 accpet 得到一個 socket 時,不會放在這個回調函數自己
處理這個套接字發過來的數據,而是把該套接字綁定到一個的新 event 中,再放回事件循環之中。
若是該 socket 有數據發送過來,那麼就觸發新的 event 處理數據。


libevent 1.× 版本套接字發送和接受 解決的方案

在老版本libevent上的實現,比較羅嗦[若是不想詳細瞭解的話,看下一部分]。

對於服務器但願先從client獲取數據的狀況,大體流程是這樣的:

  1. 將這個sockfd設置爲 nonblocking

  2. 建立2個event:

event_read,綁上sockfd的 EV_READ|EV_PERSIST,設置回調函數和參數(後面提到的struct)

event_write,綁上sockfd的 EV_WRITE|EV_PERSIST,設置回調函數和參數(後面提到的struct)

  1. 啓用 event_read 事件


  1. (異步) 等待 event_read 事件的發生, 調用相應的回調函數。這裏麻煩來了:回調函數用recv讀入的數據,不能直接用 send 丟給 sockfd 了事——由於 sockfd 是 nonblocking 的,丟給它的話,不能保證正確(爲何呢?)。因此須要一個本身管理的緩存用來保存讀入的數據中(在accept之後就建立一個struct,做爲第2步回調函數的arg傳進來),在合適的時間(好比遇到換行符)啓用 event_write 事件【event_add(event_write, NULL)】,等待 EV_WRITE 事件的觸發


  1. (異步) 當 event_write 事件的回調函數被調用的時候,往 sockfd 寫入數據,而後刪除 event_write 事件【event_del(event_write)】,等待event_read事件的下一次執行。

以上步驟比較晦澀,具體代碼可參考官方文檔裏面的【Example: A low-level ROT13 server with Libevent】

libevent 2.× 版本套接字發送和接受 解決的方案

因爲須要本身管理緩衝區,且過程晦澀難懂,而且不兼容於Windows的IOCP,因此libevent2開始,提供了bufferevent這個神器,用來提供更加優雅、易用的API。struct bufferevent 內建了兩個 event(read/write) 和對應的緩衝區【struct evbuffer *input, *output】,並提供相應的函數用來操做緩衝區(或者直接操做bufferevent)。每當有數據被讀入input的時候,read_cb函數被調用;每當utput被輸出完的時候,write_cb被調用;在網絡IO操做出現錯誤的狀況(鏈接中斷、超時、其餘錯誤),error_cb被調用。因而上一部分的步驟被簡化爲:

  1. 設置 sockfd 爲 nonblocking

  2. 使用 bufferevent_socket_new 建立一個 struct bufferevent *bev,關聯該sockfd,託管給event_base

  3. 使用 bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void *)arg) 將EV_READ/EV_WRITE 對應的函數

  4. 使用 bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST) 來啓用 read/write 事件


  1. (異步)

  • read_cb 裏面從 input 讀取數據,處理完畢後塞到 output 裏(會被自動寫入到sockfd)

  • write_cb 裏面(須要作什麼嗎?對於一個echo server來講,read_cb 就足夠了)

  • 在 error_cb 裏面處理遇到的錯誤 能夠 使用 bufferevent_set_timeouts(bev, struct timeval *READ, struct timeval *WRITE) 來設置讀寫超時, 在 error_cb 裏面處理超時。

  • read_cbwrite_cb 的原型是

void read_or_write_callback(struct bufferevent *bev, void *arg)
  • error_cb 的原型是

void error_cb(struct bufferevent *bev, short error, void *arg) //這個是event的標準回調函數原型 

能夠從bev中用libevent的API提取出event_base、sockfd、input/output等相關數據,詳情RTFM~

因而代碼簡化到只須要幾行的read_cb和error_cb函數便可:

void read_cb( struct bufferevent *bev, void *arg) {  
  
 char line[256];  
  
 int n;  
  
evutil_socket_t fd = bufferevent_getfd(bev);  
  
 while (n = bufferevent_read(bev, line, 256), n > 0)  
  
bufferevent_write(bev, line, n);  
  
}  
  
  
  
void error_cb( struct bufferevent *bev, short event, void *arg) {  
  
bufferevent_free(bev);  
  
}

因而一個支持大併發量的echo server就成型了!下面附上無註釋的echo server源碼,110行,多抄幾遍,就能徹底弄懂啦!更復雜的例子參見官方文檔裏面的【Example: A simpler ROT13 server with Libevent】

# include <stdio.h> 
  
# include <stdlib.h> 
  
# include <errno.h> 
  
# include <assert.h> 
  
  
  
# include <event2/event.h> 
  
# include <event2/bufferevent.h> 
  
  
  
# define LISTEN_PORT 9999 
  
# define LISTEN_BACKLOG 32 
  
  
  
void do_accept(evutil_socket_t listener, short event, void *arg);  
  
void read_cb( struct bufferevent *bev, void *arg);  
  
void error_cb( struct bufferevent *bev, short event, void *arg);  
  
void write_cb( struct bufferevent *bev, void *arg);  
  
  
  
int main( int argc, char *argv[]) {  
  
 int ret;  
  
evutil_socket_t listener;  
  
listener = socket(AF_INET, SOCK_STREAM, 0);  
  
assert(listener > 0);  
  
evutil_make_listen_socket_reuseable(listener);  
  
  
  
 struct sockaddr_in sin;  
  
sin.sin_family = AF_INET;  
  
sin.sin_addr.s_addr = 0;  
  
sin.sin_port = htons(LISTEN_PORT);  
  
  
  
 if (bind(listener, ( struct sockaddr *)&sin, sizeof (sin)) < 0) {  
  
perror("bind");  
  
 return 1;  
  
}  
  
  
  
 if (listen(listener, LISTEN_BACKLOG) < 0) {  
  
perror("listen");  
  
 return 1;  
  
}  
  
  
  
 printf ("Listening...\n");  
  
  
  
evutil_make_socket_nonblocking(listener);  
  
  
  
 struct event_base *base = event_base_new();  
  
assert(base != NULL );  
  
 struct event *listen_event;  
  
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, ( void *)base);  
  
event_add(listen_event, NULL );  
  
event_base_dispatch(base);  
  
  
  
 printf ("The End.");  
  
 return 0;  
  
}  
  
  
  
void do_accept(evutil_socket_t listener, short event, void *arg) {  
  
 struct event_base *base = ( struct event_base *)arg;  
  
evutil_socket_t fd;  
  
 struct sockaddr_in sin;  
  
socklen_t slen = sizeof (sin);  
  
fd = accept(listener, ( struct sockaddr *)&sin, &slen);  
  
 if (fd < 0) {  
  
perror("accept");  
  
 return ;  
  
}  
  
 if (fd > FD_SETSIZE) { //這個 if 是參考了那個ROT13的例子,貌似是官方的疏漏,從select-based例子裏抄過來忘了改 
  
perror("fd > FD_SETSIZE\n");  
  
 return ;  
  
}  
  
  
  
 printf ("ACCEPT: fd = %u\n", fd);  
  
  
  
 struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  
  
bufferevent_setcb(bev, read_cb, NULL , error_cb, arg);  
  
bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
  
}  
  
  
  
void read_cb( struct bufferevent *bev, void *arg) {  
  
# define MAX_LINE 256 
  
 char line[MAX_LINE+1];  
  
 int n;  
  
evutil_socket_t fd = bufferevent_getfd(bev);  
  
  
  
 while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
  
line[n] = '\0';  
  
 printf ("fd=%u, read line: %s\n", fd, line);  
  
  
  
bufferevent_write(bev, line, n);  
  
}  
  
}  
  
  
  
void write_cb( struct bufferevent *bev, void *arg) {}  
  
  
  
void error_cb( struct bufferevent *bev, short event, void *arg) {  
  
evutil_socket_t fd = bufferevent_getfd(bev);  
  
 printf ("fd = %u, ", fd);  
  
 if (event & BEV_EVENT_TIMEOUT) {  
  
 printf ("Timed out\n"); // if bufferevent_set_timeouts() called 
  
}  
  
 else  if (event & BEV_EVENT_EOF) {  
  
 printf ("connection closed\n");  
  
}  
  
 else  if (event & BEV_EVENT_ERROR) {  
  
 printf ("some other error\n");  
  
}  
  
bufferevent_free(bev);  
  
}

--

相關文章
相關標籤/搜索