libevent(十)bufferevent 2 libevent(九)bufferevent

接上文libevent(九)buffereventhtml

上文主要講了bufferevent如何監聽讀事件,那麼bufferevent如何監聽寫事件呢?socket

於一個fd,只要它的寫緩衝區沒有滿,就會觸發寫事件。async

通常狀況下,若是不向這個fd發送大量的數據,它的寫緩衝區是不會滿的。函數

因此,若是一開始就監聽寫事件,寫事件會一直被觸發。源碼分析

libevent的作法是:post

    當咱們確實要向fd寫入數據時,才監聽該fd的寫事件。ui

 

 

 

 

 

 

監聽寫事件

在用戶回調函數中,能夠經過 bufferevent_write 向輸出緩衝output中寫數據。this

int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
    if (evbuffer_add(bufev->output, data, size) == -1)
        return (-1);

    return 0;
}
/* Adds data to an event buffer */

int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
    struct evbuffer_chain *chain, *tmp;
    const unsigned char *data = data_in;
    size_t remain, to_alloc;
    int result = -1;

    EVBUFFER_LOCK(buf);

    if (buf->freeze_end) {
        goto done;
    }
    /* Prevent buf->total_len overflow */
    if (datlen > EV_SIZE_MAX - buf->total_len) {
        goto done;
    }

    chain = buf->last;

    /* If there are no chains allocated for this buffer, allocate one
     * big enough to hold all the data. */
    if (chain == NULL) {
        chain = evbuffer_chain_new(datlen);
        if (!chain)
            goto done;
        evbuffer_chain_insert(buf, chain);
    }

    if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
        /* Always true for mutable buffers */
        EVUTIL_ASSERT(chain->misalign >= 0 &&
            (ev_uint64_t)chain->misalign <= EVBUFFER_CHAIN_MAX);
        remain = chain->buffer_len - (size_t)chain->misalign - chain->off;
        if (remain >= datlen) {
            /* there's enough space to hold all the data in the
             * current last chain */
            memcpy(chain->buffer + chain->misalign + chain->off,
                data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        } else if (!CHAIN_PINNED(chain) &&
            evbuffer_chain_should_realign(chain, datlen)) {
            /* we can fit the data into the misalignment */
            evbuffer_chain_align(chain);

            memcpy(chain->buffer + chain->off, data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        }
    } else {
        /* we cannot write any data to the last chain */
        remain = 0;
    }

    /* we need to add another chain */
    to_alloc = chain->buffer_len;
    if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)
        to_alloc <<= 1;
    if (datlen > to_alloc)
        to_alloc = datlen;
    tmp = evbuffer_chain_new(to_alloc);
    if (tmp == NULL)
        goto done;

    if (remain) {
        memcpy(chain->buffer + chain->misalign + chain->off,
            data, remain);
        chain->off += remain;
        buf->total_len += remain;
        buf->n_add_for_cb += remain;
    }

    data += remain;
    datlen -= remain;

    memcpy(tmp->buffer, data, datlen);
    tmp->off = datlen;
    evbuffer_chain_insert(buf, tmp);
    buf->n_add_for_cb += datlen;

out:
    evbuffer_invoke_callbacks(buf);
    result = 0;
done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

如今回顧一下bufferevent_socket_new,咱們在這個函數中,設置了輸出緩衝區的回調函數url

evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

能夠看出,咱們在輸出緩衝區的回調函數中,將該fd的寫事件添加到了epoll中。spa

事件流程

上面咱們監聽了fd的寫事件,而此時該fd的寫緩衝區沒有滿,因此寫事件被觸發,繼而調用咱們在上文設置的寫事件回調函數 bufferevent_writecb。

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }

    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn't indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;

        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

主要作了三件事:

  1. 經過evbuffer_write_atmost將輸出緩衝output中的數據寫入fd中。
  2. 若是output中的數據所有處理完畢,刪除寫事件
  3. 調用用戶定義的寫事件回調函數

 

參考資料:

Libevent源碼分析-----bufferevent工做流程探究

相關文章
相關標籤/搜索