windows環境libevent搭建和demo分析

libevent框架以前有作過度析,此次是談談如何將libevent搭建在vs工做環境下,linux

而且編寫一個demo進行測試。測試過程當中會再一次帶你們分析消息是怎麼傳遞windows

的。數組

個人libevent版本libevent-2.0.22-stable,用對應的vs命令工具進入該目錄網絡

個人是Visual Studio 2008版本的Command Promptapp

 

執行成功後在libevent目錄下生成三個lib框架

以後用vs建立控制檯項目socket

 

生成成功後在項目目錄裏建立Include和Lib兩個文件夾函數

 

分別進入libevent這兩個目錄裏邊工具

將內部的全部文件拷貝到Include文件夾裏,event內容重複能夠合併oop

咱們項目目錄Include文件夾下的內容爲

將libevent庫中的三個lib拷貝到項目的Lib文件夾裏

下一步配置項目屬性,完成編譯

一、配置頭文件包含路徑,C++/General/Additional Include Directories  配置爲相對路徑的Include(因配置的路徑不一樣而異)

二、配置代碼生成

C/C++ /Code Generation RuntimeLibrary 設置爲MTD,由於庫的生成是按照這個MTD模式生成的,因此要匹配

三、配置 C/C++ /Advanced/Compile As Compile as C++ Code (/TP) (由於個人工程用到C++的函數因此配置這個)

網上有人推薦配置成TC的也能夠,本身根據項目須要

 

四、配置庫目錄

Linker/General/Additional Library Directories   ..\Lib(根據本身的Lib文件夾和項目相對位置填寫)

 

5配置 Linker\Input\AdditionalLibraries    ws2_32.lib;wsock32.lib;libevent.lib;libevent_core.lib;libevent_extras.lib;

 

6 配置忽略項,能夠不配置

輸入\忽略特定默認庫 libc.lib;msvcrt.lib;libcd.lib;libcmtd.lib;msvcrtd.lib;%(IgnoreSpecificDefaultLibraries)

 

生成lib後,不帶調試信息,沒法單步進函數裏,因此要修改腳本:Makefile.nmake第二行

CFLAGS=$(CFLAGS) /Od /W3 /wd4996 /nologo /Zi

 

到此爲止項目配置好了,咱們來寫相關的demo代碼

主函數

 

int main(int argc, char **argv)
{
    struct event_base *base;
    struct evconnlistener *listener;
    struct event *signal_event;

    struct sockaddr_in sin;

#ifdef WIN32
    WSADATA wsa_data;
    WSAStartup(0x0201, &wsa_data);
#endif
    //建立event_base
    base = event_base_new();
    if (!base) 
    {
        fprintf(stderr, "Could not initialize libevent!\n");
        return 1;
    }

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);
    sin.sin_addr.s_addr = inet_addr("192.168.1.99");
    //std::string ipstr = inet_ntoa(sin.sin_addr);
    //std::cout << ipstr.c_str();
    
    //基於eventbase 生成listen描述符並綁定
    //設置了listener_cb回調函數,當有新的鏈接登陸的時候
    //觸發listener_cb
    listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin,
        sizeof(sin));

    if (!listener) 
    {
        fprintf(stderr, "Could not create a listener!\n");
        return 1;
    }
    
    //設置終端信號,當程序收到SIGINT後調用signal_cb
    signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);

    if (!signal_event || event_add(signal_event, NULL)<0) 
    {
        fprintf(stderr, "Could not create/add a signal event!\n");
        return 1;
    }
    //event_base消息派發
    event_base_dispatch(base);

    //釋放生成的evconnlistener
    evconnlistener_free(listener);
    //釋放生成的信號事件
    event_free(signal_event);
    //釋放event_base
    event_base_free(base);

    printf("done\n");
    return 0;
}

listener回調函數

static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
    struct event_base *base = (struct event_base *)user_data;
    struct bufferevent *bev;
    //生成一個bufferevent,用於讀或者寫
    bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    if (!bev) 
    {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return;
    }
    //設置了寫回調函數和事件的回調函數
    bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
    //bufferevent設置寫事件回調
    bufferevent_enable(bev, EV_WRITE);
    //bufferevent關閉讀事件回調
    bufferevent_disable(bev, EV_READ);
    //將MESSAGE字符串拷貝到outbuffer裏
    bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}

