本文內容大體翻譯自 libevent-book, 但不是照本翻譯. 成文時, libevent最新的穩定版爲 2.1.8 stable. 即本文如無特殊說明, 全部描述均以 2.1.8 stable 版本爲準.git
本文爲系列文章的第一篇, 對應libevent-book的 chapter 0 + chapter 1 + R0 + R1程序員
這個文檔是對libevent的介紹與指導, 閱讀文檔須要你具備如下的能力:github
這裏首先要解釋四個名詞: 阻塞, 非阻塞, 同步, 異步. 它們都是修飾"接口"的形容詞, 或者說的土一點, 它們都是修飾"函數"的形容詞.redis
同步, 仍是異步, 是從"消息通訊"的視角去描述這個接口的行爲. 而所謂的消息通訊, 你能夠簡單的把"函數"想象成一個淘寶客服, 把"調用方"想象成你本身. 調用函數的過程其實就是三步:編程
你從這個角度去看, 函數調用, 就是消息通訊的過程, 你發送消息給函數, 函數通過一番運算思考, 把結果再回發給你.小程序
所謂的同步, 異步, 指的是:windows
異步客服須要解決一個問題: 當真正的運算結果得出以後, 被調用的客服如何通知做爲調用方的你, 取走答案. 在淘寶客戶端上, 是經過手機的震動消息提醒, 是經過聊天框的紅點.api
因此, 關於同步, 和異步, 這裏作一個稍微正式一點的總結:數組
這裏咱們着眼於消息的傳遞, 通信方式, 也就是站在函數的角度去看, 結果是如何傳遞給調用方的. 同步接口, 運算結果在"函數調用"這個場景下就返回給了調用方. 異步接口: 運算結果在"函數調用"這個場景以後的某個不定的時刻, 經過某種通知方式, 傳遞給調用方.安全
整個過程當中咱們忽略了一件事: 就是, 在函數執行運算的過程當中, 調用方在幹什麼. 也是是, 在淘寶客服心裏思考如何回覆你的時候, 你在幹什麼.
這就引出了阻塞與非阻塞:
換句話說:
這是兩個維度上的邏輯概念, 這兩個維度互相有必定的干涉, 並非徹底正交的兩個維度, 這樣, 既然是兩個維度, 那麼就有四種組合.
異步, 非阻塞: 調用方發起調用直至獲得結果以前這段時間, 能夠作其它事情. 被調函數接收到參數後當即返回, 但在以後的某一個時間點才把運算結果傳遞給調用方. 這提及來很繞口, 舉個栗子, 仍是客服:
能夠看到
還有一個點要給你們介紹到, 就是回調函數. 在上面講過, 異步調用, 須要函數以某種機制, 在運算結果得出以後, 將運算結果傳遞給調用方. 但回調函數又繞了一個彎.
假設沒有回調函數機制, 異步流程就是:
這個流程裏顧客作了兩件事:
而淘寶客服只作了一件事:
而有了回調機制後, 異步流程就是這樣的:
這個流程裏, 顧客作了兩件事:
淘寶客服只作了一件事:
而消息監控方, 也就是祕書, 作了一件事:
這就是回調函數的一個生動的例子, 回調函數機制中有了一個調用結果監控方, 就是祕書, 這個角色承擔着很是重要的職責: 便是在函數返回結果以後, 調用對應的回調函數. 回調機制通常都實如今異步調用框架之中, 對於寫代碼的人來講是透明的, 它簡化了調用方的職責與智力負擔, 必定程度上抽象了代碼邏輯, 簡化了編程模型(注意: 是必定程度上!). 有了回調機制:
不過正所謂回調一時爽, 調試火葬廠. 寫過JavaScript的同窗對這一點必定是深有體會. 當程序不能正確運行的時候, 調試很蛋疼. 異步框架自己因爲函數返回時機不肯定, 調試就比較蛋疼, 再加上回調機制, 那真是火葬廠了. 特別是回調嵌套回調, 裏面套個七八層的時候, 那真是把圖靈從墳裏挖出來也沒用的絕望場景.
咱們先來看一段經典的同步且阻塞的HTTP客戶端程序:
#include <netinet/in.h> // for socketaddr_in #include <sys/socket.h> // for socket functions #include <netdb.h> // for gethostbyname #include <sys/errno.h> // for errno #include <unistd.h> #include <string.h> #include <stdio.h> int main(int argc, char ** argv) { const char query[] = "GET / HTTP/1.0\r\n" "Host: www.baidu.com\r\n" "\r\n"; const char hostname[] = "www.baidu.com"; struct sockaddr_in sin; struct hostent * h; const char * cp; int fd; ssize_t n_written, remaining; char buf[4096]; /* * Look up the IP address for the hostname. * Watch out; this isn't threadsafe on most platforms. */ h = gethostbyname(hostname); if(!h) { fprintf(stderr, "E: gethostbyname(%s) failed. ErrMsg: %s\n", hostname, hstrerror(h_errno)); return -__LINE__; } if(h->h_addrtype != AF_INET) { fprintf(stderr, "E: gethostbyname(%s) returned an non AF_INET address.\n", hostname); return -__LINE__; } /* * Allocate a new socket */ fd = socket(AF_INET, SOCK_STREAM, 0); if(fd < 0) { fprintf(stderr, "E: socket failed: %s\n", strerror(errno)); return -__LINE__; } /* * Connect to the remote host */ sin.sin_family = AF_INET; sin.sin_port = htons(80); sin.sin_addr = *((struct in_addr *)(h->h_addr)); if(connect(fd, (struct sockaddr *)(&sin), sizeof(sin)) != 0) { fprintf(stderr, "E: connect to %s failed: %s\n", hostname, strerror(errno)); close(fd); return -__LINE__; } /* * Write the query * XXX Can send succeed partially? */ cp = query; remaining = strlen(query); while(remaining) { n_written = send(fd, cp, remaining, 0); if(n_written < 0) { fprintf(stderr, "E: send failed: %s\n", strerror(errno)); close(fd); return -__LINE__; } remaining -= n_written; cp += n_written; } /* * Get an answer back */ while(1) { ssize_t result = recv(fd, buf, sizeof(buf), 0); if(result == 0) { break; } else if(result < 0) { fprintf(stderr, "recv failed: %s\n", strerror(errno)); close(fd); return -__LINE__; } fwrite(buf, 1, result, stdout); } close(fd); return 0; }
在上面的示例代碼裏, 大部分有關網絡與IO的函數調用, 都是阻塞式的. 好比gethostbyname
, 在DNS解析成功域名以前是不返回的(或者解析失敗了會返回失敗), connect
函數, 在與對端主機成功創建TCP鏈接以前是不返回的(或者鏈接失敗), 再好比recv
與send
函數, 在成功操做, 或明確失敗以前, 也是不返回的.
阻塞式IO確實比較土, 上面的程序編譯運行的時候, 若是你網絡情況很差, 可能會卡一兩秒纔會讀到百度的首頁, 這卡就是由於阻塞IO的緣故. 固然, 雖然比較土, 但像這樣的場合, 使用阻塞IO是沒什麼問題的. 但假如你想寫一個程序同時讀取兩個網站的首頁的話, 就比較麻煩了: 由於你不知道哪一個網站會先響應你的請求.. 你能夠寫一些, 好比像下面這樣的, 很土的代碼:
char buf[4096]; int i, n; while(i_still_want_to_read()) { for(i = 0; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if(n == 0) { handle_close(fd[i]); } else if(n < 0) { handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); } } }
若是你的fd[]
數組裏有兩個網站的鏈接, fd[0]
接着百度, fd[1]
接着hao123, 假如hao123正常響應了, 能夠從fd[1]
裏讀出數據了, 但百度的服務器被李老闆炸了, 響應不了了, 這時, 上面的代碼就會卡在i==0
時循環裏的n = recv(fd[0], buf, sizeof(buf), 0)
這條語句中, 直到李老闆把服務器修好. 這就很蛋疼.
固然, 你能夠用多線程解決這個問題, 多數狀況下, 你有一個問題, 你嘗試使用多線程解決, 而後你多個了有問題.
上面是一個冷笑話, 多線程或多進程是一個解決方案, 一般狀況下, 最簡單的套路是使用一個線程或進程去創建TCP鏈接, 而後鏈接創建成功後, 爲每一個鏈接建立獨立的線程或進程來進行IO讀寫. 這樣即便一個網站抽風了, 也只阻塞屬於它本身的那個讀寫線程或進程, 不會影響到其它網站的響應.
下面是另一個例子程序, 這是一個服務端程序, 監聽40173端口上的TCP鏈接請求, 而後把客戶端發送的數據按ROT13法再回寫給客戶端, 一次處理一行數據. 這個程序使用Unix上的fork()
函數爲每一個客戶端的鏈接建立一個獨立的處理進程.
#include <netinet/in.h> // for sockaddr_in #include <sys/socket.h> // for socket functions #include <sys/errno.h> // for errno #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #define MAX_LINE 16384 char rot13_char(char c) { if( (c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M') ) { return c+13; } else if( (c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z') ) { return c-13; } else { return c; } } void child(int fd) { char outbuf[MAX_LINE + 1]; // extra byte for '\0' size_t outbuf_used = 0; ssize_t result; while(1) { char ch; result = recv(fd, &ch, 1, 0); if(result == 0) { break; } else if(result == -1) { perror("read"); break; } if(outbuf_used < sizeof(outbuf)) { outbuf[outbuf_used++] = rot13_char(ch); } if(ch == '\n') { send(fd, outbuf, outbuf_used, 0); outbuf_used = 0; continue; } } } void run(void) { int listener; struct sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0) { perror("bind"); return; } if(listen(listener, 16) < 0) { perror("listen"); return; } while(1) { struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr *)(&ss), &slen); if(fd < 0) { perror("accept"); } else { if(fork() == 0) { child(fd); exit(0); } } } } int main(int argc, char ** argv) { run(); return 0; }
你可使用下面的命令行, 經過netcat
工具向本機的40713發送數據, 來試驗一下上面的服務端代碼:
printf "abcdefghijklmnopqrstuvwxyz\n" | nc -4 -w1 localhost 40713
多進程或多線程確實是一個還算是比較優雅的, 應對併發鏈接的解決方案. 這種解決方案的缺陷是: 進程或線程的建立是有開銷的, 在某些平臺上, 這個開銷仍是比較大的. 這裏優化的方案是使用線程, 並使用線程池策略. 若是你的機器須要處理上千上萬的併發鏈接, 這就意味着你須要建立成千上萬個線程, 想象一下, 服務器通常也就十幾個核心, 64個不得了了, 若是有五千併發鏈接, 5000個線程排除輪64個核心的大米, 線程調度確定是個大開銷.
這個時候咱們就須要瞭解一下非阻塞了, 經過下面的Unix調用, 能夠將一個文件描述符設置爲"非阻塞"的. 明確一下: "非阻塞"描述的是IO函數的行爲, 將一個文件描述符設置爲"非阻塞"的, 實際上是指, 在這個文件描述符上執行IO操做, 函數的行爲會變成非阻塞的.
fcntl(fd, F_SETFL, O_NONBLOCK);
當這個文件描述符是socket的文件描述符時, 咱們通常也會直接稱, "把一個socket設置爲非阻塞". 將一個socket設置爲非阻塞以後, 在對應的文件描述符上, 不管是執行網絡編程相關的函數, 仍是執行IO相關的函數, 函數行爲都會變成非阻塞的, 即函數在調用以後就當即返回: 要麼當即返回成功, 要把當即告訴調用者: "暫時不可用, 請稍後再試"
有了非阻塞這種手段, 咱們就能夠改寫咱們的訪問網頁程序了: 咱們這時能夠正確的處理同時下載兩個網站的數據的需求了. 代碼片段以下:
int i, n; char buf[1024]; for(i = 0; i < n_sockets; ++i) { fcntl(fd[i], F_SETFL, O_NONBLOCK); } while(i_still_want_to_read) { for(int i = 0; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if(n == 0) { handle_close(fd[i]); // peer was closed } else if(n < 0) { if(errno == EAGAIN) { // do nothing, the kernel didn't have any data for us to read // retry } else { handle_error(fd[i], errno); } } else { handle_input(fd[i], buf, n); // read success } } }
這樣寫確實解決了問題, 可是, 在對端網站尚未成功響應的那幾百毫秒裏, 這段代碼將會瘋狂的死循環, 會把你的一個核心佔滿. 這是一個很蛋疼的解決方案, 緣由是: 對於真正的數據什麼時候到達, 咱們沒法肯定, 只能開個死循環輪詢.
舊式的改進方案是使用一個叫select()
的系統調用函數. select()
函數內部維護了三個集合:
select()
函數在這三個集合有至少一個集合不爲空的時候返回. 若是三個集合都爲空, 那麼select()
函數將阻塞.
下面是使用select()
改進後的代碼片段:
fd_set readset; int i, n; char buf[1024]; while(i_still_want_to_read) { int maxfd = -1; FD_ZERO(&readset); // add all of the interesting fds to readset for(i = 0; i < n_sockets; ++i) { if(fd[i] > maxfd) { maxfd = fd[i]; } FD_SET(fd[i], &readset): } select(maxfd+1, &readset, NULL, NULL, NULL); for(int i = 0; i < n_sockets; ++i) { if(FDD_ISSET(fd[i], &readset)) { n = recv(fd[i], &readset); if(n == 0) { handle_close(fd[i]); } else if(n < 0) { if(errno == EAGAIN) { // the kernel didn't have any data for us to read } else { handle_error(fd[i], errno); } } else { handle_input(fd[i], buf, n); } } } }
使用select()
改進了程序, 但select()
蛋疼的地方在於: 它只告訴你, 三集合中有數據了, 可是: 哪一個fd可讀, 哪一個fd可寫, 哪一個fd有異常, 這些具體的信息, 它仍是沒告訴你. 若是你的fd數量很少, OK, 上面的代碼沒什麼問題, 但若是你持有着上千個併發鏈接, 那每次select()
返回時, 你都須要把全部fd都輪一遍.
下面是使用select()
調用對rot13服務端示例代碼的重構
#include <netinet/in.h> // for sockaddr_in #include <sys/socket.h> // for socket functions #include <sys/errno.h> // for errno #include <fcntl.h> // for fcntl #include <sys/select.h> // for select #include <assert.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #define MAX_LINE 16384 char rot13_char(char c) { if( (c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M') ) { return c+13; } else if( (c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z') ) { return c-13; } else { return c; } } struct fd_state{ char buffer[MAX_LINE]; size_t buffer_used; int writing; size_t n_written; size_t write_upto; }; struct fd_state * alloc_fd_state(void) { struct fd_state * state = malloc(sizeof(struct fd_state)); if(!state) { return NULL; } state->buffer_used = state->n_written = state->writing = state->write_upto = 0; return state; } void free_fd_state(struct fd_state * state) { free(state); } void make_nonblocking(int fd) { fcntl(fd, F_SETFL, O_NONBLOCK); } int do_read(int fd, struct fd_state * state) { char buf[1024]; int i; ssize_t result; while(1) { result = recv(fd, buf, sizeof(buf), 0); if(result <= 0) { break; } for(int i = 0; i < result; ++i) { if(state->buffer_used < sizeof(state->buffer)) { state->buffer[state->buffer_used++] = rot13_char(buf[i]); } if(buf[i] == '\n') { state->writing = 1; state->write_upto = state->buffer_used; } } } if(result == 0) { return 1; } else if(result < 0) { if(errno == EAGAIN) { return 0; } return -1; } return 0; } int do_write(int fd, struct fd_state * state) { while(state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if(result < 0) { if(errno == EAGAIN) { return 0; } return -1; } assert(result != 0); state->n_written += result; } if(state->n_written == state->buffer_used) { state->n_written = state->write_upto = state->buffer_used = 0; } state->writing = 0; return 0; } void run(void) { int listener; struct fd_state * state[FD_SETSIZE]; struct sockaddr_in sin; int i, maxfd; fd_set readset, writeset, exset; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); for(i = 0; i < FD_SETSIZE; ++i) { state[i] = NULL; } listener = socket(AF_INET, SOCK_STREAM, 0); make_nonblocking(listener); int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0) { perror("bind"); return; } if(listen(listener, 16) < 0) { perror("listen"); return; } FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); while(1) { maxfd = listener; FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); FD_SET(listener, &readset); for(i = 0; i < FD_SETSIZE; ++i) { if(state[i]) { if(i > maxfd) { maxfd = i; } FD_SET(i, &readset); if(state[i]->writing) { FD_SET(i, &writeset); } } } if(select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0) { perror("select"); return; } if(FD_ISSET(listener, &readset)) { struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr *)(&ss), &slen); if(fd < 0) { perror("accept"); } else if(fd > FD_SETSIZE) { close(fd); } else { make_nonblocking(fd); state[fd] = alloc_fd_state(); assert(state[fd]); } } for(i = 0; i < maxfd + 1; ++i) { int r = 0; if(i == listener) { continue; } if(FD_ISSET(i, &readset)) { r = do_read(i, state[i]); } if(r == 0 && FD_ISSET(i, &writeset)) { r = do_write(i, state[i]); } if(r) { free_fd_state(state[i]); state[i] = NULL; close(i); } } } } int main(int argc, char ** argv) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
但這樣還不夠好: FD_SETSIZE
是一個很大的值, 至少不小於1024. 當要監聽的fd的值比較大的時候, 就很噁心, 遍歷會遍歷不少次. 對於非阻塞IO接口來說, select
是一個很粗糙的解決方案, 這個系統調用提供的功能比較薄弱, 只能說是夠用, 但接口確實太屎了, 很差用, 性能也堪優.
不一樣的操做系統平臺上提供了不少select
的替代品, 它們都用於配套非阻塞IO接口來使單線程程序也有必定的併發能力. 這些替代品有poll()
, epoll()
, kqueue()
, evports
和/dev/poll
. 而且這些替代品的性能都比select()
要好的多. 但比較蛋疼的是, 上面提到的全部接口, 幾乎都不是跨平臺的. epoll()
是Linux獨有的, kqueue()
是BSD系列(包括OS X)獨有的. evports
和/dev/poll
是Solaris獨有的. 是的, select()
屬於POSIX標準的一部分, 但就是性能捉急. 也就是說, 若是你寫的程序想跨平臺, 高性能, 你就得本身寫一層抽象, 把不一樣平臺對於IO多路複用的底層統一塊兒來: 這也就是Libevent乾的事情.
libevent的低級API爲IO多路複用提供了統一的接口, 其底層實如今不一樣的操做系統平臺上都是最高效的實現.
下面, 咱們將使用libevent對上面的程序進行重構. 注意: fd_sets
不見了, 取而代之的是一個叫event_base
的結構體.
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (!state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (!state->write_event) { event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[1024]; int i; ssize_t result; while (1) { assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n') { assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; } } } if (result == 0) { free_fd_state(state); } else if (result < 0) { if (errno == EAGAIN) // XXXX use evutil macro return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) // XXX use evutil macro return; free_fd_state(state); return; } assert(result != 0); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 1; event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { // XXXX eagain?? perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); /*XXX err*/ assert(state->write_event); event_add(state->read_event, NULL); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
總之:
evutil_socket_t
類型的使用, 與evutil_make_socket_nonblocking()
函數的使用, 均是爲也跨平臺兼容性. 使用這些類型名與工具函數, 使得在windows平臺上代碼也能跑起來.如今, 你看, 異步IO+事件處理(或者叫多路IO複用), 是單線程單進程程序取得併發能力的最佳途徑, 而libevent則是把多平臺的IO多路複用庫給你抽象統一成一層接口了. 這樣代寫的代碼不須要改動, 就能夠運行在多個平臺上.
這樣就有了三個問題:
epoll
, kqueue
, evport
, select
等. 爲何, 我須要使用libevent呢?epoll``select``evport``kqueue
等都不同.答案在這裏:
select
與epoll
: 當網絡可讀寫時, 通知應用程序去讀去寫. 而windows上IOCP的設計思路是: 當網絡可讀可寫時不通知應用程序, 而是先完成讀與寫, 再通知應用程序, 應用程序直接拿到的就是數據. 當在libevent 2提供的bufferevents
系列接口中, 它將*nix平臺下的設計, 改巴改巴改爲了IOCP式的. 使用這個系列的接口不可避免的, 對*nix平臺有性能損失(這和asio封裝網絡庫是同樣的作法), 但實話講, IOCP式的設計確實對程序員更友好, 代碼可讀性高了很多.總的來講, 你應該在以下的場合使用libevent
下面是使用bufferevents
系列接口, 以IOCP式風格對以前例子代碼的重構, 體驗一下更人性的事件處理方式:
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <event2/buffer.h> #include <event2/bufferevent.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } void readcb(struct bufferevent *bev, void *ctx) { struct evbuffer *input, *output; char *line; size_t n; int i; input = bufferevent_get_input(bev); output = bufferevent_get_output(bev); while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { for (i = 0; i < n; ++i) line[i] = rot13_char(line[i]); evbuffer_add(output, line, n); evbuffer_add(output, "\n", 1); free(line); } if (evbuffer_get_length(input) >= MAX_LINE) { /* Too long; just process what there is and go on so that the buffer * doesn't grow infinitely long. */ char buf[1024]; while (evbuffer_get_length(input)) { int n = evbuffer_remove(input, buf, sizeof(buf)); for (i = 0; i < n; ++i) buf[i] = rot13_char(buf[i]); evbuffer_add(output, buf, n); } evbuffer_add(output, "\n", 1); } } void errorcb(struct bufferevent *bev, short error, void *ctx) { if (error & BEV_EVENT_EOF) { /* connection has been closed, do any clean up here */ /* ... */ } else if (error & BEV_EVENT_ERROR) { /* check errno to see what error occurred */ /* ... */ } else if (error & BEV_EVENT_TIMEOUT) { /* must be a timeout event handle, handle it */ /* ... */ } bufferevent_free(bev); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL, errorcb, NULL); bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); bufferevent_enable(bev, EV_READ|EV_WRITE); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char ** argv) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
說實話也沒人性到哪裏去, 底層庫就是這樣, libevent仍是太基礎了. 算不上十分友好的輪子.
如今咱們要正式介紹Libevent
epoll
, BSD上用kqueue
epoll
要好一點.evutil
通用類型定義, 跨平臺相關的通用定義, 以及一些通用小函數event and event_base
核心模塊. 事件庫. *nix風格的事件模型: 在socket可讀可寫時通知應用程序.bufferevent
對核心事件庫的再一層封裝, IOCP式的事件模型: 在數據已讀已寫後通知應用程序evbuffer
這是bufferevent
模塊內部使用的緩衝區實現.evhttp
簡單的HTTP C/S實現evdns
簡單的 DNS C/S實現evrpc
簡單的 RPC實現總的來講, 做爲使用者, 須要關心的是:
evutil
是須要關心的event and event_base
核心庫的用法便可.bufferevent
和evbuffer
, 對於核心庫event and event_base
, 能夠不關心evhttp
, evdns
, evrpc
, 如無須要, 能夠不用關心如下是在連接你的代碼的時候, 你須要瞭解的二進制庫.
libevent_core
包含event and event_base
, evutil
, evbuffer
, bufferevent
中的全部函數libevent_extra
包含協議相關的函數. 包括 HTTP/DNS/RPC 等. 若是你用不到 evhttp/evdns/evrpc
裏的函數, 那麼這個庫不用連接.libevent
滿清遺老, 包含了上面兩個庫裏的全部函數. 官方不建議在使用libevent 2.0以上的版本時連接這個庫. 這是個懶人庫.libevent_pthreads
若是你編寫多線程應用程序. 那麼這個庫裏包含了基於POSIX線程庫的相關函數實現. 若是你沒有用到libevent中有關的多線程函數, 那麼這個庫不用連接. 之前這些函數是劃分在libevent_core
中的, 後來被單獨割出來了.注意: 這個庫不是全平臺的.libevent_openssl
這個庫裏的與OpenSSL相關的函數實現. 若是你沒有用到libevent中有關OpenSSL的函數, 那麼這個庫不用連接. 之前這些函數也算在libevent_core
中, 最後也割出來了. 注意: 這個庫也不是全平臺的libevent中的頭文件分爲三類, 全部頭文件都位於event2
目錄下. 也就是說在代碼中你應當這樣寫:
#include <event2/xxxx> #include <event2/xxxx> #include <event2/xxxx> #include <event2/xxxx>
具體有哪些頭文件在後續章節會詳細介紹, 目前只介紹這個分類:
_struct.h
官方建議你們使用版本2, 但有時候這個世界就是不那麼讓人舒服, 若是你須要和版本1的歷史代碼打交道, 你能夠參照下面的對照表: 老頭文件與新頭文件的對照表
舊頭文件 | 新頭文件 |
---|---|
event.h | event2/event*.h, event2/buffer*.h, event2/bufferevent*.h, event2/tag*.h |
evdns.h | event2/dns*.h |
evhttp.h | event2/http*.h |
evrpc.h | event2/rpc*.h |
evutil.h | event2/util*.h |
在當前的2.0版本中, 老的舊頭文件實際上是不須要替換的, 這些舊頭文件依然存在. 但仍是建議將他們替換成新頭文件, 由於說不定50年後libevent升級到3.0版本, 這些舊頭文件就被扔了.
另外還有一些點須要你注意:
libevent
, 裏面是libevent的全部實現. 現在這些實現被分割到了 libevent_core
和libevent_extra
兩個庫中.官方對待老版本是這樣建議的:
我對老版本的態度是這樣的: 能幹活就好. 沒有特殊緣由, 我是不會作代碼遷移的. 而且考慮到應用場景, 有時候用老版本也挺好的.
1.4.x版本的libevent被大量項目使用, 其實挺穩定的, 官方不建議使用, 只是官方再也不在1.4版本上再加特性修bug了. 1.4版本最後的一個小版本號就中止在7上不動了. 而對於1.3版本, 確實不該該再碰了.
libevent有幾項全局設定, 若是你須要改動這幾項設定, 那麼確保在代碼初始化的時候設定好值, 一旦你的代碼流程開始了, 調用了第一個libevent中的任何函數, 後續強烈建議不要再更改設定值, 不然會引發不可預知的後果.
libevent默認狀況下將把錯誤與警告日誌寫進stderr
, 而且若是你須要一些libevent內部的調試日誌的話, 也能夠經過更改設定來讓其輸出調試日誌, 以在程序崩潰時提供更多的參考信息. 這些行爲均可以經過自行實現日誌函數進行更改. 下面是libevent相關的日誌接口.
// 如下是日誌級別 #define EVENT_LOG_DEBUG 0 #define EVENT_LOG_MSG 1 #define EVENT_LOG_WARN 2 #define EVENT_LOG_ERR 3 // 如下是已經被廢棄的日誌級別定義 /* Deprecated; see note at the end of this section */ #define _EVENT_LOG_DEBUG EVENT_LOG_DEBUG #define _EVENT_LOG_MSG EVENT_LOG_MSG #define _EVENT_LOG_WARN EVENT_LOG_WARN #define _EVENT_LOG_ERR EVENT_LOG_ERR // 這個是一個函數指針類型別名, 指向日誌輸出函數 // 日誌輸出函數應當是一個雙參, 無返回值的函數, 第一個參數severity爲日誌級別, 第二個參數爲日誌字符串 typedef void (*event_log_cb)(int severity, const char *msg); // 這是用戶自定義設置日誌處理函數的接口. 若是調用該函數時入參設置爲NULL // 則將採用默認行爲 void event_set_log_callback(event_log_cb cb);
好比下面我須要改寫libevent記錄日誌的方式:
#include <event2/event.h> #include <stdio.h> // 丟棄日誌 static void discard_cb(int severity, const char * msg) { // 這個日誌函數內部什麼也不作 } // 將日誌記錄至文件 static FILE * logfile = NULL; static void write_to_file_cb(int severity, const char * msg) { const char * s; if(!logfile) { return; } switch(severity) { case EVENT_LOG_DEBUG: s = "[DBG]"; break; case EVENT_LOG_MSG: s = "[MSG]"; break; case EVENT_LOG_WARN: s = "[WRN]"; break; case EVENT_LOG_ERR: s = "[ERR]"; break; default: s = "[???]"; break; } fprintf(logfile, "[%s][%s][%s] %s\n", __FILE__, __func__, s, msg); } void suppress_logging(void) { event_set_log_callback(discard_cb); } void set_logfile(FILE * f) { logfile = f; event_set_log_callback(write_to_file_cb); }
注意: 在自定義的日誌輸出函數中, 不要調用其它libevent中的函數! 好比, 若是你要把日誌遠程輸出至網絡socket上去, 你還使用了bufferevent來輸出你的日誌, 在目前的libevent版本中, 這會致使一些奇怪的bug. libevent官方也認可這是一個設計上沒有考慮到的點, 這可能在後續版本中被移除, 但截止目前的2.1.8 stable版本, 這個問題都尚未解決. 不要做死.
默認狀況下的日誌級別是EVENT_LOG_MSG
, 也就是說EVENT_LOG_DEBUG
級別的日誌不會調用至日誌輸出函數. 要讓libevent輸出調試級別的日誌, 請使用下面的接口:
#define EVENT_DBG_NONE 0 #define EVENT_DBG_ALL 0xffffffffu // 若是傳入 EVENT_DBG_NONE, 將保持默認狀態: 不輸出調試日誌 // 若是傳入 EVENT_DEG_ALL, 將開啓調試日誌的輸出 void event_enable_debug_logging(ev_uint32_t which);
調試日誌很詳盡, 一般狀況下對於libevent的使用者而言是沒有輸出的必要的. 由於要用到調試級別日誌的場合, 是你百般無奈, 開始懷疑libevent自己有bug的時候. 雖然從宏的命名上, 彷彿還存在着 EVENT_DGB_SOMETHING
這樣, 能夠單獨控制某個模塊的調試日誌輸出的參數, 但實際上並無: 調試日誌要麼全開, 要麼全關. 沒有中間地帶. 官方宣稱可能在後續的版本中細化調試日誌的控制.
而若是你要控制其它日誌級別的輸出與否, 請自行實現日誌輸出函數. 好比忽略掉EVENT_LOG_MSG
級別的日誌之類的. 上面的接口只是控制"若是產生了調試日誌, libevent調用或不調用日誌輸出函數"而已.
上面有關日誌的接口均定義在<event2/event.h>
中.
event_enable_debug_logging()
接口在2.1.1版本以後纔有_DEBUG_LOG_XXX
, 但如今已經廢棄掉了這種定義, 在新版本中請使用不帶下劃線開頭的版本.當libevent檢測到有致命的內部錯誤發生時(好比踩內存了之類的不可恢復的錯誤), 默認行爲是調用exit()
或abort()
. 出現這種狀況99.99的緣由是使用者自身的代碼出現了嚴重的bug, 另外0.01%的緣由是libevent自身有bug.
若是你但願在進程退出以前作點額外的事情, 寫幾行帶fxxk的日誌之類的, libevent提供了相關的入口, 這能夠改寫libevent對待致命錯誤的默認行爲.
typedef void (*event_fatal_cb)(int err); void event_set_fatal_callback(event_fatal_cb cb);
注意, 不要試圖強行恢復這種致命錯誤, 也就是說, 雖然libevent給你提供了這麼個接口, 但不要在註冊的函數中試圖讓進程繼續執行. 由於這個時候libevent內部已經有坑了, 若是繼續強行恢復, 結果是不可預知的. 換個說法: 這個函數應該提供的是臨終遺言, 而不該該試圖救死扶傷.
這個函數也定義在 <event2/event.h>
中, 在2.0.3版本以後可用.
默認狀況下, libevent使用的是標準C庫中的內存管理函數, 即malloc()
, realloc()
, free()
等. libevent容許你使用其它的內存管理庫, 好比tcmalloc
或jemalloc
. 相關接口以下:
void event_set_mem_functions(void *(*malloc_fn)(size_t sz), void *(*realloc_fn)(void *ptr, size_t sz), void (*free_fn)(void *ptr));
接口的第一個參數是內存分配函數指針, 第二個參數是內存重分配函數指針, 第三個參數是內存釋放函數指針.
下面是一個使用的例子:
#include <event2/event.h> #include <sys/types.h> #include <stdlib.h> union alignment { size_t sz; void * ptr; double dbl; }; #define ALIGNMENT sizeof(union alignment) #define OUTPTR(ptr) (((char *)ptr) + ALIGNMENT) #define INPTR(ptr) (((char *)ptr) - ALIGNMENT) static size_t total_allocated = 0; static void * my_malloc(size_t sz) { void * chunk = malloc(sz + ALIGNMENT); if(!chunk) return chunk; total_allocated += sz; *(size_t *)chunk = sz; return OUTPTR(chunk); } static void * my_realloc(void * ptr, size_t sz) { size_t old_size = 0; if(ptr) { ptr = INPTR(ptr); old_size = *(size_t*)ptr; } ptr = realloc(ptr, sz + ALIGNMENT); if(!ptr) { return NULL; } *(size_t *)ptr = sz; total_allocated = total_allocated - old_size + sz; return OUTPTR(ptr); } static void my_free(void * ptr) { ptr = INPTR(ptr); total_allocated -= *(size_t *)ptr; free(ptr); } void start_counting_bytes(void) { event_set_mem_functions( my_malloc, my_realloc, my_free ); }
上面這個例子中, 提供了一種記錄全局內存使用量的簡單方案, 非線程安全.
對於自定義內存管理接口, 須要注意的有:
sz
個字節可用.realloc(NULL, sz)
這種狀況: 即, 使之行爲等同於 malloc(sz)
. 也必須正確處理realloc(ptr, 0)
這種狀況: 即, 使之行爲與free(ptr)
相同且返回NULL.free(NULL)
: 什麼也不作.malloc(0)
: 返回NULL.free()
版本與libevent內部使用的內存管理函數是一致的. 也就是說: 若是要操做libevent相關的內存區域, 請確保相關的內存處理函數和libevent內部使用的內在管理函數是一致的. 或者簡單一點: 若是你決定使用某個內存管理庫, 那麼在整個項目範圍內都使用它, 這樣最簡單, 不容易出亂子. 不然應該盡力避免在外部操做libevent建立的內存區域.event_set_mem_functions()
接口也定義在<event2/event.h>
中, 在2.0.2版本後可用.
須要注意的是: libevent在編譯安裝的時候, 能夠關閉event_set_mem_functions()
這個特性. 若是關閉了這個特性, 而在項目中又使用了這個特性, 那麼在項目編譯時, 編譯將報錯. 若是要檢測當前引入的libevent庫是否啓用了這個功能, 能夠經過檢測宏EVENT_SET_MEM_FUNCTIONS_IMPLEMENTED
宏是否被定義來判斷.
多線程程序設計裏的數據訪問是個大難題. 目前的版本里, libevent支持了多線程編程, 但這個支持法呢, 怎麼講呢, 使用者仍是須要知道很多細節才能正確的寫出多線程應用. libevent中的數據結構分爲三類:
libevent_pthreads
庫連接你的程序, 那麼這些結構的實例在多線程環境中必定的安全的. 你想讓它不安全都沒辦法.雖然libevent爲你寫了一些加鎖解鎖的無聊代碼, 你沒必要要手動爲每一個對象加鎖了, 但libevent仍是須要你指定加鎖的函數. 就像你能夠爲libevent指定其它的內存管理庫同樣. 注意這也是一個全局設定, 請遵循咱們一再強調的使用規則: 進程初始化時就定好, 後續不準再更改.
若是你使用的是POSIX線程庫, 或者標準的windows原生線程庫, 那麼簡單了一些. 設置加解鎖函數只須要一行函數調用, 接口以下:
#ifdef WIN32 int evthread_use_windows_threads(void); #define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED #endif #ifdef _EVENT_HAVE_PTHREADS int evthread_use_pthreads(); #define EVTHREAD_USE_PTHREADS_IMPLEMENTED #endif
這兩個函數在成功時都返回0, 失敗時返回-1.
這只是加解鎖. 但若是你想要自定義的是整個線程庫, 那麼你就須要手動指定以下的函數與結構定義
這裏須要注意的是: libevent並不會爲你寫哪怕一行的多線程代碼, libevent內部也不會去建立線程. 你要使用多線程, OK, 你用哪一種線程庫都行, 沒問題. 但你須要將配套的鎖/條件變量/線程檢測函數以及相關定義告訴libevent, 這樣libevent纔會知道如何在多線程環境中保護本身的實例, 以供你在多線程環境中安全的訪問.
evthread_use_xxx_threads()
以後, 把你本身的鎖函數或者條件變量函數提供給libevent就行了. 注意這種狀況下, 在你的程序的其它地方也須要使用你指定的鎖或條件變量.下面是相關的接口
// 鎖模式是 lock 與 unlock 函數的參數, 它指定了加鎖解鎖時的一些額外信息 // 若是調用 lock 或 unlock 時的鎖都不知足下面的三種模式, 參數傳0便可 #define EVTHREAD_WRITE 0x04 // 鎖模式: 僅對讀寫鎖使用: 獲取或釋放寫鎖 #define EVTHREAD_READ 0x08 // 鎖模式: 僅對讀寫鎖使用: 獲取或釋放讀鎖 #define EVTHREAD_TRY 0x10 // 鎖模式: 僅在加鎖時使用: 僅在能夠當即加鎖的時候纔去加鎖. // 若當前不可加鎖, 則lock函數當即返回失敗, 而不是阻塞 // 鎖類型是 alloc 與 free 函數的參數, 它指定了建立與銷燬的鎖的類型 // 鎖類型能夠是 EVTHREAD_LOCKTYPE_XXX 之一或者爲0 // 全部支持的鎖類型均須要被登記在 supported_locktypes 中, 若是支持多種鎖, 則多個宏之間用 | 連結構成該字段的值 // 當鎖類型爲0時, 指的是普通的, 非遞歸鎖 #define EVTHREAD_LOCKTYPE_RECURSIVE 1 // 鎖類型: 遞歸鎖, 你必須提供一種遞歸鎖給libevent使用 #define EVTHREAD_LOCKTYPE_READWRITE 2 // 鎖類型: 讀寫鎖, 在2.0.4版本以前, libevent內部沒有使用到讀寫鎖 #define EVTHREAD_LOCK_API_VERSION 1 // 將你要用的有關鎖的全部信息放在這個結構裏 struct evthread_lock_callbacks { int lock_api_version; // 必須與宏 EVTHREAD_LOCK_API_VERSION的值一致 unsigned supported_locktypes; // 必須是宏 EVTHREAD_LOCKTYPE_XXX 的或組合, 或爲0 void *(*alloc)(unsigned locktype); // 鎖分配, 須要指定鎖類型 void (*free)(void *lock, unsigned locktype); // 鎖銷燬, 須要指定鎖類型 int (*lock)(unsigned mode, void *lock); // 加鎖, 須要指定鎖模式 int (*unlock)(unsigned mode, void *lock); // 解鎖, 須要指定鎖模式 }; // 調用該函數以設置相關鎖函數 int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *); // 調該函數以設置線程ID檢測函數 void evthread_set_id_callback(unsigned long (*id_fn)(void)); // 將你要用的有關條件變量的全部信息都放在這個結構裏 struct evthread_condition_callbacks { int condition_api_version; void *(*alloc_condition)(unsigned condtype); void (*free_condition)(void *cond); int (*signal_condition)(void *cond, int broadcast); int (*wait_condition)(void *cond, void *lock, const struct timeval *timeout); }; // 經過該函數以設置相關的條件變量函數 int evthread_set_condition_callbacks( const struct evthread_condition_callbacks *);
要探究具體如何使用這些函數, 請看libevent源代碼中的evthread_pthread.c
與evthread_win32.c
文件.
對於大多數普通用戶來講, 只須要調用一下evthread_use_windows_threads()
或evthread_use_pthreads()
就好了.
上面這些函數均定義在 <event2/thread.h>
中. 在2.0.4版本後這些函數纔可用. 2.0.1至2.0.3版本中使用了一些舊接口, event_use_pthreads()
等. 有關條件變量的相關接口直至2.0.7版本纔可用, 引入條件變量是爲了解決以前libevent出現的死鎖問題.
libevent自己能夠被編譯成不支持鎖的二進制庫, 用這種二進制庫連接你的多線程代碼, bomshakalaka, 跑不起來. 這算是個無用知識點.
另外額外注意: 多線程程序, 而且還使用了POSIX線程庫和配套的鎖, 那麼你須要連接libevent_pthreads
. windows平臺則不用.
libevent有一個額外的特性叫"鎖調試", 開啓這種特性後, libevent將把它內部有關鎖的全部調用都再包裝一層, 以檢測/獲取在鎖調用過程當中出現的錯誤, 好比:
若是出現了上述錯誤, 則libevent會致使進程退出, 並附送一個斷言錯誤
要開啓這個特性, 調用下面的接口:
void evthread_enable_lock_debugging(void); #define evthread_enable_lock_debuging() evthread_enable_lock_debugging()
注意, 這也是一個全局設置項, 請遵循: 一次設置, 初始化時就設置, 永不改動的規則.
這個特性在2.0.4版本中開始支持, 當時接口函數名拼寫錯誤了, 少寫了一個g: evthread_enable_lock_debuging()
. 後來在2.1.2版本中把這個錯誤的拼寫修正過來了. 但仍是兼容了以前的錯誤拼寫.
這個特性吧, 很明顯是libevent內部開發時使用的. 如今開放出來估計是考慮到, 若是你的代碼中出現了一個bug是由libevent內部加解鎖失誤致使的, 那麼用個特性能夠定位到libevent內部. 不然你很難把鍋甩給libevent. 固然這種狀況不多見.
libevent是一個比較薄的庫, 薄的好處是性能很好, 壞處是沒有在接口上對使用者作過多的約束. 這就致使一些二把刀使用者常常會錯誤的使用libevent. 常見的智障行爲有:
這種錯誤其實挺難發現的, 爲了解決這個痛點, libevent額外開發了一個新特性: 在發生上述狀況的時候, libevent給你報錯.
但這是一個會額外消耗資源的特性, libevent內部實際上是追蹤了每一個事件結構的初始化與銷燬, 因此僅在開發測試的時候打開它, 發現問題, 解決問題. 在實際部署的時候, 不要使用這個特性. 開啓這個特性的接口以下:
void event_enable_debug_mode(void);
再不厭其煩的講一遍: 全局設定, 初始化時設定, 一次設定, 永不更改.
這個特性開啓後, 也有一個比較蛋疼的事情: 就是若是你的代碼裏大量使用了event_assign()
來建立事件結構, 可能你的程序在這個特性下會OOM掛掉..緣由是: libevent能夠經過對event_new()
和event_free()
的追蹤來檢測事件結構實例是否未被初始化, 或者被屢次初始化, 或者被非法使用. 可是對於event_assign()
拷貝來的事件結構, 這追蹤就無能爲力了, 而且蛋疼的是event_assign()
仍是淺拷貝. 這樣, 若是你的代碼裏大量的使用了event_assign()
, 這就會致使內置的的追蹤功能一旦追上車就下不來了, 完事車太多就OOM掛掉了.
爲了不在這個特性下因爲追蹤event_assign()
建立的事件實例(或許這裏叫實例已經不合適了, 應該叫句柄)而致使程序OOM, 能夠調用下面的函數以解除對這種事件實例的追蹤, 以免OOM
void event_debug_unassign(struct event * ev);
這樣, 調試模式下, 相關的追蹤檢測就會放棄追蹤由event_assign
建立的事件. 因此你看, 這個特性也不是萬能的, 有缺陷, 湊合用吧. 在不開啓調試模式下, 調用event_debug_unassign()
函數沒有任何影響
下面是一個例子:
#include <event2/event.h> #include <event2/event_struct.h> #include <stdlib.h> void cb(evutil_socket_t fd, short what, void *ptr) { struct event *ev = ptr; if (ev) // 經過判斷入參是否爲NULL, 來確認入參攜帶的事件實例是event_new來的仍是event_assign來的 event_debug_unassign(ev); // 若是是event_assign來的, 那麼就放棄對它的追蹤 } /* * 下面是一個簡單的循環, 等待fd1與fd2同時可讀 */ void mainloop(evutil_socket_t fd1, evutil_socket_t fd2, int debug_mode) { struct event_base *base; struct event event_on_stack, *event_on_heap; // 一個是棧上的事件實例, 一個是堆上的事件實例 if (debug_mode) event_enable_debug_mode(); // 開啓調試模式 base = event_base_new(); event_on_heap = event_new(base, fd1, EV_READ, cb, NULL); // 經過event_new來建立堆上的實例, 並把事件回調的入參設置爲NULL event_assign(&event_on_stack, base, fd2, EV_READ, cb, &event_on_stack); // 經過event_assign來初始化棧上的實例, 並把事件回調的入參設置爲事件實例自身的指針 event_add(event_on_heap, NULL); event_add(&event_on_stack, NULL); event_base_dispatch(base); event_free(event_on_heap); event_base_free(base); }
這個例子也寫的比較蛋疼, 湊合看吧.
另外, 調試模式下的詳情調試信息, 只能經過在編譯時額外定義宏USE_DEBUG
來附加. 即在編譯時加上-DUSE_DEBUG
來開啓. 加上這個編譯時的宏定義後, libevent就會輸出一大坨有關其內部流程的詳情日誌, 包括但不限於
這些詳情不能經過調用API的方式開啓或關閉. 而開啓調試模式的API, 在2.0.4版本後纔可用.
接口很簡單, 以下:
#define LIBEVENT_VERSION_NUMBER 0x02000300 #define LIBEVENT_VERSION "2.0.3-alpha" const char *event_get_version(void); // 獲取字符串形式的版本信息 ev_uint32_t event_get_version_number(void); // 獲取值形式的版本信息
值形式的版本信息由一個uint32_t
類型存儲, 從高位到低位, 每8位表明一個版本號. 好比 0x02000300
表明的版本號就是02.00.03.00
. 三級版本號後可能還有一個小版本號, 好比就存在過一個2.0.1.18
的版本
下面是一個在編譯期檢查libevent版本的寫法, 若版本小於2.0.1, 則編譯不經過. 須要注意的是, 編譯期檢查的是宏裏的值, 若是你的項目構建比較混亂, 極可能出現頭文件的版本, 和最終連接的二進制庫的版本不一致的狀況. 因此編譯期檢查也不必定靠譜
#include <event2/event.h> #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000100 #error "This version of Libevent is not supported; Get 2.0.1-alpha or later." #endif int make_sandwich(void) { /* Let's suppose that Libevent 6.0.5 introduces a make-me-a sandwich function. */ #if LIBEVENT_VERSION_NUMBER >= 0x06000500 evutil_make_me_a_sandwich(); return 0; #else return -1; #endif }
下面是一個在運行時檢查libdvent版本的寫法. 檢查運行期的版本是經過函數調用檢查的, 這就保證了返回的版本號必定是連接進的庫的版本號. 這個比較靠譜. 另外須要注意的是, 數值形式的版本號在libevent2.0.1以後才提供. 因此只能比較蠢的用比較字符串的方式去判斷版本號
#include <event2/event.h> #include <string.h> int check_for_old_version(void) { const char *v = event_get_version(); /* This is a dumb way to do it, but it is the only thing that works before Libevent 2.0. */ if (!strncmp(v, "0.", 2) || !strncmp(v, "1.1", 3) || !strncmp(v, "1.2", 3) || !strncmp(v, "1.3", 3)) { printf("Your version of Libevent is very old. If you run into bugs," " consider upgrading.\n"); return -1; } else { printf("Running with Libevent version %s\n", v); return 0; } } int check_version_match(void) { ev_uint32_t v_compile, v_run; v_compile = LIBEVENT_VERSION_NUMBER; v_run = event_get_version_number(); if ((v_compile & 0xffff0000) != (v_run & 0xffff0000)) { printf("Running with a Libevent version (%s) very different from the " "one we were built with (%s).\n", event_get_version(), LIBEVENT_VERSION); return -1; } return 0; }
接口和宏的定義位於 <event2/event.h>
中, 字符串形式的版本號在1.0版本就提供了, 數值形式的版本號直至2.0.1才提供
就算你手動釋放了全部在程序代碼初始化時建立的libevent對象, 在程序退出以前, 也依然有一些內置的, 對使用者不可見的libevent內部實例以及一些全局配置實例存在着, 而且存在在堆區. 通常狀況下不用管它們: 程序都退出了, 釋放不釋放有什麼區別呢? 反正操做系統會幫你清除的. 但有時你想引入一些第三方的分析工具, 好比檢測內存泄漏的工具時, 就會致使這些工具誤報內存泄漏.
你能夠簡單的調一下下面這個函數, 完成一鍵徹底清除:
void libevent_global_shutdown(void);
注意哦: 這個函數不會幫你釋放你本身調用libevent接口建立出來的對象哦! 還沒那麼智能哦!
另外, 很顯然的一點是, 當調用了這個函數以後, 再去調用其它libevent接口, 可能會出現異常哦! 因此沒事不要調用它, 若是你調用它, 那麼必定是自殺前的最後一秒.
函數定義在<event2/event.h>
中, 2.1.1版本後可用