一、數據結構linux
#define ev_at(w) ((WT)(w))->at
#define ev_active(w) ((W)(w))->active數組
typedef ev_watcher_time *WT;緩存
struct ev_loop
{
ev_tstamp mn_now
ANHE * timers
int timermax
int timercnt
ev_watcher * rfeeds
}數據結構
/* Heap Entry */ //是否緩存時間監控器中的at字段。
#if EV_HEAP_CACHE_AT
/* a heap element */
typedef struct {
ev_tstamp at;
WT w;
} ANHE;函數
#define ANHE_w(he) (he).w /* access watcher, read-write */
#define ANHE_at(he) (he).at /* access cached at, read-only */
#define ANHE_at_cache(he) (he).at = (he).w->at /* update at from watcher */
#else
/* a heap element */
typedef WT ANHE;oop
#define ANHE_w(he) (he)
#define ANHE_at(he) (he)->at
#define ANHE_at_cache(he)
#endifspa
二、ev_timer_start .net
void noinline
ev_timer_start (EV_P_ ev_timer *w) EV_THROW
{
if (expect_false (ev_is_active (w)))
return;blog
ev_at (w) += mn_now;隊列
++timercnt;
ev_start (EV_A_ (W)w, timercnt + HEAP0 - 1);//w->active = timercnt + HEAP0 - 1;
array_needsize (ANHE, timers, timermax, ev_active (w) + 1, EMPTY2);
ANHE_w (timers [ev_active (w)]) = (WT)w;
ANHE_at_cache (timers [ev_active (w)]);
upheap (timers, ev_active (w));
EV_FREQUENT_CHECK;
/*assert (("libev: internal timer heap corruption", timers [ev_active (w)] == (WT)w));*/
}
inline_speed void
upheap (ANHE *heap, int k)
{
ANHE he = heap [k];
for (;;)
{
int p = HPARENT (k);
if (UPHEAP_DONE (p, k) || ANHE_at (heap [p]) <= ANHE_at (he))
break;
heap [k] = heap [p];
ev_active (ANHE_w (heap [k])) = k;
k = p;
}
heap [k] = he;
ev_active (ANHE_w (he)) = k;
}
三、timers_reify
inline_size void
timers_reify (EV_P)
{
EV_FREQUENT_CHECK;
if (timercnt && ANHE_at (timers [HEAP0]) < mn_now)
{
do
{
ev_timer *w = (ev_timer *)ANHE_w (timers [HEAP0]);
/*assert (("libev: inactive timer on timer heap detected", ev_is_active (w)));*/
/* first reschedule or stop timer */
if (w->repeat)
{
ev_at (w) += w->repeat;
if (ev_at (w) < mn_now)
ev_at (w) = mn_now;
assert (("libev: negative ev_timer repeat value found while processing timers", w->repeat > 0.));
ANHE_at_cache (timers [HEAP0]);
downheap (timers, timercnt, HEAP0);
}
else
ev_timer_stop (EV_A_ w); /* nonrepeating: stop timer */
EV_FREQUENT_CHECK;
feed_reverse (EV_A_ (W)w);
}
while (timercnt && ANHE_at (timers [HEAP0]) < mn_now);
feed_reverse_done (EV_A_ EV_TIMER);
}
}
#if EV_USE_4HEAP
#define DHEAP 4
#define HEAP0 (DHEAP - 1) /* index of first element in heap */
#define HPARENT(k) ((((k) - HEAP0 - 1) / DHEAP) + HEAP0)
#define UPHEAP_DONE(p,k) ((p) == (k))
/* away from the root */
inline_speed void
downheap (ANHE *heap, int N, int k)
{
ANHE he = heap [k];
ANHE *E = heap + N + HEAP0;
for (;;)
{
ev_tstamp minat;
ANHE *minpos;
ANHE *pos = heap + DHEAP * (k - HEAP0) + HEAP0 + 1;
/* find minimum child */
if (expect_true (pos + DHEAP - 1 < E))
{
/* fast path */ (minpos = pos + 0), (minat = ANHE_at (*minpos));
if ( ANHE_at (pos [1]) < minat) (minpos = pos + 1), (minat = ANHE_at (*minpos));
if ( ANHE_at (pos [2]) < minat) (minpos = pos + 2), (minat = ANHE_at (*minpos));
if ( ANHE_at (pos [3]) < minat) (minpos = pos + 3), (minat = ANHE_at (*minpos));
}
else if (pos < E)
{
/* slow path */ (minpos = pos + 0), (minat = ANHE_at (*minpos));
if (pos + 1 < E && ANHE_at (pos [1]) < minat) (minpos = pos + 1), (minat = ANHE_at (*minpos));
if (pos + 2 < E && ANHE_at (pos [2]) < minat) (minpos = pos + 2), (minat = ANHE_at (*minpos));
if (pos + 3 < E && ANHE_at (pos [3]) < minat) (minpos = pos + 3), (minat = ANHE_at (*minpos));
}
else
break;
if (ANHE_at (he) <= minat)
break;
heap [k] = *minpos;
ev_active (ANHE_w (*minpos)) = k;
k = minpos - heap;
}
heap [k] = he;
ev_active (ANHE_w (he)) = k;
}
#else /* 4HEAP */
#define HEAP0 1
#define HPARENT(k) ((k) >> 1)
#define UPHEAP_DONE(p,k) (!(p))
/* away from the root */
inline_speed void
downheap (ANHE *heap, int N, int k)
{
ANHE he = heap [k];
for (;;)
{
int c = k << 1;
if (c >= N + HEAP0)
break;
c += c + 1 < N + HEAP0 && ANHE_at (heap [c]) > ANHE_at (heap [c + 1])
? 1 : 0;
if (ANHE_at (he) <= ANHE_at (heap [c]))
break;
heap [k] = heap [c];
ev_active (ANHE_w (heap [k])) = k;
k = c;
}
heap [k] = he;
ev_active (ANHE_w (he)) = k;
}
#endif
inline_speed void
feed_reverse (EV_P_ W w)
{
array_needsize (W, rfeeds, rfeedmax, rfeedcnt + 1, EMPTY2);
rfeeds [rfeedcnt++] = w;
}
inline_size void
feed_reverse_done (EV_P_ int revents)
{
do
ev_feed_event (EV_A_ rfeeds [--rfeedcnt], revents);
while (rfeedcnt);
}
void noinline
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
W w_ = (W)w;
int pri = ABSPRI (w_);
if (expect_false (w_->pending))
pendings [pri][w_->pending - 1].events |= revents;
else
{
w_->pending = ++pendingcnt [pri];
array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
pendings [pri][w_->pending - 1].w = w_;
pendings [pri][w_->pending - 1].events = revents;
}
pendingpri = NUMPRI - 1;
}
四、ev_run
int ev_run (EV_P_ int flags)
{
waittime = MAX_BLOCKTIME;
if (timercnt)
{
ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now;
if (waittime > to) waittime = to;
。。。。。
}
timers_reify(EV_A);
EV_INVOKE_PENDING;
}
Libev中在管理定時器時,使用了堆這種結構存儲ev_timer,除了最小2叉堆以外,還有4叉堆,可用經過宏定義來設置使用哪一個。
對於n叉堆來講,使用數組進行存儲時,下標爲x的元素,其孩子節點的下標範圍是[nx+1, nx+n]。好比2叉堆,下標爲x的元素,其孩子節點的下標爲2x+1和2x+2.
根據定理,對於4叉堆而言,下標爲x的元素,其孩子節點的下標範圍是[4x+1, 4x+4]。還能夠得出,其父節點的下標是(x-1)/4。然而在Libev的代碼中,使用數組a存儲堆時,4叉堆的第一個元素存放在a[3],2叉堆的第一個元素存放在a[1]。
因此,對於Libev中的4叉堆實現而言,下標爲k的元素(對應在正常實現中的下標是k-3),其孩子節點的下標範圍是[4(k-3)+1+3, 4(k-3)+4+3];其父節點的下標是((k-3-1)/4)+3。
對於Libev中的2叉堆實現而言,下標爲k的元素(對應在正常實現中,其下標是k-1),其孩子節點的下標範圍是[2(k-1)+1+1, 2(k-1)+2+1],也就是[2k, 2k+1];其父節點的下標是((k-1-1)/2)+1,也就是k/2。
downheap和upheap函數就是使用以上原則,不斷與子結點或者父結點比較,交換,最終造成堆。
首先,ev_timer_start 將時間監控器添加到timers中(經過upheap),loop->timer是一個數組形式的最小堆。根據timer->at作的比較,即堆頂爲時間最小的監控器,timer->active是數組的下標。
在ev_run中,先計算超時時間,使其不大於最小的時間。
最後,timers_reify 中取出到時的時間監聽器,添加到pendings隊列。若是是repeat的話,則更新下一次觸發時間,調用downheap操做將這個節點下移至合適的位置;不然直接刪除該watcher。
從角色上來看,這三個類型的watcher其實都是事件循環的一個擴展點。經過這三個watcher,開發者能夠在事件循環的一些特殊時刻得到回調的機會。
ev_prepare 在事件循環發生阻塞前會被觸發。
ev_check 在事件循環阻塞結束後會被觸發。ev_check的觸發是按優先級劃分的。能夠保證,ev_check是同一個優先級上阻塞結束後最早被觸發的watcher。因此,若是要保證ev_check是最早被執行的,能夠把它的優先級設成最高。
ev_idle 當沒有其餘watcher被觸發時被觸發。ev_idle也是按優先級劃分的。它的語義是,在當前優先級以及更高的優先級上沒有watcher被觸發,那麼它就會被觸發,不管以後在較低優先級上是否有其餘watcher被觸發。
這三類watcher給外部的開發者提供了很是便利的擴展機制,在這個基礎上,開發者能夠作不少有意思的事情,也對事件循環有了更多的控制權。具體到底能作些什麼,作到什麼程度,那就要看開發者們的想象力和創造力了:)
ev_signal 是信號監聽器,實現方式是經過signalfd、eventfd、pipe等方法將對信號的處理,轉化爲對文件描述符的處理。signalfd、eventfd是linux提供的同步信號處理的方式。
http://blog.csdn.net/gqtcgq/article/details/49716601