libev實現分析

libev是一個事件驅動庫,底層是基於select、epoll、kqueue等I/O複用接口。所謂事件驅動庫,就是用戶定義一個事件以及改事件發生時調用的函數,該庫會監聽該事件,並在事件發生時調用相應的函數。數組

libev提供了不少事件監聽器(watcher),最主要的有IO、時間以及信號監聽器。當某一個文件的讀事件或者寫事件發生時,週期時間到了時,進程接收到某個信號時,就會調用用戶定義的回調函數。less

下面以IO事件爲例,講述libev的工做原理:函數

一、實例oop

 1 #include<stdio.h>
 2 #include <ev.h>
 3 // every watcher type has its own typedef'd struct
 4 // with the name ev_TYPE
 5 ev_io stdin_watcher;
 6 ev_timer timeout_watcher;
 7 
 8 // all watcher callbacks have a similar signature
 9 // this callback is called when data is readable on stdin
10 static void
11 stdin_cb (EV_P_ ev_io *w, int revents)
12 {
13   puts ("stdin ready OK!");
14   // for one-shot events, one must manually stop the watcher
15   // with its corresponding stop function.
16   ev_io_stop (EV_A_ w);
17 
18   // this causes all nested ev_run's to stop iterating
19   ev_break (EV_A_ EVBREAK_ALL);
20 }
21 
22 // another callback, this time for a time-out
23 static void
24 timeout_cb (EV_P_ ev_timer *w, int revents)
25 {
26    puts ("timeout");
27    // this causes the innermost ev_run to stop iterating
28    ev_break (EV_A_ EVBREAK_ONE);
29 }
30 
31 int
32 main (void)
33 {
34     // use the default event loop unless you have special needs
35     struct ev_loop *loop = EV_DEFAULT;
36     // initialise an io watcher, then start it
37     // this one will watch for stdin to become readable
38     ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);
39     ev_io_start (loop, &stdin_watcher);
40     
41     // initialise a timer watcher, then start it
42     // simple non-repeating 5.5 second timeout
43     ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.);
44     ev_timer_start (loop, &timeout_watcher);
45     
46     // now wait for events to arrive
47     ev_run (loop, 0);
48     //
49     // break was called, so exit
50     return 0;
51 }

能夠看出,libev庫的使用簡單、方便。咱們只要定義個事件的監聽器對象,初始化,開始,最後調用ev_run。不一樣事件監聽器初始化的內容也不同,好比,IO事件監聽器須要初始化監聽的文件描述符,事件以及回調函數。ui

二、事件監聽器this

typedef ev_watcher *W;
typedef ev_watcher_list *WL;
typedef ev_watcher_time *WT;

typedef struct ev_watcher
{
  int active; 
  int pending;
  int priority;
  void *data;
  void (*cb)(EV_P_ struct type *w, int revents);
} ev_watcher;

typedef struct ev_watcher_list
{
  int active; 
  int pending;
  int priority;
  void *data;
  void (*cb)(EV_P_ struct ev_watcher_list *w, int revents);
  struct ev_watcher_list *next;
} ev_watcher_list;

typedef struct ev_io
{
  int active; 
  int pending;
  int priority;
  void *data;
  void (*cb)(EV_P_ struct ev_io *w, int revents);
  struct ev_watcher_list *next;

  int fd;     /* ro */
  int events; /* ro */
} ev_io;
ev_watcher是一個基礎監聽器,包括回調函數cd;監聽器列表(ev_watcher_list)是在監聽器的基礎上添加指向下一個監聽器的指針next;IO監聽器是在監聽器列表的基礎上加上了其特有的文件描述符和事件類型
ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);該函數就是初始化stdin_watcher這個監聽器的回調函數,文件描述符,以及事假類型。
三、struct ev_loop
struct ev_loop;
# define EV_P  struct ev_loop *loop
# define EV_P_ struct ev_loop *loop,
# define EV_A  loop
# define EV_A_ loop,  //這4個宏用於形參

struct ev_loop{ //部分參數
    ANFD *anfds
    int anfdmax

    int *fdchanges
    int fdchangemax
    int fdchangecnt
    
    ANPENDING *pendings [NUMPRI]
    int pendingmax [NUMPRI]
    int pendingcnt [NUMPRI]

    int backend
    int backend_fd
    void (*backend_modify)(EV_P_ int fd, int oev, int nev)
    void (*backend_poll)(EV_P_ ev_tstamp timeout)
}

typedef struct
{
  WL head;           //ev_watcher_list *head;
  unsigned char events; /* the events watched for */
  unsigned char reify;  /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
  unsigned char emask;  /* the epoll backend stores the actual kernel mask in here */
  unsigned char unused;
  
  unsigned int egen;    /* generation counter to counter epoll bugs */
  SOCKET handle;
  OVERLAPPED or, ow;
} ANFD;

