libevent粘包分包解決方案:bufferevent + evbuffer

轉自:http://blog.sina.com.cn/s/blog_9f1496990102vshz.htmlhtml

 

原文:http://www.lvtao.net/c/631.htmllinux

Libevent介紹windows

libevent是一個事件觸發的網絡庫,適用於windows、linux、bsd等多種平臺,內部使用select、epoll、kqueue等系統調用管理事件機制。著名分佈式緩存軟件memcached也是libevent based,並且libevent在使用上能夠作到跨平臺,並且根據libevent官方網站上公佈的數據統計,彷佛也有着非凡的性能。緩存

libevent官方網站 http://libevent.org/安全

英文文檔 http://www.wangafu.net/~nickm/libevent-book/服務器

中文文檔 http://www.cppblog.com/mysileng/category/20374.html網絡

Libevent是基於事件的網絡庫。說的通俗點,例如個人客戶端鏈接到服務端屬於一個鏈接的事件,當這個事件觸發的時候就會去處理。多線程

該文章閱讀過程當中,請結合下面的socket例子,可能會更加清晰的理解每個接口的用法。socket

event_base分佈式

1. 建立event_base

event_base是event(事件,後面會講event)的一個集合。event_base中存放你是監聽是否就緒的event。通常狀況下一個線程一個event_base,多個線程的狀況下須要開多個event_base。

event_base主要是用來管理和實現事件的監聽循環。

通常狀況下直接new一個event_base就能夠知足大部分需求了,若是須要配置參數的,能夠參見libevent官網。

建立方法:

struct event_base *event_base_new(void);

銷燬方法:

void event_base_free(struct event_base *base);

從新初始化:

int event_reinit(struct event_base *base);

2. 查看IO模型

IO多路複用模型中,有多種方法能夠供咱們選擇,可是這些模型是在不一樣的平臺下面的: select  poll  epoll  kqueue  devpoll  evport  win32

當咱們建立一個event_base的時候,libevent會自動爲咱們選擇最快的IO多路複用模型,Linux下通常會用epoll模型。

下面這個方法主要是用來獲取IO模型的名稱。

const char *event_base_get_method(const struct event_base *base);

3. 銷燬event_base

void event_base_free(struct event_base *base);

4. 事件循環 event loop

咱們上面說到 event_base是一組event的集合,咱們也能夠將event事件註冊到這個集合中。當須要事件監聽的時候,咱們就須要對這個event_base進行循環。

下面這個函數很是重要,會在內部不斷的循環監聽註冊上來的事件。

int event_base_dispatch(struct event_base *base);

返回值:0 表示成功退出  -1 表示存在錯誤信息。

還能夠用這個方法:

#define EVLOOP_ONCE             0x01  #define EVLOOP_NONBLOCK         0x02  #define EVLOOP_NO_EXIT_ON_EMPTY 0x04  
  int event_base_loop(struct event_base *base, int flags);

event_base_loop這個方法會比event_base_dispatch這個方法更加靈活一些。

EVLOOP_ONCE: 阻塞直到有一個活躍的event,而後執行完活躍事件的回調就退出。

EVLOOP_NONBLOCK : 不阻塞,檢查哪一個事件準備好,調用優先級最高的那一個,而後退出。

0:若是參數填了0,則只有事件進來的時候纔會調用一次事件的回調函數,比較經常使用

事件循環中止的狀況:

1. event_base中沒有事件event

2. 調用event_base_loopbreak(),那麼事件循環將中止

3. 調用event_base_loopexit(),那麼事件循環將中止

4. 程序錯誤,異常退出

兩個退出的方法:

// 這兩個函數成功返回 0 失敗返回 -1  // 指定在 tv 時間後中止事件循環  // 若是 tv == NULL 那麼將無延時的中止事件循環  int event_base_loopexit(struct event_base *base,const struct timeval *tv);  
// 當即中止事件循環(而不是無延時的中止)  int event_base_loopbreak(struct event_base *base);

兩個方法區別:

1. event_base_loopexit(base, NULL) 若是當前正在爲多個活躍事件調用回調函數,那麼不會當即退出,而是等到全部的活躍事件的回調函數都執行完成後才退出事件循環

2. event_base_loopbreak(base) 若是當前正在爲多個活躍事件調用回調函數,那麼當前正在調用的回調函數會被執行,而後立刻退出事件循環,而並不處理其餘的活躍事件了

5. event_base的例子:

#include <<span>stdio.h>  #include <<span>stdlib.h>  #include <<span>unistd.h>  #include <<span>sys/types.h>   #include <<span>sys/socket.h>          #include <<span>netinet/in.h>          #include <<span>arpa/inet.h>  #include <<span>string.h>  #include <<span>fcntl.h>   #include <<span>event2/event.h>  #include <<span>event2/bufferevent.h>  int main() {  
puts("init a event_base!");  
struct event_base *base; //定義一個event_base  
base = event_base_new(); //初始化一個event_base  
const char *x =  event_base_get_method(base); //查看用了哪一個IO多路複用模型,linux一下用epoll  
printf("METHOD:%s\n", x);  
int y = event_base_dispatch(base); //事件循環。由於咱們這邊沒有註冊事件,因此會直接退出  
event_base_free(base);  //銷燬libevent  
return 1;  
}

