Libuv學習——線程池

image.png


前言


經過前兩篇文章的學習,咱們已經解了Libuv中的隊列線程,爲本文的學習打下基礎,沒有看過的同窗建議先看下。下面將從生產者消費者模型和源碼兩個角度學習Libuv的線程池,爲後面學習Libuv文件處理作鋪墊。安全


生產者消費者模型


Node.js的文件操做支持同步調用和異步調用,根據Libuv官網的介紹,咱們知道它沒有跨平臺的異步文件IO可使用,因此它的異步文件IO是經過在線程池中執行同步文件IO實現的。那具體是怎麼實現的呢?答案就是生產者消費者模型。Libuv的線程包括2部分,一個是主線程,一個是線程池。主線程的一部分工做是描述任務並將其提交給線程池,線程池進行處理。拿異步文件操做爲例,主線程生成一個描述文件操做的對象,將其提交到任務隊列線程池從任務隊列獲取該對象進行處理。其中主線程是生產者,線程池中的線程是消費者,任務隊列是生產者和消費者之間的橋樑,下面是一個簡單的示意圖:bash

image.png

Libuv在生產者消費者模型中多加了一步,線程池執行完任務後,將結果交給主線程,主線程拿到結果後,若是發現有回調函數須要執行,就執行。因此Libuv的線程模型以下:異步

image.png


源碼分析


Libuv線程池的代碼很容易找到,就在src目錄下的threadpool.c文件中。async


image.png


經過上面對生產者消費者模型的介紹,該代碼大體分爲4部分:任務隊列、主線程提交任務到任務隊列(提交任務)、線程池從任務隊列獲取任務並執行(消費任務)、線程池執行完任務通知主線程執行回調函數(回調處理)。函數


任務隊列


任務隊列就是一個隊列而已。因爲任務隊列會被多個線程(主線程、線程池)同時訪問,爲了保證線程安全,須要互斥鎖。另外任務隊列若是爲空,線程池中的線程須要掛起,等待主線程提交任務後喚起,因此還須要條件變量。任務隊列、條件變量、互斥量的定義以下所示:oop


1...
2static uv_cond_t cond; // 條件變量
3static uv_mutex_t mutex; // 互斥鎖
4...
5static QUEUE wq; // 任務隊列
6...複製代碼


提交任務


主線程將任務提交到任務隊列是經過uv__work_submit來實現的,讓咱們來看下它的代碼:源碼分析


1struct uv__work {
 2  void (*work)(struct uv__work *w);
 3  void (*done)(struct uv__work *w, int status);
 4  struct uv_loop_s* loop;
 5  void* wq[2]; // 用於將其關聯到任務隊列中
 6};
 7
 8void uv__work_submit(uv_loop_t* loop,
 9                     struct uv__work* w,
10                     enum uv__work_kind kind,
11                     void (*work)(struct uv__work* w),
12                     void (*done)(struct uv__work* w, int status)) {
13  uv_once(&once, init_once); // 初始化線程,無亂調用多少次,init_once只會執行一次
14  w->loop = loop; // 事件循環
15  w->work = work; // 線程池要執行的函數
16  w->done = done; // 線程池執行結束後,通知主線程要執行的函數
17  post(&w->wq, kind); // 將任務提交任務隊列中
18}複製代碼


uv__work_submit有4個參數:第一個參數爲Libuv的事件循環,這裏咱們先忽略,之後會有專門的文章介紹;第二個參數是線程池執行任務的通用模型,類型爲uv__work,屬性work表示線程池中要執行的函數,屬性done表示線程池執行完,通知主線程要執行的函數;第3、四個參數分別對應work函數和done函數。該函數主要作了兩件事情:一件是經過uv_once調用init_once來初始化線程池;另外一件是對w進行賦值,而後經過post將其提交到任務隊列。這裏須要注意,經過nv_once能夠保證uv__work_submit在調用屢次的狀況,init_once只執行一次,nv_once底層是經過pthread_once實現的。init_once會在下一節介紹,讓咱們先來看下post。post


1static void post(QUEUE* q, enum uv__work_kind kind) {
 2  // 獲取鎖
 3  uv_mutex_lock(&mutex);
 4  ...
 5  // 將任務添加到任務隊列的最後
 6  QUEUE_INSERT_TAIL(&wq, q);
 7  
 8  // 若是線程池中有掛起的線程,就喚起掛起的線程,讓其工做
 9  if (idle_threads > 0)
10    uv_cond_signal(&cond);
11  // 釋放鎖
12  uv_mutex_unlock(&mutex);
13}複製代碼


代碼很簡單,先獲取鎖mutex,而後將任務提交到任務隊列中。若是線程池中有掛起的線程,就經過條件變量cond喚起並放棄鎖mutex。學習


消費任務


任務隊列中的任務是經過線程池進行消費的,而線程池的初始化是在uv__work_submit調用init_once實現的,先看下如何初始化線程池吧:spa


1static void init_once(void) {
2  ...
3  init_threads();
4}複製代碼


init_once調用了init_threads,那就看下init_threads。


