簡介: 本文是 POSIX 線程三部曲系列的最後一部分,Daniel 將詳細討論如何使用條件變量。條件變量是 POSIX 線程結構,可讓您在遇到某些條件時「喚醒」線程。能夠將它們看做是一種線程安全的信號發送。Daniel 使用目前您所學到的知識實現了一個多線程工做組應用程序,本文將圍繞着這一示例而進行討論。html
條件變量詳解node
在 上一篇文章結束時,我描述了一個比較特殊的難題:若是線程正在等待某個特定條件發生,它應該如何處理這種狀況?它能夠重複對互斥對象鎖定和解鎖,每次都會檢查共享數據結構,以查找某個值。但這是在浪費時間和資源,並且這種繁忙查詢的效率很是低。解決這個問題的最佳方法是使用 pthread_cond_wait() 調用來等待特殊條件發生。 linux
瞭解 pthread_cond_wait() 的做用很是重要 -- 它是 POSIX 線程信號發送系統的核心,也是最難以理解的部分。安全
首先,讓咱們考慮如下狀況:線程爲查看已連接列表而鎖定了互斥對象,然而該列表恰巧是空的。這一特定線程什麼也幹不了 -- 其設計意圖是從列表中除去節點,可是如今卻沒有節點。所以,它只能:數據結構
鎖定互斥對象時,線程將調用 pthread_cond_wait(&mycond,&mymutex)。pthread_cond_wait() 調用至關複雜,所以咱們每次只執行它的一個操做。多線程
pthread_cond_wait() 所作的第一件事就是同時對互斥對象解鎖(因而其它線程能夠修改已連接列表),並等待條件 mycond 發生(這樣當 pthread_cond_wait() 接收到另外一個線程的「信號」時,它將甦醒)。如今互斥對象已被解鎖,其它線程能夠訪問和修改已連接列表,可能還會添加項。app
此時,pthread_cond_wait() 調用還未返回。對互斥對象解鎖會當即發生,但等待條件 mycond 一般是一個阻塞操做,這意味着線程將睡眠,在它甦醒以前不會消耗 CPU 週期。這正是咱們期待發生的狀況。線程將一直睡眠,直到特定條件發生,在這期間不會發生任何浪費 CPU 時間的繁忙查詢。從線程的角度來看,它只是在等待 pthread_cond_wait() 調用返回。ide
如今繼續說明,假設另外一個線程(稱做「2 號線程」)鎖定了 mymutex 並對已連接列表添加了一項。在對互斥對象解鎖以後,2 號線程會當即調用函數 pthread_cond_broadcast(&mycond)。此操做以後,2 號線程將使全部等待 mycond 條件變量的線程當即甦醒。這意味着第一個線程(仍處於 pthread_cond_wait() 調用中)如今將甦醒。函數
如今,看一下第一個線程發生了什麼。您可能會認爲在 2 號線程調用 pthread_cond_broadcast(&mymutex) 以後,1 號線程的 pthread_cond_wait() 會當即返回。不是那樣!實際上,pthread_cond_wait() 將執行最後一個操做:從新鎖定 mymutex。一旦 pthread_cond_wait() 鎖定了互斥對象,那麼它將返回並容許 1 號線程繼續執行。那時,它能夠立刻檢查列表,查看它所感興趣的更改。學習
那個過程很是複雜,所以讓咱們先來回顧一下。第一個線程首先調用:
pthread_mutex_lock(&mymutex); |
而後,它檢查了列表。沒有找到感興趣的東西,因而它調用:
pthread_cond_wait(&mycond, &mymutex); |
而後,pthread_cond_wait() 調用在返回前執行許多操做:
pthread_mutex_unlock(&mymutex); |
它對 mymutex 解鎖,而後進入睡眠狀態,等待 mycond 以接收 POSIX 線程「信號」。一旦接收到「信號」(加引號是由於咱們並非在討論傳統的 UNIX 信號,而是來自 pthread_cond_signal() 或 pthread_cond_broadcast() 調用的信號),它就會甦醒。但 pthread_cond_wait() 沒有當即返回 -- 它還要作一件事:從新鎖定 mutex:
pthread_mutex_lock(&mymutex); |
pthread_cond_wait() 知道咱們在查找 mymutex 「背後」的變化,所以它繼續操做,爲咱們鎖定互斥對象,而後才返回。
如今已回顧了 pthread_cond_wait() 調用,您應該瞭解了它的工做方式。應該可以敘述 pthread_cond_wait() 依次執行的全部操做。嘗試一下。若是理解了 pthread_cond_wait(),其他部分就至關容易,所以請從新閱讀以上部分,直到記住爲止。好,讀完以後,可否告訴我在調用 pthread_cond_wait() 之 前,互斥對象必須處於什麼狀態?pthread_cond_wait() 調用返回以後,互斥對象處於什麼狀態?這兩個問題的答案都是「鎖定」。既然已經徹底理解了 pthread_cond_wait() 調用,如今來繼續研究更簡單的東西 -- 初始化和真正的發送信號和廣播進程。到那時,咱們將會對包含了多線程工做隊列的 C 代碼瞭如指掌。
條件變量是一個須要初始化的真實數據結構。如下就初始化的方法。首先,定義或分配一個條件變量,以下所示:
pthread_cond_t mycond; |
而後,調用如下函數進行初始化:
pthread_cond_init(&mycond,NULL); |
瞧,初始化完成了!在釋放或廢棄條件變量以前,須要毀壞它,以下所示:
pthread_cond_destroy(&mycond); |
很簡單吧。接着討論 pthread_cond_wait() 調用。
一旦初始化了互斥對象和條件變量,就能夠等待某個條件,以下所示:
pthread_cond_wait(&mycond, &mymutex); |
請注意,代碼在邏輯上應該包含 mycond 和 mymutex。一個特定條件只能有一個互斥對象,並且條件變量應該表示互斥數據「內部」的一種特殊的條件更改。一個互斥對象能夠用許多條件變量(例如,cond_empty、cond_full、cond_cleanup),但每一個條件變量只能有一個互斥對象。
對於發送信號和廣播,須要注意一點。若是線程更改某些共享數據,並且它想要喚醒全部正在等待的線程,則應使用 pthread_cond_broadcast 調用,以下所示:
pthread_cond_broadcast(&mycond); |
在某些狀況下,活動線程只須要喚醒第一個正在睡眠的線程。假設您只對隊列添加了一個工做做業。那麼只須要喚醒一個工做程序線程(再喚醒其它線程是不禮貌的!):
pthread_cond_signal(&mycond); |
此函數只喚醒一個線程。若是 POSIX 線程標準容許指定一個整數,可讓您喚醒必定數量的正在睡眠的線程,那就更完美了。可是很惋惜,我沒有被邀請參加會議。
我將演示如何建立多線程工做組。在這個方案中,咱們建立了許多工做程序線程。每一個線程都會檢查 wq(「工做隊列」),查看是否有須要完成的工做。若是有須要完成的工做,那麼線程將從隊列中除去一個節點,執行這些特定工做,而後等待新的工做到達。
與此同時,主線程負責建立這些工做程序線程、將工做添加到隊列,而後在它退出時收集全部工做程序線程。您將會遇到許多 C 代碼,好好準備吧!
須要隊列是出於兩個緣由。首先,須要隊列來保存工做做業。還須要可用於跟蹤已終止線程的數據結構。還記得前幾篇文章(請參閱本文結尾處的 參考資料)中,我曾提到過須要使用帶有特定進程標識的 pthread_join 嗎?使用「清除隊列」(稱做 "cq")能夠解決沒法等待 任何已終止線程的問題(稍後將詳細討論這個問題)。如下是標準隊列代碼。將此代碼保存到文件 queue.h 和 queue.c:
/* queue.h ** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc. ** Author: Daniel Robbins ** Date: 16 Jun 2000 */ typedef struct node { struct node *next; } node; typedef struct queue { node *head, *tail; } queue; void queue_init(queue *myroot); void queue_put(queue *myroot, node *mynode); node *queue_get(queue *myroot); |
/* queue.c ** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc. ** Author: Daniel Robbins ** Date: 16 Jun 2000 ** ** This set of queue functions was originally thread-aware. I ** redesigned the code to make this set of queue routines ** thread-ignorant (just a generic, boring yet very fast set of queue ** routines). Why the change? Because it makes more sense to have ** the thread support as an optional add-on. Consider a situation ** where you want to add 5 nodes to the queue. With the ** thread-enabled version, each call to queue_put() would ** automatically lock and unlock the queue mutex 5 times -- that's a ** lot of unnecessary overhead. However, by moving the thread stuff ** out of the queue routines, the caller can lock the mutex once at ** the beginning, then insert 5 items, and then unlock at the end. ** Moving the lock/unlock code out of the queue functions allows for ** optimizations that aren't possible otherwise. It also makes this ** code useful for non-threaded applications. ** ** We can easily thread-enable this data structure by using the ** data_control type defined in control.c and control.h. */ #include <stdio.h> #include "queue.h" void queue_init(queue *myroot) { myroot->head=NULL; myroot->tail=NULL; } void queue_put(queue *myroot,node *mynode) { mynode->next=NULL; if (myroot->tail!=NULL) myroot->tail->next=mynode; myroot->tail=mynode; if (myroot->:head==NULL) myroot->head=mynode; } node *queue_get(queue *myroot) { //get from root node *mynode; mynode=myroot->head; if (myroot->head!=NULL) myroot->head=myroot->head->next; return mynode; } |
我編寫的並非線程安全的隊列例程,事實上我建立了一個「數據包裝」或「控制」結構,它能夠是任何線程支持的數據結構。看一下 control.h:
#include typedef struct data_control { pthread_mutex_t mutex; pthread_cond_t cond; int active; } data_control; |
如今您看到了 data_control 結構定義,如下是它的視覺表示:
圖像中的鎖表明互斥對象,它容許對數據結構進行互斥訪問。黃色的星表明條件變量,它能夠睡眠,直到所討論的數據結構改變爲止。on/off 開關表示整數 "active",它告訴線程此數據是不是活動的。在代碼中,我使用整數 active 做爲標誌,告訴工做隊列什麼時候應該關閉。如下是 control.c:
/* control.c ** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc. ** Author: Daniel Robbins ** Date: 16 Jun 2000 ** ** These routines provide an easy way to make any type of ** data-structure thread-aware. Simply associate a data_control ** structure with the data structure (by creating a new struct, for ** example). Then, simply lock and unlock the mutex, or ** wait/signal/broadcast on the condition variable in the data_control ** structure as needed. ** ** data_control structs contain an int called "active". This int is ** intended to be used for a specific kind of multithreaded design, ** where each thread checks the state of "active" every time it locks ** the mutex. If active is 0, the thread knows that instead of doing ** its normal routine, it should stop itself. If active is 1, it ** should continue as normal. So, by setting active to 0, a ** controlling thread can easily inform a thread work crew to shut ** down instead of processing new jobs. Use the control_activate() ** and control_deactivate() functions, which will also broadcast on ** the data_control struct's condition variable, so that all threads ** stuck in pthread_cond_wait() will wake up, have an opportunity to ** notice the change, and then terminate. */ #include "control.h" int control_init(data_control *mycontrol) { int mystatus; if (pthread_mutex_init(&(mycontrol->mutex),NULL)) return 1; if (pthread_cond_init(&(mycontrol->cond),NULL)) return 1; mycontrol->active=0; return 0; } int control_destroy(data_control *mycontrol) { int mystatus; if (pthread_cond_destroy(&(mycontrol->cond))) return 1; if (pthread_cond_destroy(&(mycontrol->cond))) return 1; mycontrol->active=0; return 0; } int control_activate(data_control *mycontrol) { int mystatus; if (pthread_mutex_lock(&(mycontrol->mutex))) return 0; mycontrol->active=1; pthread_mutex_unlock(&(mycontrol->mutex)); pthread_cond_broadcast(&(mycontrol->cond)); return 1; } int control_deactivate(data_control *mycontrol) { int mystatus; if (pthread_mutex_lock(&(mycontrol->mutex))) return 0; mycontrol->active=0; pthread_mutex_unlock(&(mycontrol->mutex)); pthread_cond_broadcast(&(mycontrol->cond)); return 1; } |
在開始調試以前,還須要一個文件。如下是 dbug.h:
#define dabort() \ { printf("Aborting at line %d in source file %s\n",__LINE__,__FILE__); abort(); } |
此代碼用於處理工做組代碼中的不可糾正錯誤。
說到工做組代碼,如下就是:
#include <stdio.h> #include <stdlib.h> #include "control.h" #include "queue.h" #include "dbug.h" /* the work_queue holds tasks for the various threads to complete. */ struct work_queue { data_control control; queue work; } wq; /* I added a job number to the work node. Normally, the work node would contain additional data that needed to be processed. */ typedef struct work_node { struct node *next; int jobnum; } wnode; /* the cleanup queue holds stopped threads. Before a thread terminates, it adds itself to this list. Since the main thread is waiting for changes in this list, it will then wake up and clean up the newly terminated thread. */ struct cleanup_queue { data_control control; queue cleanup; } cq; /* I added a thread number (for debugging/instructional purposes) and a thread id to the cleanup node. The cleanup node gets passed to the new thread on startup, and just before the thread stops, it attaches the cleanup node to the cleanup queue. The main thread monitors the cleanup queue and is the one that performs the necessary cleanup. */ typedef struct cleanup_node { struct node *next; int threadnum; pthread_t tid; } cnode; void *threadfunc(void *myarg) { wnode *mywork; cnode *mynode; mynode=(cnode *) myarg; pthread_mutex_lock(&wq.control.mutex); while (wq.control.active) { while (wq.work.head==NULL && wq.control.active) { pthread_cond_wait(&wq.control.cond, &wq.control.mutex); } if (!wq.control.active) break; //we got something! mywork=(wnode *) queue_get(&wq.work); pthread_mutex_unlock(&wq.control.mutex); //perform processing... printf("Thread number %d processing job %d\n",mynode->threadnum,mywork->jobnum); free(mywork); pthread_mutex_lock(&wq.control.mutex); } pthread_mutex_unlock(&wq.control.mutex); pthread_mutex_lock(&cq.control.mutex); queue_put(&cq.cleanup,(node *) mynode); pthread_mutex_unlock(&cq.control.mutex); pthread_cond_signal(&cq.control.cond); printf("thread %d shutting down...\n",mynode->threadnum); return NULL; } #define NUM_WORKERS 4 int numthreads; void join_threads(void) { cnode *curnode; printf("joining threads...\n"); while (numthreads) { pthread_mutex_lock(&cq.control.mutex); /* below, we sleep until there really is a new cleanup node. This takes care of any false wakeups... even if we break out of pthread_cond_wait(), we don't make any assumptions that the condition we were waiting for is true. */ while (cq.cleanup.head==NULL) { pthread_cond_wait(&cq.control.cond,&cq.control.mutex); } /* at this point, we hold the mutex and there is an item in the list that we need to process. First, we remove the node from the queue. Then, we call pthread_join() on the tid stored in the node. When pthread_join() returns, we have cleaned up after a thread. Only then do we free() the node, decrement the number of additional threads we need to wait for and repeat the entire process, if necessary */ curnode = (cnode *) queue_get(&cq.cleanup); pthread_mutex_unlock(&cq.control.mutex); pthread_join(curnode->tid,NULL); printf("joined with thread %d\n",curnode->threadnum); free(curnode); numthreads--; } } int create_threads(void) { int x; cnode *curnode; for (x=0; x<NUM_WORKERS; x++) { curnode=malloc(sizeof(cnode)); if (!curnode) return 1; curnode->threadnum=x; if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode)) return 1; printf("created thread %d\n",x); numthreads++; } return 0; } void initialize_structs(void) { numthreads=0; if (control_init(&wq.control)) dabort(); queue_init(&wq.work); if (control_init(&cq.control)) { control_destroy(&wq.control); dabort(); } queue_init(&wq.work); control_activate(&wq.control); } void cleanup_structs(void) { control_destroy(&cq.control); control_destroy(&wq.control); } int main(void) { int x; wnode *mywork; initialize_structs(); /* CREATION */ if (create_threads()) { printf("Error starting threads... cleaning up.\n"); join_threads(); dabort(); } pthread_mutex_lock(&wq.control.mutex); for (x=0; x<16000; x++) { mywork=malloc(sizeof(wnode)); if (!mywork) { printf("ouch! can't malloc!\n"); break; } mywork->jobnum=x; queue_put(&wq.work,(node *) mywork); } pthread_mutex_unlock(&wq.control.mutex); pthread_cond_broadcast(&wq.control.cond); printf("sleeping...\n"); sleep(2); printf("deactivating work queue...\n"); control_deactivate(&wq.control); /* CLEANUP */ join_threads(); cleanup_structs(); } |
如今來快速初排代碼。定義的第一個結構稱做 "wq",它包含了 data_control 和隊列頭。data_control 結構用於仲裁對整個隊列的訪問,包括隊列中的節點。下一步工做是定義實際的工做節點。要使代碼符合本文中的示例,此處所包含的都是做業號。
接着,建立清除隊列。註釋說明了它的工做方式。好,如今讓咱們跳過 threadfunc()、join_threads()、create_threads() 和 initialize_structs() 調用,直接跳到 main()。所作的第一件事就是初始化結構 -- 這包括初始化 data_controls 和隊列,以及激活工做隊列。
如今初始化線程。若是看一下 create_threads() 調用,彷佛一切正常 -- 除了一件事。請注意,咱們正在分配清除節點,以及初始化它的線程號和 TID 組件。咱們還將清除節點做爲初始自變量傳遞給每個新的工做程序線程。爲何這樣作?
由於當某個工做程序線程退出時,它會將其清除節點鏈接到清除隊列,而後終止。那時,主線程會在清除隊列中檢測到這個節點(利用條件變量),並將這個節點移出隊列。由於 TID(線程標識)存儲在清除節點中,因此主線程能夠確切知道哪一個線程已終止了。而後,主線程將調用 pthread_join(tid),並聯接適當的工做程序線程。若是沒有作記錄,那麼主線程就須要按任意順序聯接工做程序線程,多是按它們的建立順序。因爲線程不必定按此順序終止,那麼主線程可能會在已經聯接了十個線程時,等待聯接另外一個線程。您能理解這種設計決策是如何使關閉代碼加速的嗎(尤爲在使用幾百個工做程序線程的狀況下)?
咱們已啓動了工做程序線程(它們已經完成了執行 threadfunc(),稍後將討論此函數),如今主線程開始將工做節點插入工做隊列。首先,它鎖定 wq 的控制互斥對象,而後分配 16000 個工做包,將它們逐個插入隊列。完成以後,將調用 pthread_cond_broadcast(),因而全部正在睡眠的線程會被喚醒,並開始執行工做。此時,主線程將睡眠兩秒鐘,而後釋放工做隊列,並通知工做程序線程終止活動。接着,主線程會調用 join_threads() 函數來清除全部工做程序線程。
如今來討論 threadfunc(),這是全部工做程序線程都要執行的代碼。當工做程序線程啓動時,它會當即鎖定工做隊列互斥對象,獲取一個工做節點(若是有的話),而後對它進行處理。若是沒有工做,則調用 pthread_cond_wait()。您會注意到這個調用在一個很是緊湊的 while() 循環中,這是很是重要的。當從 pthread_cond_wait() 調用中甦醒時,決不能認爲條件確定發生了 -- 它 可能發生了,也可能沒有發生。若是發生了這種狀況,即錯誤地喚醒了線程,而列表是空的,那麼 while 循環將再次調用 pthread_cond_wait()。
若是有一個工做節點,那麼咱們只打印它的做業號,釋放它並退出。然而,實際代碼會執行一些更實質性的操做。在 while() 循環結尾,咱們鎖定了互斥對象,以便檢查 active 變量,以及在循環頂部檢查新的工做節點。若是執行完此代碼,就會發現若是 wq.control.active 是 0,while 循環就會終止,並會執行 threadfunc() 結尾處的清除代碼。
工做程序線程的清除代碼部件很是有趣。首先,因爲 pthread_cond_wait() 返回了鎖定的互斥對象,它會對 work_queue 解鎖。而後,它鎖定清除隊列,添加清除代碼(包含了 TID,主線程將使用此 TID 來調用 pthread_join()),而後再對清除隊列解鎖。此後,它發信號給全部 cq 等待者 (pthread_cond_signal(&cq.control.cond)),因而主線程就知道有一個待處理的新節點。咱們不使用 pthread_cond_broadcast(),由於沒有這個必要 -- 只有一個線程(主線程)在等待清除隊列中的新節點。當它調用 join_threads() 時,工做程序線程將打印關閉消息,而後終止,等待主線程發出的 pthread_join() 調用。
若是要查看關於如何使用條件變量的簡單示例,請參考 join_threads() 函數。若是還有工做程序線程,join_threads() 會一直執行,等待清除隊列中新的清除節點。若是有新節點,咱們會將此節點移出隊列、對清除隊列解鎖(從而使工做程序能夠添加清除節點)、聯接新的工做程序線程(使用存儲在清除節點中的 TID)、釋放清除節點、減小「現有」線程的數量,而後繼續。
如今已經到了「POSIX 線程詳解」系列的尾聲,但願您已經準備好開始將多線程代碼添加到您本身的應用程序中。有關詳細信息,請參閱 參考資料部分,這部份內容還包含了本文中使用的全部源碼的 tar 文件。下一個系列中再見!
Daniel Robbins 居住在新墨西哥州的 Albuquerque。他是 Gentoo Technologies, Inc. 的總裁兼 CEO,Gentoo 項目的總設計師,MacMillan 出版書籍的撰稿做者,他的著做有: Caldera OpenLinux Unleashed, SuSE Linux Unleashed, 和 Samba Unleashed。Daniel 自二年級起就與計算機某些領域結下不解之緣,那時他首先接觸的是 Logo 程序語言,並沉溺於 Pac-Man 遊戲中。這也許就是他至今仍擔任 SONY Electronic Publishing/Psygnosis 的首席圖形設計師的緣由所在。Daniel 喜歡與妻子 Mary 和新出生的女兒 Hadassah 一塊兒共度時光。可經過 drobbins@gentoo.org與 Daniel 聯繫。
原文地址:http://www.ibm.com/developerworks/cn/linux/thread/posix_thread3/index.html#ibm-pcon