返回:

event 事件

event_base是事件的集合,負責事件的循環,以及集合的銷燬。而event就是event_base中的基本單元:事件。

咱們舉一個簡單的例子來理解事件。例如咱們的socket來進行網絡開發的時候,都會使用accept這個方法來阻塞監聽是否有客戶端socket鏈接上來,若是客戶端鏈接上來,則會建立一個線程用於服務端與客戶端進行數據的交互操做,而服務端會繼續阻塞等待下一個客戶端socket鏈接上來。客戶端鏈接到服務端實際就是一種事件。

1. 建立一個事件event

struct event *event_new(struct event_base *base, evutil_socket_t fd,short what, event_callback_fn cb,void *arg);

參數:

1. base:即event_base

2. fd:文件描述符。

3. what:event關心的各類條件。

4. cb:回調函數。

5. arg:用戶自定義的數據,能夠傳遞到回調函數中去。

libevent是基於事件的,也就是說只有在事件到來的這種條件下才會觸發當前的事件。例如:

1. fd文件描述符已準備好可寫或者可讀

2. fd立刻就準備好可寫和可讀。

3. 超時的狀況 timeout

4. 信號中斷

5. 用戶觸發的事件

what參數 event各類條件:

// 超時  #define EV_TIMEOUT 0x01  // event 相關的文件描述符能夠讀了  #define EV_READ 0x02  // event 相關的文件描述符能夠寫了  #define EV_WRITE 0x04  // 被用於信號檢測(詳見下文)  #define EV_SIGNAL 0x08  // 用於指定 event 爲 persistent 持久類型。當事件執行完畢後,不會被刪除,繼續保持pending等待狀態;  // 若是是非持久類型,則回調函數執行完畢後,事件就會被刪除,想要從新使用這個事件,就必須將這個事件繼續添加event_add   #define EV_PERSIST 0x10  // 用於指定 event 會被邊緣觸發  #define EV_ET 0x20

2. 釋放event_free

真正的釋放event的內存。

void event_free(struct event *event);

event_del 清理event的內存。這個方法並非真正意義上的釋放內存。

當函數會將事件轉爲 非pending和非activing的狀態。

int event_del(struct event *event);

3. 註冊event

該方法將用於向event_base註冊事件。

參數:ev 爲事件指針;tv 爲時間指針。當tv = NULL的時候則無超時時間。

函數返回:0表示成功 -1 表示失敗。

int event_add(struct event *ev, const struct timeval *tv);

tv時間結構例子:

struct timeval five_seconds = {5, 0};  
event_add(ev1, &five_seconds);

4.event_assign

event_new每次都會在堆上分配內存。有些場景下並非每次都須要在堆上分配內存的,這個時候咱們就能夠用到event_assign方法。

已經初始化或者處於 pending 的 event,首先須要調用 event_del() 後再調用 event_assign()。這個時候就能夠重用這個event了。

// 此函數用於初始化 event(包括能夠初始化棧上和靜態存儲區中的 event)  // event_assign() 和 event_new() 除了 event 參數以外,使用了同樣的參數  // event 參數用於指定一個未初始化的且須要初始化的 event  // 函數成功返回 0 失敗返回 -1  int event_assign(struct event *event, struct event_base *base,evutil_socket_t fd, short what,void (*callback)(evutil_socket_t, short, void *), void *arg);  
       
// 相似上面的函數,此函數被信號 event 使用  event_assign(event, base, signum, EV_SIGNAL|EV_PERSIST, callback, arg)

5. 信號事件

信號事件也能夠對信號進行事件的處理。用法和event_new相似。只不過處理的是信號而已

// base --- event_base  // signum --- 信號,例如 SIGHUP  // callback --- 信號出現時調用的回調函數  // arg --- 用戶自定義數據  evsignal_new(base, signum, cb, arg)  
       
//將信號 event 註冊到 event_base  evsignal_add(ev, tv)   
       
// 清理信號 event  evsignal_del(ev)

6. event細節

1. 每個事件event都須要經過event_new初始化生成。event_new生成的事件是在堆上分配的內存。

2. 當一個事件經過event_add被註冊到event_base上的時候,這個事件處於pending(等待狀態),當只有有事件進來的時候,event纔會被激活active狀態,相關的回調函數就會被調用。

3. persistent 若是event_new中的what參數選擇了EV_PERSIST,則是持久的類型。持久的類型調用玩回調函數後,會繼續轉爲pending狀態,就會繼續等待事件進來。大部分狀況下會選擇持久類型的事件。

4. 而非持久的類型的事件,調用玩一次以後,就會變成初始化的狀態。這個時候須要調用event_add 繼續將事件註冊到event_base上以後才能使用。

Socket實例

