經過前兩篇文章的學習,咱們已經解了Libuv中的隊列和線程,爲本文的學習打下基礎,沒有看過的同窗建議先看下。下面將從生產者消費者模型和源碼兩個角度學習Libuv的線程池,爲後面學習Libuv文件處理作鋪墊。安全
Node.js的文件操做支持同步調用和異步調用,根據Libuv官網的介紹,咱們知道它沒有跨平臺的異步文件IO可使用,因此它的異步文件IO是經過在線程池中執行同步文件IO實現的。那具體是怎麼實現的呢?答案就是生產者消費者模型。Libuv的線程包括2部分,一個是主線程,一個是線程池。主線程的一部分工做是描述任務並將其提交給線程池,線程池進行處理。拿異步文件操做爲例,主線程生成一個描述文件操做的對象,將其提交到任務隊列;線程池從任務隊列獲取該對象進行處理。其中主線程是生產者,線程池中的線程是消費者,任務隊列是生產者和消費者之間的橋樑,下面是一個簡單的示意圖:bash
Libuv在生產者消費者模型中多加了一步,線程池執行完任務後,將結果交給主線程,主線程拿到結果後,若是發現有回調函數須要執行,就執行。因此Libuv的線程模型以下:異步
Libuv線程池的代碼很容易找到,就在src目錄下的threadpool.c文件中。async
經過上面對生產者消費者模型的介紹,該代碼大體分爲4部分:任務隊列、主線程提交任務到任務隊列(提交任務)、線程池從任務隊列獲取任務並執行(消費任務)、線程池執行完任務通知主線程執行回調函數(回調處理)。函數
任務隊列就是一個隊列而已。因爲任務隊列會被多個線程(主線程、線程池)同時訪問,爲了保證線程安全,須要互斥鎖。另外任務隊列若是爲空,線程池中的線程須要掛起,等待主線程提交任務後喚起,因此還須要條件變量。任務隊列、條件變量、互斥量的定義以下所示:oop
1...
2static uv_cond_t cond; // 條件變量
3static uv_mutex_t mutex; // 互斥鎖
4...
5static QUEUE wq; // 任務隊列
6...複製代碼
主線程將任務提交到任務隊列是經過uv__work_submit來實現的,讓咱們來看下它的代碼:源碼分析
1struct uv__work {
2 void (*work)(struct uv__work *w);
3 void (*done)(struct uv__work *w, int status);
4 struct uv_loop_s* loop;
5 void* wq[2]; // 用於將其關聯到任務隊列中
6};
7
8void uv__work_submit(uv_loop_t* loop,
9 struct uv__work* w,
10 enum uv__work_kind kind,
11 void (*work)(struct uv__work* w),
12 void (*done)(struct uv__work* w, int status)) {
13 uv_once(&once, init_once); // 初始化線程,無亂調用多少次,init_once只會執行一次
14 w->loop = loop; // 事件循環
15 w->work = work; // 線程池要執行的函數
16 w->done = done; // 線程池執行結束後,通知主線程要執行的函數
17 post(&w->wq, kind); // 將任務提交任務隊列中
18}複製代碼
uv__work_submit有4個參數:第一個參數爲Libuv的事件循環,這裏咱們先忽略,之後會有專門的文章介紹;第二個參數是線程池執行任務的通用模型,類型爲uv__work,屬性work表示線程池中要執行的函數,屬性done表示線程池執行完,通知主線程要執行的函數;第3、四個參數分別對應work函數和done函數。該函數主要作了兩件事情:一件是經過uv_once調用init_once來初始化線程池;另外一件是對w進行賦值,而後經過post將其提交到任務隊列。這裏須要注意,經過nv_once能夠保證uv__work_submit在調用屢次的狀況,init_once只執行一次,nv_once底層是經過pthread_once實現的。init_once會在下一節介紹,讓咱們先來看下post。post
1static void post(QUEUE* q, enum uv__work_kind kind) {
2 // 獲取鎖
3 uv_mutex_lock(&mutex);
4 ...
5 // 將任務添加到任務隊列的最後
6 QUEUE_INSERT_TAIL(&wq, q);
7
8 // 若是線程池中有掛起的線程,就喚起掛起的線程,讓其工做
9 if (idle_threads > 0)
10 uv_cond_signal(&cond);
11 // 釋放鎖
12 uv_mutex_unlock(&mutex);
13}複製代碼
代碼很簡單,先獲取鎖mutex,而後將任務提交到任務隊列中。若是線程池中有掛起的線程,就經過條件變量cond喚起並放棄鎖mutex。學習
任務隊列中的任務是經過線程池進行消費的,而線程池的初始化是在uv__work_submit調用init_once實現的,先看下如何初始化線程池吧:spa
1static void init_once(void) {
2 ...
3 init_threads();
4}複製代碼
init_once調用了init_threads,那就看下init_threads。
1...
2#define MAX_THREADPOOL_SIZE 1024 // 線程池的最大數量
3...
4static uv_thread_t* threads; // 線程池
5static uv_thread_t default_threads[4]; // 默認的線程池,線程數量爲4
6...
7
8
9static void init_threads(void) {
10 unsigned int i;
11 const char* val;
12 ...
13
14 // 計算線程池中線程的數量,不能大於最大值MAX_THREADPOOL_SIZE
15 nthreads = ARRAY_SIZE(default_threads);
16
17 // 經過環境變量設置線程池的大小
18 val = getenv("UV_THREADPOOL_SIZE");
19 if (val != NULL)
20 nthreads = atoi(val);
21
22 // 保存線程池中最少有一個線程
23 if (nthreads == 0)
24 nthreads = 1;
25
26 // 線程池中線程數量不能超過MAX_THREADPOOL_SIZE
27 if (nthreads > MAX_THREADPOOL_SIZE)
28 nthreads = MAX_THREADPOOL_SIZE;
29
30 // 初始化線程池
31 threads = default_threads;
32 if (nthreads > ARRAY_SIZE(default_threads)) {
33 threads = uv__malloc(nthreads * sizeof(threads[0]));
34 if (threads == NULL) {
35 nthreads = ARRAY_SIZE(default_threads);
36 threads = default_threads;
37 }
38 }
39
40 // 建立條件變量
41 if (uv_cond_init(&cond))
42 abort();
43
44 // 建立互斥量
45 if (uv_mutex_init(&mutex))
46 abort();
47
48 // 初始化任務隊列
49 QUEUE_INIT(&wq);
50 ...
51
52 // 根據線程池的數量,初始化線程池中的每一個線程,並執行worker函數
53 for (i = 0; i < nthreads; i++)
54 if (uv_thread_create(threads + i, worker, &sem))
55 abort();
56
57 ...
58}複製代碼
經過上面的代碼能夠知道init_threads先獲取線程池的大小nthreads;而後初始化互斥量mutex、條件變量cond和任務隊列wq;最後建立nthreads個線程,每一個線程執行worker函數。worker函數的核心就是消費任務隊列中的任務,讓咱們詳細的看下它:
1static void worker(void* arg) {
2 struct uv__work* w;
3 QUEUE* q;
4
5 ...
6 arg = NULL;
7
8 // 獲取互斥鎖
9 uv_mutex_lock(&mutex);
10
11 // 經過無限循環,保證線程一直執行
12 for (;;) {
13
14 // 若是任務隊列爲空,經過等待條件變量cond掛起,並釋放鎖mutex
15 // 主線程提交任務經過uv_cond_signal喚起,並從新獲取鎖mutex
16 while (QUEUE_EMPTY(&wq) || ...) {
17 idle_threads += 1;
18 uv_cond_wait(&cond, &mutex);
19 idle_threads -= 1;
20 }
21
22 // 從任務隊列中獲取第一個任務
23 q = QUEUE_HEAD(&wq);
24 ...
25
26 // 將該任務從任務隊列中刪除
27 QUEUE_REMOVE(q);
28 QUEUE_INIT(q);
29
30 ...
31
32 // 操做完任務隊列,釋放鎖mutex
33 uv_mutex_unlock(&mutex);
34
35 // 獲取uv__work對象,並執行work
36 w = QUEUE_DATA(q, struct uv__work, wq);
37 w->work(w);
38
39 // 獲取loop的互斥鎖wq_mutex
40 uv_mutex_lock(&w->loop->wq_mutex);
41 w->work = NULL;
42
43 // 將執行完work函數的任務掛到loop->wq隊列中
44 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
45
46 // 經過uv_async_send通知主線程,固然有任務執行完了,主線程能夠執行任務中的done函數。
47 uv_async_send(&w->loop->wq_async);
48 uv_mutex_unlock(&w->loop->wq_mutex);
49
50 // 獲取鎖,執行任務隊列中的下一個任務
51 ...
52 uv_mutex_lock(&mutex);
53 ...
54 }
55}複製代碼
worker的本質就是從任務隊列中獲取任務,而後執行work函數。執行完後,將該任務提交到事件循環loop的wp隊列中,經過uv_async_send告知主線程執行任務中的done函數。
上面咱們介紹了worker函數在執行完任務後會經過uv_async_send告知主線程執行回調函數,那這塊是怎麼實現的呢?這裏涉及到了事件循環,這裏咱們就簡單的介紹一下,後面會有詳細的文章介紹它。事件循環loop在初始化的時候會調用uv_async_init,該函數的第三個參數是一個函數,當其餘線程調用uv_async_send時,該函數就會執行。具體代碼以下:
1uv_async_init(loop, &loop->wq_async, uv__work_done);
2
3void uv__work_done(uv_async_t* handle) {
4 struct uv__work* w;
5 uv_loop_t* loop;
6 QUEUE* q;
7 QUEUE wq;
8 int err;
9
10 loop = container_of(handle, uv_loop_t, wq_async);
11 uv_mutex_lock(&loop->wq_mutex);
12 QUEUE_MOVE(&loop->wq, &wq);
13 uv_mutex_unlock(&loop->wq_mutex);
14
15 while (!QUEUE_EMPTY(&wq)) {
16 q = QUEUE_HEAD(&wq);
17 QUEUE_REMOVE(q);
18
19 w = container_of(q, struct uv__work, wq);
20 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
21 w->done(w, err);
22 }
23}複製代碼
uv__work_done很簡單,獲取loop中的wq隊列,獲取隊列中的每一個任務並調用done函數。
本文首先介紹了生產者消費者模型,而後經過任務隊列、提交任務、消費任務、回調處理講解了Libuv線程池,爲下一篇講解Libuv文件處理作鋪墊,若是你對Libuv系列感興趣的話,歡迎關注咱們。