原文:Unlike network I/O, there are no platform-specific file I/O primitives libuv could rely on, so the current approach is to run blocking file I/O operations in a thread pool。 翻譯:不像網絡IO,libuv沒有特定平臺的異步IO原語能夠依賴,因此當前是在線程池中執行阻塞(同步)IO來實現異步的。
根據libuv官網對其架構的介紹,咱們能夠知道它並非單線程的,它有一個線程池,用來處理文件IO、DSN查詢等操做。在介紹線程池以前,先經過POSIX Threads介紹一下線程的基本操做,爲下一篇文章介紹線程池打下基礎。若是您對libuv的總體架構感興趣,能夠訪問下面連接瞭解,固然之後我也會寫文章介紹的。html
Design overview - libuv documentation相信你們對線程的概念都有或多或少的瞭解,這裏就不介紹了,下面將直接經過API和demo來學習。因爲不一樣平臺線程的規範和原語不同,而我對Linux比較熟悉,因此接下來將經過Linux中的POSIX Threads來說解。libuv線程池主要涉及到線程建立、互斥鎖和條件變量3個東西,下面將分別介紹它們。api
讓咱們首先了解一下如何建立線程,代碼和輸出以下:數組
#include <stdio.h>
#include <pthread.h>
#define NUM_THREADS 5
int sum = 0;
void * thread_task(void * args) {
int max = (int)args;
for (int i = 0; i <= max; ++i) {
sum += i;
}
printf("sum: %i\n", sum);
pthread_exit(NULL);
}
int main() {
pthread_t threads[NUM_THREADS];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 8192);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_create(&thread, &attr, thread_task, (void *)10);
if (result) {
printf("線程建立失敗 errCode:%i", result);
return -1;
}
}
pthread_attr_destroy(&attr);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_join(thread, NULL);
if (result == 3) {
printf("線程%i已經結束了\n", i);
continue;
}
}
printf("main函數運行結束, sum: %i\n", sum);
return 0;
}複製代碼
上面代碼很簡單,建立5個線程,每一個線程將傳入數據做爲最大值max,而後從0,1,2,3,...,max加到sum上。接下來粗略講解一下每行代碼的含義:bash
#include <stdio.h>
#include <pthread.h>
#define NUM_THREADS 5
int sum = 0;複製代碼
前4行代碼引入了stdio.h、pthread.h兩個頭文件,函數printf在stdio.h中定義,線程相關的api在pthread.h中定義;定義了一個常量NUM_THREADS和一個變量sum,常量NUM_THREADS表示要建立的線程數,變量sum用來計算總和。微信
void * thread_task(void * args) {
int max = (int)args;
for (int i = 0; i <= max; ++i) {
sum += i;
}
printf("sum: %i\n", sum);
pthread_exit(NULL);
}複製代碼
接下來定義了一個函數thread_task,該函數會被每一個線程執行。函數很簡單,將輸入的int參數做爲max,將0,1,2,3,...,max依次加到sum上,並將當前sum輸出到控制檯。最後執行pthread_exist結束線程。網絡
最後讓咱們看下main函數裏面的內容,架構
int main() {
pthread_t threads[NUM_THREADS];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 8192);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_create(&thread, &attr, thread_task, (void *)10);
if (result) {
printf("線程建立失敗 errCode:%i", result);
return -1;
}
}
pthread_attr_destroy(&attr);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_join(thread, NULL);
if (result == 3) {
printf("線程%i已經結束了\n", i);
}
}
printf("main func end\n");
return 0;
}複製代碼
先定義了5個表明線程的數組threads;接着定義線程屬性變量attr,將線程的棧設爲8192個字節;以後經過pthread_create建立線程,每一個線程將會執行thread_task函數,並經過第3個參數將10傳遞給thread_task;最後經過pthread_join告訴main函數等到全部線程執行完以後再繼續執行。app
若是足夠仔細,相信你可能已經發現上面的輸出不符合預期,按理說應該輸出275纔對,爲啥只輸出了249呢?異步
咱們再運行一下程序看看,結果又正常了。函數
讓咱們簡單經過2個線程來分析一下,假設此刻sum值爲120,線程1中i循環到3,線程2循環到6,下表展現了致使sum錯誤的可能狀況:
經過上表能夠發現之因此出現問題是由於將i加到sum這個操做不是原子的,若是從讀取sum、將i加到sum整個過程變成原子操做,就不會有問題了。解決該問題的經常使用方法之一就是互斥鎖,讓咱們簡單修改一下代碼:
...
pthread_mutex_t mutex;
void * thread_task(void * args) {
int max = (int)args;
for (int i = 0; i <= max; ++i) {
pthread_mutex_lock(&mutex);
sum += i;
pthread_mutex_unlock(&mutex);
}
printf("sum: %i\n", sum);
pthread_exit(NULL);
}
int main() {
pthread_mutex_init(&mutex, NULL);
...
pthread_mutex_destroy(&mutex);
}
複製代碼
從代碼的角度來看,修改後的代碼增長了一個全局互斥鎖mutex,並在main函數初始化。在sum += i;前面加了一句代碼pthread_mutex_lock(&mutex),它告訴線程嘗試獲取鎖,獲取失敗就掛起,等待其餘線程釋放鎖;獲取成功就繼續執行代碼,並經過pthread_mutex_unlock(&mutex)將獲取的鎖給釋放掉。
互斥鎖只解決了多個線程修改共享變量的問題,對於下面場景它是沒法辦法解決的。一個線程須要知足一個條件才能執行下去,而這個條件由另外一個線程知足的。好比如今有一個變量i和2個線程,當i爲0時第一個線程輸出一段內容,並將i變成1;當i爲1時,第二個線程輸出一段內容,並將i變成0;兩個線程依次交替執行。對於這個問題,咱們能夠經過條件變量來實現。下面是實現的代碼和輸出。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int i = 0;
pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;
void * thread_task0(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 0) {
pthread_cond_wait(&cond0, &mutex);
}
sleep(1);
printf("**************thread_task0 i: %i\n", i);
i = 1;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond1);
}
}
void * thread_task1(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 1) {
pthread_cond_wait(&cond1, &mutex);
}
sleep(1);
printf("################thread_task1 i: %i\n", i);
i = 0;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond0);
}
}
int main() {
pthread_t thread0;
pthread_t thread1;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond0, NULL);
pthread_cond_init(&cond1, NULL);
pthread_create(&thread0, NULL, thread_task0, NULL);
pthread_create(&thread1, NULL, thread_task1, NULL);
pthread_join(thread0, NULL);
pthread_join(thread1, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond0);
pthread_cond_destroy(&cond1);
return 0;
}複製代碼
讓咱們簡單分析一下代碼吧,前3行引入了3個頭文件,前2個已經介紹過了,第3個頭文件中有sleep函數的定義,後面會用到。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>複製代碼
隨後定義了變量i,互斥鎖mutex和2個條件變量cond0、cond1,這裏須要注意一下條件變量是須要和互斥鎖一塊兒使用的。
int i = 0;
pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;複製代碼
緊接着咱們定義了2個函數,分別由2個線程執行,因爲這兩個函數文字解釋比較麻煩,下面經過表格來表示兩個線程的執行過程。這裏須要注意的是,pthread_cond_wait會放棄當前線程得到的鎖,並進入掛起狀態。當其餘線程經過pthread_cond_signal通知該線程時,該線程會被喚起,從新得到鎖。
void * thread_task0(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 0) {
pthread_cond_wait(&cond0, &mutex);
}
sleep(1);
printf("**************thread_task0 i: %i\n", i);
i = 1;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond1);
}
}
void * thread_task1(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 1) {
pthread_cond_wait(&cond1, &mutex);
}
sleep(1);
printf("################thread_task1 i: %i\n", i);
i = 0;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond0);
}
}複製代碼
main函數只是用來啓動上面介紹的兩個線程,因此這裏就不解釋了。
上面介紹了POSIX Threads,接下來讓咱們粗略的看下libuv的線程吧,libuv官網也給出了對應的API文檔,有興趣的同窗能夠看下:
Threading and synchronization utilities經過翻看源碼,咱們能夠在src/unix/thread.c和src/win/thread.c文件下看到libuv線程的實現,很簡單,就是對各個平臺原有線程API進行包裝,使得API統一化,下面經過src/unix/thread.c稍稍看下它的實現吧。
typedef pthread_t uv_thread_t;
int uv_thread_create_ex(uv_thread_t* tid,
const uv_thread_options_t* params,
void (*entry)(void *arg),
void *arg) {
int err;
pthread_attr_t* attr;
pthread_attr_t attr_storage;
size_t pagesize;
size_t stack_size;
/* Used to squelch a -Wcast-function-type warning. */
union {
void (*in)(void*);
void* (*out)(void*);
} f;
stack_size =
params->flags & UV_THREAD_HAS_STACK_SIZE ? params->stack_size : 0;
attr = NULL;
if (stack_size == 0) {
stack_size = thread_stack_size();
} else {
pagesize = (size_t)getpagesize();
/* Round up to the nearest page boundary. */
stack_size = (stack_size + pagesize - 1) &~ (pagesize - 1);
#ifdef PTHREAD_STACK_MIN
if (stack_size < PTHREAD_STACK_MIN)
stack_size = PTHREAD_STACK_MIN;
#endif
}
if (stack_size > 0) {
attr = &attr_storage;
if (pthread_attr_init(attr))
abort();
if (pthread_attr_setstacksize(attr, stack_size))
abort();
}
f.in = entry;
err = pthread_create(tid, attr, f.out, arg);
if (attr != NULL)
pthread_attr_destroy(attr);
return UV__ERR(err);
}複製代碼
能夠看到建立線程的方法和咱們在POSIX Threads中介紹的差很少,都是經過pthread_create來建立,只不過經過pthread_attr_t設置了一些線程屬性罷了,好比線程堆棧的大小。
typedef pthread_mutex_t uv_mutex_t;
int uv_mutex_init(uv_mutex_t* mutex) {
#if defined(NDEBUG) || !defined(PTHREAD_MUTEX_ERRORCHECK)
return UV__ERR(pthread_mutex_init(mutex, NULL));
#else
pthread_mutexattr_t attr;
int err;
if (pthread_mutexattr_init(&attr))
abort();
if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK))
abort();
err = pthread_mutex_init(mutex, &attr);
if (pthread_mutexattr_destroy(&attr))
abort();
return UV__ERR(err);
#endif
}
void uv_mutex_lock(uv_mutex_t* mutex) {
if (pthread_mutex_lock(mutex))
abort();
}
void uv_mutex_unlock(uv_mutex_t* mutex) {
if (pthread_mutex_unlock(mutex))
abort();
}複製代碼
互斥鎖的API也和咱們POSIX Threads裏介紹的差很少。
本文初步經過線程建立、互斥鎖和條件變量介紹了POSIX Threads以及libuv自己的線程API,這些是libuv實現線程池的核心,結合上篇《Libuv學習——隊列》,咱們已經爲下篇libuv線程池打好了基礎,全部您有興趣的話,能夠關注咱們微信公衆號《方凳雅集》,這樣您將會在第一時間看到咱們的文章。