#include   #include   #include   #include       #include       #include       #include      #include   #include    
  #include   #include   
  //讀取客戶端  void do_read(evutil_socket_t fd, short event, void *arg) {  
    //繼續等待接收數據    
    char buf[1024];  //數據傳送的緩衝區      
    int len;    
    if ((len = recv(fd, buf, 1024, 0)) > 0)  {    
        buf[len] = '\0';      
        printf("%s\n", buf);      
        if (send(fd, buf, len, 0) < 0) {    //將接受到的數據寫回客戶端  
            perror("write");      
        }  
    }   
}  
  
  
//回調函數,用於監聽鏈接進來的客戶端socket  void do_accept(evutil_socket_t fd, short event, void *arg) {  
    int client_socketfd;//客戶端套接字      
    struct sockaddr_in client_addr; //客戶端網絡地址結構體     
    int in_size = sizeof(struct sockaddr_in);    
    //客戶端socket    
    client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的    
    if (client_socketfd < 0) {    
        puts("accpet error");    
        exit(1);  
    }    
  
    //類型轉換  
    struct event_base *base_ev = (struct event_base *) arg;  
  
    //socket發送歡迎信息    
    char * msg = "Welcome to My socket";    
    int size = send(client_socketfd, msg, strlen(msg), 0);    
  
    //建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  
    //持久類型,而且將base_ev傳遞到do_read回調函數中去  
    struct event *ev;  
    ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
    event_add(ev, NULL);  
}  
  
  
//入口主函數  int main() {  
  
    int server_socketfd; //服務端socket    
    struct sockaddr_in server_addr;   //服務器網絡地址結構體      
    memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零      
    server_addr.sin_family = AF_INET; //設置爲IP通訊      
    server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--容許鏈接到全部本地地址上      
    server_addr.sin_port = htons(8001); //服務器端口號      
    
    //建立服務端套接字    
    server_socketfd = socket(PF_INET,SOCK_STREAM,0);    
    if (server_socketfd < 0) {    
        puts("socket error");    
        return 0;    
    }    
  
    evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  
    evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽  
    
    //綁定IP    
    if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) {    
        puts("bind error");    
        return 0;    
    }    
  
    //監聽,監聽隊列長度 5    
    listen(server_socketfd, 10);    
      
    //建立event_base 事件的集合,多線程的話 每一個線程都要初始化一個event_base  
    struct event_base *base_ev;  
    base_ev = event_base_new();   
    const char *x =  event_base_get_method(base_ev); //獲取IO多路複用的模型,linux通常爲epoll  
    printf("METHOD:%s\n", x);  
  
    //建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端)  
    //將base_ev傳遞到do_accept中的arg參數  
    struct event *ev;  
    ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);  
  
    //註冊事件,使事件處於 pending的等待狀態  
    event_add(ev, NULL);  
  
    //事件循環  
    event_base_dispatch(base_ev);  
  
    //銷燬event_base  
    event_base_free(base_ev);    
    return 1;  
}

說明:

1. 必須設置socket爲非阻塞模式,不然就會阻塞在那邊,影響整個程序運行

evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  vutil_make_socket_nonblocking(server_socketfd); //設置無阻塞

2. 咱們首選創建的事件主要用於監聽客戶端的連入。當客戶端有socket鏈接到服務器端的時候,回調函數do_accept就會去執行;當空閒的時候,這個事件就會是一個pending等待狀態,等待有新的鏈接進來,新的鏈接進來了以後又會繼續執行。

struct event *ev;  
ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);

3. 在do_accept事件中咱們建立了一個新的事件,這個事件的回調函數是do_read。主要用來循環監聽客戶端上傳的數據。do_read這個方法會一直循環執行,接收到客戶端數據就會進行處理。

//建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  //持久類型,而且將base_ev傳遞到do_read回調函數中去  struct event *ev;  
ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
event_add(ev, NULL);

Bufferevent

上面的socket例子估計通過測試估計你們就會有不少疑問:

1. do_read方法做爲一個事件會一直被循環

2. 當客戶端鏈接斷開的時候,do_read方法仍是在循環,根本不知道客戶端已經斷開socket的鏈接。

3. 須要解決各類粘包和拆包(相關粘包拆包文章)問題

若是要解決這個問題,咱們可能要作大量的工做來維護這些socket的鏈接狀態,讀取狀態。而Libevent的Bufferevent幫咱們解決了這些問題。

Bufferevent主要是用來管理和調度IO事件;而Evbuffer(下面一節會講到)主要用來緩衝網絡IO數據。

Bufferevent目前支持TCP協議,而不知道UDP協議。咱們這邊也只講TCP協議下的Bufferevent的使用。

咱們先看下下面的接口(而後結合下面改進socket的例子,本身動手去實驗一下):

1. 建立Bufferevent API

//建立一個Bufferevent  struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);

參數:

base:即event_base

fd:文件描述符。若是是socket的方法,則socket須要設置爲非阻塞的模式。

options:行爲選項,下面是行爲選項內容

1. BEV_OPT_CLOSE_ON_FREE :當 bufferevent 被釋放同時關閉底層(socket 被關閉等) 通常用這個選項

2. BEV_OPT_THREADSAFE :爲 bufferevent 自動分配鎖,這樣可以在多線程環境中安全使用

3. BEV_OPT_DEFER_CALLBACKS : 當設置了此標誌,bufferevent 會延遲它的全部回調(參考前面說的延時回調)