1...
 2#define MAX_THREADPOOL_SIZE 1024 // 線程池的最大數量
 3...
 4static uv_thread_t* threads; // 線程池
 5static uv_thread_t default_threads[4]; // 默認的線程池,線程數量爲4
 6...
 7
 8
 9static void init_threads(void) {
10  unsigned int i;
11  const char* val;
12  ...
13      
14  // 計算線程池中線程的數量,不能大於最大值MAX_THREADPOOL_SIZE
15  nthreads = ARRAY_SIZE(default_threads);
16    
17  // 經過環境變量設置線程池的大小
18  val = getenv("UV_THREADPOOL_SIZE");
19  if (val != NULL)
20    nthreads = atoi(val);
21  
22  // 保存線程池中最少有一個線程
23  if (nthreads == 0)
24    nthreads = 1;
25    
26  // 線程池中線程數量不能超過MAX_THREADPOOL_SIZE
27  if (nthreads > MAX_THREADPOOL_SIZE)
28    nthreads = MAX_THREADPOOL_SIZE;
29    
30  // 初始化線程池
31  threads = default_threads;
32  if (nthreads > ARRAY_SIZE(default_threads)) {
33    threads = uv__malloc(nthreads * sizeof(threads[0]));
34    if (threads == NULL) {
35      nthreads = ARRAY_SIZE(default_threads);
36      threads = default_threads;
37    }
38  }
39    
40  // 建立條件變量
41  if (uv_cond_init(&cond))
42    abort();
43  
44  // 建立互斥量
45  if (uv_mutex_init(&mutex))
46    abort();
47
48  // 初始化任務隊列
49  QUEUE_INIT(&wq);
50  ...
51      
52  // 根據線程池的數量,初始化線程池中的每一個線程,並執行worker函數
53  for (i = 0; i < nthreads; i++)
54    if (uv_thread_create(threads + i, worker, &sem))
55      abort();
56  
57  ...
58}複製代碼


經過上面的代碼能夠知道init_threads先獲取線程池的大小nthreads;而後初始化互斥量mutex、條件變量cond和任務隊列wq;最後建立nthreads個線程,每一個線程執行worker函數。worker函數的核心就是消費任務隊列中的任務,讓咱們詳細的看下它:


1static void worker(void* arg) {
 2  struct uv__work* w;
 3  QUEUE* q;
 4  
 5  ...
 6  arg = NULL;
 7    
 8  // 獲取互斥鎖
 9  uv_mutex_lock(&mutex);
10    
11  // 經過無限循環,保證線程一直執行
12  for (;;) {
13    
14    // 若是任務隊列爲空,經過等待條件變量cond掛起,並釋放鎖mutex
15    // 主線程提交任務經過uv_cond_signal喚起,並從新獲取鎖mutex
16    while (QUEUE_EMPTY(&wq) || ...) {
17      idle_threads += 1;
18      uv_cond_wait(&cond, &mutex);
19      idle_threads -= 1;
20    }
21      
22    // 從任務隊列中獲取第一個任務
23    q = QUEUE_HEAD(&wq);
24    ...
25        
26    // 將該任務從任務隊列中刪除
27    QUEUE_REMOVE(q);
28    QUEUE_INIT(q);
29      
30    ...
31        
32    // 操做完任務隊列,釋放鎖mutex
33    uv_mutex_unlock(&mutex);
34
35    // 獲取uv__work對象,並執行work
36    w = QUEUE_DATA(q, struct uv__work, wq);
37    w->work(w);
38
39    // 獲取loop的互斥鎖wq_mutex
40    uv_mutex_lock(&w->loop->wq_mutex);
41    w->work = NULL;
42    
43    // 將執行完work函數的任務掛到loop->wq隊列中
44    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
45      
46    // 經過uv_async_send通知主線程,固然有任務執行完了,主線程能夠執行任務中的done函數。
47    uv_async_send(&w->loop->wq_async);
48    uv_mutex_unlock(&w->loop->wq_mutex);
49
50    // 獲取鎖,執行任務隊列中的下一個任務
51    ...
52    uv_mutex_lock(&mutex);
53    ...
54  }
55}複製代碼


worker的本質就是從任務隊列中獲取任務,而後執行work函數。執行完後,將該任務提交到事件循環loop的wp隊列中,經過uv_async_send告知主線程執行任務中的done函數。


回調處理


上面咱們介紹了worker函數在執行完任務後會經過uv_async_send告知主線程執行回調函數,那這塊是怎麼實現的呢?這裏涉及到了事件循環,這裏咱們就簡單的介紹一下,後面會有詳細的文章介紹它。事件循環loop在初始化的時候會調用uv_async_init,該函數的第三個參數是一個函數,當其餘線程調用uv_async_send時,該函數就會執行。具體代碼以下:


1uv_async_init(loop, &loop->wq_async, uv__work_done);
 2
 3void uv__work_done(uv_async_t* handle) {
 4  struct uv__work* w;
 5  uv_loop_t* loop;
 6  QUEUE* q;
 7  QUEUE wq;
 8  int err;
 9
10  loop = container_of(handle, uv_loop_t, wq_async);
11  uv_mutex_lock(&loop->wq_mutex);
12  QUEUE_MOVE(&loop->wq, &wq);
13  uv_mutex_unlock(&loop->wq_mutex);
14
15  while (!QUEUE_EMPTY(&wq)) {
16    q = QUEUE_HEAD(&wq);
17    QUEUE_REMOVE(q);
18
19    w = container_of(q, struct uv__work, wq);
20    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
21    w->done(w, err);
22  }
23}複製代碼


uv__work_done很簡單,獲取loop中的wq隊列,獲取隊列中的每一個任務並調用done函數。


總結


本文首先介紹了生產者消費者模型,而後經過任務隊列、提交任務、消費任務、回調處理講解了Libuv線程池,爲下一篇講解Libuv文件處理作鋪墊,若是你對Libuv系列感興趣的話,歡迎關注咱們。

相關文章
相關標籤/搜索