typedef struct
{
  W w;       //ev_watcher *w;
  int events; /* the pending event set for the given watcher */
} ANPENDING;
ev_run函數主要是一個while循環,在這個while循環中不斷檢測各個事件是否發生,若是發生就調用其回調函數。而這個過程當中,主要用到的對象就是struct ev_loop結構體對象,檢測哪些事件,回調哪一個函數都存放在該對象中。
struct ev_loop結構體中的字段不少,以IO事件爲例介紹幾個主要的:
anfds是一個數組,數組元素是結構體ANFD,ANFD有一個成員是監聽器列表。數組下標是文件描述符,而列表成員是監聽該文件的事件監聽器。因此,anfds有點相似散列表,以文件描述符做爲鍵,以監聽器做爲值,採用開鏈法解決散列衝突。
該字段的初始化在ev_io_start函數中,主要目的是用戶定義的監聽器告訴ev_loop。
fdchanges是一個int數組,也是在ev_io_start中初始化。存放的是監聽了的文件描述符。這樣ev_run每次循環的時候,要先從fdchanges中取出已經監聽的文件描述符,再以該描述符爲下標,從anfds中取出監聽器對象。這樣就獲得文件描述符以及監聽
的事件。

pendings是一個二維數組,第一維是優先級,第二維是監聽器。這個數組是用於執行相應的回調函數,根據優先級,遍歷全部監聽器,調用監聽器的回調函數。

三、ev_io_start
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  ev_start (EV_A_ (W)w, 1);   //w->active = active;
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero); //when fd + 1 > anfdmax : 從新分配數組大小
                                                                  //anfds = (type *)array_realloc(sizeof (ANFD), (anfds), &(anfdmax), (fd + 1));
  wlist_add (&anfds[fd].head, (WL)w);  // w->next = *head;*head = w;
  
  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); //nfds [fd].reify |= flags;
                                                                 //++fdchangecnt;
                                                                 //array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);
                                                                 //fdchanges [fdchangecnt - 1] = fd;
}

array_realloc (int elem, void *base, int *cur, int cnt)
{
  *cur = array_nextsize (elem, *cur, cnt);
  return ev_realloc (base, elem * *cur);
}

//分配當前數組大小的兩倍內存,若是大於4096,則爲4096的倍數
int array_nextsize (int elem, int cur, int cnt)
{
  int ncur = cur + 1;

  do
    ncur <<= 1;
  while (cnt > ncur);

  /* if size is large, round to MALLOC_ROUND - 4 * longs to accommodate malloc overhead */
  if (elem * ncur > MALLOC_ROUND - sizeof (void *) * 4) //#define MALLOC_ROUND 4096
    {
      ncur *= elem;
      ncur = (ncur + elem + (MALLOC_ROUND - 1) + sizeof (void *) * 4) & ~(MALLOC_ROUND - 1);
      ncur = ncur - sizeof (void *) * 4;
      ncur /= elem;
    }

  return ncur;
}

ev_io_start函數主要就是對ev_loop的anfds和fdchanges字段操做,上面已介紹。array_needsize函數實現當數組大小不夠時,要從新分配內存,分配方式與stl::vector有些相似,都是新分配的內存爲當前內存的2倍,而後移動原先數據到新內存,釋放舊內存。spa

四、ev_run
ev_run (EV_P_ int flags)
{
    ...
    do{
      fd_reify (EV_A);
      backend_poll (EV_A_ waittime);
      if (expect_false (checkcnt))
        queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
      EV_INVOKE_PENDING;
    }
    while(...)
    ...
}
ev_run函數代碼比較多,以上是以IO事件爲例,進行的精簡下面是以epoll做爲IO多路複用的機制進行ev_run說明
一、ev_loop對象的初始化:
ev_loop對象初始化:
1、# define EV_DEFAULT  ev_default_loop (0)

2static struct ev_loop default_loop_struct;
EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0;

ev_default_loop (unsigned int flags) EV_THROW
{
  if (!ev_default_loop_ptr)
    {
      EV_P = ev_default_loop_ptr = &default_loop_struct;
      loop_init (EV_A_ flags);
    }

  return ev_default_loop_ptr;
}