4. BEV_OPT_UNLOCK_CALLBACKS : 若是 bufferevent 被設置爲線程安全的,用戶提供的回調被調用時 bufferevent 的鎖會被持有。若是設置了此選項,Libevent 將在調用你的回調時釋放 bufferevent 的鎖

2. 釋放Bufferevent

void bufferevent_free(struct bufferevent *bev);

若是設置了延時回調BEV_OPT_DEFER_CALLBACKS,則釋放會在延時回調調用了回調函數以後,纔會真正釋放。

3. 設置Bufferevent的回調函數和相關設置

前面咱們說過了,使用了Bufferevent以後,Libevent會幫咱們託管三種事件:1. 讀取事件  2. 寫入事件  3. 處理事件

咱們先看一下回調函數結構:

1. 讀取和寫入的回調函數結構,其中 ctx爲通用傳遞的參數

typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);

2. 事件回調,即鏈接斷開、錯誤處理等回調。其中ctx爲通用傳遞的參數。

events參數爲事件,用戶能夠在回調函數中拿到這個事件來進行事務處理的判斷:

1. BEV_EVENT_READING   在 bufferevent 上進行讀取操做時出現了一個事件

2. BEV_EVENT_WRITING  在 bufferevent 上進行寫入操做時出現了一個事件

3. BEV_EVENT_ERROR  進行 bufferevent 操做時出錯

4. BEV_EVENT_TIMEOUT  在 bufferevent 上出現了超時

5. BEV_EVENT_EOF  在 bufferevent 上遇到了文件結束符,鏈接斷開

6. BEV_EVENT_CONNECTED 在 bufferevent 上請求鏈接完成了

typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);

3. 在bufferevent上設置回調函數。

bufev:bufferevent_socket_new建立的bufferevent

readcb:讀取事件的回調函數,沒有則能夠爲NULL

writecb:寫入事件的回調函數,沒有則能夠爲NULL

eventcb:事件函數的回調函數,沒有則能夠爲NULL,通常咱們能夠在這裏面判斷鏈接斷開等。

cbarg:公用傳輸的傳遞

經過這個函數,咱們就能夠設置咱們須要的一些回調函數信息。

void bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb,  
bufferevent_data_cb writecb,bufferevent_event_cb eventcb,void *cbarg);

取回回調函數:

void bufferevent_getcb(struct bufferevent *bufev,bufferevent_data_cb *readcb_ptr,bufferevent_data_cb *writecb_ptr,  
bufferevent_event_cb *eventcb_ptr,void **cbarg_ptr)

4. 設置Bufferevent事件的類型

bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);

5. 水位設置。

水位設置能夠這麼理解,bufferevent至關於一個水位容器,其中參數:

events:EV_READ 則爲設置讀取事件;EV_WRITE 則爲寫入事件。EV_READ |  EV_WRITE 爲設置二者的水位。

lowmark:最低水位,默認爲0。這個參數很是重要,例如lowmark設置爲10,則當bufferevent容器中有10個字符的時候纔會去調用readcb這個回調函數。

void bufferevent_setwatermark(struct bufferevent *bufev, short events,size_t lowmark, size_t highmark);

6. 下面能夠看一個設置和回調函數例子:

//建立一個bufferevent  struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE);  
//設置讀取方法和error時候的方法  bufferevent_setcb(bev, read_cb, NULL, error_cb, base_ev);     
//設置類型  bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
//設置水位  bufferevent_setwatermark(bev, EV_READ, 0, 0);  
//讀取事件回調函數  void read_cb(struct bufferevent *bev, void *arg) {  
#define MAX_LINE     256  char line[MAX_LINE+1];  
int n;  
evutil_socket_t fd = bufferevent_getfd(bev);  
while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
line[n] = '\0';  
printf("fd=%u, read line: %s\n", fd, line);  
bufferevent_write(bev, line, n);  
}  
puts("haha");  
}  
//寫入事件回調函數  void write_cb(struct bufferevent *bev, void *arg) {}  
//事件回調  void error_cb(struct bufferevent *bev, short event, void *arg) {  
evutil_socket_t fd = bufferevent_getfd(bev);  
printf("fd = %u, ", fd);  
if (event & BEV_EVENT_TIMEOUT) {  
printf("Timed out\n");  
} else if (event & BEV_EVENT_EOF) {  
printf("connection closed\n");  
} else if (event & BEV_EVENT_ERROR) {  
printf("some other error\n");  
}  
bufferevent_free(bev);  
}

4. 輸入輸出相關函數

1. 獲取buffer:

// 獲取到輸入 buffer  struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);  
// 獲取到輸出 buffer  struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);

2. 寫入和輸出函數,成功返回0,失敗返回-1:

bufev:bufferevent

data:寫入的字符串數據

size:字符長度

//寫入  int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);  
//輸出  size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);

3. 寫入輸出函數2:

bufev:bufferevent

buf:buffer塊  下面會講到evbuffer的使用

int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);  
int bufferevent_read_buffer(struct bufferevent *bufev,struct evbuffer *buf);

使用Bufferevent後的Socket例子:

上面咱們已經介紹完了Bufferevent的相關API,能夠看下具體例子。

