一看名字就知道是圍繞eventloop轉的。
那首先確定是eventloop是個什麼?通常都是IO事件,timer事件的管理器。
那首先看如何new出來一個eventloop:
一、由於libevent是跨平臺的,在不一樣平臺上會有不一樣的配置,首先讀配置:
struct event_config {
TAILQ_HEAD(event_configq, event_config_entry) entries;
int n_cpus_hint;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
有一個event_config 的結構,目的就是爲了描述將要建立的event_base的配置。
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; \
struct type **tqh_last; \
}
TAILQ_HEAD是一個結構體的宏。
好吧,爲了方便閱讀我把宏給幹掉了:
struct event_config_entry {
struct {
event_config_entry* tqe_next;
event_config_entry** tqe_prev;
} next;
const char *avoid_method;
};
struct event_config {
struct event_configq{
struct event_config_entry *tqh_first;
struct event_config_entry **tqh_last;
} entries;
int n_cpus_hint;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
如今清楚了event_config_new就是分配了這個結構體(鏈表)的內存,並對鏈表作了初始化。
下一步要開始初始化配置了:
event_base_new_with_config在作這個工做:
哇塞,這個函數裏就開始new eventBase了:
並且在對base作初始化
1,、初始化base的時間:
gettime(base, &base->event_tv);
看下實現就清楚了:
static int gettime(struct event_base *base, struct timeval *tp)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (base->tv_cache.tv_sec) {
*tp = base->tv_cache;
return (0);
}
return (evutil_gettimeofday(tp, NULL));
}
若是base的tv_cache保存的有時間就直接用這個時間,以免調用函數獲取時間太頻繁;
不然調用evutil_gettimeofday 是會調用各個系統獲取當前時間的函數。
以後設置這個幾個參數:
base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1;
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
sig是與信號相關的,linux下的,windows下的貌似沒什麼用;重點看th_notify_fd的做用:
evutil_socket_t th_notify_fd[2];
意思是用來由一些線程通知函數喚醒主線程的。
event_deferred_cb_queue_init(&base->defer_queue);
初始化defer_queue隊列,這個隊列的做用是:
這個隊列內部結構以下:
struct deferred_cb_queue {
void *lock;
int active_count;
void (*notify_fn)(struct deferred_cb_queue *, void *);
void *notify_arg;
TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
};
裏面還有個鏈表:
struct deferred_cb {
TAILQ_ENTRY (deferred_cb) cb_next;
unsigned queued : 1;
deferred_cb_fn cb;
void *arg;
};
以後設置根據配置backend;
在windows下能夠設置爲select或者iocp;若是是select代碼會在win32select.c裏:
struct eventop win32ops = {
"win32",
win32_init,
win32_add,
win32_del,
win32_dispatch,
win32_dealloc,
0,
0,
sizeof(struct idx_info),
}
初始化時:
void *
win32_init(struct event_base *_base)
{
struct win32op *winop;
size_t size;
if (!(winop = mm_calloc(1, sizeof(struct win32op))))
return NULL;
winop->num_fds_in_fd_sets = NEVENT;
size = FD_SET_ALLOC_SIZE(NEVENT);
if (!(winop->readset_in = mm_malloc(size)))
goto err;
if (!(winop->writeset_in = mm_malloc(size)))
goto err;
if (!(winop->readset_out = mm_malloc(size)))
goto err;
if (!(winop->writeset_out = mm_malloc(size)))
goto err;
if (!(winop->exset_out = mm_malloc(size)))
goto err;
winop->readset_in->fd_count = winop->writeset_in->fd_count = 0;
winop->readset_out->fd_count = winop->writeset_out->fd_count
= winop->exset_out->fd_count = 0;
if (evsig_init(_base) < 0)
winop->signals_are_broken = 1;
return (winop);
err:
XFREE(winop->readset_in);
XFREE(winop->writeset_in);
XFREE(winop->readset_out);
XFREE(winop->writeset_out);
XFREE(winop->exset_out);
XFREE(winop);
return (NULL);
}
裏面分配了32個fd;
好吧Eventbase的事情終於搞完了。好辛苦!!!!
關鍵是看event_base_dispatch如何進行mainloop的:
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base);
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done = 0;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
timeout_correct(base, &tv);
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
evutil_timerclear(&tv);
}
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
gettime(base, &base->event_tv);
clear_time_cache(base);
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
跟以前看過的libev是一個模子出來的,都是先檢測有木有可讀、可寫的立刻去處理,而後調用backend->dispatch檢測事件,處理timer事件。一直轉,除非外部要中止或者木有fd了。
下面看下select事件的檢測吧:
int
win32_dispatch(struct event_base *base, struct timeval *tv)
{
struct win32op *win32op = base->evbase;
int res = 0;
unsigned j, i;
int fd_count;
SOCKET s;
if (win32op->resize_out_sets) {
size_t size = FD_SET_ALLOC_SIZE(win32op->num_fds_in_fd_sets);
if (!(win32op->readset_out = mm_realloc(win32op->readset_out, size)))
return (-1);
if (!(win32op->exset_out = mm_realloc(win32op->exset_out, size)))
return (-1);
if (!(win32op->writeset_out = mm_realloc(win32op->writeset_out, size)))
return (-1);
win32op->resize_out_sets = 0;
}
fd_set_copy(win32op->readset_out, win32op->readset_in);
fd_set_copy(win32op->exset_out, win32op->writeset_in);
fd_set_copy(win32op->writeset_out, win32op->writeset_in);
fd_count =
(win32op->readset_out->fd_count > win32op->writeset_out->fd_count) ?
win32op->readset_out->fd_count : win32op->writeset_out->fd_count;
if (!fd_count) {
long msec = tv ? evutil_tv_to_msec(tv) : LONG_MAX;
if (msec < 0)
msec = LONG_MAX;
Sleep(msec);
return (0);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = select(fd_count,
(struct fd_set*)win32op->readset_out,
(struct fd_set*)win32op->writeset_out,
(struct fd_set*)win32op->exset_out, tv);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
event_debug(("%s: select returned %d", __func__, res));
if (res <= 0) {
return res;
}
if (win32op->readset_out->fd_count) {
i = rand() % win32op->readset_out->fd_count;
for (j=0; jreadset_out->fd_count; ++j) {
if (++i >= win32op->readset_out->fd_count)
i = 0;
s = win32op->readset_out->fd_array[i];
evmap_io_active(base, s, EV_READ);
}
}
if (win32op->exset_out->fd_count) {
i = rand() % win32op->exset_out->fd_count;
for (j=0; jexset_out->fd_count; ++j) {
if (++i >= win32op->exset_out->fd_count)
i = 0;
s = win32op->exset_out->fd_array[i];
evmap_io_active(base, s, EV_WRITE);
}
}
if (win32op->writeset_out->fd_count) {
SOCKET s;
i = rand() % win32op->writeset_out->fd_count;
for (j=0; jwriteset_out->fd_count; ++j) {
if (++i >= win32op->writeset_out->fd_count)
i = 0;
s = win32op->writeset_out->fd_array[i];
evmap_io_active(base, s, EV_WRITE);
}
}
return (0);
}
考慮的很周到,select以前會釋放lock,由於select這裏是阻塞的,但不但願阻塞主線程,select會繼續加鎖。把可讀,可寫fd進行copy,裏面還用了一個簡單的隨機 保證公平。
以後就更新timecache爲當前時間,而後檢測timer時間,到點就執行timer事件:
static void
timeout_process(struct event_base *base)
{
struct timeval now;
struct event *ev;
if (min_heap_empty(&base->timeheap)) {
return;
}
gettime(base, &now);
while ((ev = min_heap_top(&base->timeheap))) {
if (evutil_timercmp(&ev->ev_timeout, &now, >))
break;
event_del_internal(ev);
event_debug(("timeout_process: call %p",
ev->ev_callback));
event_active_nolock(ev, EV_TIMEOUT, 1);
}
}
一看代碼就明白了使用最小堆管理時間的,固然時間間隔最小的要先執行,處理流程是這樣的:
檢測時間堆裏的事件,若是時間已經超過當前時間,那麼先把這個事件從堆裏刪除,而後再active這個event,就是調用註冊進來的timer函數。
好了,暫時寫到這裏吧。 linux