3static void noinline ecb_cold
loop_init (EV_P_ unsigned int flags) EV_THROW
{
 flags = atoi (getenv ("LIBEV_FLAGS"));
#if EV_USE_IOCP
      if (!backend && (flags & EVBACKEND_IOCP  )) backend = iocp_init   (EV_A_ flags);
#endif
#if EV_USE_PORT
      if (!backend && (flags & EVBACKEND_PORT  )) backend = port_init   (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
      if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
      if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init  (EV_A_ flags);
#endif
#if EV_USE_POLL
      if (!backend && (flags & EVBACKEND_POLL  )) backend = poll_init   (EV_A_ flags);
#endif
#if EV_USE_SELECT
      if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
}
EV_USE_EPOLL 、EV_USE_SELECT等宏是在調用./configure時,搜索sys/epoll.h sys/select.h等文件,若是文件存在,就將宏設置爲1.
__cplusplus宏是g++編譯器定義的

4int inline_size
epoll_init (EV_P_ int flags)
{
  backend_fd = epoll_create (256);

  if (backend_fd < 0)
    return 0;

  fcntl (backend_fd, F_SETFD, FD_CLOEXEC);

  backend_mintime = 1e-3; /* epoll does sometimes return early, this is just to avoid the worst */
  backend_modify  = epoll_modify;
  backend_poll    = epoll_poll;

  epoll_eventmax = 64; /* initial number of events receivable per poll */
  epoll_events = (struct epoll_event *)ev_malloc (sizeof (struct epoll_event) * epoll_eventmax);

  return EVBACKEND_EPOLL;//即 EVBACKEND_EPOLL   = 0x00000004U
}
epoll_init返回值爲非0,因此不會調用後面的poll_init、select_init。

   因此,在初始化時,調用了epoll_create初始化backend_fd。指針

 二、fd_reifycode

fd_reify (EV_P)
{
    for (i = 0; i < fdchangecnt; ++i)
    {
      int fd = fdchanges [i];
      ANFD *anfd = anfds + fd;

      if (o_reify & EV__IOFDSET)
        backend_modify (EV_A_ fd, o_events, anfd->events); //即poll_modify
    }
  fdchangecnt = 0;
}
epoll_modify (EV_P_ int fd, int oev, int nev)
{
  struct epoll_event ev;

  ev.data.u64 = (uint64_t)(uint32_t)fd
              | ((uint64_t)(uint32_t)++anfds [fd].egen << 32);
  ev.events   = (nev & EV_READ  ? EPOLLIN  : 0)
              | (nev & EV_WRITE ? EPOLLOUT : 0);

  if (expect_true (!epoll_ctl (backend_fd, oev && oldmask != nev ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev)))
    return;
}

  在fd_reify中將andfs中監聽的事件添加到backend_fd中。對象

  三、backend_poll

backend_poll (EV_A_ waittime); // 即epoll_poll
epoll_poll (EV_P_ ev_tstamp timeout)
{
  eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);
  for (i = 0; i < eventcnt; ++i)
    {
      struct epoll_event *ev = epoll_events + i;

      int fd = (uint32_t)ev->data.u64; /* mask out the lower 32 bits */
      int want = anfds [fd].events;
      int got  = (ev->events & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)
               | (ev->events & (EPOLLIN  | EPOLLERR | EPOLLHUP) ? EV_READ  : 0);
      fd_event (EV_A_ fd, got);  //將watcher設置到loop的pending數組中
      }
}
fd_event (EV_P_ int fd, int revents)
{
  ANFD *anfd = anfds + fd;
  if (expect_true (!anfd->reify))
    fd_event_nocheck (EV_A_ fd, revents);
}
fd_event_nocheck (EV_P_ int fd, int revents)
{
  ANFD *anfd = anfds + fd;
  ev_io *w;

  for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
    {
      int ev = w->events & revents;

      if (ev)
        ev_feed_event (EV_A_ (W)w, ev);
    }
}
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
  W w_ = (W)w;
  int pri = ABSPRI (w_);

  if (expect_false (w_->pending))
    pendings [pri][w_->pending - 1].events |= revents;
  else
    {
      w_->pending = ++pendingcnt [pri];
      array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
      pendings [pri][w_->pending - 1].w      = w_;
      pendings [pri][w_->pending - 1].events = revents;
    }
}

在backend_poll中會調用epoll_wait,經過fd_event函數,將就緒的文件描述符對應的監聽器添加到ev_loop對象的pendings字段中

  四、EV_INVOKE_PENDING

void noinline
ev_invoke_pending (EV_P)
{
  pendingpri = NUMPRI;

  while (pendingpri) /* pendingpri possibly gets modified in the inner loop */
    {
      --pendingpri;

      while (pendingcnt [pendingpri])
        {
          ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];

          p->w->pending = 0;
          EV_CB_INVOKE (p->w, p->events);
        }
    }
}
# define EV_CB_INVOKE(watcher,revents) (watcher)->cb (EV_A_ (watcher), (revents))

EV_INVOKE_PENDING會一次調用pendings中的監聽器的回調函數。

至此,ev_run大致介紹完畢。

小結:

總的來講,對於IO事件驅動,libev是先將監聽器存放在一個數組,每次遍歷都將監聽器監聽的文件描述符添加到epoll_wait進行監聽,而後將eopll_wait返回的就緒描述符對應的監聽器添加到pendings,最後調用pendings中監聽器的回調函數。

相關文章
相關標籤/搜索