#include   #include   #include   #include       #include       #include       #include      #include   #include    
  #include   #include   
  void read_cb(struct bufferevent *bev, void *arg) {  
    #define MAX_LINE    256  
    char line[MAX_LINE+1];  
    int n;  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
        line[n] = '\0';  
        printf("fd=%u, read line: %s\n", fd, line);  
        bufferevent_write(bev, line, n);  
    }  
    puts("haha");  
}  
void write_cb(struct bufferevent *bev, void *arg) {}  
void error_cb(struct bufferevent *bev, short event, void *arg) {  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    printf("fd = %u, ", fd);  
    if (event & BEV_EVENT_TIMEOUT) {  
        printf("Timed out\n");  
    } else if (event & BEV_EVENT_EOF) {  
        printf("connection closed\n");  
    } else if (event & BEV_EVENT_ERROR) {  
        printf("some other error\n");  
    }  
    bufferevent_free(bev);  
}  
  
//回調函數,用於監聽鏈接進來的客戶端socket  void do_accept(evutil_socket_t fd, short event, void *arg) {  
    int client_socketfd;//客戶端套接字      
    struct sockaddr_in client_addr; //客戶端網絡地址結構體     
    int in_size = sizeof(struct sockaddr_in);    
    //客戶端socket    
    client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的    
    if (client_socketfd < 0) {    
        puts("accpet error");    
        exit(1);  
    }    
  
    //類型轉換  
    struct event_base *base_ev = (struct event_base *) arg;  
  
    //socket發送歡迎信息    
    char * msg = "Welcome to My socket";    
    int size = send(client_socketfd, msg, strlen(msg), 0);    
  
    //建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  
    //持久類型,而且將base_ev傳遞到do_read回調函數中去  
    //struct event *ev;  
    //ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
    //event_add(ev, NULL);  
  
    //建立一個bufferevent  
    struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE);  
    //設置讀取方法和error時候的方法  
    bufferevent_setcb(bev, read_cb, NULL, error_cb, base_ev);    
    //設置類型  
    bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
    //設置水位,每次接受10個字符  
    bufferevent_setwatermark(bev, EV_READ, 0, 10);  
}  
  
  
//入口主函數  int main() {  
  
    int server_socketfd; //服務端socket    
    struct sockaddr_in server_addr;   //服務器網絡地址結構體      
    memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零      
    server_addr.sin_family = AF_INET; //設置爲IP通訊      
    server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--容許鏈接到全部本地地址上      
    server_addr.sin_port = htons(8001); //服務器端口號      
    
    //建立服務端套接字    
    server_socketfd = socket(PF_INET,SOCK_STREAM,0);    
    if (server_socketfd < 0) {    
        puts("socket error");    
        return 0;    
    }    
  
    evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  
    evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽  
    
    //綁定IP    
    if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) {    
        puts("bind error");    
        return 0;    
    }    
  
    //監聽,監聽隊列長度 5    
    listen(server_socketfd, 10);    
      
    //建立event_base 事件的集合,多線程的話 每一個線程都要初始化一個event_base  
    struct event_base *base_ev;  
    base_ev = event_base_new();   
    const char *x =  event_base_get_method(base_ev); //獲取IO多路複用的模型,linux通常爲epoll  
    printf("METHOD:%s\n", x);  
  
    //建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端)  
    //將base_ev傳遞到do_accept中的arg參數  
    struct event *ev;  
    ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);  
  
    //註冊事件,使事件處於 pending的等待狀態  
    event_add(ev, NULL);  
  
    //事件循環  
    event_base_dispatch(base_ev);  
  
    //銷燬event_base  
    event_base_free(base_ev);    
    return 1;  
}

Evbuffer IO緩衝

上面講了Bufferevent主要用於事件的管理和調度IO。而Evbuffer給咱們提供了很是實用的IO緩存工具。

上一個例子中,雖然解決了斷開鏈接、讀取事件等IO管理的工做,可是也是存在缺陷的。

1. 由於TCP粘包拆包的緣由,咱們不知道一次接收到的數據是不是完整的。

2. 咱們沒法根據客戶端傳遞過來的數據來分析客戶端的請求信息。根據上面的問題,咱們可能會考慮設計一個緩衝容器,這個容器主要用來不停得接收客戶端傳遞過來的數據信息,而且要等到信息量接收到必定的程度的時候,咱們對客戶端的信息進行分析處理,最後才能知道客戶端的請求內容。若是本身作這個緩衝容器,恐怕是須要花費不少的時間,而Libevent已經給咱們設計了Evbuffer,咱們能夠直接使用Evbuffer緩衝容器來知足咱們的業務需求。

evbuffer結構:

struct evbuffer{  
  // 當前有效緩衝區的內存起始地址  
 u_char *buffer;   
  // 整個分配(realloc)用來緩衝的內存起始地址  
  u_char *orig_buffer;   
  // origin_buffer和buffer之間的字節數  
 size_t misalign;   
  // 整個分配用來緩衝的內存字節數  
 size_t totallen;   
  // 當前有效緩衝區的長度(字節數)  
 size_t off;   
  //回到函數,當緩衝區有變化的時候會被調用  
 void (*cb)(struct evbuffer *, size_t, size_t, void *);  
  //回調函數的參數  
 void *cbarg;   
};

