libevent(九)bufferevent

bufferevent,帶buffer的eventgit

struct bufferevent {
    struct event_base *ev_base;
    const struct bufferevent_ops *be_ops;   // backend
    
    struct event ev_read;               // 讀事件
    struct event ev_write;              // 寫事件
    
    struct evbuffer *input;             // 讀緩衝
    struct evbuffer *output;            // 寫緩衝
    
    bufferevent_data_cb readcb;         // 讀事件回調
    bufferevent_data_cb writecb;        // 寫事件回調
    bufferevent_event_cb errorcb;       // error回調
    void *cbarg;                        // 回調函數參數
    
    struct event_watermark wm_read;
    struct event_watermark wm_write;
    
    struct timeval timeout_read;
    struct timeval timeout_write;
    
    short enabled;
};

下面簡單分析bufferevent相關函數(示例DEMO)github

bufferevent_socket_new

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

#ifdef WIN32
    if (base && event_base_get_iocp(base))
        return bufferevent_async_new(base, fd, options);
#endif

    if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
        return NULL;

    if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
                    options) < 0) {
        mm_free(bufev_p);
        return NULL;
    }
    bufev = &bufev_p->bev;
    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

    evbuffer_freeze(bufev->input, 0);
    evbuffer_freeze(bufev->output, 1);

    return bufev;
}

函數作了4件事:socket

  1. 設置backend: bufferevent_ops_socket
  2. 設置fd的讀事件回調: bufferevent_readcb
  3. 設置fd的寫事件回調: bufferevent_writecb
  4. 設置輸出緩衝區的回調: bufferevent_socket_outbuf_cb

backend結構以下:async

const struct bufferevent_ops bufferevent_ops_socket = {
    "socket",
    evutil_offsetof(struct bufferevent_private, bev),
    be_socket_enable,
    be_socket_disable,
    be_socket_destruct,
    be_socket_adj_timeouts,
    be_socket_flush,
    be_socket_ctrl,
};

bufferevent_setcb

void
bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
    BEV_LOCK(bufev);

    bufev->readcb = readcb;
    bufev->writecb = writecb;
    bufev->errorcb = eventcb;

    bufev->cbarg = cbarg;
    BEV_UNLOCK(bufev);
}

該函數主要設置用戶回調函數。tcp

bufferevent_enable

int
bufferevent_enable(struct bufferevent *bufev, short event)
{
    struct bufferevent_private *bufev_private =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    short impl_events = event;
    int r = 0;

    _bufferevent_incref_and_lock(bufev);
    if (bufev_private->read_suspended)
        impl_events &= ~EV_READ;
    if (bufev_private->write_suspended)
        impl_events &= ~EV_WRITE;

    bufev->enabled |= event;

    if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
        r = -1;

    _bufferevent_decref_and_unlock(bufev);
    return r;
}

static int
be_socket_enable(struct bufferevent *bufev, short event)
{
    if (event & EV_READ) {
        if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
            return -1;
    }
    if (event & EV_WRITE) {
        if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
            return -1;
    }
    return 0;
}

#define be_socket_add(ev, t)            \
    _bufferevent_add_event((ev), (t))

int
_bufferevent_add_event(struct event *ev, const struct timeval *tv)
{
    if (tv->tv_sec == 0 && tv->tv_usec == 0)
        return event_add(ev, NULL);
    else
        return event_add(ev, tv);
}

該函數將fd加入到epoll中。函數

事件流程

當fd上有讀事件發生時,首先調用bufferevent_readcb。源碼分析

static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    struct evbuffer *input;
    int res = 0;
    short what = BEV_EVENT_READING;
    ev_ssize_t howmuch = -1, readmax=-1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_READ, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }

    input = bufev->input;

    /*
     * If we have a high watermark configured then we don't want to
     * read more data than would make us reach the watermark.
     */
    if (bufev->wm_read.high != 0) {
        howmuch = bufev->wm_read.high - evbuffer_get_length(input);
        /* we somehow lowered the watermark, stop reading */
        if (howmuch <= 0) {
            bufferevent_wm_suspend_read(bufev);
            goto done;
        }
    }
    readmax = _bufferevent_get_read_max(bufev_p);
    if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
                           * uglifies this code. XXXX */
        howmuch = readmax;
    if (bufev_p->read_suspended)
        goto done;

    evbuffer_unfreeze(input, 0);
    res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
    evbuffer_freeze(input, 0);

    if (res == -1) {
        int err = evutil_socket_geterror(fd);
        if (EVUTIL_ERR_RW_RETRIABLE(err))
            goto reschedule;
        /* error case */
        what |= BEV_EVENT_ERROR;
    } else if (res == 0) {
        /* eof case */
        what |= BEV_EVENT_EOF;
    }

    if (res <= 0)
        goto error;

    _bufferevent_decrement_read_buckets(bufev_p, res);

    /* Invoke the user callback - must always be called last */
    if (evbuffer_get_length(input) >= bufev->wm_read.low)
        _bufferevent_run_readcb(bufev);

    goto done;

 reschedule:
    goto done;

 error:
    bufferevent_disable(bufev, EV_READ);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

經過evbuffer_read(內部調用recv),將fd上的數據讀入輸入緩衝input中。ui

經過_bufferevent_run_readcb調用用戶回調函數this

(若是出錯,經過_bufferevent_run_eventcb調用errorcb)spa

void
_bufferevent_run_readcb(struct bufferevent *bufev)
{
    /* Requires that we hold the lock and a reference */
    struct bufferevent_private *p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    if (bufev->readcb == NULL)
        return;
    if (p->options & BEV_OPT_DEFER_CALLBACKS) {
        p->readcb_pending = 1;
        if (!p->deferred.queued)
            SCHEDULE_DEFERRED(p);
    } else {
        bufev->readcb(bufev, bufev->cbarg);
    }
}

 

在用戶回調函數中,能夠經過 bufferevent_read 取出輸入緩衝input中的數據。

size_t
bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
{
    return (evbuffer_remove(bufev->input, data, size));
}

/* Reads data from an event buffer and drains the bytes read */
int
evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
{
    ev_ssize_t n;
    EVBUFFER_LOCK(buf);
    n = evbuffer_copyout(buf, data_out, datlen);
    if (n > 0) {
        if (evbuffer_drain(buf, n)<0)
            n = -1;
    }
    EVBUFFER_UNLOCK(buf);
    return (int)n;
}

 

參考資料:

Libevent源碼分析-----evbuffer結構與基本操做

Libevent源碼分析-----更多evbuffer操做函數

相關文章
相關標籤/搜索