nodejs深刻學習系列之libuv基礎篇(二)

既前篇nodejs深刻學習系列之libuv基礎篇(一)學習的基本概念以後,咱們在第二篇將帶你們去學習爲何libuv的併發能力這麼優秀?這併發後面的實現機制是什麼?node

三、libuv的事件循環機制

好了,瞭解了上述的基本概念以後,咱們來扯一扯Libuv的事件循環機制,也就是event-loop。仍是以[譯文]libuv設計思想概述一文展現的兩張圖片,再結合代碼來學習整個Libuv的事件循環機制。linux

3.一、解密第一張圖片

首先是第一張圖片:git

細心的童鞋會發現這張圖片被我用紅框分割成了兩部分,爲何呢?由於Libuv處理fs I/O和網絡I/O用了兩套機制去實現,或者說更全面的講應該是fs I/O和 DNS等實現的方式和網絡 I/O是不同的。爲何這麼說呢?請看下圖,你就會明白了:github

上圖左側是libuv的兩大基石:event-loop線程和thread pool。而從圖的右側有兩條軌跡分別鏈接到這兩個基石,我特別用紅色加粗標記,能夠看到:segmentfault

  • Network I/O最後的調用都會歸結到uv__io_start這個函數,而該函數會將須要執行的I/O事件和回調塞到watcher隊列中,以後uv_run函數執行的Poll for I/O階段作的即是從watcher隊列中取出事件調用系統的接口,這是其中一條主線
  • Fs I/O和DNS的全部操做都會歸結到調用uv__work_sumit這個函數,而該函數就是執行線程池初始化並調度的終極函數。這是另一條主線。

3.二、解密第二張圖片

接着咱們來看第二張圖片,咱們依然將該圖片進行改造以下:bash

整個事件循環的執行主體是在uv_run中,每一次的循環經歷的階段對應的函數在上圖中已經標註出來,有幾個重點要說一下:網絡

  1. 循環是否退出(也就是進程是否結束)取決於如下幾個條件中的一個多線程

    1.一、loop->stop_flag變爲1而且uv__loop_alive返回不爲0,也就是調用uv_stop函數而且loop不存在活躍的和被引用的句柄、活躍的請求或正在關閉的句柄。併發

    1.二、事件循環運行模式等於UV_RUN_ONCE或者是UV_RUN_NOWAIT異步

  2. I/O循環的超時時間的肯定:

    2.一、若是時間循環運行模式是UV_RUN_NOWAIT,超時爲0。

    2.二、若是循環將要中止(代碼調用了uv_stop()),超時爲0。

    2.三、若是沒有活躍句柄或請求,超時爲0。

    2.四、若是有任何Idle句柄處於活躍狀態,超時爲0。

    2.五、若是有等待關閉的句柄,超時爲0。

    2.六、若是以上狀況都不匹配,則採用最近的計時器的超時時間-當前時間(handle->timeout-loop->time),或者若是沒有活動計時器,則爲無窮大(即返回-1)。

  3. I/O循環的實現主體uv__io_poll根據系統不一樣,使用方式不同,若是對linux系統熟悉的話,epoll方式應該也會了解。更多epoll的只是能夠參考該文章:Linux IO模式及 select、poll、epoll詳解

四、libuv的線程池

說完時間循環的主線程,接下去咱們繼續揭祕libuv的線程池。

libuv提供了一個threadpool,可用來運行用戶代碼並在事件循環線程(event-loop)中獲得通知。這個線程池在內部用於運行全部文件系統操做,以及getaddrinfo和getnameinfo請求。固然若是你想要將本身的代碼放在線程池中運行也是能夠的,libuv提供除了uv_queue_work的方法供開發者本身選擇。

它的默認大小爲4,可是能夠在啓動時經過將UV_THREADPOOL_SIZE環境變量設置爲任意值(最大值爲1024)來更改它。

threadpool是全局的,並在全部事件循環中共享。當一個特定的函數使用threadpool(即當使用uv_queue_work())時,libuv預先分配並初始化UV_THREADPOOL_SIZE所容許的最大線程數。這致使了相對較小的內存開銷(128個線程大約1MB),但在運行時提升了線程的性能。

關於線程的操做,demo中的文件是:傳送門

在實例中,咱們用了三種方式來實現和線程相關的一些操做:

  1. 從線程池中調度一個線程運行回調: uv_queue_work
  2. 使用uv_async_send來「喚醒」 event loop主線程並執行uv_async_init當初設置好的回調
  3. 使用uv_thread_create手動建立一個線程來執行

咱們在上一節中知道,想要建立線程池並讓他們工做,惟一繞不開的函數是uv__work_submit,你們能夠在libuv源碼中搜尋這個,能夠發現可以找到的也就這幾個文件:(以unix系統爲例)