libevent的緩衝是一個連續的內存區域,其處理數據的方式(寫數據和讀數據)更像一個隊列操做方式:從後寫入,從前

讀出。evbuffer分別設置相關指針(一個指標)用於指示讀出位置和寫入位置。其大體結構如圖:

orig_buffer指向由realloc分配的連續內存區域,buffer指向有效數據的內存區域,totallen表示orig_buffer指向的內存

區域的大小,misalign表示buffer相對於orig_buffer的偏移,off表示有效數據的長度。

下面是一些基礎的和最經常使用的API,詳細的API設計,仍是請翻看官方網站:

1.  建立和銷燬Evbuffer

struct evbuffer *evbuffer_new(void);  
void evbuffer_free(struct evbuffer *buf);

2. 線程鎖

int evbuffer_enable_locking(struct evbuffer *buf, void *lock);  
void evbuffer_lock(struct evbuffer *buf);  
void evbuffer_unlock(struct evbuffer *buf);

3. 檢查buffer長度,比較經常使用

size_t evbuffer_get_length(const struct evbuffer *buf);

返回的是buffer中的字節數。

4. 向buffer中添加數據,經常使用

int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);

這個函數添加data處的datalen字節到buf的末尾,成功時返回0,失敗時返回-1。

int evbuffer_expand(struct evbuffer *buf, size_t datlen);

這個函數修改緩衝區的最後一塊,或者添加一個新的塊,使得緩衝區足以容納datlen字節,而不須要更多的內存分配。

int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t size);  
int evbuffer_prepend_buffer(struct evbuffer *dst, struct evbuffer* src);

除了將數據移動到目標緩衝區前面以外,這兩個函數的行爲分別與evbuffer_add()和evbuffer_add_buffer()相同。

使用這些函數時要小心,永遠不要對與bufferevent共享的evbuffer使用。這些函數是2.0.1-alpha版本新添加的。

5. 刪除和移動buffer中的內容

int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);

evbuffer_remove()函數從buf前面複製和移除datlen字節到data處的內存中。若是可用字節少於datlen,函數複製全部字節。失敗時返回-1,不然返回複製了的字節數。

int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src);  
int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst,  
    size_t datlen);

evbuffer_add_buffer()將src中的全部數據移動到dst末尾,成功時返回0,失敗時返回-1。

evbuffer_remove_buffer()函數從src中移動datlen字節到dst末尾,儘可能少進行復制。若是字節數小於datlen,全部字節被移動。函數返回移動的字節數。

evbuffer_add_buffer()在0.8版本引入;evbuffer_remove_buffer()是2.0.1-alpha版本新增長的。

6. 搜索buffer中的內容,經常使用

struct evbuffer_ptr {  
ev_ssize_t pos;  
struct {  
  } _internal;  
};  
struct evbuffer_ptr evbuffer_search(struct evbuffer *buffer,  
const char *what, size_t len, const struct evbuffer_ptr *start);  
struct evbuffer_ptr evbuffer_search_range(struct evbuffer *buffer,  
const char *what, size_t len, const struct evbuffer_ptr *start,  
const struct evbuffer_ptr *end);  
struct evbuffer_ptr evbuffer_search_eol(struct evbuffer *buffer,  
struct evbuffer_ptr *start, size_t *eol_len_out,  
enum evbuffer_eol_style eol_style);

結構evbuffer_ptr中的pos爲偏移量,若是爲-1則沒查詢到,大於-1,則搜索到了匹配的位置。

1. evbuffer_search()函數在緩衝區中查找含有len個字符的字符串what。函數返回包含字符串位置,或者在沒有找到字符串時包含-1的evbuffer_ptr結構體。若是提供了start參數,則從指定的位置開始搜索;不然,從開始處進行搜索。

2. evbuffer_search_range()函數和evbuffer_search行爲相同,只是它只考慮在end以前出現的what。

3. evbuffer_search_eol()函數像evbuffer_readln()同樣檢測行結束,可是不復制行,而是返回指向行結束符的evbuffer_ptr。若是eol_len_out非空,則它被設置爲EOL字符串長度。

7. 面向行的讀取

不少互聯網協議都是基於行的。evbuffer_readln()函數從evbuffer前面取出一行,用一個新分配的空字符結束的字符串返回這一行。若是n_read_out不是NULL,則它被設置爲返回的字符串的字節數。若是沒有整行供讀取,函數返回空。返回的字符串不包括行結束符。

enum evbuffer_eol_style {  
EVBUFFER_EOL_ANY,  
EVBUFFER_EOL_CRLF,  
EVBUFFER_EOL_CRLF_STRICT,  
EVBUFFER_EOL_LF,  
EVBUFFER_EOL_NUL  
};  
char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,  
    enum evbuffer_eol_style eol_style);

1. EVBUFFER_EOL_LF:行尾是單個換行符(也就是\n,ASCII值是0x0A)

2. EVBUFFER_EOL_CRLF_STRICT:行尾是一個回車符,後隨一個換行符(也就是\r\n,ASCII值是0x0D 0x0A)

