Libevent學習筆記(五) 根據例子學習bufferevent

libevent中提供了一個Hello-world.c 的例子,從這個例子能夠學習libevent是如何使用bufferevent的。linux

這個例子在Sample中windows

 這個例子以前講解過,此次主要看下bufferevent的使用。安全

第一步找到main函數多線程

main函數socket

int main(){
//...
listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin));

//...

    event_base_dispatch(base);async

    evconnlistener_free(listener);
    event_free(signal_event);
    event_base_free(base);函數

    printf("done\n");
    return 0;oop


}

 

 

main函數中調用evconnlistener_new_bind()建立了一個evconnlistener 類型的listener,學習

而後拍發消息,以後釋放各類資源。spa

 

第二步在evconnlistener_new_bind()中調用evconnlistener_new()完成listener屬性設置。

 這個函數裏對evconnlistener_event中base進行回調函數的綁定和參數設置,經過event_assign將evconnlistener_event的 

listener設置讀事件的回調函數,而且經過evconnlistener_enable讓讀回調函數觸發,也就是觸發listener_read_cb。

這裏evconnlister_enable調用的也是結構體註冊的enable具體看代碼吧,調用的是r = lev->ops->enable(lev);

等同於調用event_listener_enable,該函數內部完成event_add。

struct evconnlistener_event {
    struct evconnlistener base;
    struct event listener;
};


struct
evconnlistener * evconnlistener_new(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, evutil_socket_t fd) { struct evconnlistener_event *lev;//開闢evconnlistener_event大小區域 lev = mm_calloc(1, sizeof(struct evconnlistener_event)); if (!lev) return NULL; //lev -> base 表示 evconnlistener //evconnlistener evconnlistener_ops 基本回調參數和回調函數結構體賦值 lev->base.ops = &evconnlistener_event_ops; //evconnlistener_cb 設置爲listener_cb lev->base.cb = cb; //ptr表示event_base 指針 lev->base.user_data = ptr; lev->base.flags = flags; lev->base.refcnt = 1;// lev is evconnlistener_event //lev->listener is event //爲lev->listener設置讀回調函數和讀關注事件,僅進行設置並沒加入event隊列 event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST, listener_read_cb, lev); //實際調用了event_add將事件加入event隊列 evconnlistener_enable(&lev->base); return &lev->base; }

 

 第三步listener_read_cb內部調用accept生成新的socket處理鏈接,調用listener_cb

新的socket做爲參數傳遞給evconnlistener_event中base的回調函數listener_cb

 

static void
listener_read_cb(evutil_socket_t fd, short what, void *p)
{
    struct evconnlistener *lev = p;
    int err;
    evconnlistener_cb cb;
    evconnlistener_errorcb errorcb;
    void *user_data;
    LOCK(lev);
    while (1) {
        //...//cb 就 是  listener_cb
        cb = lev->cb;
        user_data = lev->user_data;
        UNLOCK(lev);
        //觸發了listener_cb
        
        //完成了eventbuffer註冊寫和事件函數  
        cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
            user_data);
        
        LOCK(lev);
        if (lev->refcnt == 1) {
            int freed = listener_decref_and_unlock(lev);
            EVUTIL_ASSERT(freed);
            return;
        }
        --lev->refcnt;
    }
  //...
}

 第四步listener_cb 調用bufferevent_socket_new 生成bufferevent, 而後bufferevent_setcb設置讀寫水位觸發的回調函數,

bufferevent_enable將bufferevent的寫事件加入監聽,即開始檢測寫事件。關閉讀事件,而且向outbuf中寫入MSG

bufferevent_socket_new內部綁定bufferevent的讀寫事件回調函數,讀事件爲bufev->ev_read,綁定了bufferevent_readcb回調函數,

寫事件爲bufev->ev_write,綁定了bufferevent_writecb回調函數。這兩個回調函數和bufferevent的readcb和writecb是不同的,

這兩個函數在對應的讀寫事件激活時才觸發。而readcb和writecb是基於水位線達到閾值纔會觸發。作好區分。bufferevent_socket_new

內部還對bufev->output添加了對調函數bufferevent_socket_outbuf_cb,bufferevent_socket_outbuf_cb內部檢測是否開啓寫事件,

以及是否可寫,若是可寫,一樣將寫事件加入監聽隊列,也就是調用了event_add。bufferevent_socket_new內部解釋完畢了。

