圖片來源:libuvhtml
本文做者:肖思元前端
對 Node.js 的學習,不管如何都繞不開 Libuv。本文選擇沿着 Libuv 的 Linux 實現的脈絡對其內部一探究竟node
As an asynchronous event-driven JavaScript runtime, Node.js is designed to build scalable network applicationslinux
Node.js 做爲前端同窗探索服務端業務的利器,自身是立志能夠構建一個具備伸縮性的網絡應用程序。目前的服務端環境主要仍是 Linux,對於另外一個主要的服務端環境 Unix,則在 API 上和 Linux 具備很高類似性,因此選擇 Linux 做爲起始點,說不定能夠有雙倍收穫和雙倍快樂github
下面是 libuv 官網的架構圖:c#
單以 Linux 平臺來看,libuv 主要工做能夠簡單劃爲兩部分:數組
爲了追本溯源,咱們將從 epoll 開始markdown
簡單來講,epoll 是由 Linux 內核提供的一個系統調用(system call),咱們的應用程序能夠經過它:網絡
咱們經過一小段僞代碼來演示使用 epoll 時的核心步驟:
// 建立 epoll 實例
int epfd = epoll_create(MAX_EVENTS);
// 向 epoll 實例中添加須要監聽的文件描述符,這裏是 `listen_sock`
epoll_ctl_add(epfd, listen_sock, EPOLLIN | EPOLLOUT | EPOLLET);
while(1) {
// 等待來自 epoll 的通知,通知會在其中的文件描述符狀態改變時
// 由系統通知應用。通知的形式以下:
//
// epoll_wait 調用不會當即返回,系統會在其中的文件描述符狀態發生
// 變化時返回
//
// epoll_wait 調用返回後:
// nfds 表示發生變化的文件描述符數量
// events 會保存當前的事件,它的數量就是 nfds
int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
// 遍歷 events,對事件做出符合應用預期的響應
for (int i = 0; i < nfds; i++) {
// consume events[i]
}
}
複製代碼
完整例子見 epoll-echo-server
上面的代碼中已經包含了註釋,能夠大體歸納爲下圖:
因此處於 libuv 底層的 epoll 也是有「事件循環」的概念,可見事件循環並非 libuv 首創
提到 epoll,不得不提它的兩種觸發模式:水平觸發(Level-triggered)、邊緣觸發(Edge-triggered)。不得不提是由於它們關係到 epoll 的事件觸發機制,加上名字取得又有些晦澀
這兩個術語都源自電子學領域,咱們從它們的原始含義開始理解
首先是水平觸發:
上圖是表示電壓變化的時序圖,VH 表示電壓的峯值,VL 表示電話的谷值。水平觸發的含義是,隨着時間的變化,只要電壓處於峯值,系統就會激活對應的電路(觸發)
上圖依然是表示電壓變化的時序圖,不過激活電路(觸發)的條件是電壓的改變,即電壓由 VH -> VL、VL -> VH 的狀態變化,在圖中經過邊來表示這個變化,即 Rising edge 和 Falling edge,因此稱爲 Edge-triggered 即邊緣觸發
咱們能夠大體理解它們的形式與差異,繼續結合下面的 epoll 中的表現進行理解
回到 epoll 中,水平觸發和邊緣觸發做爲原始含義的衍生,固然仍是具備相似電子學領域中的含義
咱們經過一個例子來理解,好比咱們有一個 fd(File descriptor) 表示剛創建的客戶端鏈接,隨後客戶端給咱們發送了 5 bytes 的內容,
若是是水平觸發:
若是是邊緣觸發:
咱們很難將水平觸發、邊緣觸發的字面意思與上面的行爲聯繫起來,好在咱們已經預先了解過它們在電子學領域的含義
水平觸發,由於已是可讀狀態,因此它會一直觸發,直到咱們讀完緩衝區,且系統緩衝區沒有新的客戶端發送的內容;邊緣觸發對應的是狀態的變化,每次有新的客戶端發送內容,都會設置可讀狀態,所以只會在這個時機觸發
水平觸發是 epoll 默認的觸發模式,而且 libuv 中使用的也是水平觸發。在瞭解了水平觸發和邊緣觸發的區別後,咱們其實就能夠猜想 libuv 使用水平觸發而不是邊緣觸發背後的考量:
若是是邊緣觸發,在 epoll 的客觀能力上,咱們不被要求一次讀取完緩衝區的內容(能夠等到下一次客戶端發送內容時繼續讀取)。可是實際業務中,客戶端此時極可能在等待咱們的響應(能夠結合 HTTP 協議理解),而咱們還在等待客戶端的下一次寫入,所以會陷入死鎖的邏輯。由此一來,一次讀取完緩衝區的內容幾乎就成了邊緣觸發模式下的必選方式,這就不可避免的形成其餘回調的等待時間變長,讓 CPU 時間分配在各個回調之間顯得不夠均勻
epoll 並不可以做用在全部的 IO 操做上,好比文件的讀寫操做,就沒法享受到 epoll 的便利性
因此 libuv 的工做能夠大體歸納爲:
回到 libuv,咱們將以 event-loop 爲主要脈絡,結合上文提到的 epoll,以及下面將會介紹到的線程池,繼續 libuv 在 Linux 上的實現細節一探究竟
咱們將結合源碼來回顧一下 event-loop 基本概念
下面這幅圖也取自 libuv 官網,它描述了 event-loop 內部的工做:
單看流程圖可能太抽象,下面是對應的 libuv 內部的實現 完整內容:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r) uv__update_time(loop);
// 是循環,沒錯了
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
// 處理 timer 隊列
uv__run_timers(loop);
// 處理 pending 隊列
ran_pending = uv__run_pending(loop);
// 處理 idle 隊列
uv__run_idle(loop);
// 處理 prepare 隊列
uv__run_prepare(loop);
// 執行 io_poll
uv__io_poll(loop, timeout);
uv__metrics_update_idle_time(loop);
// 執行 check 隊列
uv__run_check(loop);
// 執行 closing 隊列
uv__run_closing_handles(loop);
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break;
}
return r;
}
複製代碼
之因此各類形式的回調(好比 setTimeout
)在優先級上會有差異,就在於它們使用的是不一樣的隊列,而不一樣的隊列在每次事件循環的迭代中的執行順序不一樣
按照官網的描述,它們是對 event-loop 中執行的操做的抽象,前者表示須要長期存在的操做,後者表示短暫的操做。單看文字描述可能不太好理解,咱們看一下它們的使用方式有何不一樣
對於 Handle 表示的長期存在的操做來講,它們的 API 具備相似下面的形式:
// IO 操做
int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, uv_os_sock_t socket);
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);
int uv_poll_stop(uv_poll_t* poll);
// timer
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);
複製代碼
大體都有這三個步驟(並非所有):初始化 -> 開始 -> 中止
。很好理解吧,由於是長期存在的操做,它開始了就會持續被處理,因此須要安排一個「中止」的 API
而對於 Request 表示的短暫操做來講,好比域名解析操做:
int uv_getaddrinfo(uv_loop_t* loop, uv_getaddrinfo_t* req, uv_getaddrinfo_cb getaddrinfo_cb, /* ... */);
複製代碼
域名解析操做的交互形式是,咱們提交須要解析的地址,方法會返回解析的結果(這樣的感受彷佛有點 HTTP 1.0 請求的樣子),因此按「請求 - Request」來命名這樣的操做的緣由就變得有畫面感了
不過 Handle 和 Request 二者不是互斥的概念,Handle 內部實現可能也用到了 Request。由於一些宏觀來看的長期操做,在每一個時間切片內是能夠當作是 Request 的,好比咱們處理一個請求,能夠當作是一個 Handle,而在當次的請求中,咱們極可能會作一些讀取和寫入的操做,這些操做就能夠當作是 Request
咱們經過 timer 開放出來的 API 爲線索,來分析它的內部實現:
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);
複製代碼
uv_timer_init
沒有什麼特殊的地方,只是初始化一下 handle
的狀態,並將其添加到 loop->handle_queue
中
uv_timer_start
內部作了這些工做:
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat) {
uint64_t clamped_timeout;
// loop->time 表示 loop 當前的時間。loop 每次迭代開始時,會用當次時間更新該值
// clamped_timeout 就是該 timer 將來超時的時間點,這裏直接計算好,這樣將來就不須要
// 計算了,直接從 timers 中取符合條件的便可
if (clamped_timeout < timeout)
clamped_timeout = (uint64_t) -1;
handle->timer_cb = cb;
handle->timeout = clamped_timeout;
handle->repeat = repeat;
// 除了預先計算好的 clamped_timeout 之外,將來當 clamped_timeout 相同時,使用這裏的
// 自增 start_id 做爲比較條件來以爲 handle 的執行前後順序
handle->start_id = handle->loop->timer_counter++;
// 將 handle 插入到 timer_heap 中,這裏的 heap 是 binary min heap,因此根節點就是
// clamped_timeout 值(或者 start_id)最小的 handle
heap_insert(timer_heap(handle->loop),
(struct heap_node*) &handle->heap_node,
timer_less_than);
// 設置 handle 的開始狀態
uv__handle_start(handle);
return 0;
}
複製代碼
uv_timer_stop
內部作了這些工做:
int uv_timer_stop(uv_timer_t* handle) {
if (!uv__is_active(handle))
return 0;
// 將 handle 移出 timer_heap,和 heap_insert 操做同樣,除了移出以外
// 還會維護 timer_heap 以保障其始終是 binary min heap
heap_remove(timer_heap(handle->loop),
(struct heap_node*) &handle->heap_node,
timer_less_than);
// 設置 handle 的狀態爲中止
uv__handle_stop(handle);
return 0;
}
複製代碼
到目前爲止,咱們已經知道所謂的 start
和 stop
其實能夠粗略地歸納爲,往屬性 loop->timer_heap
中插入或者移出 handle,而且這個屬性使用一個名爲 binary min heap 的數據結構
而後咱們再回顧上文的 uv_run
:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
// ...
while (r != 0 && loop->stop_flag == 0) {
// ...
uv__update_time(loop);
uv__run_timers(loop);
// ...
}
// ...
}
複製代碼
uv__update_time
咱們已經見過了,做用就是在循環開頭階段、使用當前時間設置屬性 loop->time
咱們只須要最後看一下 uv__run_timers
的內容,就能夠串聯整個流程:
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
// 取根節點,該值保證始終是全部待執行的 handle
// 中,最早超時的那一個
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;
// 中止、移出 handle、順便維護 timer_heap
uv_timer_stop(handle);
// 若是是須要 repeat 的 handle,則從新加入到 timer_heap 中
// 會在下一次事件循環中、由本方法繼續執行
uv_timer_again(handle);
// 執行超時 handle 其對應的回調
handle->timer_cb(handle);
}
}
複製代碼
以上,就是 timer 在 Libuv 中的大體實現方式
後面咱們會看到,除了 timer 以外的 handle 都存放在名爲 queue 的數據結構中,而存放 timer handle 的數據結構則爲 min heap。那麼咱們就來看看這樣的差異選擇有何深意
所謂 min heap 實際上是(如需更全面的介紹,能夠參考 Binary Tree):
先看 binary tree(二元樹的定義是):
進一步看 complete binary tree 的定義則是:
下面是幾個例子:
complete binary tree 的例子:
18
/ \
15 30
/ \ / \
40 50 100 40
/ \ /
8 7 9
下面不是 complete binary tree,由於最後一層沒有優先放滿左邊
18
/ \
40 30
/ \
100 40
min heap 的例子,根節點是最小值、父節點始終小於其子節點:
18
/ \
40 30
/ \
100 40
複製代碼
在 libuv 中對 timer handle 所需的操做是:
clamped_timeout
最小的 timer handle而 min heap 兼顧了上面的需求:
heap 的實如今文件是 heap-inl.h,我加入了一些註釋,有興趣的同窗能夠繼續一探究竟
上面,咱們已經瞭解了每次事件循環迭代中、處於第一順位的 timer 的處理,接下來咱們來看處在第二順位的 pending 隊列的處理:
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;
if (QUEUE_EMPTY(&loop->pending_queue))
return 0;
QUEUE_MOVE(&loop->pending_queue, &pq);
// 不斷從隊列中彈出元素進行操做
while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
return 1;
}
複製代碼
從源碼來看,僅僅是從隊列 loop->pending_queue
中不斷彈出元素而後執行,而且彈出的元素是 uv__io_t
結構體的屬性,從名字來看大體應該是 IO 相關的操做
另外,對 loop->pending_queue
進行插入操做的只有函數 uv__io_feed,該函數的被調用點基本是執行一些 IO 相關的收尾工做
和上文出現的 min heap 同樣,queue 也是主要用到的數據結構,因此咱們在第一次見到它的時候、順便介紹一下
min heap 的實現相對更深一些,因此提供了基於源碼的註釋 heap-inl.h 讓感興趣的讀者深刻了解一下,而 queue 則相對就簡單一些,加上源碼中隨處會出現操做 queue 的宏,瞭解這些宏到底作了什麼、會讓閱讀源碼時更加安心
接下來咱們就一塊兒看看 queue 和一些經常使用的操做它的宏,首先是起始狀態:
queue 在 libuv 中被設計成一個環形結構,因此起始狀態就是 next
和 prev
都指向自身
接下來咱們來看一下往 queue 插入一個新的元素是怎樣的形式:
上圖分兩部分,上半部分是已有的 queue、h 表示其當前的 head,q 是待插入的元素。下半部分是插入後的結果,圖中的紅色表示 prev
的通路,紫色表示 next
的通路,順着通路咱們能夠發現它們始終是一個環形結構
上圖演示的 QUEUE_INSERT_TAIL
顧名思義是插入到隊尾,而由於是環形結構,咱們須要修改頭、尾、待插入元素三者的引用關係
再看一下移除某個元素的形式:
移除某個元素就比較簡單了,就是將該元素的 prev
和 next
鏈接起來便可,這樣鏈接後,就跳過了該元素,使得該元素呈現出被移除的狀態(沒法在通路中訪問到)
繼續看下鏈接兩個隊列的操做:
看上去貌似很複雜,其實就是把兩個環先解開,而後首尾相連成爲一個新的環便可。這裏經過意識流的做圖方式,使用 1
和 2
標註了代碼和鏈接動做的對應關係
最後看一下將隊列一分爲二的操做:
上圖一樣經過意識流的做圖方式,使用 1
和 2
標註了代碼和鏈接動做的對應關係;將本來以 h
開頭的 queue,在 q
處剪開,h
和 q
以前的元素相鏈接成爲一個新的 queue;n
做爲另外一個 queue 的開頭,鏈接 q
和斷開前的隊列的末尾,構成另外一個 queue
上面演示了一些具備有表明性的 queue 操做,感興趣的同窗能夠繼續查看 queue.h 來一探究竟
你們或許會奇怪,爲何沒有按照它們在事件循環中的順序進行介紹,並且還把它們三個放在了一塊兒
若是你們在源碼中搜索 uv__run_idle
或者 uv__run_check
會更加奇怪,由於咱們只能找到它們的聲明,甚至找不到它們的定義
其實它們都是在 loop-watcher.c 中經過宏生成的,由於它們的操做都是同樣的 - 從各自的隊列中取出 handle 而後執行便可
須要說明的是,你們不要被 idle 的名字迷惑了,它並非事件循環閒置的時候纔會執行的隊列,而是在每次時間循環迭代中,都會執行的,徹底沒有 idle 之意
不過要說徹底沒有 idle 之意彷佛也不是特別合適,好比 idle 和 prepare 隊列在內部實現上,無非是前後執行的隊列而已:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
// ...
while (r != 0 && loop->stop_flag == 0) {
// ...
uv__run_idle(loop);
uv__run_prepare(loop);
uv__io_poll(loop, timeout);
// ...
}
// ...
}
複製代碼
那麼如今有一個 handle,咱們但願它在 uv__io_poll
以前執行,是添加到 idle 仍是 prepare 隊列中呢?
我以爲 prepare 是取「爲了下面的 uv__io_poll
作準備」之意,因此若是是爲了 io_poll 作準備的 handle,那麼能夠添加到 prepare 隊列中,其他則能夠添加到 idle 之中。一樣的設定我以爲也適用於 check,它運行在 io_poll 以後,可讓用戶作一些檢驗 IO 執行結果的工做,讓任務隊列更加語義化
對於 io_poll 咱們仍是從事件循環開始分析
下面是上文已經介紹過的事件循環的片斷:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
// ...
while (r != 0 && loop->stop_flag == 0) {
// ...
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
// ...
}
// ...
}
複製代碼
上面的代碼計算了一個 timeout
用於調用 uv__io_poll(loop, timeout)
uv__io_poll
定義在 linux-core.c 中,雖然這是一個包含註釋在內接近 300 行的函數,但想必你們也發現了,其中的核心邏輯就是開頭演示的 epoll 的用法:
void uv__io_poll(uv_loop_t* loop, int timeout) {
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
// ...
// `loop->backend_fd` 是使用 `epoll_create` 建立的 epoll 實例
epoll_ctl(loop->backend_fd, op, w->fd, &e)
// ...
}
// ...
for (;;) {
// ...
if (/* ... */) {
// ...
} else {
// ...
// `epoll_wait` 和 `epoll_pwait` 只有細微的差異,因此這裏只考慮前者
nfds = epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
// ...
}
}
// ...
for (i = 0; i < nfds; i++) {
// ...
w = loop->watchers[fd];
// ...
w->cb(loop, w, pe->events);
}
}
複製代碼
epoll_wait
的 timeout
參數的含義是:
-1
表示一直等到有事件產生0
則當即返回,包含調用時產生的事件milliseconds
爲單位,規約到將來某個系統時間片內結合上面這些,咱們看下 uv_backend_timeout
是如何計算 timeout
的:
int uv_backend_timeout(const uv_loop_t* loop) {
// 時間循環被外部中止了,因此讓 `uv__io_poll` 理解返回
// 以便儘快結束事件循環
if (loop->stop_flag != 0)
return 0;
// 沒有待處理的 handle 和 request,則也不須要等待了,一樣讓 `uv__io_poll`
// 儘快返回
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
// idle 隊列不爲空,也要求 `uv__io_poll` 儘快返回,這樣儘快進入下一個時間循環
// 不然會致使 idle 產生太高的延遲
if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;
// 和上一步目的同樣,不過這裏是換成了 pending 隊列
if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;
// 和上一步目的同樣,不過這裏換成,待關閉的 handles,都是爲了不目標隊列產生
// 太高的延遲
if (loop->closing_handles)
return 0;
return uv__next_timeout(loop);
}
int uv__next_timeout(const uv_loop_t* loop) {
const struct heap_node* heap_node;
const uv_timer_t* handle;
uint64_t diff;
heap_node = heap_min(timer_heap(loop));
// 若是沒有 timer 待處理,則能夠放心的 block 住,等待事件到達
if (heap_node == NULL)
return -1; /* block indefinitely */
handle = container_of(heap_node, uv_timer_t, heap_node);
// 有 timer,且 timer 已經到了要被執行的時間內,則需讓 `uv__io_poll`
// 儘快返回,以在下一個事件循環迭代內處理超時的 timer
if (handle->timeout <= loop->time)
return 0;
// 沒有 timer 超時,用最小超時間減去、當前的循環時間的差值,做爲超時時間
// 由於在爲了這個差值時間內是沒有 timer 超時的,因此能夠放心 block 以等待
// epoll 事件
diff = handle->timeout - loop->time;
if (diff > INT_MAX)
diff = INT_MAX;
return (int) diff;
}
複製代碼
上面的 uv__next_timeout
實現主要分爲三部分:
-1
,結合本節開頭對 epoll_wait
的 timeout
參數的解釋,-1
會讓後續的 uv__io_poll
進入 block 狀態、徹底等待事件的到達handle->timeout <= loop->time
,則返回 0
,這樣 uv__io_poll
不會 block 住事件循環,目的是爲了快速進入下一次事件循環、以執行超時的 timerdiff
來做爲 uv__io_poll
的阻塞時間不知道你們發現沒有,timeout 的計算,其核心指導思想就是要儘量的讓 CPU 時間可以在事件循環的屢次迭代的、多個不一樣任務隊列的執行、中儘量的分配均勻,避免某個類型的任務產生很高的延遲
瞭解了 io_poll 隊列是如何執行以後,咱們經過一個 echo server 的小栗子,來對 io_poll 有個總體的認識:
uv_loop_t *loop;
void echo_write(uv_write_t *req, int status) {
// ...
// 一些無所謂有,但有所謂無的收尾工做
}
void echo_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf) {
// ...
// 建立一個寫入請求(上文已經介紹過 Request 和 Handle 的區別),
// 將讀取的客戶端內容寫回給客戶端,寫入完成後進入回調 `echo_write`
uv_write_t *write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_write(write_req, client, &buf, 1, echo_write);
}
void on_new_connection(uv_stream_t *server, int status) {
// ...
// 建立 client 實例並關聯到事件循環
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
// 與創建客戶端鏈接,並讀取客戶端輸入,讀取完成後進入 `echo_read` 回調
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
// ...
}
int main() {
// 建立事件循環
loop = uv_default_loop();
// 建立 server 實例並關聯事件循環
uv_tcp_t server;
uv_tcp_init(loop, &server);
// ...
// 綁定 server 到某個端口,並接受請求
uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 7000));
// 新的客戶端請求到達後,會進去到 `on_new_connection` 回調
uv_listen((uv_stream_t*) &server, 128, on_new_connection);
// ...
// 啓動事件循環
return uv_run(loop, UV_RUN_DEFAULT);
}
複製代碼
到目前爲止,咱們已經確認過 io_poll 內部實現確實是使用的 epoll。在本文的開頭,咱們也提到 epoll 目前並不能處理全部的 IO 操做,對於那些 epoll 不支持的 IO 操做,libuv 統一使用其內部的線程池來模擬出異步 IO。接下來咱們看看線程池的大體工做形式
由於咱們已經知道讀寫文件的操做是沒法使用 epoll 的,那麼就順着這個線索,經過 uv_fs_read 的內部實現,找到 uv__work_submit 方法,發現是在其中初始化的線程池:
void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
// ...
post(&w->wq, kind);
}
複製代碼
因此線程池的建立、是一個延遲建立的單例。init_once
內部會調用 init_threads 來完成線程池初始化工做:
static uv_thread_t default_threads[4];
static void init_threads(void) {
// ...
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
// ...
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
// ...
}
複製代碼
經過上面的實現,咱們知道默認的線程池中線程的數量是 4
,而且能夠經過 UV_THREADPOOL_SIZE
環境變量從新指定該數值
除了對線程池進行單例延遲建立,uv__work_submit
固然仍是會提交任務的,這部分工做是由 post(&w->wq, kind)
完成的,咱們來看下 post 方法的實現細節:
static void post(QUEUE* q, enum uv__work_kind kind) {
uv_mutex_lock(&mutex);
// ...
// 將任務插入到 `wq` 這個線程共享的隊列中
QUEUE_INSERT_TAIL(&wq, q);
// 若是有空閒線程,則通知它們開始工做
if (idle_threads > 0)
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}
複製代碼
能夠發現對於提交任務,其實就是將任務插入到線程共享隊列 wq
,而且有空閒線程時纔會通知它們工做。那麼,若是此時沒有空閒線程的話,是否是任務就被忽略了呢?答案是否,由於工做線程會在完成當前工做後,主動檢查 wq
隊列是否還有待完成的工做,有的話會繼續完成,沒有的話,則進入睡眠,等待下次被喚醒(後面會繼續介紹這部分細節)
上面在建立線程的時候 uv_thread_create(threads + i, worker, &sem)
中的 worker
就是線程執行的內容,咱們來看下 worker 的大體內容:
// 線程池的 wq,提交的任務都先鏈到其中
static QUEUE wq;
static void worker(void* arg) {
// ...
// `idle_threads` 和 `run_slow_work_message` 這些是線程共享的,因此要加個鎖
uv_mutex_lock(&mutex);
for (;;) {
// 這裏的條件判斷,能夠大體當作是「沒有任務」爲 true
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
// 輪轉到當前進程時由於沒有任務,則無事可作
// 空閒線程數 +1
idle_threads += 1;
// `uv_cond_wait` 內部是使用 `pthread_cond_wait` 調用後會:
// - 讓線程進入等待狀態,等待條件變量 `cond` 發生變動
// - 對 `mutex` 解鎖
//
// 此後,其餘線程中都可使用 `uv_cond_signal` 內部是 `pthread_cond_signal`
// 來廣播一個條件變量 `cond` 變動的事件,操做系統內部會隨機喚醒一個等待 `cond`
// 變動的線程,並在被喚醒線程的 uv_cond_wait 調用返回以前,對以前傳入的 `mutex`
// 參數上鎖
//
// 所以循環跳出(有任務)後,`mutex` 必定是上鎖的
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
// ...
// 由於上鎖了,因此放心進行隊列的彈出操做
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
// ...
// 由於已經完成了彈出,能夠解鎖,讓其餘線程能夠繼續操做隊列
uv_mutex_unlock(&mutex);
// 利用 c 結構體的小特性,作字段偏移,拿到 `q` 所屬的 `uv__work` 實例
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
// 下面要操做 `w->loop->wq` 因此要上鎖
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL;
// 須要看仔細,和開頭部分線程池中的 wq 區別開
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
// 喚醒主線程的事件循環
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
// 這一步上鎖是必須的,由於下次迭代的開頭又須要
// 操做共享內存,不過沒必要擔憂死鎖,由於它和下一次迭代
// 中的 `uv_cond_wait` 解鎖操做是對應的
uv_mutex_lock(&mutex);
// ...
}
}
複製代碼
上面咱們保留了相對重要的內容,並加以註釋。能夠大體地歸納爲:
uv_cond_wait
來等待被喚醒wq
中主動找一個任務作,完成任務就喚醒主線程,由於回調須要在主線程被執行uv_cond_wait
再次進入睡眠狀態uv_cond_signal
來通知操做系統作調度當線程池完成任務後,須要通知主線程執行對應的回調。通知的方式頗有意思,咱們先來看下事件循環初始化操做 uv_loop_init:
int uv_loop_init(uv_loop_t* loop) {
// ...
// 初始化 min heap 和各類隊列,用於存放各式的 handles
heap_init((struct heap*) &loop->timer_heap);
QUEUE_INIT(&loop->wq);
QUEUE_INIT(&loop->idle_handles);
QUEUE_INIT(&loop->async_handles);
QUEUE_INIT(&loop->check_handles);
QUEUE_INIT(&loop->prepare_handles);
QUEUE_INIT(&loop->handle_queue);
// ...
// 調用 `epoll_create` 建立 epoll 實例
err = uv__platform_loop_init(loop);
if (err)
goto fail_platform_init;
// ...
// 用於線程池通知的初始化
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
// ...
}
複製代碼
上面的代碼中 uv_async_init 是用於初始化線程池通知相關的工做,下面是它的函數簽名:
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb);
複製代碼
因此第三個實參 uv__work_done 實際上是一個回調函數,咱們能夠看下它的內容:
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
// 將目前的 `loop->wq` 所有移動到局部變量 `wq` 中,
//
// `loop->wq` 中的內容是在上文 worker 中任務完成後使用
// `QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq)` 添加的
//
// 這樣儘快釋放鎖,讓其餘任務可儘快接入
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
// 遍歷 `wq` 執行其中每一個任務的完成回調
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
複製代碼
知道了 uv__work_done
就是負責執行任務完成回調的工做後,繼續看一下 uv_async_init
的內容,看看其內部是如何使用 uv__work_done
的:
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
// ...
// 待調查
err = uv__async_start(loop);
// ...
// 建立了一個 async handle
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
// 在目前的脈絡中 `async_cb` 就是 `uv__work_done` 了
handle->async_cb = async_cb;
handle->pending = 0;
// 把 async handle 加入到隊列 `loop->async_handles` 中
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
// ...
}
複製代碼
咱們繼續看一下以前待調查的 uv__async_start 的內容:
static int uv__async_start(uv_loop_t* loop) {
// ...
// `eventfd` 能夠建立一個 epoll 內部維護的 fd,該 fd 能夠和其餘真實的 fd(好比 socket fd)同樣
// 添加到 epoll 實例中,能夠監聽它的可讀事件,也能夠對其進行寫入操做,所以就用戶代碼就能夠藉助這個
// 看似虛擬的 fd 來實現的事件訂閱了
err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (err < 0)
return UV__ERR(errno);
pipefd[0] = err;
pipefd[1] = -1;
// ...
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
複製代碼
咱們知道 epoll 是支持 socket fd 的,對於支持的 fd,epoll 的事件調度將很是的高效。而對於不支持的 IO 操做,libuv 則使用 eventfd
建立一個虛擬的 fd,繼續利用 fd 的事件調度功能
咱們繼續看下上面出現的 uv__io_start 的細節,來確認一下事件訂閱的步驟:
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// ...
// 你們能夠翻到上面 `uv__io_poll` 的部分,會發現其中有遍歷 `loop->watcher_queue`
// 將其中的 fd 都加入到 epoll 實例中,以訂閱它們的事件的動做
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
// 將 fd 和對應的任務關聯的操做,一樣能夠翻看上面的 `uv__io_poll`,當接收到事件
// 通知後,會有從 `loop->watchers` 中根據 fd 取出任務並執行其完成回調的動做
// 另外,根據 fd 確保 watcher 不會被重複添加
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}
複製代碼
確認了事件訂閱步驟之後,咱們來看下事件回調的內容。上面的形參 w
在咱們目前的脈絡中,對應的實參是 loop->async_io_watcher
,而它是經過 uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0])
初始化的,咱們看一下 uv__io_init
的函數簽名:
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
複製代碼
因此 uv__async_io 是接收到虛擬 fd 事件的回調函數,繼續看下它的內容:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// ...
// 確保 `w` 一定是 `loop->async_io_watcher`
assert(w == &loop->async_io_watcher);
for (;;) {
// 從中讀一些內容,`w->fd` 就是上面使用 `eventfd` 建立的虛擬 fd
// 不出意外的話,通知那端的方式、必定是往這個 fd 裏面寫入一些內容,咱們能夠後面繼續確認
// 從中讀取一些內容的目的是避免緩衝區被通知所用的不含實際意義的字節佔滿
r = read(w->fd, buf, sizeof(buf));
// ...
}
// 執行 `loop->async_handles` 隊列,任務實際的回調
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
// ...
h->async_cb(h);
}
}
複製代碼
咱們已經知道了事件的訂閱,以及事件響應的方式
接着繼續確認一下事件通知是如何在線程池中觸發的。uv_async_send 是喚醒主線程的開放 API,它實際上是調用的內部 API uv__async_send:
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
// ...
fd = loop->async_io_watcher.fd;
do
// 果真事件通知這一端就是往 `eventfd` 建立的虛擬 fd 寫入數據
// 剩下的就是交給 epoll 高效的事件調度機制喚醒事件訂閱方就能夠了
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
// ...
}
複製代碼
咱們最後經過一副意識流的圖,對上面的線程池的流程進行小結:
上圖中咱們的任務是在 uv__run_idle(loop);
執行的回調中經過 uv__work_submit
完成的,可是實際上,對於使用事件循環的應用而言,整個應用的時間片都劃分在了各個不一樣的隊列回調中,因此實際上、從其他的隊列中提交任務也是可能的
咱們開頭已經介紹過,只有 Handle 才配備了關閉的 API,由於 Request 是一個短暫任務。Handle 的關閉須要使用 uv_close:
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
assert(!uv__is_closing(handle));
handle->flags |= UV_HANDLE_CLOSING;
handle->close_cb = close_cb;
switch (handle->type) {
// 根據不一樣的 handle 類型,執行各自的資源回收工做
case UV_NAMED_PIPE:
uv__pipe_close((uv_pipe_t*)handle);
break;
case UV_TTY:
uv__stream_close((uv_stream_t*)handle);
break;
case UV_TCP:
uv__tcp_close((uv_tcp_t*)handle);
break;
// ...
default:
assert(0);
}
// 添加到 `loop->closing_handles`
uv__make_close_pending(handle);
}
void uv__make_close_pending(uv_handle_t* handle) {
assert(handle->flags & UV_HANDLE_CLOSING);
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}
複製代碼
調用 uv_close
關閉 Handle 後,libuv 會先釋放 Handle 佔用的資源(好比關閉 fd),隨後經過調用 uv__make_close_pending
把 handle 鏈接到 closing_handles
隊列中,該隊列會在事件循環中被 uv__run_closing_handles(loop)
調用所執行
使用了事件循環後,業務代碼的執行時機都在回調中,因爲 closing_handles
是最後一個被執行的隊列,因此在其他隊列的回調中、那些執行 uv_close
時傳遞的回調,都會在當次迭代中被執行
本文沿着 Libuv 的 Linux 實現的脈絡對其內部實現進行了簡單的探索、嘗試解開 libuv 的神祕面紗。很顯然,只看這篇是不夠的,希望有幸能夠做爲想深刻了解 libuv 的起始讀物。後續咱們會結合 Node.js 來探究它們內部是如何銜接的
本文發佈自 網易雲音樂大前端團隊,文章未經受權禁止任何形式的轉載。咱們常年招收前端、iOS、Android,若是你準備換工做,又剛好喜歡雲音樂,那就加入咱們 grp.music-fe (at) corp.netease.com!