一些基本參數

static const char MESSAGE[] = "Hello, NewConnection!\n";

static const int PORT = 9995;

bufferevent的寫回調函數

static void conn_writecb(struct bufferevent *bev, void *user_data)
{
    //取出bufferevent 的output數據
    struct evbuffer *output = bufferevent_get_output(bev);
    //長度爲0,那麼寫完畢,釋放空間
    if (evbuffer_get_length(output) == 0) 
    {
        printf("flushed answer\n");
        bufferevent_free(bev);
    }
}

bufferevent的事件回調函數

//僅僅做爲事件回調函數,寫本身想要作的功能就行
//最後記得釋放buffevent空間
static void conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
    if (events & BEV_EVENT_EOF) 
    {
        printf("Connection closed.\n");
    } 
    else if (events & BEV_EVENT_ERROR) 
    {
        printf("Got an error on the connection: %s\n",
            strerror(errno));/*XXX win32*/
    }
    /* None of the other events can happen here, since we haven't enabled
     * timeouts */
    bufferevent_free(bev);
}

信號終止函數

//程序捕捉到信號後就讓baseloop終止
static void signal_cb(evutil_socket_t sig, short events, void *user_data)
{
    struct event_base *base = (struct event_base *)user_data;
    struct timeval delay = { 2, 0 };

    printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");

    event_base_loopexit(base, &delay);
}

整個demo完成了。

下面分析下libevent如何作的消息傳遞和回調註冊函數

 

從main函數中的evconnlistener_new_bind入手

struct evconnlistener *
evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
    void *ptr, unsigned flags, int backlog, const struct sockaddr *sa,
    int socklen)
{
    struct evconnlistener *listener;
    evutil_socket_t fd;
    int on = 1;
    int family = sa ? sa->sa_family : AF_UNSPEC;

    if (backlog == 0)
        return NULL;

    fd = socket(family, SOCK_STREAM, 0);
    if (fd == -1)
        return NULL;

    if (evutil_make_socket_nonblocking(fd) < 0) {
        evutil_closesocket(fd);
        return NULL;
    }

    if (flags & LEV_OPT_CLOSE_ON_EXEC) {
        if (evutil_make_socket_closeonexec(fd) < 0) {
            evutil_closesocket(fd);
            return NULL;
        }
    }

    if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on))<0) {
        evutil_closesocket(fd);
        return NULL;
    }
    if (flags & LEV_OPT_REUSEABLE) {
        if (evutil_make_listen_socket_reuseable(fd) < 0) {
            evutil_closesocket(fd);
            return NULL;
        }
    }

    if (sa) {
        if (bind(fd, sa, socklen)<0) {
            evutil_closesocket(fd);
            return NULL;
        }
    }
    //cb = listener_cb,  ptr = struct event_base *base;
    listener = evconnlistener_new(base, cb, ptr, flags, backlog, fd);
    if (!listener) {
        evutil_closesocket(fd);
        return NULL;
    }

    return listener;
}

 

 evconnlistener_new_bind 完成了socket生成和綁定,而且內部調用evconnlistener_new

生成了evconnlistener* listener,將listener和socket綁定在一塊兒。

 

struct evconnlistener *
evconnlistener_new(struct event_base *base,
    evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
    evutil_socket_t fd)
{
    struct evconnlistener_event *lev;
    if (backlog > 0) {
        if (listen(fd, backlog) < 0)
            return NULL;
    } else if (backlog < 0) {
        if (listen(fd, 128) < 0)
            return NULL;
    }
       //開闢evconnlistener_event大小區域
    lev = mm_calloc(1, sizeof(struct evconnlistener_event));
    if (!lev)
        return NULL;
    //lev -> base 表示  evconnlistener 
    //evconnlistener     evconnlistener_ops 基本回調參數和回調函數結構體賦值
    lev->base.ops = &evconnlistener_event_ops;
    //evconnlistener_cb 設置爲listener_cb
    lev->base.cb = cb;
    //ptr表示event_base 指針
    lev->base.user_data = ptr;
    lev->base.flags = flags;
    lev->base.refcnt = 1;

    if (flags & LEV_OPT_THREADSAFE) {
        EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    }

    //  lev   is evconnlistener_event       
    //lev->listener is event
    //爲lev->listener設置讀回調函數和讀關注事件,僅進行設置並沒加入event隊列
    event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
        listener_read_cb, lev);
    //實際調用了event_add將事件加入event隊列
    evconnlistener_enable(&lev->base);

    return &lev->base;
}

 lev = mm_calloc(1, sizeof(struct evconnlistener_event));開闢了一個

