上一篇其實啥也沒講,不過node自己就是這麼複雜,走流程就要走全套。就像曾經看webpack源碼,讀了300行代碼最後就爲了取package.json裏面的main屬性,致使我直接棄坑了,垃圾源碼看完對腦子沒一點好處。回頭看了我以前那篇博客,同步那塊講的還像回事,異步就慘不忍睹了。不過講道理,異步中涉及鎖、底層操做系統API(iocp)的部分我到如今也不太懂,畢竟沒有實際的多線程開發經驗,只是純粹的技術愛好者。
這一篇再次進入libuv內部,從uv_fs_stat開始,操做系統以windows爲準,方法源碼以下。
int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
int err;
INIT(UV_FS_STAT);
err = fs__capture_path(req, path, NULL, cb != NULL);
if (err) {
return uv_translate_sys_error(err);
}
POST;
}複製代碼
int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
INIT(STAT);
PATH;
POST;
}複製代碼
前面兩步在那篇都有介紹,這裏就不重複了。大概就是根據操做類型初始化req對象,而後處理一下路徑,分配合理的空間給path字符串這些。
#define POST \ do { \ if (cb != NULL) { \ uv__req_register(loop, req); \
uv__work_submit(loop, \
&req->work_req, \
UV__WORK_FAST_IO, \
uv__fs_work, \
uv__fs_done); \
return 0; \
} else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)複製代碼
因爲只關注異步操做,因此看if分支。參數已經在註釋中給出,還須要注意的一個點是方法名,register、submit,即註冊、提交。意思是,異步操做中,在這裏也不是執行I/O的地點,實際上還有更深刻的地方,繼續日後面看。
uv__req_register這個就不看了,簡單講是把loop的active_handle++,每一輪輪詢結束後會檢測當前loop是否還有活躍的handle須要處理,有就會繼續跑,判斷標準就是active_handle數量是否大於0。
// uv__word結構體
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2];
};
// 參數參考上面 init_once是一個方法
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq, kind);
}複製代碼
又是兩部曲,第一個uv_once如其名,這個方法只會執行一次,而後將loop對象和兩個方法掛在前面req的uv__work結構體上,最後調用post。
uv_once這個方法有點意思,自己跟stat操做自己毫無關係,只是對全部I/O操做作一個準備工做,全部的I/O操做都會預先調一下這個方法。windows、Unix系統的處理方式徹底不一樣,這裏貼一貼代碼,Unix不想看也看不懂,搞搞windows系統的。
void uv_once(uv_once_t* guard, void (*callback)(void)) {
// 調用過方法此處ran爲1 直接返回
if (guard->ran) {
return;
}
uv__once_inner(guard, callback);
}
static void uv__once_inner(uv_once_t* guard, void (*callback)(void)) {
DWORD result;
HANDLE existing_event, created_event;
// 建立或打開命名或未命名的事件對象
created_event = CreateEvent(NULL, 1, 0, NULL);
if (created_event == 0) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
// 對&guard->event與NULL作原子比較 若是相等則將created_event賦予&guard->event
// 返回第一個參數的初始值
existing_event = InterlockedCompareExchangePointer(&guard->event,
created_event,
NULL);
// 若是第一個參數初始值爲NULL 說明該線程搶到了方法第一次執行權利
if (existing_event == NULL) {
/* We won the race */
callback();
result = SetEvent(created_event);
assert(result);
guard->ran = 1;
} else {
// ...
}
}複製代碼
-
libuv這裏直接跟操做系統通訊,在windows上須要藉助其自身的event模塊來輔助異步操做。
-
提早劇透一下,全部的I/O操做是調用獨立線程進行處理,因此這個uv_once會被屢次調用,而多線程搶調用的時候有兩種狀況;第一種最簡單,第一名已經跑完全部流程,將ran設置爲1,其他線程直接被擋在了uv_once那裏直接返回了。第二種就較爲複雜,兩個線程同時接到了這個任務,而後都跑進了uv_once_inner中去了,如何保證參數callback只會被調用一次?這裏用上了windows內置的原子指針比較方法InterlockedCompareExchangePointer。何謂原子比較?這是隻有在多線程纔會出現的概念,原子性保證了每次讀取變量的值都是根據最新信息計算出來的,避免了多線程常常出現的競態問題,詳細文獻能夠參考wiki。
-
只有第一個搶到了調用權利的線程纔會進入if分支,以後調用callback方法,並設置event,那個SetEvent也是windowsAPI,有興趣本身研究去。
最後,全部的代碼流向都爲了執行callback,參數代表這是一個函數指針,無返回值無參數,叫init_once。
static void init_once(void) {
#ifndef _WIN32
#endif
init_threads();
}複製代碼
先代表,libuv中有一個很是關鍵的數據結構:隊列,在src/queue.h。不少地方(好比以前講輪詢的某一階段取對應的callback時)我雖說的是鏈表,但實際上用的是這個,因爲鏈表是隊列的超集,並且比較容易理解,總的來講也不算錯。說這麼多,實際上是初始化線程池會用到不少queue的宏,我不想講,後面會單獨開一篇說。
static void init_threads(void) {
unsigned int i;
const char* val;
uv_sem_t sem;
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nthreads = atoi(val);
if (nthreads == 0)
nthreads = 1;
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;
threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
}複製代碼
除去一些不關心的代碼,剩下的就是判斷是否有手動設置線程池數量,而後初始化分配空間,最後循環給每個線程分配任務。
這個worker能夠先簡單看一下,大部份內容都是QUEUE相關,詳細內容所有寫在註釋裏面。
static void worker(void* arg) {
uv_mutex_lock(&mutex);
for (;;) {
q = QUEUE_HEAD(&wq);
if (q == &exit_message) {
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
break;
}
QUEUE_REMOVE(q);
QUEUE_INIT(q);
is_slow_work = 0;
if (q == &run_slow_work_message) {
}
uv_mutex_unlock(&mutex);
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL;
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_lock(&mutex);
if (is_slow_work) {
slow_io_work_running--;
}
}
}複製代碼
注意是靜態方法,因此也須要處理多線程問題。註釋我寫的很是詳細了,能夠慢慢看,不懂C++也大概能明白流程。
還覺得這一篇能搞完,沒想到這個流程有點長,先這樣吧。