bufferevent_setcb設置的是讀寫水位達到閾值後的回調函數,bufferevent_enable內部也是調用了event_add,將讀事件加入監聽隊列。

bufferevent_enable內部調用bufev->be_ops->enable(bufev, impl_events),等同於be_socket_enable,另外bufferevent_write

函數內部調用evbuffer_add,evbuffer_add內部調用了evbuffer_invoke_callbacks,

就會調用綁定在output buffer上的回調函數bufferevent_socket_outbuf_cb。

static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
    struct event_base *base = user_data;
    struct bufferevent *bev;

    bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    if (!bev) {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return;
    }
    //設置寫回調和事件回調
    bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
    bufferevent_enable(bev, EV_WRITE);
    bufferevent_disable(bev, EV_READ);
    //將要發送的內容寫入evbuffer結構
    bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}

 

 

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

  //...//設置bufferevent中   ev_read(event類型)回調函數
    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    //設置bufferevent中   ev_write(event類型)回調函數
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    //爲bufev->output(evbuffer類型)設置回調函數,插入bufferevent->output的callback隊列
    //bufferevent_socket_outbuf_cb回調函數內部將ev_write事件加入事件隊列
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

    evbuffer_freeze(bufev->input, 0);
    evbuffer_freeze(bufev->output, 1);
  //...
return bufev; }

 

 

static int
be_socket_enable(struct bufferevent *bufev, short event)
{
    if (event & EV_READ) {
        if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
            return -1;
    }
    if (event & EV_WRITE) {
        if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
            return -1;
    }
    return 0;
}

 

 

第五步 bufferevent的output中寫入MSG, 而且以前也已經將EV_WRITE事件加入監聽,因此內核檢測到socket可寫,

會通知bufferevent的ev_write,調用綁定在ev_write上的函數bufferevent_writecb。

這是bufferevent內部的寫操做,咱們能夠詳細看一下。以前也有講過bufferevent會將接收到的數據放到input buffer中,

將output buffer中的數據發送。因此以前講過的接口bufferevent_write讓咱們將要發送的數據放到output中,

bufferevent_read能夠從input中讀出

bufferevent接收到的數據。

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;
    //對 bufferevent加鎖,支持多線程安全模式
    _bufferevent_incref_and_lock(bufev);
    //檢測是否帶有超時事件
    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    //判斷是不是鏈接事件
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        //鏈接失敗刪除該事件
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
            //windows狀況下直接運行事件回調函數,而後go done
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
    //linux 下 運行事件回調函數
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
        //檢測是否可寫,不可寫刪除該事件
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }
    //計算bufferevent能寫的最大數量
    atmost = _bufferevent_get_write_max(bufev_p);
    //寫事件掛起了,跳過。
    if (bufev_p->write_suspended)
        goto done;
    
    //output非空
    if (evbuffer_get_length(bufev->output)) {
        //將output的頭打開,從頭部發送
        evbuffer_unfreeze(bufev->output, 1);
        //bufferevent調用寫操做,將outbuffer中的內容發送出去
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        //將output的頭部關閉
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn't indicate
               an EOF. An ECONNRESET might be more typical.
             */
             //寫完了
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;
        //bufferevent減小發送的大小,留下未發送的,下次再發送,由於是PERSIST|WRITE
        //因此會在下次檢測到可寫時候繼續寫
        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    //計算是否將outbuf中的內容發送完,發完了就刪除寫事件
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
     //將buffer中的內容發完,或者低於low 水位,那麼調用用戶註冊的寫回調函數
    //以前註冊在bufev->writecb中的回調函數
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

 

第六步:這個函數內部每次儘量多的發送數據,沒有發送完就下次輪詢繼續發送,直到水位低於或等於

寫數據的低水位,那麼就會觸發bufferevent 低水位寫回調函數。也就是conn_writecb,

在conn_writecb內部檢測output buffer中數據爲空,就釋放該bufferevent。

static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
    struct evbuffer *output = bufferevent_get_output(bev);
    if (evbuffer_get_length(output) == 0) {
        printf("flushed answer\n");
        bufferevent_free(bev);
    }
}

 

 

這就是總體流程,bufferevent內部的流暢看懂便可,咱們只須要使用libevent提供的接口便可。

 

個人公衆號,謝謝關注

相關文章
相關標籤/搜索