evconnlistener_event* 空間,evconnlistener_event類型以下

struct evconnlistener_event {
    struct evconnlistener base;
    struct event listener;
};

該結構包含一個evconnlistener和event事件結構體

evconnlistener結構以下

struct evconnlistener {
    //listener基本操做封裝成一個結構體
    //結構體包含操做的函數指針
    const struct evconnlistener_ops *ops;
    void *lock;
    //listener回調函數,有新的鏈接到來會觸發
    evconnlistener_cb cb;
    //listener有錯誤會觸發這個函數
    evconnlistener_errorcb errorcb;
    //存儲一些回調函數用到的參數
    void *user_data;
    unsigned flags;
    short refcnt;
    unsigned enabled : 1;
};

struct evconnlistener_ops {
    int (*enable)(struct evconnlistener *);
    int (*disable)(struct evconnlistener *);
    void (*destroy)(struct evconnlistener *);
    void (*shutdown)(struct evconnlistener *);
    evutil_socket_t (*getfd)(struct evconnlistener *);
    struct event_base *(*getbase)(struct evconnlistener *);
};

 

 

 lev->base.ops = &evconnlistener_event_ops;這句就是對這個結構體指針

賦值,evconnlistener_event_ops是一個實例化的結構體對象,

裏面包含定義好的操做函數

static const struct evconnlistener_ops evconnlistener_event_ops = {
    event_listener_enable,
    event_listener_disable,
    event_listener_destroy,
    NULL, /* shutdown */
    event_listener_getfd,
    event_listener_getbase
};

對lev->base其他參數的賦值就不一一解釋了。

 接下來看一下event_assign函數內部實現

{
    if (!base)
        base = current_base;

    _event_debug_assert_not_added(ev);
       //屬於哪一個event_base
    ev->ev_base = base;
       //事件回調函數
    ev->ev_callback = callback;
    //回調函數的參數
    ev->ev_arg = arg;
    //event關注哪一個fd
    ev->ev_fd = fd;
    //event事件類型
    ev->ev_events = events;
    ev->ev_res = 0;
    ev->ev_flags = EVLIST_INIT;
    //被調用過幾回
    ev->ev_ncalls = 0;
    ev->ev_pncalls = NULL;

    if (events & EV_SIGNAL) {
        if ((events & (EV_READ|EV_WRITE)) != 0) {
            event_warnx("%s: EV_SIGNAL is not compatible with "
                "EV_READ or EV_WRITE", __func__);
            return -1;
        }
        ev->ev_closure = EV_CLOSURE_SIGNAL;
    } else {
        if (events & EV_PERSIST) {
            evutil_timerclear(&ev->ev_io_timeout);
            ev->ev_closure = EV_CLOSURE_PERSIST;
        } else {
            ev->ev_closure = EV_CLOSURE_NONE;
        }
    }

    min_heap_elem_init(ev);

    if (base != NULL) {
        /* by default, we put new events into the middle priority */
        //優先級的設置
        ev->ev_pri = base->nactivequeues / 2;
    }

    _event_debug_note_setup(ev);

    return 0;
}

event_assign內部實現能夠看出該函數僅僅對event的屬性進行設置

event_assign主要對event設置了listener_read_cb回調函數,這是

很重要的一個細節,咱們看下listener_read_cb內部實現