3. EVBUFFER_EOL_CRLF:行尾是一個可選的回車,後隨一個換行符(也就是說,能夠是\r\n或者\n)。這種格式對於解析基於文本的互聯網協議頗有用,由於標準一般要求\r\n的行結束符,而不遵循標準的客戶端有時候只使用\n。

4. EVBUFFER_EOL_ANY:行尾是任意數量、任意次序的回車和換行符。這種格式不是特別有用。它的存在主要是爲了向後兼容。

例子:

//readline  char * rline;  
size_t len;  
rline = evbuffer_readln(buf, &len, EVBUFFER_EOL_CRLF);  
puts("Hello");  
if (rline != NULL) {  
    bufferevent_write_buffer(bev, buf); //使用buffer的方式輸出結果  }

8. 複製數據

ev_ssize_t evbuffer_copyout(struct evbuffer *buf, void *data, size_t datlen);  
ev_ssize_t evbuffer_copyout_from(struct evbuffer *buf,  
     const struct evbuffer_ptr *pos,  
     void *data_out, size_t datlen);

evbuffer_copyout()的行爲與evbuffer_remove()相同,可是它不從緩衝區移除任何數據。也就是說,它從buf前面複製datlen字節到data處的內存中。若是可用字節少於datlen,函數會複製全部字節。失敗時返回-1,不然返回複製的字節數。

若是從緩衝區複製數據太慢,可使用evbuffer_peek()。

使用Evbuffer優化後的例子

#include   #include   #include   #include       #include       #include       #include      #include   #include    
  #include   #include   #include   #define MAX_LINE    256  
  void read_cb(struct bufferevent *bev, void *arg) {  
    struct evbuffer *buf = (struct evbuffer *)arg;  
    char line[MAX_LINE+1];  
    int n;  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {  
        line[n] = '\0';  
          
        //將讀取到的內容放進緩衝區  
        evbuffer_add(buf, line, n);  
  
        //搜索匹配緩衝區中是否有==,==號來分隔每次客戶端的請求  
        const char *x = "==";  
        struct evbuffer_ptr ptr = evbuffer_search(buf, x, strlen(x), 0);     
        if (ptr.pos != -1) {  
            bufferevent_write_buffer(bev, buf); //使用buffer的方式輸出結果  
        }  
    }  
}  
void write_cb(struct bufferevent *bev, void *arg) {}  
void error_cb(struct bufferevent *bev, short event, void *arg) {  
    evutil_socket_t fd = bufferevent_getfd(bev);  
    printf("fd = %u, ", fd);  
    if (event & BEV_EVENT_TIMEOUT) {  
        printf("Timed out\n");  
    } else if (event & BEV_EVENT_EOF) {  
        printf("connection closed\n");  
    } else if (event & BEV_EVENT_ERROR) {  
        printf("some other error\n");  
    }  
    //清空緩衝區  
    struct evbuffer *buf = (struct evbuffer *)arg;  
    evbuffer_free(buf);  
    bufferevent_free(bev);  
}  
  
//回調函數,用於監聽鏈接進來的客戶端socket  void do_accept(evutil_socket_t fd, short event, void *arg) {  
    int client_socketfd;//客戶端套接字      
    struct sockaddr_in client_addr; //客戶端網絡地址結構體     
    int in_size = sizeof(struct sockaddr_in);    
    //客戶端socket    
    client_socketfd = accept(fd, (struct sockaddr *) &client_addr, ∈_size); //等待接受請求,這邊是阻塞式的    
    if (client_socketfd < 0) {    
        puts("accpet error");    
        exit(1);  
    }    
  
    //類型轉換  
    struct event_base *base_ev = (struct event_base *) arg;  
  
    //socket發送歡迎信息    
    char * msg = "Welcome to My socket";    
    int size = send(client_socketfd, msg, strlen(msg), 0);    
  
    //建立一個事件,這個事件主要用於監聽和讀取客戶端傳遞過來的數據  
    //持久類型,而且將base_ev傳遞到do_read回調函數中去  
    //struct event *ev;  
    //ev = event_new(base_ev, client_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_read, base_ev);  
    //event_add(ev, NULL);  
  
    //建立一個evbuffer,用來緩衝客戶端傳遞過來的數據  
    struct evbuffer *buf = evbuffer_new();  
    //建立一個bufferevent  
    struct bufferevent *bev = bufferevent_socket_new(base_ev, client_socketfd, BEV_OPT_CLOSE_ON_FREE);  
    //設置讀取方法和error時候的方法,將buf緩衝區當參數傳遞  
    bufferevent_setcb(bev, read_cb, NULL, error_cb, buf);    
    //設置類型  
    bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);  
    //設置水位  
    bufferevent_setwatermark(bev, EV_READ, 0, 0);  
}  
  
  
//入口主函數  int main() {  
  
    int server_socketfd; //服務端socket    
    struct sockaddr_in server_addr;   //服務器網絡地址結構體      
    memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零      
    server_addr.sin_family = AF_INET; //設置爲IP通訊      
    server_addr.sin_addr.s_addr = INADDR_ANY;//服務器IP地址--容許鏈接到全部本地地址上      
    server_addr.sin_port = htons(8001); //服務器端口號      
    
    //建立服務端套接字    
    server_socketfd = socket(PF_INET,SOCK_STREAM,0);    
    if (server_socketfd < 0) {    
        puts("socket error");    
        return 0;    
    }    
  
    evutil_make_listen_socket_reuseable(server_socketfd); //設置端口重用  
    evutil_make_socket_nonblocking(server_socketfd); //設置無阻賽  
    
    //綁定IP    
    if (bind(server_socketfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr))<<span>0) {    
        puts("bind error");    
        return 0;    
    }    
  
    //監聽,監聽隊列長度 5    
    listen(server_socketfd, 10);    
      
    //建立event_base 事件的集合,多線程的話 每一個線程都要初始化一個event_base  
    struct event_base *base_ev;  
    base_ev = event_base_new();   
    const char *x =  event_base_get_method(base_ev); //獲取IO多路複用的模型,linux通常爲epoll  
    printf("METHOD:%s\n", x);  
  
    //建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端)  
    //將base_ev傳遞到do_accept中的arg參數  
    struct event *ev;  
    ev = event_new(base_ev, server_socketfd, EV_TIMEOUT|EV_READ|EV_PERSIST, do_accept, base_ev);  
  
    //註冊事件,使事件處於 pending的等待狀態  
    event_add(ev, NULL);  
  
    //事件循環  
    event_base_dispatch(base_ev);  
  
    //銷燬event_base  
    event_base_free(base_ev);    
    return 1;  
}

