既前篇nodejs深刻學習系列之libuv基礎篇(一)學習的基本概念以後,咱們在第二篇將帶你們去學習爲何libuv的併發能力這麼優秀?這併發後面的實現機制是什麼?node
好了,瞭解了上述的基本概念以後,咱們來扯一扯Libuv的事件循環機制,也就是event-loop。仍是以[譯文]libuv設計思想概述一文展現的兩張圖片,再結合代碼來學習整個Libuv的事件循環機制。linux
首先是第一張圖片:git
細心的童鞋會發現這張圖片被我用紅框分割成了兩部分,爲何呢?由於Libuv處理fs I/O和網絡I/O用了兩套機制去實現,或者說更全面的講應該是fs I/O和 DNS等實現的方式和網絡 I/O是不同的。爲何這麼說呢?請看下圖,你就會明白了:github
上圖左側是libuv的兩大基石:event-loop
線程和thread pool
。而從圖的右側有兩條軌跡分別鏈接到這兩個基石,我特別用紅色加粗標記,能夠看到:segmentfault
uv__io_start
這個函數,而該函數會將須要執行的I/O事件和回調塞到watcher
隊列中,以後uv_run
函數執行的Poll for I/O
階段作的即是從watcher隊列中取出事件調用系統的接口,這是其中一條主線uv__work_sumit
這個函數,而該函數就是執行線程池初始化並調度的終極函數。這是另一條主線。接着咱們來看第二張圖片,咱們依然將該圖片進行改造以下:bash
整個事件循環的執行主體是在uv_run
中,每一次的循環經歷的階段對應的函數在上圖中已經標註出來,有幾個重點要說一下:網絡
循環是否退出(也就是進程是否結束)取決於如下幾個條件中的一個:多線程
1.一、loop->stop_flag變爲1而且uv__loop_alive返回不爲0,也就是調用uv_stop
函數而且loop不存在活躍的和被引用的句柄、活躍的請求或正在關閉的句柄。併發
1.二、事件循環運行模式等於UV_RUN_ONCE
或者是UV_RUN_NOWAIT
異步
I/O循環的超時時間的肯定:
2.一、若是時間循環運行模式是UV_RUN_NOWAIT
,超時爲0。
2.二、若是循環將要中止(代碼調用了uv_stop()
),超時爲0。
2.三、若是沒有活躍句柄或請求,超時爲0。
2.四、若是有任何Idle句柄處於活躍狀態,超時爲0。
2.五、若是有等待關閉的句柄,超時爲0。
2.六、若是以上狀況都不匹配,則採用最近的計時器的超時時間-當前時間(handle->timeout-loop->time),或者若是沒有活動計時器,則爲無窮大(即返回-1)。
I/O循環的實現主體uv__io_poll
根據系統不一樣,使用方式不同,若是對linux系統熟悉的話,epoll方式應該也會了解。更多epoll的只是能夠參考該文章:Linux IO模式及 select、poll、epoll詳解
說完時間循環的主線程,接下去咱們繼續揭祕libuv的線程池。
libuv提供了一個threadpool,可用來運行用戶代碼並在事件循環線程(event-loop)中獲得通知。這個線程池在內部用於運行全部文件系統操做,以及getaddrinfo和getnameinfo請求。固然若是你想要將本身的代碼放在線程池中運行也是能夠的,libuv提供除了uv_queue_work
的方法供開發者本身選擇。
它的默認大小爲4,可是能夠在啓動時經過將UV_THREADPOOL_SIZE
環境變量設置爲任意值(最大值爲1024)來更改它。
threadpool是全局的,並在全部事件循環中共享。當一個特定的函數使用threadpool(即當使用uv_queue_work())時,libuv預先分配並初始化UV_THREADPOOL_SIZE所容許的最大線程數。這致使了相對較小的內存開銷(128個線程大約1MB),但在運行時提升了線程的性能。
關於線程的操做,demo中的文件是:傳送門
在實例中,咱們用了三種方式來實現和線程相關的一些操做:
uv_async_send
來「喚醒」 event loop主線程並執行uv_async_init
當初設置好的回調咱們在上一節中知道,想要建立線程池並讓他們工做,惟一繞不開的函數是uv__work_submit
,你們能夠在libuv源碼中搜尋這個,能夠發現可以找到的也就這幾個文件:(以unix系統爲例)
threadpool.c
1. uv__work_submit實現地方
2. uv_queue_work調用
fs.c
1. 宏定義POST調用,全部的fs操做都會調用POST這個宏
getaddrinfo.c
1. uv_getaddrinfo調用
getnameinfo.c
1. uv_getnameinfo調用
複製代碼
細心的童鞋發現,每一處調用的地方都會傳一個叫作enum uv__work_kind kind
的操做,根據上面的調用,能夠看出分爲了3種任務類型:
學習線程池初始化以前,咱們先得普及一下線程間的同步原語。這樣後面看的代碼纔不會糊里糊塗
libuv提供了mutex鎖
、讀寫鎖
、信號量(Semaphores)
、條件量(Conditions)
、屏障(Barriers)
五種手段來實現線程間資源競爭互斥同步等操做。接下去會簡單地介紹,以便待會的初始化流程能夠讀懂。
互斥鎖用於對資源的互斥訪問,當你訪問的內存資源可能被別的線程訪問到,這個時候你就能夠考慮使用互斥鎖,在訪問的時候鎖住。對應的使用流程多是這樣的:
在線程初始化的過程當中,咱們會初始化一個全局的互斥鎖:
static void init_threads(void) {
...
if (uv_mutex_init(&mutex))
abort()
...
}
複製代碼
然後在每一個線程的執行實體worker
函數中,就使用互斥鎖對下面幾個公共資源進行鎖住與解鎖:
static void worker(void* arg) {
...
uv_mutex_lock(&mutex);
...
uv_mutex_unlock(&mutex);
}
複製代碼
讀寫鎖沒有用在線程的啓動過程當中,咱們在demo中用來實踐對某個全局變量的訪問。具體使用步驟參考代碼,這裏就再也不贅述。
信號量是一種專門用於提供不一樣進程間或線程間同步手段的原語。信號量本質上是一個非負整數計數器,表明共享資源的數目,一般是用來控制對共享資源的訪問。通常使用步驟是這樣的:
在線程池初始化過程當中,咱們利用信號量來等待全部的線程初始化結束,以下代碼:
static void init_threads(void) {
...
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
}
// 而每一個線程的執行實體都會去將信號量-1:
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
...
}
複製代碼
這樣只要全部的線程沒有初始化完成,uv_sem_destroy
這個函數是不會執行到的,整個初始化函數也不會返回,此時的主線程也就阻塞在這裏了。
而條件變量經過容許線程阻塞和等待另外一個線程發送信號的方法彌補了互斥鎖的不足。條件變量的內部實質上是一個等待隊列,放置等待(阻塞)的線程,線程在條件變量上等待和通知,互斥鎖用來保護等待隊列(由於全部的線程均可以放入等待隊列,因此等待隊列成爲了一個共享的資源,須要被上鎖保護),所以條件變量一般和互斥鎖一塊兒使用。通常使用步驟是這樣的:
libuv使用條件變量來阻塞線程池和喚醒線程池,使用代碼以下:
static void init_threads(void) {
if (uv_cond_init(&cond))
abort();
}
static void worker(void* arg) {
...
for (;;) {
/* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O
and we're at the threshold for that. */ while (QUEUE_EMPTY(&wq) || (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } ... } } static void post(QUEUE* q, enum uv__work_kind kind) { ... if (idle_threads > 0) uv_cond_signal(&cond) ... } 複製代碼
從上面三處代碼能夠看到線程啓動以後就進入阻塞狀態,直到有I/O請求調用uv_cond_signal來喚醒,按照uv_cond_wait
調用的順序造成一個等待隊列,循環調用。
在多線程的時候,咱們總會碰到一個需求,就是須要等待一組進程所有執行完畢後再執行某些事,因爲多線程是亂序的,沒法預估線程都執行到哪裏了,這就要求咱們有一個屏障做爲同步點,在全部有屏障的地方都會阻塞等待,直到全部的線程都的代碼都執行到同步點,再繼續執行後續代碼。使用步驟通常是:
只有當初始化計數的值爲0,主線程纔會繼續執行,具體使用方法能夠參考demo。
至此藉助於線程間同步原語,咱們就嘩啦啦地把線程的初始化以及大概的工做機制講完了,總結出了下面一張圖:
線程池的工做利用的是主線程post
函數和各個線程的worker
函數,post
函數的工做內容以下:
併發的慢 I/O 的請求數量不會超過線程池大小的一半,這樣作的好處是避免多個慢 I/O 的請求在某段時間內把全部線程都佔滿,致使其它可以快速執行的請求須要排隊。
static unsigned int slow_work_thread_threshold(void) {
return (nthreads + 1) / 2;
}
複製代碼
而各個線程的工做內容以下:
w->work(w);
uv_async_send
上一小節清晰地描述了libuv的主線程是如何將請求分給各個線程以及線程是如何處理請求的,那麼上述過程當中還有一個步驟:線程池裏面的線程完成工做以後是如何通知主線程的?主線程收到通知以後又繼續作了些什麼?
這個過程咱們稱之爲線程間的通訊。上一小節中或者咱們的demo中已經知道,完成這個事情的主要函數是uv_async_send
,那麼這個函數是如何實現的呢?請看下圖:
從圖中咱們能夠看到,藉助於io poll與管道,線程池的線程寫入數據,被主線程輪詢出來,知道有消息過來,就開始執行對應的回調函數。整個流程就是這麼easy~