static void
listener_read_cb(evutil_socket_t fd, short what, void *p)
{
    struct evconnlistener *lev = p;
    int err;
    evconnlistener_cb cb;
    evconnlistener_errorcb errorcb;
    void *user_data;
    LOCK(lev);
    while (1) {
        struct sockaddr_storage ss;
#ifdef WIN32
        int socklen = sizeof(ss);
#else
        socklen_t socklen = sizeof(ss);
#endif
        //調用accept生成新的fd
        evutil_socket_t new_fd = accept(fd, (struct sockaddr*)&ss, &socklen);
        if (new_fd < 0)
            break;
        if (socklen == 0) {
            /* This can happen with some older linux kernels in
             * response to nmap. */
            evutil_closesocket(new_fd);
            continue;
        }
        //設置非阻塞
        if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
            evutil_make_socket_nonblocking(new_fd);

        if (lev->cb == NULL) {
            evutil_closesocket(new_fd);
            UNLOCK(lev);
            return;
        }
        ++lev->refcnt;
        //cb 就 是  listener_cb
        cb = lev->cb;
        user_data = lev->user_data;
        UNLOCK(lev);
        //觸發了listener_cb
        
        //完成了eventbuffer註冊寫和事件函數  
        cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
            user_data);
        
        LOCK(lev);
        if (lev->refcnt == 1) {
            int freed = listener_decref_and_unlock(lev);
            EVUTIL_ASSERT(freed);
            return;
        }
        --lev->refcnt;
    }
   。。。。。。。。。
}

在evconnlistener_new中

//evconnlistener_cb 設置爲listener_cb
lev->base.cb = cb;

lev->base就是evconnlistener對象

listener_read_cb內部回調用綁定在evconnlistener的listener_cb。

記得以前我所說的綁定麼?evconnlistener_new這個函數裏生成的lev,

以後對lev,這裏的lev就是 evconnlistener_event對象,lev->listener是

event對象,經過調用event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
listener_read_cb, lev);

lev->listener綁定的就是listener_read_cb。也就是是說

listener_read_cb調用後,從而調用了綁定在evconnlistener的listener_cb。

那麼咱們只要知道lev->listener(event對象)的讀事件是如何派發的就能夠梳理此

流程了。

以前咱們梳理過event_dispatch裏進行的事件派發,調用不一樣模型的dispatch,

稍後再梳理一遍。由於調用event_assign僅僅對event設置了屬性,尚未加到

事件隊列裏。

在evconnlistener_new函數裏調用完event_assign,以後調用的是

evconnlistener_enable,evconnlistener_enable這個函數完成了事件

添加到事件隊列的功能。

int
evconnlistener_enable(struct evconnlistener *lev)
{
    int r;
    LOCK(lev);
    lev->enabled = 1;
    if (lev->cb)
        //調用evconnlistener 的ops的enable函數
        //lev->ops 此時指向evconnlistener_event_ops
        //enable函數爲  event_listener_enable
        r = lev->ops->enable(lev);
    else
        r = 0;
    UNLOCK(lev);
    return r;
}

上面有evconnlistener_event_ops結構體,那幾個函數也列出來了。

咱們看下event_listener_enable函數

 

static int
event_listener_enable(struct evconnlistener *lev)
{
    //經過evconnlistener* 找到evconnlistener_event *
    struct evconnlistener_event *lev_e =
        EVUTIL_UPCAST(lev, struct evconnlistener_event, base);
    //
    return event_add(&lev_e->listener, NULL);
}

裏面調用了event_add

//添加事件操做
int
event_add(struct event *ev, const struct timeval *tv)
{
    int res;

    if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
        event_warnx("%s: event has no event_base set.", __func__);
        return -1;
    }

    EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
    //添加事件核心函數
    res = event_add_internal(ev, tv, 0);

    EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

    return (res);
}

對於event_add內部判斷是否枷鎖,進行加鎖,而後調用

event_add_internal完成事件添加

