一般咱們在討論 Node.js 的時候都會涉及到異步這個特性。實際上 Node.js 在執行異步調用的時候,不一樣的場景下有着不一樣的處理方式。本文將經過 libuv 源碼來分析 Node.js 是如何經過 libuv 的線程池完成異步調用。本文描述的 Node.js 版本爲 v11.15.0,libuv 版本爲 1.24.0 。javascript
如下面的代碼爲例,它經過調用 fs.access 來異步地判斷文件是否存在並在回調中打印日誌,在 Node.js 中這是一個典型的異步調用。html
const fs = require('fs') const cb = function (err) { console.log(`Is myfile exists: ${!err}`) } fs.access('myfile', cb)
在分析上面這段代碼的調用過程以前,咱們先來了解一些 libuv 概念。java
主動經過 libuv 發起的操做被 libuv 稱爲[請求]請求,libuv 的線程池做用於如下 4 種枚舉的異步請求:node
UV_FS
: fs 模塊的異步函數(除了 uv_fs_req_cleanup ),fs.access、fs.stat 等。UV_GETADDRINFO
:dns 模塊的異步函數,dns.lookup 等。UV_GETNAMEINFO
:dns 模塊的異步函數,dns.lookupService 等。UV_WORK
:zlib 模塊的 zlib.unzip、zlib.gzip 等;在 Node.js 的 Addon(C/C++) 中經過 uv_queue_work 建立的多線程請求。其它的 UV_CONNECT
、UV_WRITE
、UDP_SEND
等則並不會經過線程池去執行。linux
這 4 種枚舉請求 libuv 內部把它們分爲 3 種[任務類型]任務類型:git
UV__WORK_CPU
:CPU 密集型,UV_WORK
類型的請求被定義爲這種類型。所以根據這個分類,不推薦在 uv_queue_work
中作 I/O 密集的操做。UV__WORK_FAST_IO
:快 IO 型,UV_FS
類型的請求被定義爲這種類型。UV__WORK_SLOW_IO
:慢 IO 型,UV_GETADDRINFO
和 UV_GETNAMEINFO
類型的請求被定義爲這種類型。UV__WORK_SLOW_IO
執行不一樣於 UV__WORK_CPU
與 UV__WORK_FAST_IO
,libuv 執行它的時候流程會有些差別,這個後面會提到。github
libuv 經過init_threads 函數初始化線程池,初始化時會根據一個名爲 UV_THREADPOOL_SIZE 的環境變量來初始化內部線程池的大小,線程最大數量爲 128 ,默認爲 4 。若是以單進程的架構去部署服務,能夠根據服務器 CPU 的核心數量及業務狀況來設置線程池大小,達到資源利用的最大化。uv loop 線程在建立 worker 線程時,會初始化如下變量:算法
UV__WORK_CPU
和 UV__WORK_FAST_IO
類型的請求後將其插到此隊列的尾部,並經過 uv_cond_signal 喚醒 worker 線程去處理,這是線程池請求的主隊列。UV__WORK_SLOW_IO
類型的請求後將其插到此隊列的尾部。worker 線程的入口函數均爲 worker 函數,這個咱們後面再說。 init_threads 實現以下:c#
static void init_threads(void) { unsigned int i; const char* val; uv_sem_t sem; // 6-23 行初始化線程池大小 nthreads = ARRAY_SIZE(default_threads); val = getenv("UV_THREADPOOL_SIZE"); // 根據環境變量設置線程池大小 if (val != NULL) nthreads = atoi(val); if (nthreads == 0) nthreads = 1; if (nthreads > MAX_THREADPOOL_SIZE) nthreads = MAX_THREADPOOL_SIZE; threads = default_threads; if (nthreads > ARRAY_SIZE(default_threads)) { threads = uv__malloc(nthreads * sizeof(threads[0])); if (threads == NULL) { nthreads = ARRAY_SIZE(default_threads); threads = default_threads; } } // 初始化條件變量 if (uv_cond_init(&cond)) abort(); // 初始化互斥量 if (uv_mutex_init(&mutex)) abort(); // 初始化隊列和節點 QUEUE_INIT(&wq); // 工做隊列 QUEUE_INIT(&slow_io_pending_wq); // 慢 I/O 隊列 QUEUE_INIT(&run_slow_work_message); // 若是有慢 I/O 請求,將此節點做爲標誌位插入到 wq 中 // 初始化信號量 if (uv_sem_init(&sem, 0)) abort(); // 後續線程同步須要依賴這個信號量,所以這個信號量建立失敗了則終止進程 // 建立 worker 線程 for (i = 0; i < nthreads; i++) if (uv_thread_create(threads + i, worker, &sem)) // 初始化 worker 線程 abort(); // woker 線程建立錯誤緣由爲 EAGAIN、EINVAL、EPERM 其中之一,具體請參考 man3 // 等待 worker 建立完成 for (i = 0; i < nthreads; i++) uv_sem_wait(&sem); // 等待 worker 線程建立完畢 // 回收信號量資源 uv_sem_destroy(&sem); }
libuv 有兩個函數能夠建立多線程請求:api
uv__work_submit 函數主要作 2 件事:
實現以下:
void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { // 在收到請求後纔開始初始化線程池,可是隻會初始化一次 uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq, kind); } static void init_once(void) { // fork 後子進程的 mutex 、condition variables 等 pthread 變量的狀態是父進程 fork 時的複製,因此子進程建立時須要重置狀態 // 具體請參考 http://man7.org/linux/man-pages/man2/fork.2.html if (pthread_atfork(NULL, NULL, &reset_once)) abort(); // 初始化線程池 init_threads(); } static void reset_once(void) { // 重置 once 變量 uv_once_t child_once = UV_ONCE_INIT; memcpy(&once, &child_once, sizeof(child_once)); }
post 函數主要作 2 件事:
判斷請求的請求類型是不是 UV__WORK_SLOW_IO
:
slow_io_pending_wq
的尾部,同時在請求隊列 wq
的尾部插入一個 run_slow_work_message
節點做爲標誌位,告知請求隊列 wq
當前存在慢 I/O 請求。wq
尾部。併發的慢 I/O 的請求數量不會超過線程池大小的一半,這樣作的好處是避免多個慢 I/O 的請求在某段時間內把全部線程都佔滿,致使其它可以快速執行的請求須要排隊。
post 函數實現以下:
static void post(QUEUE* q, enum uv__work_kind kind) { // 加鎖 uv_mutex_lock(&mutex); if (kind == UV__WORK_SLOW_IO) { /* 插入到慢 I/O 隊列中 */ QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); /* 若是 run_slow_work_message 節點不爲空表明其已在 wq 隊列中,無需再次插入 */ if (!QUEUE_EMPTY(&run_slow_work_message)) { uv_mutex_unlock(&mutex); return; } // 不在 wq 隊列中則將 run_slow_work_message 做爲標誌位插到 wq 尾部 q = &run_slow_work_message; } // 將請求插到請求隊列尾部 QUEUE_INSERT_TAIL(&wq, q); // 若是有空閒的線程,喚醒某一個去執行請求 if (idle_threads > 0) uv_cond_signal(&cond); // 喚醒一個 worker 線程 uv_mutex_unlock(&mutex); }
worker 線程的入口函數 worker 在線程建立好並初始化完成後將按照下面的步驟不斷的循環:
static void worker(void* arg) { struct uv__work* w; QUEUE* q; int is_slow_work; // 通知 uv loop 線程此 worker 線程已建立完畢 uv_sem_post((uv_sem_t*) arg); arg = NULL; uv_mutex_lock(&mutex); // 經過這個死循環來不斷的執行請求 for (;;) { /* 這個 while 有2個判斷 1. 在多核處理器下,pthread_cond_signal 可能會激活多於一個線程,經過一個 while 來避免這種狀況致使的問題,具體請參考 https://linux.die.net/man/3/pthread_cond_signal 2. 限制慢 I/O 請求的數量小於線程數量的一半 */ while (QUEUE_EMPTY(&wq) || (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; // worker 線程初始化完成或沒有請求執行時進入阻塞狀態,直到被新的請求喚醒 uv_cond_wait(&cond, &mutex); idle_threads -= 1; } // 喚醒而且達到執行請求的條件後取出隊列頭部的請求 q = QUEUE_HEAD(&wq); // 若是頭部請求是退出,則跳出循環,結束 worker 線程 if (q == &exit_message) { // 繼續喚醒其它 worker 去結束線程 uv_cond_signal(&cond); uv_mutex_unlock(&mutex); break; } // 將這個請求節點從請求隊列 wq 中移除 QUEUE_REMOVE(q); QUEUE_INIT(q); is_slow_work = 0; // 若是這個請求是慢 I/O 的標誌位 if (q == &run_slow_work_message) { /* 控制慢 I/O 請求數量,超過則插到隊列尾部,等待前面的請求執行完 */ if (slow_io_work_running >= slow_work_thread_threshold()) { QUEUE_INSERT_TAIL(&wq, q); continue; } /* 判斷慢 I/O 請求隊列中是否有請求,請求有可能被取消 */ if (QUEUE_EMPTY(&slow_io_pending_wq)) continue; is_slow_work = 1; slow_io_work_running++; // 取出慢 I/O 請求隊列中頭部的請求 q = QUEUE_HEAD(&slow_io_pending_wq); QUEUE_REMOVE(q); QUEUE_INIT(q); // 若是慢 I/O 請求隊列中還有請求,則將 run_slow_work_message 這個標誌位從新插到請求隊列 wq 的尾部 if (!QUEUE_EMPTY(&slow_io_pending_wq)) { QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); if (idle_threads > 0) uv_cond_signal(&cond); // 喚醒一個線程繼續執行 } } uv_mutex_unlock(&mutex); w = QUEUE_DATA(q, struct uv__work, wq); // 上面處理了這多,終於在這裏開始執行請求的函數了 w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; // 爲保證線程安全,請求執行完後不會當即回調請求,而是將完成的請求插到已完成的請求隊列中,在uv loop 線程完成回調 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); // 經過 uv_async_send 同步 uv loop 線程:線程池完成了一個請求 uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_lock(&mutex); if (is_slow_work) { slow_io_work_running--; } } }
在 uv_loop_init 時,線程池的 [wq_async]wq_async 句柄經過 uv_async_init 初始化並插入到 uv loop 的 async_handles 隊列中,而後在 uv loop 線程中遍歷 async_handles 隊列並完成回調。
worker 線程 和 uv loop 線程經過 uv_async_send 進行同步,而uv_async_send 只作了一件事:向 async_wfd 句柄寫了一個長度爲 1 個字節的字符串(只有 \0
這個字符)。
uv_async_send 實現以下:
int uv_async_send(uv_async_t* handle) { if (ACCESS_ONCE(int, handle->pending) != 0) return 0; // cmpxchgi 函數設置標誌位,若是已經設置過則不會重複調用 uv__async_send if (cmpxchgi(&handle->pending, 0, 1) == 0) uv__async_send(handle->loop); return 0; } static void uv__async_send(uv_loop_t* loop) { const void* buf; ssize_t len; int fd; int r; buf = ""; len = 1; fd = loop->async_wfd; #if defined(__linux__) if (fd == -1) { static const uint64_t val = 1; buf = &val; len = sizeof(val); fd = loop->async_io_watcher.fd; /* eventfd */ } #endif do r = write(fd, buf, len); // 向 fd 寫入內容 while (r == -1 && errno == EINTR); if (r == len) return; if (r == -1) if (errno == EAGAIN || errno == EWOULDBLOCK) return; abort(); }
對 async_wfd 寫內容爲何能作到同步呢?實際上在 worker 線程對 async_wfd 寫入時,uv loop 線程同時也在不斷的循環去接收處理各類各樣的事件或請求,其中就包括對 async_wfd 可讀事件的監聽。
uv loop 是在 uv_run 函數中執行的,它在 Node.js 啓動時 被調用, uv_run 實現以下:
int uv_run(uv_loop_t* loop, uv_run_mode mode) { int timeout; int r; int ran_pending; r = uv__loop_alive(loop); if (!r) uv__update_time(loop); while (r != 0 && loop->stop_flag == 0) { // 更新計時器時間 uv__update_time(loop); // 回調超時的計時器,setTimeout、setInterval 都是由這個函數回調 uv__run_timers(loop); // 處理某些沒有在 uv__io_poll 完成的回調 ran_pending = uv__run_pending(loop); // 官方解釋:Idle handle is needed only to stop the event loop from blocking in poll. // 實際上 napi 中某些函數好比 napi_call_threadsafe_function 會往 idle 隊列中插入回調,而後在這個階段執行 uv__run_idle(loop); // process._startProfilerIdleNotifier 的回調 uv__run_prepare(loop); timeout = 0; if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT) timeout = uv_backend_timeout(loop); // 計算 uv__io_poll 超時時間,算法請參考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/core.c#L318 // 對 async_wfd 可讀的監聽在 uv__io_poll 這個函數中 // 第二個參數 timeout 爲上面計算出來,用來設置 epoll_wait 等函數等待 I/O 事件的時間 uv__io_poll(loop, timeout); // setImmediate 的回調 // ps: 我的以爲從實現上講 setImmediate 和 nextTick 應該互換名字 :-) uv__run_check(loop); // 關閉句柄是個異步操做 // 通常結束 uv loop 時會先調用 uv_walk 遍歷全部句柄並關閉它們,而後再執行一次 uv loop 經過這個函數來完成關閉,最後再調用 uv_loop_close,不然的話會出現內存泄露 uv__run_closing_handles(loop); if (mode == UV_RUN_ONCE) { /* UV_RUN_ONCE implies forward progress: at least one callback must have * been invoked when it returns. uv__io_poll() can return without doing * I/O (meaning: no callbacks) when its timeout expires - which means we * have pending timers that satisfy the forward progress constraint. * * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from * the check. */ uv__update_time(loop); uv__run_timers(loop); } r = uv__loop_alive(loop); if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break; } /* The if statement lets gcc compile it to a conditional store. Avoids * dirtying a cache line. */ if (loop->stop_flag != 0) loop->stop_flag = 0; return r; }
能夠看到 uv loop 裏面其實就是在不斷的循環去更新計時器、處理各類類型的回調、輪詢 I/O 事件,Node.js 的異步即是經過 uv loop 完成的。
libuv 的異步採用的是 Reactor 模型進行多路複用,在 uv__io_poll 中去處理 I/O 相關的事件, uv__io_poll 在不一樣的平臺下經過 epoll、kqueue 等不一樣的方式實現。因此當往 async_wfd 寫入內容時,在 uv__io_poll 中將會輪詢到 async_wfd 可讀的事件,這個事件僅僅是用來通知 uv loop 線程: 非 uv loop 線程有回調須要在 uv loop 線程執行。
當輪詢到 async_wfd 可讀後,uv__io_poll 會回調對應的函數 uv__async_io,它主要作了下面 2 件事:
實現以下:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; QUEUE queue; QUEUE* q; uv_async_t* h; assert(w == &loop->async_io_watcher); // 這個 for 循環用來確認是否有 uv_async_send 調用 for (;;) { r = read(w->fd, buf, sizeof(buf)); if (r == sizeof(buf)) continue; if (r != -1) break; if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; abort(); } // 交換 loop->async_handle 和 queue內容,避免在遍歷 loop->async_handles 時有新的 async_handle 插入到隊列 // loop->async_handles 隊列中除了線程池的句柄還有其它的 QUEUE_MOVE(&loop->async_handles, &queue); while (!QUEUE_EMPTY(&queue)) { q = QUEUE_HEAD(&queue); h = QUEUE_DATA(q, uv_async_t, queue); QUEUE_REMOVE(q); // 將 uv_async_t 從新插入到 loop->async_handles 中,uv_async_t 須要手動調用 uv__async_stop 纔會從隊列中移除 QUEUE_INSERT_TAIL(&loop->async_handles, q); // 確認這個 async_handle 是否須要回調 if (cmpxchgi(&h->pending, 1, 0) == 0) continue; if (h->async_cb == NULL) continue; // 調用經過 uv_async_init 初始化 uv_async_t 時綁定的回調函數 // 線程池的 uv_async_t 是在 uv_loop_init 時初始化的,它綁定的回調是 uv__work_done // 所以若是 h == loop->wq_async,這裏 h->async_cb 實際是調用了 uv__work_done(h); // 詳情請參考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/loop.c#L88 h->async_cb(h); } }
調用線程池的 h->async_cb
後會回到線程池的 uv__work_done 函數:
void uv__work_done(uv_async_t* handle) { struct uv__work* w; uv_loop_t* loop; QUEUE* q; QUEUE wq; int err; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); // 清空已完成的 loop->wq 隊列 QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); w = container_of(q, struct uv__work, wq); // 若是在回調前調用了 uv_cancel 取消請求,則即便請求已經執行完,依舊算出錯 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; w->done(w, err); } }
最後經過 w->done(w, err)
回調 uv__fs_done,並由 uv__fs_done 回調 JS 函數:
static void uv__fs_done(struct uv__work* w, int status) { uv_fs_t* req; req = container_of(w, uv_fs_t, work_req); uv__req_unregister(req->loop, req); // 若是取消了則拋出異常 if (status == UV_ECANCELED) { assert(req->result == 0); req->result = UV_ECANCELED; } // 回調 JS req->cb(req); }
以上就是 libuv 是線程池從建立到執行多線程請求的過程。
再回到文章開頭提到的代碼,咱們來分析它的調用過程。
const fs = require('fs') const cb = function (err) { console.log(`Is myfile exists: ${!err}`) } fs.access('myfile', cb)
假設線程池大小爲 2 ,下面描述了執行 fs.access 時 3 個線程的狀態(略過了 Node.js 啓動和 JavaScript 和 Native 函數調用過程),時間軸從上到下:
空白
表明處於阻塞狀態,-
表明線程還沒有啓動
uv loop thread | worker thread 1 | worker thread 2 |
---|---|---|
[fs.access]fs.access | - | - |
JavaScript 經過 v8 調用 Native 函數 | - | - |
uv_fs_access | - | - |
uv__work_submit | - | - |
init_threads | worker | worker |
uv_sem_wait | uv_sem_post | uv_sem_post |
uv_cond_wait | uv_cond_wait | |
uv_cond_signal | ||
uv__io_poll | access | |
uv__io_poll | ||
uv__io_poll | uv_async_send | |
uv__io_poll | uv_cond_wait | |
uv__io_poll | ||
uv__async_io | ||
uv__work_done | ||
uv__fs_done | ||
Native 經過 v8 回調 JavaScript 函數 | ||
cb | ||
console.log(`Is myfile exists: ${exists}`) |
能夠看到調用過程以下:
worker thread 1
去執行請求,同時 uv loop 線程不斷的輪詢是否完成了請求。worker thread 1
同步的調用 access 函數判斷目標文件是否可讀。worker thread 1
經過 uv_async_send 同步 uv loop 線程請求已完成,同時自身進入阻塞狀態,等待新的請求將其喚醒。整個過程因爲沒有新的請求進來, worker thread 2
始終處於阻塞狀態。
經過對 fs.access 的調用過程分析,咱們瞭解了 libuv 是如何經過線程池進行異步調用的。另外也能夠看到針對不一樣的平臺,libuv 對 uv__io_poll 的實現是不一樣的,後面咱們將介紹 uv__io_poll 實現異步 I/O 的方式。