libevent中提供了一個Hello-world.c 的例子,從這個例子能夠學習libevent是如何使用bufferevent的。linux
這個例子在Sample中windows
這個例子以前講解過,此次主要看下bufferevent的使用。安全
第一步找到main函數多線程
main函數socket
int main(){
//...
listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin));
//...
event_base_dispatch(base);async
evconnlistener_free(listener);
event_free(signal_event);
event_base_free(base);函數
printf("done\n");
return 0;oop
}
main函數中調用evconnlistener_new_bind()建立了一個evconnlistener 類型的listener,學習
而後拍發消息,以後釋放各類資源。spa
第二步在evconnlistener_new_bind()中調用evconnlistener_new()完成listener屬性設置。
這個函數裏對evconnlistener_event中base進行回調函數的綁定和參數設置,經過event_assign將evconnlistener_event的
listener設置讀事件的回調函數,而且經過evconnlistener_enable讓讀回調函數觸發,也就是觸發listener_read_cb。
這裏evconnlister_enable調用的也是結構體註冊的enable具體看代碼吧,調用的是r = lev->ops->enable(lev);
等同於調用event_listener_enable,該函數內部完成event_add。
struct evconnlistener_event {
struct evconnlistener base;
struct event listener;
};
struct evconnlistener * evconnlistener_new(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, evutil_socket_t fd) { struct evconnlistener_event *lev;//開闢evconnlistener_event大小區域 lev = mm_calloc(1, sizeof(struct evconnlistener_event)); if (!lev) return NULL; //lev -> base 表示 evconnlistener //evconnlistener evconnlistener_ops 基本回調參數和回調函數結構體賦值 lev->base.ops = &evconnlistener_event_ops; //evconnlistener_cb 設置爲listener_cb lev->base.cb = cb; //ptr表示event_base 指針 lev->base.user_data = ptr; lev->base.flags = flags; lev->base.refcnt = 1;// lev is evconnlistener_event //lev->listener is event //爲lev->listener設置讀回調函數和讀關注事件,僅進行設置並沒加入event隊列 event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST, listener_read_cb, lev); //實際調用了event_add將事件加入event隊列 evconnlistener_enable(&lev->base); return &lev->base; }
第三步listener_read_cb內部調用accept生成新的socket處理鏈接,調用listener_cb
新的socket做爲參數傳遞給evconnlistener_event中base的回調函數listener_cb
static void listener_read_cb(evutil_socket_t fd, short what, void *p) { struct evconnlistener *lev = p; int err; evconnlistener_cb cb; evconnlistener_errorcb errorcb; void *user_data; LOCK(lev); while (1) { //...//cb 就 是 listener_cb cb = lev->cb; user_data = lev->user_data; UNLOCK(lev); //觸發了listener_cb //完成了eventbuffer註冊寫和事件函數 cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen, user_data); LOCK(lev); if (lev->refcnt == 1) { int freed = listener_decref_and_unlock(lev); EVUTIL_ASSERT(freed); return; } --lev->refcnt; } //... }
第四步listener_cb 調用bufferevent_socket_new 生成bufferevent, 而後bufferevent_setcb設置讀寫水位觸發的回調函數,
bufferevent_enable將bufferevent的寫事件加入監聽,即開始檢測寫事件。關閉讀事件,而且向outbuf中寫入MSG
bufferevent_socket_new內部綁定bufferevent的讀寫事件回調函數,讀事件爲bufev->ev_read,綁定了bufferevent_readcb回調函數,
寫事件爲bufev->ev_write,綁定了bufferevent_writecb回調函數。這兩個回調函數和bufferevent的readcb和writecb是不同的,
這兩個函數在對應的讀寫事件激活時才觸發。而readcb和writecb是基於水位線達到閾值纔會觸發。作好區分。bufferevent_socket_new
內部還對bufev->output添加了對調函數bufferevent_socket_outbuf_cb,bufferevent_socket_outbuf_cb內部檢測是否開啓寫事件,
以及是否可寫,若是可寫,一樣將寫事件加入監聽隊列,也就是調用了event_add。bufferevent_socket_new內部解釋完畢了。
bufferevent_setcb設置的是讀寫水位達到閾值後的回調函數,bufferevent_enable內部也是調用了event_add,將讀事件加入監聽隊列。
bufferevent_enable內部調用bufev->be_ops->enable(bufev, impl_events),等同於be_socket_enable,另外bufferevent_write
函數內部調用evbuffer_add,evbuffer_add內部調用了evbuffer_invoke_callbacks,
就會調用綁定在output buffer上的回調函數bufferevent_socket_outbuf_cb。
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { struct event_base *base = user_data; struct bufferevent *bev; bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(base); return; } //設置寫回調和事件回調 bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL); bufferevent_enable(bev, EV_WRITE); bufferevent_disable(bev, EV_READ); //將要發送的內容寫入evbuffer結構 bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); }
struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) { struct bufferevent_private *bufev_p; struct bufferevent *bufev; //...//設置bufferevent中 ev_read(event類型)回調函數 event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); //設置bufferevent中 ev_write(event類型)回調函數 event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); //爲bufev->output(evbuffer類型)設置回調函數,插入bufferevent->output的callback隊列 //bufferevent_socket_outbuf_cb回調函數內部將ev_write事件加入事件隊列 evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); evbuffer_freeze(bufev->input, 0); evbuffer_freeze(bufev->output, 1);
//... return bufev; }
static int be_socket_enable(struct bufferevent *bufev, short event) { if (event & EV_READ) { if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1) return -1; } if (event & EV_WRITE) { if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1) return -1; } return 0; }
第五步 bufferevent的output中寫入MSG, 而且以前也已經將EV_WRITE事件加入監聽,因此內核檢測到socket可寫,
會通知bufferevent的ev_write,調用綁定在ev_write上的函數bufferevent_writecb。
這是bufferevent內部的寫操做,咱們能夠詳細看一下。以前也有講過bufferevent會將接收到的數據放到input buffer中,
將output buffer中的數據發送。因此以前講過的接口bufferevent_write讓咱們將要發送的數據放到output中,
bufferevent_read能夠從input中讀出
bufferevent接收到的數據。
static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); int res = 0; short what = BEV_EVENT_WRITING; int connected = 0; ev_ssize_t atmost = -1; //對 bufferevent加鎖,支持多線程安全模式 _bufferevent_incref_and_lock(bufev); //檢測是否帶有超時事件 if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the * timeout, since a read has occurred */ what |= BEV_EVENT_TIMEOUT; goto error; } //判斷是不是鏈接事件 if (bufev_p->connecting) { int c = evutil_socket_finished_connecting(fd); /* we need to fake the error if the connection was refused * immediately - usually connection to localhost on BSD */ if (bufev_p->connection_refused) { bufev_p->connection_refused = 0; c = -1; } if (c == 0) goto done; bufev_p->connecting = 0; //鏈接失敗刪除該事件 if (c < 0) { event_del(&bufev->ev_write); event_del(&bufev->ev_read); _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); goto done; } else { connected = 1; //windows狀況下直接運行事件回調函數,而後go done #ifdef WIN32 if (BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); bufferevent_async_set_connected(bufev); _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); goto done; } #endif //linux 下 運行事件回調函數 _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); //檢測是否可寫,不可寫刪除該事件 if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) { event_del(&bufev->ev_write); goto done; } } } //計算bufferevent能寫的最大數量 atmost = _bufferevent_get_write_max(bufev_p); //寫事件掛起了,跳過。 if (bufev_p->write_suspended) goto done; //output非空 if (evbuffer_get_length(bufev->output)) { //將output的頭打開,從頭部發送 evbuffer_unfreeze(bufev->output, 1); //bufferevent調用寫操做,將outbuffer中的內容發送出去 res = evbuffer_write_atmost(bufev->output, fd, atmost); //將output的頭部關閉 evbuffer_freeze(bufev->output, 1); if (res == -1) { int err = evutil_socket_geterror(fd); if (EVUTIL_ERR_RW_RETRIABLE(err)) goto reschedule; what |= BEV_EVENT_ERROR; } else if (res == 0) { /* eof case XXXX Actually, a 0 on write doesn't indicate an EOF. An ECONNRESET might be more typical. */ //寫完了 what |= BEV_EVENT_EOF; } if (res <= 0) goto error; //bufferevent減小發送的大小,留下未發送的,下次再發送,由於是PERSIST|WRITE //因此會在下次檢測到可寫時候繼續寫 _bufferevent_decrement_write_buckets(bufev_p, res); } //計算是否將outbuf中的內容發送完,發完了就刪除寫事件 if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } /* * Invoke the user callback if our buffer is drained or below the * low watermark. */ //將buffer中的內容發完,或者低於low 水位,那麼調用用戶註冊的寫回調函數 //以前註冊在bufev->writecb中的回調函數 if ((res || !connected) && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { _bufferevent_run_writecb(bufev); } goto done; reschedule: if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } goto done; error: bufferevent_disable(bufev, EV_WRITE); _bufferevent_run_eventcb(bufev, what); done: _bufferevent_decref_and_unlock(bufev); }
第六步:這個函數內部每次儘量多的發送數據,沒有發送完就下次輪詢繼續發送,直到水位低於或等於
寫數據的低水位,那麼就會觸發bufferevent 低水位寫回調函數。也就是conn_writecb,
在conn_writecb內部檢測output buffer中數據爲空,就釋放該bufferevent。
static void conn_writecb(struct bufferevent *bev, void *user_data) { struct evbuffer *output = bufferevent_get_output(bev); if (evbuffer_get_length(output) == 0) { printf("flushed answer\n"); bufferevent_free(bev); } }
這就是總體流程,bufferevent內部的流暢看懂便可,咱們只須要使用libevent提供的接口便可。
個人公衆號,謝謝關注