static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
    int tv_is_absolute)
{
    struct event_base *base = ev->ev_base;
    int res = 0;
    int notify = 0;

   
    //根據不一樣的事件類型將事件放到evmap裏,調用不一樣模型的add函數
    //將事件按照EV_READ或者EV_WRITE或者EV_SIGNAL放入evmap事件隊列裏
    //將ev按照EVLIST_INSERTED放入用戶的事件隊列裏
    if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
        !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
        if (ev->ev_events & (EV_READ|EV_WRITE))
            //將事件按照讀寫  IO方式加入evmap裏,而且調用不一樣網絡模型的add完成事件添加
            res = evmap_io_add(base, ev->ev_fd, ev);
        else if (ev->ev_events & EV_SIGNAL)
            //將事件按照信號方式添加
            res = evmap_signal_add(base, (int)ev->ev_fd, ev);
        //將事件插入輪詢的事件隊列裏
        if (res != -1)
            event_queue_insert(base, ev, EVLIST_INSERTED);
        if (res == 1) {
            /* evmap says we need to notify the main thread. */
            notify = 1;
            res = 0;
        }
    }
}

 因爲lev->listener(event)類型的事件是I/O 讀事件,因此

會進入evmap_io_add完成讀事件的添加

int
evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
    const struct eventop *evsel = base->evsel;
    struct event_io_map *io = &base->io;
    struct evmap_io *ctx = NULL;
    int nread, nwrite, retval = 0;
    short res = 0, old = 0;
    struct event *old_ev;

    EVUTIL_ASSERT(fd == ev->ev_fd);

    if (fd < 0)
        return 0;
//windows狀況下use a hashtable instead of an array
#ifndef EVMAP_USE_HT
    if (fd >= io->nentries) {
        if (evmap_make_space(io, fd, sizeof(struct evmap_io *)) == -1)
            return (-1);
    }
#endif

    //從io中找到下標爲fd的結構體數據 evmap_io * 賦值給ctx
    //若是沒有找到就調用evmap_io_init初始化
    GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
                         evsel->fdinfo_len);

    nread = ctx->nread;
    nwrite = ctx->nwrite;

    if (nread)
        old |= EV_READ;
    if (nwrite)
        old |= EV_WRITE;

    if (ev->ev_events & EV_READ) {
        if (++nread == 1)
            res |= EV_READ;
    }
    if (ev->ev_events & EV_WRITE) {
        if (++nwrite == 1)
            res |= EV_WRITE;
    }
   ......if (res) {
        void *extra = ((char*)ctx) + sizeof(struct evmap_io);
        /* XXX(niels): we cannot mix edge-triggered and
         * level-triggered, we should probably assert on
         * this. */
         //這裏就是調用了不一樣模型的demultiplexer的添加操做
         //調用不一樣的網絡模型add接口
        if (evsel->add(base, ev->ev_fd,
            old, (ev->ev_events & EV_ET) | res, extra) == -1)
            return (-1);
        retval = 1;
    }

    ctx->nread = (ev_uint16_t) nread;
    ctx->nwrite = (ev_uint16_t) nwrite;
    //哈希表對應的event事件隊列加入ev
    TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next);

    return (retval);
}

該函數內部的一些函數就不展開,挺好理解的。

到此爲止咱們瞭解了listen描述符的回調函數和讀事件的綁定。

回到main函數,看下event_base_dispatch就知道綁定在

lev->listener(event)類型的讀事件listener_read_cb

是如何派發的,進而在讀事件裏完成了evconnlistener的listener_cb

的調用。

int
event_base_dispatch(struct event_base *event_base)
{
    return (event_base_loop(event_base, 0));
}

 

int
event_base_loop(struct event_base *base, int flags)這個函數內部

咱們只看關鍵代碼

int
event_base_loop(struct event_base *base, int flags)
{
  
  const struct eventop *evsel = base->evsel;

    //不一樣模型的派發函數
    //evsel就是指向ops數組的某一種模型


  res = evsel->dispatch(base, tv_p);

  

  if (N_ACTIVE_CALLBACKS(base)) {

    //處理激活隊列中的事件
    int n = event_process_active(base);
    if ((flags & EVLOOP_ONCE)
      && N_ACTIVE_CALLBACKS(base) == 0
        && n != 0)
      done = 1;
    } else if (flags & EVLOOP_NONBLOCK)
      done = 1;



}

 

 以前有說過evsel初始化和模型選擇的代碼,這裏從新梳理下

const struct eventop *evsel;

event_base_new或者event_init內部調用了

event_base_new_with_config,

event_base_new_with_config函數調用了

base->evsel = eventops[i];

eventops屬主封裝了幾種模型結構體的指針

