以前咱們介紹了鎖,然而鎖並非併發程序設計中所需的惟一原語。在不少狀況下,線程須要檢查某一條件(condition)知足以後,纔會繼續運行。例如,父線程須要檢查子線程是否執行完畢。這種等待如何實現呢?安全
注:併發程序有兩大需求,一是互斥,二是等待。互斥是由於線程間存在共享數據,等待則是由於線程間存在依賴。服務器
咱們能夠嘗試用一個共享變量,如圖所示。這種解決方案通常能工做,可是效率低下,由於主線程會自旋檢查,浪費CPU時間。咱們但願有某種方式讓父線程休眠,直到等待的條件知足(即子線程完成執行)。網絡
1 volatile int done = 0; 2 3 void *child(void *arg) { 4 printf("child\n"); 5 done = 1; 6 return NULL; 7 } 8 9 int main(int argc, char *argv[]) { 10 printf("parent: begin\n"); 11 pthread_t c; 12 Pthread_create(&c, NULL, child, NULL); // create child 13 while (done == 0) 14 ; // spin 15 printf("parent: end\n"); 16 return 0; 17 }
線程可使用條件變量(condition variable),來等待一個條件變成真。條件變量是一個顯式隊列,當某些執行狀態(即條件,condition)不知足時,線程能夠把本身加入隊列,等待該條件。當其餘線程改變了上述狀態時,就能夠經過在該條件上發送信號喚醒隊列中的等待線程,讓它們繼續執行。數據結構
在POSIX庫中,要聲明一個條件變量,只要像這樣寫:pthread_cond_t c(注意:還須要適當的初始化)。條件變量有兩種相關操做:wait()和signal()。線程要睡眠的時候,調用wait();當線程想喚醒等待在某個條件變量上的睡眠線程時,調用signal()。下面是一個典型示例:多線程
1 int done = 0; 2 pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; 3 pthread_cond_t c = PTHREAD_COND_INITIALIZER; 4 5 void thr_exit() { 6 Pthread_mutex_lock(&m); 7 done = 1; 8 Pthread_cond_signal(&c); 9 Pthread_mutex_unlock(&m); 10 } 11 12 void *child(void *arg) { 13 printf("child\n"); 14 thr_exit(); 15 return NULL; 16 } 17 18 void thr_join() { 19 Pthread_mutex_lock(&m); 20 while (done == 0) 21 Pthread_cond_wait(&c, &m); 22 Pthread_mutex_unlock(&m); 23 } 24 25 int main(int argc, char *argv[]) { 26 printf("parent: begin\n"); 27 pthread_t p; 28 Pthread_create(&p, NULL, child, NULL); 29 thr_join(); 30 printf("parent: end\n"); 31 return 0; 32 }
wait()調用除了條件變量外還有一個參數,它是一個互斥鎖。它假定在wait()調用時,這個互斥鎖是已上鎖狀態。wait()的職責是原子地釋放鎖,並讓調用線程休眠。當線程被喚醒時,它必須從新獲取鎖,再返回調用者。這樣複雜的步驟也是爲了不在線程陷入休眠時,產生一些競態條件。併發
有兩種狀況須要考慮。第一種狀況是父線程建立出子線程,但本身繼續運行,而後立刻調用thr_join()等待子線程。在這種狀況下,它會先獲取鎖,檢查子線程是否完成,而後調用wait(),讓本身休眠。子線程最終得以運行,打印出「child」,並調用thr_exit()函數喚醒父線程,這段代碼會在得到鎖後設置狀態變量done,而後向父線程發信號喚醒它。最後,父線程會運行(從wait()調用返回並持有鎖),釋放鎖,打印出「parent:end」。函數
第二種狀況是,子線程在建立後,馬上運行,設置變量done爲1,調用signal函數喚醒其餘線程(這裏沒有其餘線程),而後結束。父線程運行後,調用thr_join()時,發現done已是1了,就直接返回。高併發
須要注意的是,在上面的代碼中,狀態變量done和互斥鎖c都是必需的。假如咱們不使用狀態變量,代碼像下面這樣,會出現什麼問題?oop
1 void thr_exit() { 2 Pthread_mutex_lock(&m); 3 Pthread_cond_signal(&c); 4 Pthread_mutex_unlock(&m); 5 } 6 7 void thr_join() { 8 Pthread_mutex_lock(&m); 9 Pthread_cond_wait(&c, &m); 10 Pthread_mutex_unlock(&m); 11 }
假設子線程馬上運行,而且調用thr_exit()。在這種狀況下,子線程發送信號,但此時卻沒有在條件變量上睡眠等待的線程。父線程運行時,就會調用wait並卡在那裏,沒有其餘線程會喚醒它。經過這個例子,你應該認識到變量done的重要性,它記錄了線程感興趣的值。睡眠、喚醒和鎖都離不開它。post
在下面的例子中,咱們假設線程在發信號和等待時都不加鎖。又會發生什麼問題?
1 void thr_exit() { 2 done = 1; 3 Pthread_cond_signal(&c); 4 } 5 6 void thr_join() { 7 if (done == 0) 8 Pthread_cond_wait(&c); 9 }
這裏的問題是一個微妙的競態條件。具體來講,若是父進程調用thr_join(),檢查完done的值爲0,而後試圖睡眠。但在調用wait進入睡眠以前,父進程被中斷。隨後子線程修改變量done爲1,發出信號,此時一樣沒有等待線程。當父線程再次運行時,就會長眠不醒。
因此,咱們能夠堅持這樣一條原則:在使用條件變量時,調用signal和wait時要持有鎖。
假設有一個或多個生產者線程和一個或多個消費者線程。生產者把生成的數據項放入緩衝區,消費者從緩衝區取走數據項,以某種方式消費。不少實際的系統中都會有這種場景。例如,在多線程的網絡服務器中,一個生產者將HTTP請求放入工做隊列,消費線程從隊列中取走請求並處理。
由於有界緩衝區是共享資源,因此咱們必須經過同步機制來訪問它,以避免產生競態條件。爲了更好地理解這個問題,咱們來嘗試一些實際的代碼。
首先須要一個共享緩衝區,讓生產者放入數據,消費者取出數據。簡單起見,咱們就拿一個整數來作緩衝區,兩個內部函數將值放入緩衝區,從緩衝區取值。
1 int buffer; 2 int count = 0; // initially, empty 3 4 void put(int value) { 5 assert(count == 0); 6 count = 1; 7 buffer = value; 8 } 9 10 int get() { 11 assert(count == 1); 12 count = 0; 13 return buffer; 14 }
put()函數會假設緩衝區是空的,把一個值存在緩衝區,而後把count設置爲1表示緩衝區滿了。get()函數恰好相反,把緩衝區清空後,並返回該值。
如今咱們須要編寫一些函數,用於生產和消費數據。調用生產函數的咱們稱之爲生產者(producer)線程,調用消費函數的咱們稱之爲消費者(consumer)線程。下面展現了一對非線程安全的生產者和消費者的代碼,生產者將一個整數放入共享緩衝區loops次,消費者持續從該共享緩衝區中獲取數據,並打印出數據項。咱們的目標就是使用條件變量將其改形成線程安全的版本。
1 void *producer(void *arg) { 2 int i; 3 int loops = (int) arg; 4 for (i = 0; i < loops; i++) { 5 put(i); 6 } 7 } 8 9 void *consumer(void *arg) { 10 int i; 11 while (1) { 12 int tmp = get(); 13 printf("%d\n", tmp); 14 } 15 }
顯然,put()和get()函數之中會有臨界區,由於put()更新緩衝區,get()讀取緩衝區。咱們的首次嘗試以下:
1 cond_t cond; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); // p1 8 if (count == 1) // p2 9 Pthread_cond_wait(&cond, &mutex); // p3 10 put(i); // p4 11 Pthread_cond_signal(&cond); // p5 12 Pthread_mutex_unlock(&mutex); // p6 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); // c1 20 if (count == 0) // c2 21 Pthread_cond_wait(&cond, &mutex); // c3 22 int tmp = get(); // c4 23 Pthread_cond_signal(&cond); // c5 24 Pthread_mutex_unlock(&mutex); // c6 25 printf("%d\n", tmp); 26 } 27 }
當生產者想要填充緩衝區時,它等待緩衝區變空(p1~p3)。消費者具備徹底相同的邏輯,但等待不一樣的條件——變滿(c1~c3)。
當只有一個生產者和一個消費者時,上圖的代碼可以正常運行。但若是有超過一個線程,這個方案會有兩個嚴重的問題。
先來看第一個問題,它與等待以前的if語句有關。假設有兩個消費者(Tc1和Tc2),一個生產者(Tp)。首先,一個消費者(Tc1)先開始執行,它得到鎖(c1),檢查緩衝區是否能夠消費(c2),而後等待(c3)。
接着生產者(Tp)運行。它獲取鎖(p1),檢查緩衝區是否滿(p2),發現沒滿就給緩衝區加入一個數字(p4)。而後生產者發出信號,說緩衝區已滿(p5)。關鍵的是,這讓第一個消費者(Tc1)再也不睡在條件變量上,進入就緒隊列。生產者繼續執行,直到發現緩衝區滿後睡眠(p6,p1-p3)。
這時問題發生了:另外一個消費者(Tc2)搶先執行,消費了緩衝區中的值。如今假設Tc1運行,在從wait返回以前,它獲取了鎖,而後返回。而後它調用了get() (p4),但緩衝區已沒法消費。斷言觸發,代碼不能像預期那樣工做。
問題產生的緣由很簡單:在Tc1被生產者喚醒後,但在它運行以前,因爲Tc2搶先運行,緩衝區的狀態改變了。發信號給線程只是喚醒它們,暗示狀態發生了變化,但並不會保證在它運行以前狀態一直是指望的狀況。
修復這個問題很簡單:把if語句改成while。當消費者Tc1被喚醒後,馬上再次檢查共享變量(c2)。若是緩衝區此時爲空,消費者就會回去繼續睡眠(c3)。生產者中相應的if也改成while(p2)。
1 cond_t cond; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); // p1 8 while (count == 1) // p2 9 Pthread_cond_wait(&cond, &mutex); // p3 10 put(i); // p4 11 Pthread_cond_signal(&cond); // p5 12 Pthread_mutex_unlock(&mutex); // p6 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); // c1 20 while (count == 0) // c2 21 Pthread_cond_wait(&cond, &mutex); // c3 22 int tmp = get(); // c4 23 Pthread_cond_signal(&cond); // c5 24 Pthread_mutex_unlock(&mutex); // c6 25 printf("%d\n", tmp); 26 } 27 }
咱們要記住一條關於條件變量的簡單規則:老是使用while循環。
可是,這段代碼仍然有一個問題,也是上文提到的兩個問題之一,它和咱們只用了一個條件變量有關。
假設兩個消費者(Tc1和Tc2)先運行,都睡眠了(c3)。生產者開始運行,在緩衝區放入一個值,喚醒了一個消費者(假定是Tc1),並開始睡眠。如今是一個消費者立刻要運行(Tc1),兩個線程(Tc2和Tp)都等待在同一個條件變量上。
消費者Tc1醒過來並從wait()調用返回(c3),從新檢查條件(c2),發現緩衝區是滿的,消費了這個值(c4)。這個消費者而後在該條件上發信號(c5),喚醒一個在睡眠的線程。可是,應該喚醒哪一個線程呢?
由於消費者已經清空了緩衝區,很顯然,應該喚醒生產者。可是,若是它喚醒了Tc2,問題就出現了。消費者Tc2會醒過來,發現隊列爲空(c2),又繼續回去睡眠(c3)。生產者Tp剛纔在緩衝區中放了一個值,如今在睡眠。消費者Tc1繼續執行後也回去睡眠了。3個線程都在睡眠,顯然是一個大問題。
咱們能夠看出:信號顯然須要,但必須更有指向性。消費者不該該喚醒消費者,而應該只喚醒生產者,反之亦然。
這個問題的解決方案也很簡單:使用兩個而不是一個條件變量,以便在系統狀態改變時,能正確地發出信號喚醒哪類線程。下面展現了最終的代碼。
1 cond_t empty, fill; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); 8 while (count == 1) 9 Pthread_cond_wait(&empty, &mutex); 10 put(i); 11 Pthread_cond_signal(&fill); 12 Pthread_mutex_unlock(&mutex); 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); 20 while (count == 0) 21 Pthread_cond_wait(&fill, &mutex); 22 int tmp = get(); 23 Pthread_cond_signal(&empty); 24 Pthread_mutex_unlock(&mutex); 25 printf("%d\n", tmp); 26 } 27 }
咱們如今有了可用的生產者/消費者方案,但不太通用,咱們最後所作的修改是爲了提升併發和效率。具體來講就是增長更多緩衝區槽位,這樣在睡眠以前,生產者能夠生產多個值;一樣,消費者在睡眠以前能夠消費多個值。
單個生產者和消費者時,這種方案由於上下文切換少,提升了效率。多個生產者和消費者時,它能夠支持併發生產和消費。和現有方案相比,改動也很小。
第一處修改是緩衝區結構自己,以及對應的put()和get()方法:
1 int buffer[MAX]; 2 int fill = 0; 3 int use = 0; 4 int count = 0; 5 6 void put(int value) { 7 buffer[fill] = value; 8 fill = (fill + 1) % MAX; 9 count++; 10 } 11 12 int get() { 13 int tmp = buffer[use]; 14 use = (use + 1) % MAX; 15 count--; 16 return tmp; 17 }
下面展現了最終的代碼邏輯。至此,咱們解決了生產者/消費者問題。
1 cond_t empty, fill; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); // p1 8 while (count == MAX) // p2 9 Pthread_cond_wait(&empty, &mutex); // p3 10 put(i); // p4 11 Pthread_cond_signal(&fill); // p5 12 Pthread_mutex_unlock(&mutex); // p6 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); // c1 20 while (count == 0) // c2 21 Pthread_cond_wait(&fill, &mutex); // c3 22 int tmp = get(); // c4 23 Pthread_cond_signal(&empty); // c5 24 Pthread_mutex_unlock(&mutex); // c6 25 printf("%d\n", tmp); 26 } 27 }
如今再來看條件變量的一個例子。這段代碼是一個簡單的多線程內存分配庫中的問題片斷:
1 // how many bytes of the heap are free? 2 int bytesLeft = MAX_HEAP_SIZE; 3 4 // need lock and condition too 5 cond_t c; 6 mutex_t m; 7 8 void *allocate(int size) { 9 Pthread_mutex_lock(&m); 10 while (bytesLeft < size) 11 Pthread_cond_wait(&c, &m); 12 void *ptr = ...; // get mem from heap 13 bytesLeft -= size; 14 Pthread_mutex_unlock(&m); 15 return ptr; 16 } 17 18 void free(void *ptr, int size) { 19 Pthread_mutex_lock(&m); 20 bytesLeft += size; 21 Pthread_cond_signal(&c); // whom to signal?? 22 Pthread_mutex_unlock(&m); 23 }
從代碼中能夠看出,當線程調用進入內存分配代碼時,它可能會由於內存不足而等待。相應的,線程釋放內存時,會發信號說有更多內存空閒。可是,代碼中有一個問題:應該喚醒哪一個等待線程(可能有多個線程)?
解決方案也很直接:用pthread_cond_broadcast()代替上述代碼中的pthread_cond_signal(),喚醒全部的等待線程。這樣作,確保了全部應該喚醒的線程都被喚醒。固然,不利的一面是可能會影響性能,由於沒必要要地喚醒了其餘許多不應被喚醒的線程。這些線程被喚醒後,從新檢查條件,立刻再次睡眠。
這種條件變量叫做覆蓋條件(covering condition),由於它能覆蓋全部須要喚醒線程的場景(保守策略)。通常來講,若是你發現程序只有改爲廣播信號時才能工做,多是程序有缺陷。但在某些情景下,就像上述內存分配的例子中,廣播多是最直接有效的方案。
信號量是Dijkstra及其同事發明的,做爲與同步有關的全部工做的惟一原語,可使用信號量做爲鎖和條件變量。
信號量是有一個整數值的對象,能夠用兩個函數來操做它。在POSIX標準中,是sem_wait()和sem_post()。由於信號量的初始值可以決定其行爲,因此首先要初始化信號量,才能調用其餘函數與之交互。
#include <semaphore.h> sem_t s; sem_init(&s, 0, 1);
其中申明瞭一個信號量s,經過第三個參數,將它的值初始化爲1。sem_init()的第二個參數,在咱們的全部例子中都被設置爲0,表示信號量是在同一進程的多個線程共享的。信號量初始化以後,咱們能夠調用sem_wait()或sem_post()與之交互。
sem_wait()對信號量的值進行原子減一操做,當信號量的值大於等於1時馬上返回,不然會將調用線程放入信號量關聯的隊列中等待被喚醒。sem_post()對信號量的值進行原子加一操做,它不用等待某些條件知足,直接增長信號量的值,若是有等待線程,就喚醒其中一個。當信號量的值爲負數時,這個值就是等待線程的個數。
信號量的第一種用法是咱們已經熟悉的:用信號量做爲鎖。在下面的代碼片斷裏,咱們直接把臨界區用一對sem_wait()/sem_post()環繞。爲了使這段代碼正常工做,信號量m的初始值X是相當重要的。X應該是多少呢?
sem_t m; sem_init(&m, 0, X); // initialize semaphore to X; what should X be? sem_wait(&m); // critical section here sem_post(&m);
回顧sem_wait()和sem_post()函數的定義,咱們發現初值應該是1。
咱們假設有兩個線程的場景。第一個線程(線程1)調用了sem_wait(),它把信號量的值減爲0。由於值是0,線程1從函數返回並進入臨界區。若是沒有其餘線程嘗試獲取鎖,當它調用sem_post()時,會將信號量重置爲1(由於沒有等待線程,不會喚醒其餘線程)。
若是線程1持有鎖,另外一個線程(線程2)調用sem_wait()嘗試進入臨界區。這種狀況下,線程2把信號量減爲−1,而後等待。線程1再次運行,它最終調用sem_post(),將信號量的值增長到0,喚醒等待的線程,而後線程2就能夠獲取鎖。線程2執行結束時,再次增長信號量的值,將它恢復爲1。
由於鎖只有兩個狀態(持有和沒持有),因此這種用法有時也叫做二值信號量(binary semaphore)。
下面是一個簡單的例子。假設一個線程建立另外一個線程,而且等待它結束,那麼信號量的初始值X應該是多少?
1 sem_t s; 2 3 void * 4 child(void *arg) { 5 printf("child\n"); 6 sem_post(&s); // signal here: child is done 7 return NULL; 8 } 9 10 int 11 main(int argc, char *argv[]) { 12 sem_init(&s, 0, X); // what should X be? 13 printf("parent: begin\n"); 14 pthread_t c; 15 Pthread_create(c, NULL, child, NULL); 16 sem_wait(&s); // wait here for child 17 printf("parent: end\n"); 18 return 0; 19 }
有兩種狀況須要考慮。第一種,父線程建立了子線程,可是子線程並無運行。這種狀況下,父線程調用sem_wait()會先於子線程調用sem_post()。咱們但願父線程等待子線程運行,惟一的辦法是讓信號量的值不大於0。所以,初值值爲0。父線程運行,將信號量減爲−1,而後睡眠等待;子線程運行的時候,調用sem_post(),信號量增長爲0,喚醒父線程,父線程而後從sem_wait()返回,完成該程序。
第二種狀況是子線程在父線程調用sem_wait()以前就運行結束。在這種狀況下,子線程會先調用sem_post(),將信號量從0增長到1。而後當父線程有機會運行時,會調用sem_wait(),發現信號量的值爲1。因而父線程將信號量從1減爲0,沒有等待,直接從sem_wait()返回,也達到了預期效果。
在這裏,咱們討論如何使用信號量來解決上面提到的生產者/消費者,也即有界緩衝區問題。封裝的put()和get()函數以下:
1 int buffer[MAX]; 2 int fill = 0; 3 int use = 0; 4 5 void put(int value) { 6 buffer[fill] = value; // line f1 7 fill = (fill + 1) % MAX; // line f2 8 } 9 10 int get() { 11 int tmp = buffer[use]; // line g1 12 use = (use + 1) % MAX; // line g2 13 return tmp; 14 }
咱們用兩個信號量empty和full分別表示緩衝區空或者滿,下面是咱們嘗試解決生產者/消費者問題的代碼。
1 sem_t empty; 2 sem_t full; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 sem_wait(&empty); // line P1 8 put(i); // line P2 9 sem_post(&full); // line P3 10 } 11 } 12 13 void *consumer(void *arg) { 14 int i, tmp = 0; 15 while (tmp != -1) { 16 sem_wait(&full); // line C1 17 tmp = get(); // line C2 18 sem_post(&empty); // line C3 19 printf("%d\n", tmp); 20 } 21 } 22 23 int main(int argc, char *argv[]) { 24 // ... 25 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... 26 sem_init(&full, 0, 0); // ... and 0 are full 27 // ... 28 }
咱們先假設MAX=1,驗證程序是否有效。假設有兩個線程,一個生產者和一個消費者。咱們來看在一個CPU上的具體場景。消費者先運行,執行到C1行,調用sem_wait(&full)。由於full初始值爲0,wait調用會將full減爲−1,致使消費者睡眠,等待另外一個線程調用sem_post(&full),符合預期。
假設生產者而後運行。執行到P1行,調用sem_wait(&empty)。生產者將繼續執行,由於empty被初始化爲MAX(在這裏是1)。所以,empty被減爲0,生產者向緩衝區中加入數據,而後執行P3行,調用sem_post(&full),把full從−1變成0,喚醒消費者。
在這種狀況下,可能會有兩種狀況。若是生產者繼續執行,再次循環到P1行,因爲empty值爲0,它會阻塞。若是生產者被中斷,而消費者開始執行,調用sem_wait(&full),發現緩衝區確實滿了,消費它。這兩種狀況都是符合預期的。
能夠繼續推導,在MAX=1時,即使有多個生產者和消費者的狀況下,本示例代碼仍然正常運行。
咱們如今假設MAX大於1,同時假定有多個生產者,多個消費者。那麼就有問題了:競態條件。假設兩個生產者(Pa和Pb)幾乎同時調用put()。當Pa先運行,在f1行先加入第一條數據(fill=0),假設Pa在將fill計數器更新爲1以前被中斷,Pb開始運行,也在f1行給緩衝區的0位置加入一條數據,這意味着那裏的數據被覆蓋,這也就意味着生產者的數據丟失。
能夠看到,向緩衝區加入元素和增長緩衝區的索引是臨界區,須要當心保護起來。因此,咱們使用二值信號量做爲鎖來進行互斥。下面是對應的代碼。
1 sem_t empty; 2 sem_t full; 3 sem_t mutex; 4 5 void *producer(void *arg) { 6 int i; 7 for (i = 0; i < loops; i++) { 8 sem_wait(&mutex); // line p0 (NEW LINE) 9 sem_wait(&empty); // line p1 10 put(i); // line p2 11 sem_post(&full); // line p3 12 sem_post(&mutex); // line p4 (NEW LINE) 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 sem_wait(&mutex); // line c0 (NEW LINE) 20 sem_wait(&full); // line c1 21 int tmp = get(); // line c2 22 sem_post(&empty); // line c3 23 sem_post(&mutex); // line c4 (NEW LINE) 24 printf("%d\n", tmp); 25 } 26 } 27 28 int main(int argc, char *argv[]) { 29 // ... 30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... 31 sem_init(&full, 0, 0); // ... and 0 are full 32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock (NEW LINE) 33 // ... 34 }
如今咱們給整個put()/get()部分都增長了鎖,就是註釋中有NEW LINE的幾行。這彷佛是正確的思路,但仍然有問題——死鎖。
假設有兩個線程,一個生產者和一個消費者。消費者首先運行,得到鎖,而後對full信號量執行sem_wait()。由於尚未數據,因此消費者阻塞,讓出CPU。可是,問題來了,此時消費者仍然持有鎖。而後生產者運行,它首先對二值互斥信號量調用sem_wait()。鎖已經被消費者持有,所以生產者也被卡住。
這裏出現了一個循環等待。消費者持有互斥量,等待在full信號量上。生產者能夠發送full信號,卻在等待互斥量。所以,生產者和消費者互相等待對方——典型的死鎖。
要解決這個問題,只需減小鎖的做用域,下面是最終的可行方案。能夠看到,咱們把獲取和釋放互斥量的操做調整爲緊挨着臨界區,把full、empty的喚醒和等待操做調整到鎖外面。就獲得了簡單而有效的有界緩衝區,多線程程序的經常使用模式。
1 sem_t empty; 2 sem_t full; 3 sem_t mutex; 4 5 void *producer(void *arg) { 6 int i; 7 for (i = 0; i < loops; i++) { 8 sem_wait(&empty); // line p1 9 sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...) 10 put(i); // line p2 11 sem_post(&mutex); // line p2.5 (... AND HERE) 12 sem_post(&full); // line p3 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 sem_wait(&full); // line c1 20 sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...) 21 int tmp = get(); // line c2 22 sem_post(&mutex); // line c2.5 (... AND HERE) 23 sem_post(&empty); // line c3 24 printf("%d\n", tmp); 25 } 26 } 27 28 int main(int argc, char *argv[]) { 29 // ... 30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... 31 sem_init(&full, 0, 0); // ... and 0 are full 32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock 33 // ... 34 }
另外一個經典問題源於對更加靈活的鎖定原語的渴望,它認可不一樣的數據結構訪問可能須要不一樣類型的鎖。例如,一個併發鏈表有不少插入和查找操做。插入操做會修改鏈表的狀態,而查找操做只是讀取該結構,只要沒有進行插入操做,咱們能夠併發的執行多個查找操做。讀者—寫者鎖(reader-writer lock)就是用來完成這種操做的。下面是這種鎖的代碼。
1 typedef struct _rwlock_t { 2 sem_t lock; // binary semaphore (basic lock) 3 sem_t writelock; // used to allow ONE writer or MANY readers 4 int readers; // count of readers reading in critical section 5 } rwlock_t; 6 7 void rwlock_init(rwlock_t *rw) { 8 rw->readers = 0; 9 sem_init(&rw->lock, 0, 1); 10 sem_init(&rw->writelock, 0, 1); 11 } 12 13 void rwlock_acquire_readlock(rwlock_t *rw) { 14 sem_wait(&rw->lock); 15 rw->readers++; 16 if (rw->readers == 1) 17 sem_wait(&rw->writelock); // first reader acquires writelock 18 sem_post(&rw->lock); 19 } 20 21 void rwlock_release_readlock(rwlock_t *rw) { 22 sem_wait(&rw->lock); 23 rw->readers--; 24 if (rw->readers == 0) 25 sem_post(&rw->writelock); // last reader releases writelock 26 sem_post(&rw->lock); 27 } 28 29 void rwlock_acquire_writelock(rwlock_t *rw) { 30 sem_wait(&rw->writelock); 31 } 32 33 void rwlock_release_writelock(rwlock_t *rw) { 34 sem_post(&rw->writelock); 35 }
若是某個線程要更新數據結構,須要調用rwlock_acquire_writelock()得到寫鎖,調用rwlock_release_writelock()釋放寫鎖。內部經過一個writelock的信號量保證只有一個寫者能得到鎖進入臨界區,從而更新數據結構。
獲取讀鎖時,讀者首先要獲取lock,而後增長reader變量,追蹤目前有多少個讀者在訪問該數據結構。當第一個讀者獲取讀鎖時,同時也會獲取寫鎖,即在writelock信號量上調用sem_wait(),最後調用sem_post()釋放lock。
一旦一個讀者得到了讀鎖,其餘的讀者也能夠獲取這個讀鎖。可是,想要獲取寫鎖的線程,就必須等到全部的讀者都結束。最後一個退出的讀者在writelock信號量上調用sem_post(),從而讓等待的寫者可以獲取該鎖。
這一方案可行,但有一些缺陷,尤爲是公平性,讀者很容易餓死寫者。存在複雜一些的解決方案,好比有寫者等待時,避免更多的讀者進入並持有鎖。最後,讀者-寫者鎖一般加入了更多鎖操做,所以和其餘一些簡單快速的鎖相比,讀者—寫者鎖在性能方面沒有優點。
最後,咱們用底層的同步原語鎖和條件變量,來實現本身的信號量,名字叫做Zemaphore。
1 typedef struct _Zem_t { 2 int value; 3 pthread_cond_t cond; 4 pthread_mutex_t lock; 5 } Zem_t; 6 7 // only one thread can call this 8 void Zem_init(Zem_t *s, int value) { 9 s->value = value; 10 Cond_init(&s->cond); 11 Mutex_init(&s->lock); 12 } 13 14 void Zem_wait(Zem_t *s) { 15 Mutex_lock(&s->lock); 16 while (s->value <= 0) 17 Cond_wait(&s->cond, &s->lock); 18 s->value--; 19 Mutex_unlock(&s->lock); 20 } 21 22 void Zem_post(Zem_t *s) { 23 Mutex_lock(&s->lock); 24 s->value++; 25 Cond_signal(&s->cond); 26 Mutex_unlock(&s->lock); 27 }
咱們實現的信號量和Dijkstra定義的信號量有一點細微區別,就是咱們沒有保持當信號量的值爲負數時,讓它反映出等待的線程數。事實上,該值永遠不會小於0。這一行爲更容易實現,並符合現有的Linux實現。