Util工具

Libevent還提供一些工具方法。這些方法能夠簡化咱們的開發。

1. 時間處理函數

//建立一個事件,類型爲持久性EV_PERSIST,回調函數爲do_accept(主要用於監聽鏈接進來的客戶端)  //將base_ev傳遞到do_accept中的arg參數  // 用於加或者減前兩個參數,結果被保存在第三個參數中  #define evutil_timeradd(tvp, uvp, vvp)   #define evutil_timersub(tvp, uvp, vvp)   // 清除 timeval 將其值設置爲 0  #define evutil_timerclear(tvp)   // 判斷 timeval 是否爲 0,若是是 0 返回 false,不然返回 true  #define evutil_timerisset(tvp)   // 比較兩個 timeval  // 使用的時候這樣用:  // evutil_timercmp(t1, t2, <=) 含義爲判斷 t1 <= t2 是否成立  // cmp 爲全部的 C 關係操做符  #define evutil_timercmp(tvp, uvp, cmp)  // 獲取當前時間並保存到 tv  // tz 目前無用  int evutil_gettimeofday(struct timeval *tv, struct timezone *tz);

2. Socket API

// 用於關閉一個 socket  int evutil_closesocket(evutil_socket_t s);  
#define EVUTIL_CLOSESOCKET(s) evutil_closesocket(s)  // 返回當前線程的最後一次 socket 操做的錯誤碼  #define EVUTIL_SOCKET_ERROR()  // 改變當前 socket 的錯誤碼  #define EVUTIL_SET_SOCKET_ERROR(errcode)  // 返回特定的 sock 的錯誤碼  #define evutil_socket_geterror(sock)  // 經過 socket 錯誤碼獲取到一個字符串描述  #define evutil_socket_error_to_string(errcode)  // 設置 sock 爲非阻塞的 socket  int evutil_make_socket_nonblocking(evutil_socket_t sock);  
// 設置 sock 的地址可重用  int evutil_make_listen_socket_reuseable(evutil_socket_t sock);

3. 字符串

// 它們對應於標準的 snprintf 和 vsnprintf  int evutil_snprintf(char *buf, size_t buflen, const char *format, ...);  
int evutil_vsnprintf(char *buf, size_t buflen, const char *format, va_list ap);

4. 安全的隨機函數

// 此函數將使用隨機的數據填充 n 個字節的 buf  void evutil_secure_rng_get_bytes(void *buf, size_t n);

附:一個客戶端例子

#include     #include         #include         #include        #include     #include     #include     #include         int main() {      
int client_fd; //定義一個客戶端的SOCKET      struct sockaddr_in server_addr; //服務器端        memset(&server_addr,0,sizeof(server_addr)); //數據初始化--清零    server_addr.sin_family=AF_INET; //設置爲IP通訊          server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");//服務器IP地址        server_addr.sin_port = htons(8001); //服務器端口號          client_fd = socket(PF_INET, SOCK_STREAM, 0);      
if (client_fd < 1) {  
puts("client socket error");       
return 0;     
}       
     int ret = connect(client_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));   
if (ret < 0) {        
puts("client connect error!");     
return 0;     
}       
char buf[1024];       
int len = recv(client_fd, buf, 1024, 0); //等待接收服務器端的數據     buf[len] = '\0';    
puts(buf);      
char *x = "Hello World,saodsadoosadosa==sadsad==";    
send(client_fd, x, strlen(x), 0); //發送數據     memset(buf, 0, 1024);  
int len2 = recv(client_fd, buf, 1024, 0); //繼續接收服務端返回的數據   buf[len2] = '\0';   
puts(buf);      
shutdown(client_fd,2); //關閉socket    }
相關文章
相關標籤/搜索