libuv的源碼分析(1)

libuv我在今年四月份的時候開始接觸,一開始也遇到了不少坑,但後來理解並遵照了它的設計思想,一切就變得很方便。這幾天開始着手精讀它的源碼,本着記錄本身的學習痕跡,也但願能增長別人搜索相關問題結果數的目的,所以就有了這些東西,這個系列至少會有四篇,後續再說吧。
html

那麼它是什麼,一個高效輕量的跨平臺異步io庫,在linux下,它整合了libevent,在windows下,它用iocp重寫了一套。它有那些功能,以下面這幅官網上的圖所示:linux

它的總體結構基於事件循環,簡單的說就是外部的接口實際上是對內層的一個個請求,並無作真正的事,這些請求都存儲在內部一個請求隊列中,在事件循環中,再從請求隊列中取出他們,而後作具體的事情,作完了利用回調函數通知調用者,這樣一來,全部的外部接口均可以變成異步的。windows

它的世界有三類元素:api

  1. uv_loop_t,表示事件循環,爲其餘兩類元素提供環境容器和統籌調度。
  2. uv_handle_t族,持續性請求,生命週期較長,且能屢次觸發事件。
  3. uv_req_t族,一次性請求.

它的實現基於一個假設,運行於單線程.異步

下面主要來分析它的主體和經常使用部分,循環,timer,tcp,udp,fs,特殊的uv_handle_t.此次分析是基於其windows部分的,由於我在windows上用它比較多,不過其不一樣平臺上的思路是一致的,只是具體實現上採用的系統api不一樣罷了.庫版本基於1.7.0版本.socket


 

uv_loop_tasync

先來看定義,在include/uv.h的1459行:tcp

struct uv_loop_s { /* User data - use this for whatever. */
  void* data; /* Loop reference counting. */ unsigned int active_handles; void* handle_queue[2]; void* active_reqs[2]; /* Internal flag to signal loop stop. */ unsigned int stop_flag; UV_LOOP_PRIVATE_FIELDS }; #define UV_LOOP_PRIVATE_FIELDS                                                \
    /* The loop's I/O completion port */ \ HANDLE iocp; \ /* The current time according to the event loop. in msecs. */ \ uint64_t time; \ /* Tail of a single-linked circular queue of pending reqs. If the queue */ \ /* is empty, tail_ is NULL. If there is only one item, */ \ /* tail_->next_req == tail_ */ \ uv_req_t* pending_reqs_tail; \ /* Head of a single-linked list of closed handles */ \ uv_handle_t* endgame_handles; \ /* The head of the timers tree */ \ struct uv_timer_tree_s timers; \ /* Lists of active loop (prepare / check / idle) watchers */ \ uv_prepare_t* prepare_handles; \ uv_check_t* check_handles; \ uv_idle_t* idle_handles; \ /* This pointer will refer to the prepare/check/idle handle whose */ \ /* callback is scheduled to be called next. This is needed to allow */ \ /* safe removal from one of the lists above while that list being */ \ /* iterated over. */ \ uv_prepare_t* next_prepare_handle; \ uv_check_t* next_check_handle; \ uv_idle_t* next_idle_handle; \ /* This handle holds the peer sockets for the fast variant of uv_poll_t */ \ SOCKET poll_peer_sockets[UV_MSAFD_PROVIDER_COUNT]; \ /* Counter to keep track of active tcp streams */ \ unsigned int active_tcp_streams; \ /* Counter to keep track of active udp streams */ \ unsigned int active_udp_streams; \ /* Counter to started timer */ \ uint64_t timer_counter; \ /* Threadpool */ \ void* wq[2]; \ uv_mutex_t wq_mutex; \ uv_async_t wq_async;
active_handles是uv_loop_t的引用計數,每投遞一個uv_handle_t或uv_req_t都會使其遞增,結束一個請求都會使其遞減。
handle_queue是uv_handle_t族的隊列。
active_queue是uv_req_t族的隊列。
data是用戶數據域。
uv_prepare_t,uv_check_t,uv_idle_t,uv_async_t是個特殊的uv_handle_t,隨後會說明。
pending_reqs_tail是用來裝已經處理過的請求的隊列。
endgame_handles是用來裝執行了關閉的uv_handle_t族鏈表。
timers是計時器結構,用最小堆實現。
iocp是完成端口的句柄。