static const struct eventop *eventops[] = {
#ifdef _EVENT_HAVE_EVENT_PORTS
    &evportops,
#endif
#ifdef _EVENT_HAVE_WORKING_KQUEUE
    &kqops,
#endif
#ifdef _EVENT_HAVE_EPOLL
    &epollops,
#endif
#ifdef _EVENT_HAVE_DEVPOLL
    &devpollops,
#endif
#ifdef _EVENT_HAVE_POLL
    &pollops,
#endif
#ifdef _EVENT_HAVE_SELECT
    &selectops,
#endif
#ifdef WIN32
    &win32ops,
#endif
    NULL
};

 

 舉個例子,看下epollops

const struct eventop epollops = {
    "epoll",
    epoll_init,
    epoll_nochangelist_add,
    epoll_nochangelist_del,
    epoll_dispatch,
    epoll_dealloc,
    1, /* need reinit */
    EV_FEATURE_ET|EV_FEATURE_O1,
    0
};
eventop 類型
struct eventop {
    /** The name of this backend. */
    const char *name;
   
    void *(*init)(struct event_base *);

    int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
    
    int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
   
    int (*dispatch)(struct event_base *, struct timeval *);
    /** Function to clean up and free our data from the event_base. */
    void (*dealloc)(struct event_base *);
    
    int need_reinit;
   
    enum event_method_feature features;
 
    size_t fdinfo_len;
};

epollops是eventop類型的變量,實現了增長刪除,初始化,銷燬,

派發等功能。

因此當模型選擇epoll時

res = evsel->dispatch(base, tv_p);實際調用的是

epoll的派發函數

static int epoll_dispatch(struct event_base *base, struct timeval *tv)
{

  struct epollop *epollop = base->evbase;

    struct epoll_event *events = epollop->events;

  res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

  for (i = 0; i < res; i++) {

      int what = events[i].events;
      short ev = 0;

      if (what & (EPOLLHUP|EPOLLERR)) {
        ev = EV_READ | EV_WRITE;
      } else {
          if (what & EPOLLIN)
            ev |= EV_READ;
          if (what & EPOLLOUT)
          ev |= EV_WRITE;
        }

        if (!ev)
          continue;

        //更新evmap,而且將事件放入active隊列
        evmap_io_active(base, events[i].data.fd, ev | EV_ET);
      }

}

 

evmap_io_active函數內部調用event_active_nolock,event_active_nolock中調用

event_queue_insert(base, ev, EVLIST_ACTIVE);負責將event放入激活隊列裏,

而且更新event在evmap中的 標記狀態

到目前爲止咱們瞭解了事件派發的流程,event_base_loop循環執行網絡模型的dispatch,

內核返回就緒事件,dispatch內部調用evmap_io_active將就緒事件放入激活隊列裏。

在event_base_loop中調用event_process_active處理就緒隊列中的event。

好比內核返回listen描述符讀就緒事件,那麼就會將listen的event放入就緒隊列中,

在event_process_active處理event的讀事件,調用了以前綁定的listener_read_cb

回調函數。


下面看下

 

static int
event_process_active(struct event_base *base)
{
    /* Caller must hold th_base_lock */
    struct event_list *activeq = NULL;
    int i, c = 0;
    //循環處理就緒隊列中的每個就緒事件
    for (i = 0; i < base->nactivequeues; ++i) {
        if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
            base->event_running_priority = i;
            activeq = &base->activequeues[i];
            c = event_process_active_single_queue(base, activeq);
            if (c < 0) {
                base->event_running_priority = -1;
                return -1;
            } else if (c > 0)
                break; 
        }
    }
    //調用延時回調函數
    event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
    base->event_running_priority = -1;
    return c;
}

 

循環調用了

 

event_process_active_single_queue
switch (ev->ev_closure) {
        case EV_CLOSURE_SIGNAL:
            event_signal_closure(base, ev);
            break;
        case EV_CLOSURE_PERSIST:
            event_persist_closure(base, ev);
            break;
        default:
        case EV_CLOSURE_NONE:
            EVBASE_RELEASE_LOCK(base, th_base_lock);
            (*ev->ev_callback)(
               ev->ev_fd, ev->ev_res, ev->ev_arg); break;
        }

