轉自:http://blog.sina.com.cn/s/blog_9f1496990102vshz.htmlhtml
原文:http://www.lvtao.net/c/631.htmllinux
Libevent介紹windows
libevent是一個事件觸發的網絡庫,適用於windows、linux、bsd等多種平臺,內部使用select、epoll、kqueue等系統調用管理事件機制。著名分佈式緩存軟件memcached也是libevent based,並且libevent在使用上能夠作到跨平臺,並且根據libevent官方網站上公佈的數據統計,彷佛也有着非凡的性能。緩存
libevent官方網站 http://libevent.org/安全
英文文檔 http://www.wangafu.net/~nickm/libevent-book/服務器
中文文檔 http://www.cppblog.com/mysileng/category/20374.html網絡
Libevent是基於事件的網絡庫。說的通俗點,例如個人客戶端鏈接到服務端屬於一個鏈接的事件,當這個事件觸發的時候就會去處理。多線程
該文章閱讀過程當中,請結合下面的socket例子,可能會更加清晰的理解每個接口的用法。socket
event_base分佈式
1. 建立event_base
event_base是event(事件,後面會講event)的一個集合。event_base中存放你是監聽是否就緒的event。通常狀況下一個線程一個event_base,多個線程的狀況下須要開多個event_base。
event_base主要是用來管理和實現事件的監聽循環。
通常狀況下直接new一個event_base就能夠知足大部分需求了,若是須要配置參數的,能夠參見libevent官網。
建立方法:
struct event_base *event_base_new(void);
銷燬方法:
void event_base_free(struct event_base *base);
從新初始化:
int event_reinit(struct event_base *base);
2. 查看IO模型
IO多路複用模型中,有多種方法能夠供咱們選擇,可是這些模型是在不一樣的平臺下面的: select poll epoll kqueue devpoll evport win32
當咱們建立一個event_base的時候,libevent會自動爲咱們選擇最快的IO多路複用模型,Linux下通常會用epoll模型。
下面這個方法主要是用來獲取IO模型的名稱。
const char *event_base_get_method(const struct event_base *base);
3. 銷燬event_base
void event_base_free(struct event_base *base);
4. 事件循環 event loop
咱們上面說到 event_base是一組event的集合,咱們也能夠將event事件註冊到這個集合中。當須要事件監聽的時候,咱們就須要對這個event_base進行循環。
下面這個函數很是重要,會在內部不斷的循環監聽註冊上來的事件。
int event_base_dispatch(struct event_base *base);
返回值:0 表示成功退出 -1 表示存在錯誤信息。
還能夠用這個方法:
#define EVLOOP_ONCE 0x01 #define EVLOOP_NONBLOCK 0x02 #define EVLOOP_NO_EXIT_ON_EMPTY 0x04 int event_base_loop(struct event_base *base, int flags);
event_base_loop這個方法會比event_base_dispatch這個方法更加靈活一些。
EVLOOP_ONCE: 阻塞直到有一個活躍的event,而後執行完活躍事件的回調就退出。
EVLOOP_NONBLOCK : 不阻塞,檢查哪一個事件準備好,調用優先級最高的那一個,而後退出。
0:若是參數填了0,則只有事件進來的時候纔會調用一次事件的回調函數,比較經常使用
事件循環中止的狀況:
1. event_base中沒有事件event
2. 調用event_base_loopbreak(),那麼事件循環將中止
3. 調用event_base_loopexit(),那麼事件循環將中止
4. 程序錯誤,異常退出
兩個退出的方法:
// 這兩個函數成功返回 0 失敗返回 -1 // 指定在 tv 時間後中止事件循環 // 若是 tv == NULL 那麼將無延時的中止事件循環 int event_base_loopexit(struct event_base *base,const struct timeval *tv); // 當即中止事件循環(而不是無延時的中止) int event_base_loopbreak(struct event_base *base);
兩個方法區別:
1. event_base_loopexit(base, NULL) 若是當前正在爲多個活躍事件調用回調函數,那麼不會當即退出,而是等到全部的活躍事件的回調函數都執行完成後才退出事件循環
2. event_base_loopbreak(base) 若是當前正在爲多個活躍事件調用回調函數,那麼當前正在調用的回調函數會被執行,而後立刻退出事件循環,而並不處理其餘的活躍事件了
5. event_base的例子:
#include <<span>stdio.h> #include <<span>stdlib.h> #include <<span>unistd.h> #include <<span>sys/types.h> #include <<span>sys/socket.h> #include <<span>netinet/in.h> #include <<span>arpa/inet.h> #include <<span>string.h> #include <<span>fcntl.h> #include <<span>event2/event.h> #include <<span>event2/bufferevent.h> int main() { puts("init a event_base!"); struct event_base *base; //定義一個event_base base = event_base_new(); //初始化一個event_base const char *x = event_base_get_method(base); //查看用了哪一個IO多路複用模型,linux一下用epoll printf("METHOD:%s\n", x); int y = event_base_dispatch(base); //事件循環。由於咱們這邊沒有註冊事件,因此會直接退出 event_base_free(base); //銷燬libevent return 1; }
返回:
event 事件
event_base是事件的集合,負責事件的循環,以及集合的銷燬。而event就是event_base中的基本單元:事件。
咱們舉一個簡單的例子來理解事件。例如咱們的socket來進行網絡開發的時候,都會使用accept這個方法來阻塞監聽是否有客戶端socket鏈接上來,若是客戶端鏈接上來,則會建立一個線程用於服務端與客戶端進行數據的交互操做,而服務端會繼續阻塞等待下一個客戶端socket鏈接上來。客戶端鏈接到服務端實際就是一種事件。
1. 建立一個事件event
struct event *event_new(struct event_base *base, evutil_socket_t fd,short what, event_callback_fn cb,void *arg);
參數:
1. base:即event_base
2. fd:文件描述符。
3. what:event關心的各類條件。
4. cb:回調函數。
5. arg:用戶自定義的數據,能夠傳遞到回調函數中去。
libevent是基於事件的,也就是說只有在事件到來的這種條件下才會觸發當前的事件。例如:
1. fd文件描述符已準備好可寫或者可讀
2. fd立刻就準備好可寫和可讀。
3. 超時的狀況 timeout
4. 信號中斷
5. 用戶觸發的事件
what參數 event各類條件:
// 超時 #define EV_TIMEOUT 0x01 // event 相關的文件描述符能夠讀了 #define EV_READ 0x02 // event 相關的文件描述符能夠寫了 #define EV_WRITE 0x04 // 被用於信號檢測(詳見下文) #define EV_SIGNAL 0x08 // 用於指定 event 爲 persistent 持久類型。當事件執行完畢後,不會被刪除,繼續保持pending等待狀態; // 若是是非持久類型,則回調函數執行完畢後,事件就會被刪除,想要從新使用這個事件,就必須將這個事件繼續添加event_add #define EV_PERSIST 0x10 // 用於指定 event 會被邊緣觸發 #define EV_ET 0x20
2. 釋放event_free
真正的釋放event的內存。
void event_free(struct event *event);
event_del 清理event的內存。這個方法並非真正意義上的釋放內存。
當函數會將事件轉爲 非pending和非activing的狀態。
int event_del(struct event *event);
3. 註冊event
該方法將用於向event_base註冊事件。
參數:ev 爲事件指針;tv 爲時間指針。當tv = NULL的時候則無超時時間。
函數返回:0表示成功 -1 表示失敗。
int event_add(struct event *ev, const struct timeval *tv);
tv時間結構例子:
struct timeval five_seconds = {5, 0}; event_add(ev1, &five_seconds);
4.event_assign
event_new每次都會在堆上分配內存。有些場景下並非每次都須要在堆上分配內存的,這個時候咱們就能夠用到event_assign方法。
已經初始化或者處於 pending 的 event,首先須要調用 event_del() 後再調用 event_assign()。這個時候就能夠重用這個event了。
// 此函數用於初始化 event(包括能夠初始化棧上和靜態存儲區中的 event) // event_assign() 和 event_new() 除了 event 參數以外,使用了同樣的參數 // event 參數用於指定一個未初始化的且須要初始化的 event // 函數成功返回 0 失敗返回 -1 int event_assign(struct event *event, struct event_base *base,evutil_socket_t fd, short what,void (*callback)(evutil_socket_t, short, void *), void *arg); // 相似上面的函數,此函數被信號 event 使用 event_assign(event, base, signum, EV_SIGNAL|EV_PERSIST, callback, arg)
5. 信號事件
信號事件也能夠對信號進行事件的處理。用法和event_new相似。只不過處理的是信號而已
// base --- event_base // signum --- 信號,例如 SIGHUP // callback --- 信號出現時調用的回調函數 // arg --- 用戶自定義數據 evsignal_new(base, signum, cb, arg) //將信號 event 註冊到 event_base evsignal_add(ev, tv) // 清理信號 event evsignal_del(ev)
6. event細節
1. 每個事件event都須要經過event_new初始化生成。event_new生成的事件是在堆上分配的內存。
2. 當一個事件經過event_add被註冊到event_base上的時候,這個事件處於pending(等待狀態),當只有有事件進來的時候,event纔會被激活active狀態,相關的回調函數就會被調用。
3. persistent 若是event_new中的what參數選擇了EV_PERSIST,則是持久的類型。持久的類型調用玩回調函數後,會繼續轉爲pending狀態,就會繼續等待事件進來。大部分狀況下會選擇持久類型的事件。
4. 而非持久的類型的事件,調用玩一次以後,就會變成初始化的狀態。這個時候須要調用event_add 繼續將事件註冊到event_base上以後才能使用。
Socket實例
#include #include #include #include #include #include #include #include #include #include #include //讀取客戶端 void do_read(evutil_socket_t fd, short event, void *arg) { //繼續等待接收數據 char buf[1024]; //數據傳送的緩衝區 int len; if ((len = recv(fd, buf, 1024, 0)) > 0) { buf[len] = '\0'; printf("%s\n", buf); if (send(fd, buf, len, 0) < 0) { //將接受到的數據寫回客戶端 perror("write"); } } } //回調函數,用於監聽鏈接進來的客戶端socket void do_accept(evutil_socket_t fd, short event, void *arg) { int client_socketfd;//客戶端套接字 struct sockaddr_in client_addr; //客戶端網絡地址結構體 int in_size = sizeof(struct sockaddr_in); //客戶端socket client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的 if (client_socketfd < 0) { puts("accpet error"); exit(1); } //類型轉換 struct event_base *base_ev = (struct event_base *) arg; //socket發送歡迎信息 char * msg = "Welcome to My socket"; int size = send(client_socketfd, msg, strlen(msg), 0); //建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據 //持久類型,而且將base_ev傳遞到do_read回調函數中去 struct event *ev; ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev); event_add(ev, NULL); } //入口主函數 int main() { int server_socketfd; //服務端socket struct sockaddr_in server_addr; //服務器網絡地址結構體 memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零 server_addr.sin_family = AF_INET; //設置爲IP通訊 server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--容許鏈接到全部本地地址上 server_addr.sin_port = htons(8001); //服務器端口號 //建立服務端套接字 server_socketfd = socket(PF_INET,SOCK_STREAM,0); if (server_socketfd < 0) { puts("socket error"); return 0; } evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用 evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽 //綁定IP if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) { puts("bind error"); return 0; } //監聽,監聽隊列長度 5 listen(server_socketfd, 10); //建立event_base 事件的集合,多線程的話 每一個線程都要初始化一個event_base struct event_base *base_ev; base_ev = event_base_new(); const char *x = event_base_get_method(base_ev); //獲取IO多路複用的模型,linux通常爲epoll printf("METHOD:%s\n", x); //建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端) //將base_ev傳遞到do_accept中的arg參數 struct event *ev; ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev); //註冊事件,使事件處於 pending的等待狀態 event_add(ev, NULL); //事件循環 event_base_dispatch(base_ev); //銷燬event_base event_base_free(base_ev); return 1; }
說明:
1. 必須設置socket爲非阻塞模式,不然就會阻塞在那邊,影響整個程序運行
evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用 vutil_make_socket_nonblocking(server_socketfd); //設置無阻塞
2. 咱們首選創建的事件主要用於監聽客戶端的連入。當客戶端有socket鏈接到服務器端的時候,回調函數do_accept就會去執行;當空閒的時候,這個事件就會是一個pending等待狀態,等待有新的鏈接進來,新的鏈接進來了以後又會繼續執行。
struct event *ev; ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);
3. 在do_accept事件中咱們建立了一個新的事件,這個事件的回調函數是do_read。主要用來循環監聽客戶端上傳的數據。do_read這個方法會一直循環執行,接收到客戶端數據就會進行處理。
//建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據 //持久類型,而且將base_ev傳遞到do_read回調函數中去 struct event *ev; ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev); event_add(ev, NULL);
Bufferevent
上面的socket例子估計通過測試估計你們就會有不少疑問:
1. do_read方法做爲一個事件會一直被循環
2. 當客戶端鏈接斷開的時候,do_read方法仍是在循環,根本不知道客戶端已經斷開socket的鏈接。
3. 須要解決各類粘包和拆包(相關粘包拆包文章)問題
若是要解決這個問題,咱們可能要作大量的工做來維護這些socket的鏈接狀態,讀取狀態。而Libevent的Bufferevent幫咱們解決了這些問題。
Bufferevent主要是用來管理和調度IO事件;而Evbuffer(下面一節會講到)主要用來緩衝網絡IO數據。
Bufferevent目前支持TCP協議,而不知道UDP協議。咱們這邊也只講TCP協議下的Bufferevent的使用。
咱們先看下下面的接口(而後結合下面改進socket的例子,本身動手去實驗一下):
1. 建立Bufferevent API
//建立一個Bufferevent struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);
參數:
base:即event_base
fd:文件描述符。若是是socket的方法,則socket須要設置爲非阻塞的模式。
options:行爲選項,下面是行爲選項內容
1. BEV_OPT_CLOSE_ON_FREE :當 bufferevent 被釋放同時關閉底層(socket 被關閉等) 通常用這個選項
2. BEV_OPT_THREADSAFE :爲 bufferevent 自動分配鎖,這樣可以在多線程環境中安全使用
3. BEV_OPT_DEFER_CALLBACKS : 當設置了此標誌,bufferevent 會延遲它的全部回調(參考前面說的延時回調)
4. BEV_OPT_UNLOCK_CALLBACKS : 若是 bufferevent 被設置爲線程安全的,用戶提供的回調被調用時 bufferevent 的鎖會被持有。若是設置了此選項,Libevent 將在調用你的回調時釋放 bufferevent 的鎖
2. 釋放Bufferevent
void bufferevent_free(struct bufferevent *bev);
若是設置了延時回調BEV_OPT_DEFER_CALLBACKS,則釋放會在延時回調調用了回調函數以後,纔會真正釋放。
3. 設置Bufferevent的回調函數和相關設置
前面咱們說過了,使用了Bufferevent以後,Libevent會幫咱們託管三種事件:1. 讀取事件 2. 寫入事件 3. 處理事件
咱們先看一下回調函數結構:
1. 讀取和寫入的回調函數結構,其中 ctx爲通用傳遞的參數
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
2. 事件回調,即鏈接斷開、錯誤處理等回調。其中ctx爲通用傳遞的參數。
events參數爲事件,用戶能夠在回調函數中拿到這個事件來進行事務處理的判斷:
1. BEV_EVENT_READING 在 bufferevent 上進行讀取操做時出現了一個事件
2. BEV_EVENT_WRITING 在 bufferevent 上進行寫入操做時出現了一個事件
3. BEV_EVENT_ERROR 進行 bufferevent 操做時出錯
4. BEV_EVENT_TIMEOUT 在 bufferevent 上出現了超時
5. BEV_EVENT_EOF 在 bufferevent 上遇到了文件結束符,鏈接斷開
6. BEV_EVENT_CONNECTED 在 bufferevent 上請求鏈接完成了
typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);
3. 在bufferevent上設置回調函數。
bufev:bufferevent_socket_new建立的bufferevent
readcb:讀取事件的回調函數,沒有則能夠爲NULL
writecb:寫入事件的回調函數,沒有則能夠爲NULL
eventcb:事件函數的回調函數,沒有則能夠爲NULL,通常咱們能夠在這裏面判斷鏈接斷開等。
cbarg:公用傳輸的傳遞
經過這個函數,咱們就能夠設置咱們須要的一些回調函數信息。
void bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb, bufferevent_data_cb writecb,bufferevent_event_cb eventcb,void *cbarg);
取回回調函數:
void bufferevent_getcb(struct bufferevent *bufev,bufferevent_data_cb *readcb_ptr,bufferevent_data_cb *writecb_ptr, bufferevent_event_cb *eventcb_ptr,void **cbarg_ptr)
4. 設置Bufferevent事件的類型
bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
5. 水位設置。
水位設置能夠這麼理解,bufferevent至關於一個水位容器,其中參數:
events:EV_READ 則爲設置讀取事件;EV_WRITE 則爲寫入事件。EV_READ | EV_WRITE 爲設置二者的水位。
lowmark:最低水位,默認爲0。這個參數很是重要,例如lowmark設置爲10,則當bufferevent容器中有10個字符的時候纔會去調用readcb這個回調函數。
void bufferevent_setwatermark(struct bufferevent *bufev, short events,size_t lowmark, size_t highmark);
6. 下面能夠看一個設置和回調函數例子:
//建立一個bufferevent struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE); //設置讀取方法和error時候的方法 bufferevent_setcb(bev, read_cb, NULL, error_cb, base_ev); //設置類型 bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST); //設置水位 bufferevent_setwatermark(bev, EV_READ, 0, 0); //讀取事件回調函數 void read_cb(struct bufferevent *bev, void *arg) { #define MAX_LINE 256 char line[MAX_LINE+1]; int n; evutil_socket_t fd = bufferevent_getfd(bev); while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) { line[n] = '\0'; printf("fd=%u, read line: %s\n", fd, line); bufferevent_write(bev, line, n); } puts("haha"); } //寫入事件回調函數 void write_cb(struct bufferevent *bev, void *arg) {} //事件回調 void error_cb(struct bufferevent *bev, short event, void *arg) { evutil_socket_t fd = bufferevent_getfd(bev); printf("fd = %u, ", fd); if (event & BEV_EVENT_TIMEOUT) { printf("Timed out\n"); } else if (event & BEV_EVENT_EOF) { printf("connection closed\n"); } else if (event & BEV_EVENT_ERROR) { printf("some other error\n"); } bufferevent_free(bev); }
4. 輸入輸出相關函數
1. 獲取buffer:
// 獲取到輸入 buffer struct evbuffer *bufferevent_get_input(struct bufferevent *bufev); // 獲取到輸出 buffer struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
2. 寫入和輸出函數,成功返回0,失敗返回-1:
bufev:bufferevent
data:寫入的字符串數據
size:字符長度
//寫入 int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size); //輸出 size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
3. 寫入輸出函數2:
bufev:bufferevent
buf:buffer塊 下面會講到evbuffer的使用
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf); int bufferevent_read_buffer(struct bufferevent *bufev,struct evbuffer *buf);
使用Bufferevent後的Socket例子:
上面咱們已經介紹完了Bufferevent的相關API,能夠看下具體例子。
#include #include #include #include #include #include #include #include #include #include #include void read_cb(struct bufferevent *bev, void *arg) { #define MAX_LINE 256 char line[MAX_LINE+1]; int n; evutil_socket_t fd = bufferevent_getfd(bev); while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) { line[n] = '\0'; printf("fd=%u, read line: %s\n", fd, line); bufferevent_write(bev, line, n); } puts("haha"); } void write_cb(struct bufferevent *bev, void *arg) {} void error_cb(struct bufferevent *bev, short event, void *arg) { evutil_socket_t fd = bufferevent_getfd(bev); printf("fd = %u, ", fd); if (event & BEV_EVENT_TIMEOUT) { printf("Timed out\n"); } else if (event & BEV_EVENT_EOF) { printf("connection closed\n"); } else if (event & BEV_EVENT_ERROR) { printf("some other error\n"); } bufferevent_free(bev); } //回調函數,用於監聽鏈接進來的客戶端socket void do_accept(evutil_socket_t fd, short event, void *arg) { int client_socketfd;//客戶端套接字 struct sockaddr_in client_addr; //客戶端網絡地址結構體 int in_size = sizeof(struct sockaddr_in); //客戶端socket client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的 if (client_socketfd < 0) { puts("accpet error"); exit(1); } //類型轉換 struct event_base *base_ev = (struct event_base *) arg; //socket發送歡迎信息 char * msg = "Welcome to My socket"; int size = send(client_socketfd, msg, strlen(msg), 0); //建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據 //持久類型,而且將base_ev傳遞到do_read回調函數中去 //struct event *ev; //ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev); //event_add(ev, NULL); //建立一個bufferevent struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE); //設置讀取方法和error時候的方法 bufferevent_setcb(bev, read_cb, NULL, error_cb, base_ev); //設置類型 bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST); //設置水位,每次接受10個字符 bufferevent_setwatermark(bev, EV_READ, 0, 10); } //入口主函數 int main() { int server_socketfd; //服務端socket struct sockaddr_in server_addr; //服務器網絡地址結構體 memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零 server_addr.sin_family = AF_INET; //設置爲IP通訊 server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--容許鏈接到全部本地地址上 server_addr.sin_port = htons(8001); //服務器端口號 //建立服務端套接字 server_socketfd = socket(PF_INET,SOCK_STREAM,0); if (server_socketfd < 0) { puts("socket error"); return 0; } evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用 evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽 //綁定IP if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) { puts("bind error"); return 0; } //監聽,監聽隊列長度 5 listen(server_socketfd, 10); //建立event_base 事件的集合,多線程的話 每一個線程都要初始化一個event_base struct event_base *base_ev; base_ev = event_base_new(); const char *x = event_base_get_method(base_ev); //獲取IO多路複用的模型,linux通常爲epoll printf("METHOD:%s\n", x); //建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端) //將base_ev傳遞到do_accept中的arg參數 struct event *ev; ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev); //註冊事件,使事件處於 pending的等待狀態 event_add(ev, NULL); //事件循環 event_base_dispatch(base_ev); //銷燬event_base event_base_free(base_ev); return 1; }
Evbuffer IO緩衝
上面講了Bufferevent主要用於事件的管理和調度IO。而Evbuffer給咱們提供了很是實用的IO緩存工具。
上一個例子中,雖然解決了斷開鏈接、讀取事件等IO管理的工做,可是也是存在缺陷的。
1. 由於TCP粘包拆包的緣由,咱們不知道一次接收到的數據是不是完整的。
2. 咱們沒法根據客戶端傳遞過來的數據來分析客戶端的請求信息。根據上面的問題,咱們可能會考慮設計一個緩衝容器,這個容器主要用來不停得接收客戶端傳遞過來的數據信息,而且要等到信息量接收到必定的程度的時候,咱們對客戶端的信息進行分析處理,最後才能知道客戶端的請求內容。若是本身作這個緩衝容器,恐怕是須要花費不少的時間,而Libevent已經給咱們設計了Evbuffer,咱們能夠直接使用Evbuffer緩衝容器來知足咱們的業務需求。
evbuffer結構:
struct evbuffer{ // 當前有效緩衝區的內存起始地址 u_char *buffer; // 整個分配(realloc)用來緩衝的內存起始地址 u_char *orig_buffer; // origin_buffer和buffer之間的字節數 size_t misalign; // 整個分配用來緩衝的內存字節數 size_t totallen; // 當前有效緩衝區的長度(字節數) size_t off; //回到函數,當緩衝區有變化的時候會被調用 void (*cb)(struct evbuffer *, size_t, size_t, void *); //回調函數的參數 void *cbarg; };
libevent的緩衝是一個連續的內存區域,其處理數據的方式(寫數據和讀數據)更像一個隊列操做方式:從後寫入,從前
讀出。evbuffer分別設置相關指針(一個指標)用於指示讀出位置和寫入位置。其大體結構如圖:
orig_buffer指向由realloc分配的連續內存區域,buffer指向有效數據的內存區域,totallen表示orig_buffer指向的內存
區域的大小,misalign表示buffer相對於orig_buffer的偏移,off表示有效數據的長度。
下面是一些基礎的和最經常使用的API,詳細的API設計,仍是請翻看官方網站:
1. 建立和銷燬Evbuffer
struct evbuffer *evbuffer_new(void); void evbuffer_free(struct evbuffer *buf);
2. 線程鎖
int evbuffer_enable_locking(struct evbuffer *buf, void *lock); void evbuffer_lock(struct evbuffer *buf); void evbuffer_unlock(struct evbuffer *buf);
3. 檢查buffer長度,比較經常使用
size_t evbuffer_get_length(const struct evbuffer *buf);
返回的是buffer中的字節數。
4. 向buffer中添加數據,經常使用
int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);
這個函數添加data處的datalen字節到buf的末尾,成功時返回0,失敗時返回-1。
int evbuffer_expand(struct evbuffer *buf, size_t datlen);
這個函數修改緩衝區的最後一塊,或者添加一個新的塊,使得緩衝區足以容納datlen字節,而不須要更多的內存分配。
int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t size); int evbuffer_prepend_buffer(struct evbuffer *dst, struct evbuffer* src);
除了將數據移動到目標緩衝區前面以外,這兩個函數的行爲分別與evbuffer_add()和evbuffer_add_buffer()相同。
使用這些函數時要小心,永遠不要對與bufferevent共享的evbuffer使用。這些函數是2.0.1-alpha版本新添加的。
5. 刪除和移動buffer中的內容
int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);
evbuffer_remove()函數從buf前面複製和移除datlen字節到data處的內存中。若是可用字節少於datlen,函數複製全部字節。失敗時返回-1,不然返回複製了的字節數。
int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src); int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, size_t datlen);
evbuffer_add_buffer()將src中的全部數據移動到dst末尾,成功時返回0,失敗時返回-1。
evbuffer_remove_buffer()函數從src中移動datlen字節到dst末尾,儘可能少進行復制。若是字節數小於datlen,全部字節被移動。函數返回移動的字節數。
evbuffer_add_buffer()在0.8版本引入;evbuffer_remove_buffer()是2.0.1-alpha版本新增長的。
6. 搜索buffer中的內容,經常使用
struct evbuffer_ptr { ev_ssize_t pos; struct { } _internal; }; struct evbuffer_ptr evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start); struct evbuffer_ptr evbuffer_search_range(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start, const struct evbuffer_ptr *end); struct evbuffer_ptr evbuffer_search_eol(struct evbuffer *buffer, struct evbuffer_ptr *start, size_t *eol_len_out, enum evbuffer_eol_style eol_style);
結構evbuffer_ptr中的pos爲偏移量,若是爲-1則沒查詢到,大於-1,則搜索到了匹配的位置。
1. evbuffer_search()函數在緩衝區中查找含有len個字符的字符串what。函數返回包含字符串位置,或者在沒有找到字符串時包含-1的evbuffer_ptr結構體。若是提供了start參數,則從指定的位置開始搜索;不然,從開始處進行搜索。
2. evbuffer_search_range()函數和evbuffer_search行爲相同,只是它只考慮在end以前出現的what。
3. evbuffer_search_eol()函數像evbuffer_readln()同樣檢測行結束,可是不復制行,而是返回指向行結束符的evbuffer_ptr。若是eol_len_out非空,則它被設置爲EOL字符串長度。
7. 面向行的讀取
不少互聯網協議都是基於行的。evbuffer_readln()函數從evbuffer前面取出一行,用一個新分配的空字符結束的字符串返回這一行。若是n_read_out不是NULL,則它被設置爲返回的字符串的字節數。若是沒有整行供讀取,函數返回空。返回的字符串不包括行結束符。
enum evbuffer_eol_style { EVBUFFER_EOL_ANY, EVBUFFER_EOL_CRLF, EVBUFFER_EOL_CRLF_STRICT, EVBUFFER_EOL_LF, EVBUFFER_EOL_NUL }; char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, enum evbuffer_eol_style eol_style);
1. EVBUFFER_EOL_LF:行尾是單個換行符(也就是\n,ASCII值是0x0A)
2. EVBUFFER_EOL_CRLF_STRICT:行尾是一個回車符,後隨一個換行符(也就是\r\n,ASCII值是0x0D 0x0A)
3. EVBUFFER_EOL_CRLF:行尾是一個可選的回車,後隨一個換行符(也就是說,能夠是\r\n或者\n)。這種格式對於解析基於文本的互聯網協議頗有用,由於標準一般要求\r\n的行結束符,而不遵循標準的客戶端有時候只使用\n。
4. EVBUFFER_EOL_ANY:行尾是任意數量、任意次序的回車和換行符。這種格式不是特別有用。它的存在主要是爲了向後兼容。
例子:
//readline char * rline; size_t len; rline = evbuffer_readln(buf, &len, EVBUFFER_EOL_CRLF); puts("Hello"); if (rline != NULL) { bufferevent_write_buffer(bev, buf); //使用buffer的方式輸出結果 }
8. 複製數據
ev_ssize_t evbuffer_copyout(struct evbuffer *buf, void *data, size_t datlen); ev_ssize_t evbuffer_copyout_from(struct evbuffer *buf, const struct evbuffer_ptr *pos, void *data_out, size_t datlen);
evbuffer_copyout()的行爲與evbuffer_remove()相同,可是它不從緩衝區移除任何數據。也就是說,它從buf前面複製datlen字節到data處的內存中。若是可用字節少於datlen,函數會複製全部字節。失敗時返回-1,不然返回複製的字節數。
若是從緩衝區複製數據太慢,可使用evbuffer_peek()。
使用Evbuffer優化後的例子
#include #include #include #include #include #include #include #include #include #include #include #include #define MAX_LINE 256 void read_cb(struct bufferevent *bev, void *arg) { struct evbuffer *buf = (struct evbuffer *)arg; char line[MAX_LINE+1]; int n; evutil_socket_t fd = bufferevent_getfd(bev); while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) { line[n] = '\0'; //將讀取到的內容放進緩衝區 evbuffer_add(buf, line, n); //搜索匹配緩衝區中是否有==,==號來分隔每次客戶端的請求 const char *x = "=="; struct evbuffer_ptr ptr = evbuffer_search(buf, x, strlen(x), 0); if (ptr.pos != -1) { bufferevent_write_buffer(bev, buf); //使用buffer的方式輸出結果 } } } void write_cb(struct bufferevent *bev, void *arg) {} void error_cb(struct bufferevent *bev, short event, void *arg) { evutil_socket_t fd = bufferevent_getfd(bev); printf("fd = %u, ", fd); if (event & BEV_EVENT_TIMEOUT) { printf("Timed out\n"); } else if (event & BEV_EVENT_EOF) { printf("connection closed\n"); } else if (event & BEV_EVENT_ERROR) { printf("some other error\n"); } //清空緩衝區 struct evbuffer *buf = (struct evbuffer *)arg; evbuffer_free(buf); bufferevent_free(bev); } //回調函數,用於監聽鏈接進來的客戶端socket void do_accept(evutil_socket_t fd, short event, void *arg) { int client_socketfd;//客戶端套接字 struct sockaddr_in client_addr; //客戶端網絡地址結構體 int in_size = sizeof(struct sockaddr_in); //客戶端socket client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的 if (client_socketfd < 0) { puts("accpet error"); exit(1); } //類型轉換 struct event_base *base_ev = (struct event_base *) arg; //socket發送歡迎信息 char * msg = "Welcome to My socket"; int size = send(client_socketfd, msg, strlen(msg), 0); //建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據 //持久類型,而且將base_ev傳遞到do_read回調函數中去 //struct event *ev; //ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev); //event_add(ev, NULL); //建立一個evbuffer,用來緩衝客戶端傳遞過來的數據 struct evbuffer *buf = evbuffer_new(); //建立一個bufferevent struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE); //設置讀取方法和error時候的方法,將buf緩衝區當參數傳遞 bufferevent_setcb(bev, read_cb, NULL, error_cb, buf); //設置類型 bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST); //設置水位 bufferevent_setwatermark(bev, EV_READ, 0, 0); } //入口主函數 int main() { int server_socketfd; //服務端socket struct sockaddr_in server_addr; //服務器網絡地址結構體 memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零 server_addr.sin_family = AF_INET; //設置爲IP通訊 server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--容許鏈接到全部本地地址上 server_addr.sin_port = htons(8001); //服務器端口號 //建立服務端套接字 server_socketfd = socket(PF_INET,SOCK_STREAM,0); if (server_socketfd < 0) { puts("socket error"); return 0; } evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用 evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽 //綁定IP if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) { puts("bind error"); return 0; } //監聽,監聽隊列長度 5 listen(server_socketfd, 10); //建立event_base 事件的集合,多線程的話 每一個線程都要初始化一個event_base struct event_base *base_ev; base_ev = event_base_new(); const char *x = event_base_get_method(base_ev); //獲取IO多路複用的模型,linux通常爲epoll printf("METHOD:%s\n", x); //建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端) //將base_ev傳遞到do_accept中的arg參數 struct event *ev; ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev); //註冊事件,使事件處於 pending的等待狀態 event_add(ev, NULL); //事件循環 event_base_dispatch(base_ev); //銷燬event_base event_base_free(base_ev); return 1; }
Util工具
Libevent還提供一些工具方法。這些方法能夠簡化咱們的開發。
1. 時間處理函數
//建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端) //將base_ev傳遞到do_accept中的arg參數 // 用於加或者減前兩個參數,結果被保存在第三個參數中 #define evutil_timeradd(tvp, uvp, vvp) #define evutil_timersub(tvp, uvp, vvp) // 清除 timeval 將其值設置爲 0 #define evutil_timerclear(tvp) // 判斷 timeval 是否爲 0,若是是 0 返回 false,不然返回 true #define evutil_timerisset(tvp) // 比較兩個 timeval // 使用的時候這樣用: // evutil_timercmp(t1, t2, <=) 含義爲判斷 t1 <= t2 是否成立 // cmp 爲全部的 C 關係操做符 #define evutil_timercmp(tvp, uvp, cmp) // 獲取當前時間並保存到 tv // tz 目前無用 int evutil_gettimeofday(struct timeval *tv, struct timezone *tz);
2. Socket API
// 用於關閉一個 socket int evutil_closesocket(evutil_socket_t s); #define EVUTIL_CLOSESOCKET(s) evutil_closesocket(s) // 返回當前線程的最後一次 socket 操做的錯誤碼 #define EVUTIL_SOCKET_ERROR() // 改變當前 socket 的錯誤碼 #define EVUTIL_SET_SOCKET_ERROR(errcode) // 返回特定的 sock 的錯誤碼 #define evutil_socket_geterror(sock) // 經過 socket 錯誤碼獲取到一個字符串描述 #define evutil_socket_error_to_string(errcode) // 設置 sock 爲非阻塞的 socket int evutil_make_socket_nonblocking(evutil_socket_t sock); // 設置 sock 的地址可重用 int evutil_make_listen_socket_reuseable(evutil_socket_t sock);
3. 字符串
// 它們對應於標準的 snprintf 和 vsnprintf int evutil_snprintf(char *buf, size_t buflen, const char *format, ...); int evutil_vsnprintf(char *buf, size_t buflen, const char *format, va_list ap);
4. 安全的隨機函數
// 此函數將使用隨機的數據填充 n 個字節的 buf void evutil_secure_rng_get_bytes(void *buf, size_t n);
附:一個客戶端例子
#include #include #include #include #include #include #include #include int main() { int client_fd; //定義一個客戶端的SOCKET struct sockaddr_in server_addr; //服務器端 memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零 server_addr.sin_family=AF_INET; //設置爲IP通訊 server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");//服務器IP地址 server_addr.sin_port = htons(8001); //服務器端口號 client_fd = socket(PF_INET, SOCK_STREAM, 0); if (client_fd < 1) { puts("client socket error"); return 0; } int ret = connect(client_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)); if (ret < 0) { puts("client connect error!"); return 0; } char buf[1024]; int len = recv(client_fd, buf, 1024, 0); //等待接收服務器端的數據 buf[len] = '\0'; puts(buf); char *x = "Hello World,saodsadoosadosa==sadsad=="; send(client_fd, x, strlen(x), 0); //發送數據 memset(buf, 0, 1024); int len2 = recv(client_fd, buf, 1024, 0); //繼續接收服務端返回的數據 buf[len2] = '\0'; puts(buf); shutdown(client_fd,2); //關閉socket }