這個結構外部除了data外,其餘都不該該使用。與其直接有關的接口有兩個, uv_loop_init和uv_run。
先來看uv_loop_init,在/src/core.c的126行:
int uv_loop_init(uv_loop_t* loop) { int err; /* Initialize libuv itself first */ uv__once_init(); /* Create an I/O completion port */ loop->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); if (loop->iocp == NULL) return uv_translate_sys_error(GetLastError()); /* To prevent uninitialized memory access, loop->time must be initialized * to zero before calling uv_update_time for the first time. */ loop->time = 0; uv_update_time(loop); QUEUE_INIT(&loop->wq); QUEUE_INIT(&loop->handle_queue); QUEUE_INIT(&loop->active_reqs); loop->active_handles = 0; loop->pending_reqs_tail = NULL; loop->endgame_handles = NULL; RB_INIT(&loop->timers); loop->check_handles = NULL; loop->prepare_handles = NULL; loop->idle_handles = NULL; loop->next_prepare_handle = NULL; loop->next_check_handle = NULL; loop->next_idle_handle = NULL; memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets); loop->active_tcp_streams = 0; loop->active_udp_streams = 0; loop->timer_counter = 0; loop->stop_flag = 0; err = uv_mutex_init(&loop->wq_mutex); if (err) goto fail_mutex_init; err = uv_async_init(loop, &loop->wq_async, uv__work_done); if (err) goto fail_async_init; uv__handle_unref(&loop->wq_async); loop->wq_async.flags |= UV__HANDLE_INTERNAL; return 0; fail_async_init: uv_mutex_destroy(&loop->wq_mutex); fail_mutex_init: CloseHandle(loop->iocp); loop->iocp = INVALID_HANDLE_VALUE; return err; }

這裏初始化了本身各個字段,並用uv__once_init模擬了pthread_once_t初始化了庫的各個子系統。函數

再來看看uv_run,這是核心函數,庫的入口與發動機,在src/core.c的372行:oop

 1 int uv_run(uv_loop_t *loop, uv_run_mode mode) {  2  DWORD timeout;  3   int r;  4   int ran_pending;  5   void (*poll)(uv_loop_t* loop, DWORD timeout);  6 
 7   if (pGetQueuedCompletionStatusEx)  8     poll = &uv_poll_ex;  9   else
10     poll = &uv_poll; 11 
12   r = uv__loop_alive(loop); 13   if (!r) 14  uv_update_time(loop); 15 
16   while (r != 0 && loop->stop_flag == 0) { 17  uv_update_time(loop); 18  uv_process_timers(loop); 19 
20     ran_pending = uv_process_reqs(loop); 21  uv_idle_invoke(loop); 22  uv_prepare_invoke(loop); 23 
24     timeout = 0; 25     if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT) 26       timeout = uv_backend_timeout(loop); 27 
28     (*poll)(loop, timeout); 29 
30  uv_check_invoke(loop); 31  uv_process_endgames(loop); 32 
33     if (mode == UV_RUN_ONCE) { 34       /* UV_RUN_ONCE implies forward progress: at least one callback must have 35  * been invoked when it returns. uv__io_poll() can return without doing 36  * I/O (meaning: no callbacks) when its timeout expires - which means we 37  * have pending timers that satisfy the forward progress constraint. 38  * 39  * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from 40  * the check. 41        */
42  uv_process_timers(loop); 43  } 44 
45     r = uv__loop_alive(loop); 46     if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) 47       break; 48  } 49 
50   /* The if statement lets the compiler compile it to a conditional store. 51  * Avoids dirtying a cache line. 52    */
53   if (loop->stop_flag != 0) 54     loop->stop_flag = 0; 55 
56   return r; 57 }

 

循環的流程用官方一張圖來表示:

 

 這些具體的處理函數也基本是各個子系統的入口函數,在後面分析各個子系統時會詳細說明,這裏只分析它的三種運行模式,也就是mode參數指定的:

  1. UV_ONCE,阻塞模式運行,直處處理了一次請求。關鍵在於阻塞,這個阻塞點發生在26-28行,由uv_backend_timeout選定一個超時時間,讓poll函數進行超時阻塞,那麼來看看uv_backend_timeout的選定規則,在src/core.c的234行:
     1 int uv_backend_timeout(const uv_loop_t* loop) {  2   if (loop->stop_flag != 0)  3     return 0;  4 
     5   if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))  6     return 0;  7 
     8   if (loop->pending_reqs_tail)  9     return 0; 10 
    11   if (loop->endgame_handles) 12     return 0; 13 
    14   if (loop->idle_handles) 15     return 0; 16 
    17   return uv__next_timeout(loop);

    是這樣的,A有任何可處理的請求,返回0.B取最小計時器事件的時間與當前時間的差值。C沒有計時器事件返回INFINITE,直到有一個完成端口事件。

  2. UV_DEFAULT,阻塞運行到直到手動中止或沒有被任何請求引用。
  3. UV_NOWAIT,運行一次循環流程,跳過了uv_backend_timeout這一步。

最後來講一下uv_loopt_t是如何退出的,A.stop_flag爲1.B.沒有任何請求(也就是uv_loop_alive的判斷)。用uv_stop能夠設置stop_flag爲1。

未完待續,下一篇講四大特殊uv_handle_t。

相關文章
相關標籤/搜索