(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);

就是調用綁定在event上的回調函數。好比綁定在

lev->listener(event)類型的讀事件listener_read_cb;

從而調用了綁定在evconnlistener的listener_cb。

這樣整個流程就跑通了。

最上面有listener_cb函數的實現,整個消息傳遞流程不跟蹤了,

讀者能夠模仿上面的方式去跟蹤消息。

這裏簡單表述下

在bufferevent建立的時候調用了這個函數

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

...

    //設置bufferevent中 ev_read(event類型)回調函數
    event_assign(&bufev->ev_read, bufev->ev_base, fd,
    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    //設置bufferevent中 ev_write(event類型)回調函數
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

 
 

    //爲bufev->output(evbuffer類型)設置回調函數,回調函數內部將ev_write事件加入事件隊列
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

   ...

    return bufev;
}

函數內部爲ev_read和ev_write 設置默認的回調函數bufferevent_readcb和

bufferevent_writecb。

接着爲輸出的 evbuffer綁定bufferevent_socket_outbuf_cb函數。

bufferevent_socket_outbuf_cb函數若是發現咱們有可寫的東西,而且

沒開始寫,那麼 將ev_write事件加入event隊列,

跟上面的輪詢同樣,有可寫就緒事件就會觸發綁定在ev_write上的

bufferevent_writecb函數。若是沒有添加寫的數據,就跳出函數。

以後調用。因爲這時處於bufferevent剛建立狀態,那麼說明沒有數據

寫入bufferevent,因此這時是不會將ev_write加入event隊列的。

回到listener_cb函數。

接着調用bufferevent_setcb 函數設置bufferevent的讀,寫,事件,

回調函數。

調用bufferevent_enable使寫事件生效,

內部調用

bufev->be_ops->enable(bufev, impl_events);

bufferevent註冊好的回調函數以下

const struct bufferevent_ops bufferevent_ops_socket = {
    "socket",
    evutil_offsetof(struct bufferevent_private, bev),
    be_socket_enable,
    be_socket_disable,
    be_socket_destruct,
    be_socket_adj_timeouts,
    be_socket_flush,
    be_socket_ctrl,
};

 

static int
be_socket_enable(struct bufferevent *bufev, short event)
{
    if (event & EV_READ) {
        if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
            return -1;
    }
    if (event & EV_WRITE) {
        if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
            return -1;
    }
    return 0;
}

eventbuffer讀寫事件加入到event隊列裏,此處爲添加ev_write寫事件,當寫事件就緒,

輪詢能夠出發綁定的bufferevent_writecb回調函數。

 

當調用bufferevent_writecb這個函數時,咱們把內部代碼簡化分析

 
 

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)

{

//計算bufferevent能寫的最大數量
    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        //bufferevent調用寫操做,將outbuffer中的內容發送出去
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn't indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;
        //bufferevent減小發送的大小,留下未發送的,下次再發送
        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    //計算是否將outbuf中的內容發送完,發完了就刪除寫事件
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
     //將buffer中的內容發完,或者低於low 水位,那麼調用用戶註冊的寫回調函數
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

}

_bufferevent_run_writecb內部調用了

bufev->writecb(bufev, bufev->cbarg);

也就是說咱們本身實現的

static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
    struct evbuffer *output = bufferevent_get_output(bev);
    if (evbuffer_get_length(output) == 0) {
        printf("flushed answer\n");
        bufferevent_free(bev);
    }
}

到此爲止整個bufferevent消息走向梳理出來。

最後有一點須要陳述,在listener_cb中最後調用

bufferevent_write(bev, MESSAGE, strlen(MESSAGE));

起內部調用evbuffer_add,該函數內部

out:
evbuffer_invoke_callbacks(buf);會調用

bufferevent_socket_outbuf_cb,進而調用

bufferevent_write。

因此我認爲是調用evbuffer_add向outbuf中添加數據後,

調用了evbuffer_invoke_callbacks,觸發bufferevent_write,

或者ev_write先檢測到寫就緒事件,而後調用buffervent_write.

這二者前後並不清楚。

 

整個流程就是這樣,須要繼續研究而後梳理。

個人公衆號:

相關文章
相關標籤/搜索