threadpool.c
  1. uv__work_submit實現地方
  2. uv_queue_work調用
fs.c
  1. 宏定義POST調用,全部的fs操做都會調用POST這個宏
getaddrinfo.c
  1. uv_getaddrinfo調用
getnameinfo.c
  1. uv_getnameinfo調用
複製代碼

細心的童鞋發現,每一處調用的地方都會傳一個叫作enum uv__work_kind kind的操做,根據上面的調用,能夠看出分爲了3種任務類型:

  • UV__WORK_CPU:CPU 密集型,UV_WORK 類型的請求被定義爲這種類型。所以根據這個分類,不推薦在 uv_queue_work 中作 I/O 密集的操做。
  • UV__WORK_FAST_IO:快 IO 型,UV_FS 類型的請求被定義爲這種類型。
  • UV__WORK_SLOW_IO:慢 IO 型,UV_GETADDRINFO 和 UV_GETNAMEINFO 類型的請求被定義爲這種類型

4.二、線程池的初始化

學習線程池初始化以前,咱們先得普及一下線程間的同步原語。這樣後面看的代碼纔不會糊里糊塗

libuv提供了mutex鎖讀寫鎖信號量(Semaphores)條件量(Conditions)屏障(Barriers)五種手段來實現線程間資源競爭互斥同步等操做。接下去會簡單地介紹,以便待會的初始化流程能夠讀懂。

4.2.一、Mutex鎖

互斥鎖用於對資源的互斥訪問,當你訪問的內存資源可能被別的線程訪問到,這個時候你就能夠考慮使用互斥鎖,在訪問的時候鎖住。對應的使用流程多是這樣的:

  • 初始化互斥鎖:uv_mutex_init(uv_mutex_t* handle)
  • 鎖住互斥資源:uv_mutex_lock(uv_mutex_t* handle)
  • 解鎖互斥資源:uv_mutex_unlock(uv_mutex_t* handle)

在線程初始化的過程當中,咱們會初始化一個全局的互斥鎖:

static void init_threads(void) {
  ...
  if (uv_mutex_init(&mutex))
    abort()
  ...
}
複製代碼

然後在每一個線程的執行實體worker函數中,就使用互斥鎖對下面幾個公共資源進行鎖住與解鎖:

  • 請求隊列 wq:線程池收到 UV__WORK_CPU 和 UV__WORK_FAST_IO 類型的請求後將其插到此隊列的尾部,並經過 uv_cond_signal 喚醒 worker 線程去處理,這是線程池請求的主隊列。
  • 慢 I/O 隊列 slow_io_pending_wq:線程池收到 UV__WORK_SLOW_IO 類型的請求後將其插到此隊列的尾部。
  • 慢 I/O 標誌位節點 run_slow_work_message:當存在慢 I/O 請求時,用來做爲一個標誌位放在請求隊列 wq 中,表示當前有慢 I/O 請求,worker 線程處理請求時須要關注慢 I/O 隊列的請求;當慢 I/O 隊列的請求都處理完畢後這個標誌位將從請求隊列 wq 中移除。
static void worker(void* arg) {
  ...
  uv_mutex_lock(&mutex);

  ...
  uv_mutex_unlock(&mutex);
}
複製代碼

4.2.二、讀寫鎖

讀寫鎖沒有用在線程的啓動過程當中,咱們在demo中用來實踐對某個全局變量的訪問。具體使用步驟參考代碼,這裏就再也不贅述。

4.2.三、信號量

信號量是一種專門用於提供不一樣進程間或線程間同步手段的原語。信號量本質上是一個非負整數計數器,表明共享資源的數目,一般是用來控制對共享資源的訪問。通常使用步驟是這樣的:

  • 初始化信號量:int uv_sem_init(uv_sem_t* sem, unsigned int value)
  • 信號量加1:void uv_sem_wait(uv_sem_t* sem)
  • 信號量減1:void uv_sem_post(uv_sem_t* sem)
  • 信號量銷燬:void uv_sem_wait(uv_sem_t* sem)

在線程池初始化過程當中,咱們利用信號量來等待全部的線程初始化結束,以下代碼:

static void init_threads(void) {
  ...
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

// 而每一個線程的執行實體都會去將信號量-1:
static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  int is_slow_work;

  uv_sem_post((uv_sem_t*) arg);
  ...
}
複製代碼

這樣只要全部的線程沒有初始化完成,uv_sem_destroy這個函數是不會執行到的,整個初始化函數也不會返回,此時的主線程也就阻塞在這裏了。

4.2.四、條件變量

