前段時間使用libevent網絡庫實現了一個遊戲服務器引擎,在此記錄下其中遇到的一個問題。windows
我在設計服務器上選擇把邏輯和網絡分線程,線程之間通訊使用隊列。可是這樣作會有個問題:api
當邏輯線程想要主動的發一個數據包的時候,網絡線程此時可能還阻塞在等待網絡IO的系統調用上(好比說epoll)。若是不作特殊處理的話,此時消息包就會一直積壓在緩衝區中,直到下一次網絡線程從掛起的系統調用返回(好比來了一個消息包)。所以,當邏輯線程發送消息包的時候(bufferevent_write)須要一種喚醒機制,讓網絡線程從epoll等系統調用中返回並處理髮送消息包邏輯。服務器
因爲對libevent的api不熟悉,起初我是本身實現這個功能的。實現確實是不復雜,可是缺違背了個人初心:只寫簡單必要的代碼,保證儘量少的bug。直到後來和同事探討(爭論:P)了一番,才發現原來libevent是有對此作支持的,可是具體怎麼作,文檔裏面沒有詳細的說,所以同事也說不出個因此然。鑑於此狀況,我決定,把libevent中與此相關的源碼粗略的過一遍,以求能弄明白如下兩件事:網絡
(1)與跨線程喚醒事件等待相關的api有哪些,以及如何使用?socket
(2)這些api背後到底作了哪些工做?函數
相關API,及用法oop
和我起初想得不同,libevent相關的api很簡單而且只有一個:this
/*call event_use_pthreads() if use posix threads.*/ evthread_use_windows_threads(); struct event_base* ev_base = event_base_new();
須要注意的是函數evthread_use_windows_threads的調用必須在初始化event_base以前。在此以後,無需再作別的事情,邏輯線程在執行bufferevent_write的時候,libevent就會自動喚醒網絡線程的事件循環,並執行發送數據。線程
隱藏在API背後的邏輯設計
先看看evthread_use_windows_threads函數作了什麼?
int evthread_use_windows_threads(void) { ... evthread_set_lock_callbacks(&cbs); evthread_set_id_callback(evthread_win32_get_id); evthread_set_condition_callbacks(&cond_cbs); return 0; }
經過調用evthread_use_windows_threads,咱們設置了一些回調函數,包括設置了libevent獲取線程id的回調函數evthread_id_fn_。
看看初始化事件循環的函數event_base_new作了什麼:
// event.c struct event_base * event_base_new(void) { ... base = event_base_new_with_config(cfg); } struct event_base * event_base_new_with_config(const struct event_config *cfg) { ... #ifndef EVENT__DISABLE_THREAD_SUPPORT if (EVTHREAD_LOCKING_ENABLED() && (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { int r; EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0); EVTHREAD_ALLOC_COND(base->current_event_cond); r = evthread_make_base_notifiable(base); if (r<0) { event_warnx("%s: Unable to make base notifiable.", __func__); event_base_free(base); return NULL; } } #endif ... } int evthread_make_base_notifiable(struct event_base *base) { int r; if (!base) return -1; EVBASE_ACQUIRE_LOCK(base, th_base_lock); r = evthread_make_base_notifiable_nolock_(base); EVBASE_RELEASE_LOCK(base, th_base_lock); return r; } static int evthread_make_base_notifiable_nolock_(struct event_base *base) { ... if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) { notify = evthread_notify_base_default; cb = evthread_notify_drain_default; } else { return -1; } base->th_notify_fn = notify; }
它經過以下調用:
event_base_new-->event_base_new_with_config-->evthread_make_base_notifiable-->evthread_make_base_notifiable_nolock_
最後經過evutil_make_internal_pipe_函數建立了兩個互相鏈接的socket(windows環境下,用此來模擬pipe):
/* Internal function: Set fd[0] and fd[1] to a pair of fds such that writes on * fd[1] get read from fd[0]. Make both fds nonblocking and close-on-exec. * Return 0 on success, -1 on failure. */ int evutil_make_internal_pipe_(evutil_socket_t fd[2]) { /* Making the second socket nonblocking is a bit subtle, given that we ignore any EAGAIN returns when writing to it, and you don't usally do that for a nonblocking socket. But if the kernel gives us EAGAIN, then there's no need to add any more data to the buffer, since the main thread is already either about to wake up and drain it, or woken up and in the process of draining it. */ #if defined(EVENT__HAVE_PIPE2) if (pipe2(fd, O_NONBLOCK|O_CLOEXEC) == 0) return 0; #endif #if defined(EVENT__HAVE_PIPE) if (pipe(fd) == 0) { if (evutil_fast_socket_nonblocking(fd[0]) < 0 || evutil_fast_socket_nonblocking(fd[1]) < 0 || evutil_fast_socket_closeonexec(fd[0]) < 0 || evutil_fast_socket_closeonexec(fd[1]) < 0) { close(fd[0]); close(fd[1]); fd[0] = fd[1] = -1; return -1; } return 0; } else { event_warn("%s: pipe", __func__); } #endif #ifdef _WIN32 #define LOCAL_SOCKETPAIR_AF AF_INET #else #define LOCAL_SOCKETPAIR_AF AF_UNIX #endif if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, fd) == 0) { if (evutil_fast_socket_nonblocking(fd[0]) < 0 || evutil_fast_socket_nonblocking(fd[1]) < 0 || evutil_fast_socket_closeonexec(fd[0]) < 0 || evutil_fast_socket_closeonexec(fd[1]) < 0) { evutil_closesocket(fd[0]); evutil_closesocket(fd[1]); fd[0] = fd[1] = -1; return -1; } return 0; } fd[0] = fd[1] = -1; return -1; }
以後,再設置用於喚醒操做的notify函數(evthread_notify_base_default):
static int evthread_notify_base_default(struct event_base *base) { char buf[1]; int r; buf[0] = (char) 0; #ifdef _WIN32 r = send(base->th_notify_fd[1], buf, 1, 0); #else r = write(base->th_notify_fd[1], buf, 1); #endif return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0; }
能夠看出,在windows下libevent的喚醒機制實際也是self pipe trick,只不過它經過構造一對socket來模擬pipe,當須要喚醒的時候,它就往其中一個socket寫入1個字節。
再去看看bufferevent_write:
// bufferevent.c int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { if (evbuffer_add(bufev->output, data, size) == -1) return (-1); return 0; } // buffer.c int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) { ... evbuffer_invoke_callbacks_(buf); }
它會觸發一系列列回調函數,而這些回調函數在建立bufferevent的時候被指定:
//bufferevent_sock.c struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) { ... evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); } static void bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) { ... if (cbinfo->n_added && (bufev->enabled & EV_WRITE) && !event_pending(&bufev->ev_write, EV_WRITE, NULL) && !bufev_p->write_suspended) { /* Somebody added data to the buffer, and we would like to * write, and we were not writing. So, start writing. */ if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) { /* Should we log this? */ } } } //bufferevent.c int bufferevent_add_event_(struct event *ev, const struct timeval *tv) { if (!evutil_timerisset(tv)) return event_add(ev, NULL); else return event_add(ev, tv); } //event.c int event_add(struct event *ev, const struct timeval *tv) { EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); res = event_add_nolock_(ev, tv, 0); EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); } int event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) { ... /* if we are not in the right thread, we need to wake up the loop */ //若是在構造event_base以前調用了evthread_use_windows_threads,EVBASE_NEED_NOTIFY此時會返回true,不然爲false。 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); }
由代碼可知,在往bufferevent寫數據後執行的回調函數中,就有喚醒網絡線程邏輯(evthread_notify_base)。那爲何還須要手動調用evthread_use_windows_threads函數呢?
這裏再說一下:
#define EVBASE_NEED_NOTIFY(base) \ ((base)->running_loop && \ ((base)->th_owner_id != evthreadimpl_get_id_())) unsigned long evthreadimpl_get_id_() { return evthread_id_fn_ ? evthread_id_fn_() : 1; }
以前說過,當調用evthread_use_windows_threads,設置了libevent獲取線程id的回調函數evthread_id_fn_。也正由於此,纔會去跑下去執行evthread_notify_base函數:
static int evthread_notify_base(struct event_base *base) { EVENT_BASE_ASSERT_LOCKED(base); if (!base->th_notify_fn) return -1; if (base->is_notify_pending) return 0; base->is_notify_pending = 1; return base->th_notify_fn(base); }
因此,當咱們在邏輯線程調用bufferevent_write嘗試發送一段數據的時候,它會依據如下的調用,通知網絡線程:
bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base
以上即是libevent跨線程喚醒的邏輯。