而條件變量經過容許線程阻塞和等待另外一個線程發送信號的方法彌補了互斥鎖的不足。條件變量的內部實質上是一個等待隊列,放置等待(阻塞)的線程,線程在條件變量上等待和通知,互斥鎖用來保護等待隊列(由於全部的線程均可以放入等待隊列,因此等待隊列成爲了一個共享的資源,須要被上鎖保護),所以條件變量一般和互斥鎖一塊兒使用。通常使用步驟是這樣的:

  • 初始化條件變量:int uv_cond_init(uv_cond_t* cond)
  • 線程阻塞等待被喚醒:void uv_cond_wait(uv_cond_t* cond, uv_mutex_t* mutex)
  • 別的線程喚醒阻塞的線程:void uv_cond_signal(uv_cond_t* cond)

libuv使用條件變量來阻塞線程池和喚醒線程池,使用代碼以下:

static void init_threads(void) {
  if (uv_cond_init(&cond))
    abort();
}

static void worker(void* arg) {
  ...
  for (;;) {
    /* `mutex` should always be locked at this point. */

    /* Keep waiting while either no work is present or only slow I/O
       and we're at the threshold for that. */ while (QUEUE_EMPTY(&wq) || (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } ... } } static void post(QUEUE* q, enum uv__work_kind kind) { ... if (idle_threads > 0) uv_cond_signal(&cond) ... } 複製代碼

從上面三處代碼能夠看到線程啓動以後就進入阻塞狀態,直到有I/O請求調用uv_cond_signal來喚醒,按照uv_cond_wait調用的順序造成一個等待隊列,循環調用。

4.2.五、屏障

在多線程的時候,咱們總會碰到一個需求,就是須要等待一組進程所有執行完畢後再執行某些事,因爲多線程是亂序的,沒法預估線程都執行到哪裏了,這就要求咱們有一個屏障做爲同步點,在全部有屏障的地方都會阻塞等待,直到全部的線程都的代碼都執行到同步點,再繼續執行後續代碼。使用步驟通常是:

  • 初始化屏障須要達到的個數:int uv_barrier_init(uv_barrier_t* barrier, unsigned int count)
  • 每當達到條件便將計數+1:int uv_barrier_wait(uv_barrier_t* barrier)
  • 銷燬屏障:void uv_barrier_destroy(uv_barrier_t* barrier)

只有當初始化計數的值爲0,主線程纔會繼續執行,具體使用方法能夠參考demo。

至此藉助於線程間同步原語,咱們就嘩啦啦地把線程的初始化以及大概的工做機制講完了,總結出了下面一張圖:

4.一、線程池工做調度

線程池的工做利用的是主線程post函數和各個線程的worker函數,post函數的工做內容以下:

  • 判斷請求的請求類型是不是 UV__WORK_SLOW_IO:
    • 若是是,將這個請求插到慢 I/O 請求隊列 slow_io_pending_wq 的尾部,同時在請求隊列 wq 的尾部插入一個 run_slow_work_message 節點做爲標誌位,告知請求隊列 wq 當前存在慢 I/O 請求。
    • 若是不是,將請求插到請求隊列 wq 尾部。
  • 若是有空閒的線程,喚醒某一個去執行請求。

併發的慢 I/O 的請求數量不會超過線程池大小的一半,這樣作的好處是避免多個慢 I/O 的請求在某段時間內把全部線程都佔滿,致使其它可以快速執行的請求須要排隊。

static unsigned int slow_work_thread_threshold(void) {
  return (nthreads + 1) / 2;
}
複製代碼

而各個線程的工做內容以下:

  • 等待喚醒。
  • 取出請求隊列 wq 或者慢 I/O 請求隊列的頭部請求去執行。 => w->work(w);
  • 通知 uv loop 線程完成了一個請求的處理。=> uv_async_send
  • 回到最開始循環的位置。

4.二、線程間的通訊

上一小節清晰地描述了libuv的主線程是如何將請求分給各個線程以及線程是如何處理請求的,那麼上述過程當中還有一個步驟:線程池裏面的線程完成工做以後是如何通知主線程的?主線程收到通知以後又繼續作了些什麼?

這個過程咱們稱之爲線程間的通訊。上一小節中或者咱們的demo中已經知道,完成這個事情的主要函數是uv_async_send,那麼這個函數是如何實現的呢?請看下圖:

從圖中咱們能夠看到,藉助於io poll與管道,線程池的線程寫入數據,被主線程輪詢出來,知道有消息過來,就開始執行對應的回調函數。整個流程就是這麼easy~

參考

  1. Linux IO模式及 select、poll、epoll詳解
  2. Node.js 異步:文件 I/O
相關文章
相關標籤/搜索