【操做系統—併發】條件變量與信號量

條件變量

以前咱們介紹了鎖,然而鎖並非併發程序設計中所需的惟一原語。在不少狀況下,線程須要檢查某一條件(condition)知足以後,纔會繼續運行。例如,父線程須要檢查子線程是否執行完畢。這種等待如何實現呢?安全

注:併發程序有兩大需求,一是互斥,二是等待。互斥是由於線程間存在共享數據,等待則是由於線程間存在依賴。服務器

咱們能夠嘗試用一個共享變量,如圖所示。這種解決方案通常能工做,可是效率低下,由於主線程會自旋檢查,浪費CPU時間。咱們但願有某種方式讓父線程休眠,直到等待的條件知足(即子線程完成執行)。網絡

1    volatile int done = 0;
2
3    void *child(void *arg) {
4        printf("child\n");
5        done = 1;
6        return NULL;
7    }
8
9    int main(int argc, char *argv[]) {
10       printf("parent: begin\n");
11       pthread_t c;
12       Pthread_create(&c, NULL, child, NULL); // create child
13       while (done == 0)
14           ; // spin
15       printf("parent: end\n");
16       return 0;
17   }

定義和使用

線程可使用條件變量(condition variable),來等待一個條件變成真。條件變量是一個顯式隊列,當某些執行狀態(即條件,condition)不知足時,線程能夠把本身加入隊列,等待該條件。當其餘線程改變了上述狀態時,就能夠經過在該條件上發送信號喚醒隊列中的等待線程,讓它們繼續執行。數據結構

在POSIX庫中,要聲明一個條件變量,只要像這樣寫:pthread_cond_t c(注意:還須要適當的初始化)。條件變量有兩種相關操做:wait()和signal()。線程要睡眠的時候,調用wait();當線程想喚醒等待在某個條件變量上的睡眠線程時,調用signal()。下面是一個典型示例:多線程

1    int done = 0;
2    pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
3    pthread_cond_t c = PTHREAD_COND_INITIALIZER;
4
5    void thr_exit() {
6        Pthread_mutex_lock(&m);
7        done = 1;
8        Pthread_cond_signal(&c);
9        Pthread_mutex_unlock(&m);
10    }
11
12    void *child(void *arg) {
13        printf("child\n");
14        thr_exit();
15        return NULL;
16   }
17
18   void thr_join() {
19       Pthread_mutex_lock(&m);
20       while (done == 0)
21           Pthread_cond_wait(&c, &m);
22       Pthread_mutex_unlock(&m);
23   }
24
25   int main(int argc, char *argv[]) {
26       printf("parent: begin\n");
27       pthread_t p;
28       Pthread_create(&p, NULL, child, NULL);
29       thr_join();
30       printf("parent: end\n");
31       return 0;
32   }

wait()調用除了條件變量外還有一個參數,它是一個互斥鎖。它假定在wait()調用時,這個互斥鎖是已上鎖狀態。wait()的職責是原子地釋放鎖,並讓調用線程休眠。當線程被喚醒時,它必須從新獲取鎖,再返回調用者。這樣複雜的步驟也是爲了不在線程陷入休眠時,產生一些競態條件。併發

有兩種狀況須要考慮。第一種狀況是父線程建立出子線程,但本身繼續運行,而後立刻調用thr_join()等待子線程。在這種狀況下,它會先獲取鎖,檢查子線程是否完成,而後調用wait(),讓本身休眠。子線程最終得以運行,打印出「child」,並調用thr_exit()函數喚醒父線程,這段代碼會在得到鎖後設置狀態變量done,而後向父線程發信號喚醒它。最後,父線程會運行(從wait()調用返回並持有鎖),釋放鎖,打印出「parent:end」。函數

第二種狀況是,子線程在建立後,馬上運行,設置變量done爲1,調用signal函數喚醒其餘線程(這裏沒有其餘線程),而後結束。父線程運行後,調用thr_join()時,發現done已是1了,就直接返回。高併發

須要注意的是,在上面的代碼中,狀態變量done和互斥鎖c都是必需的。假如咱們不使用狀態變量,代碼像下面這樣,會出現什麼問題?oop

1    void thr_exit() {
2        Pthread_mutex_lock(&m);
3        Pthread_cond_signal(&c);
4        Pthread_mutex_unlock(&m);
5    }
6
7    void thr_join() {
8        Pthread_mutex_lock(&m);
9        Pthread_cond_wait(&c, &m);
10       Pthread_mutex_unlock(&m);
11   }

假設子線程馬上運行,而且調用thr_exit()。在這種狀況下,子線程發送信號,但此時卻沒有在條件變量上睡眠等待的線程。父線程運行時,就會調用wait並卡在那裏,沒有其餘線程會喚醒它。經過這個例子,你應該認識到變量done的重要性,它記錄了線程感興趣的值。睡眠、喚醒和鎖都離不開它。post

在下面的例子中,咱們假設線程在發信號和等待時都不加鎖。又會發生什麼問題?

1    void thr_exit() {
2        done = 1;
3        Pthread_cond_signal(&c);
4    }
5
6    void thr_join() {
7        if (done == 0)
8            Pthread_cond_wait(&c);
9    }

這裏的問題是一個微妙的競態條件。具體來講,若是父進程調用thr_join(),檢查完done的值爲0,而後試圖睡眠。但在調用wait進入睡眠以前,父進程被中斷。隨後子線程修改變量done爲1,發出信號,此時一樣沒有等待線程。當父線程再次運行時,就會長眠不醒。

因此,咱們能夠堅持這樣一條原則:在使用條件變量時,調用signal和wait時要持有鎖

生產者/消費者問題

假設有一個或多個生產者線程和一個或多個消費者線程。生產者把生成的數據項放入緩衝區,消費者從緩衝區取走數據項,以某種方式消費。不少實際的系統中都會有這種場景。例如,在多線程的網絡服務器中,一個生產者將HTTP請求放入工做隊列,消費線程從隊列中取走請求並處理。

由於有界緩衝區是共享資源,因此咱們必須經過同步機制來訪問它,以避免產生競態條件。爲了更好地理解這個問題,咱們來嘗試一些實際的代碼。

首先須要一個共享緩衝區,讓生產者放入數據,消費者取出數據。簡單起見,咱們就拿一個整數來作緩衝區,兩個內部函數將值放入緩衝區,從緩衝區取值。

1    int buffer;
2    int count = 0; // initially, empty
3
4    void put(int value) {
5        assert(count == 0);
6        count = 1;
7        buffer = value;
8    }
9
10   int get() {
11       assert(count == 1);
12       count = 0;
13       return buffer;
14   }

put()函數會假設緩衝區是空的,把一個值存在緩衝區,而後把count設置爲1表示緩衝區滿了。get()函數恰好相反,把緩衝區清空後,並返回該值。

如今咱們須要編寫一些函數,用於生產和消費數據。調用生產函數的咱們稱之爲生產者(producer)線程,調用消費函數的咱們稱之爲消費者(consumer)線程。下面展現了一對非線程安全的生產者和消費者的代碼,生產者將一個整數放入共享緩衝區loops次,消費者持續從該共享緩衝區中獲取數據,並打印出數據項。咱們的目標就是使用條件變量將其改形成線程安全的版本。

1    void *producer(void *arg) {
2        int i;
3        int loops = (int) arg;
4        for (i = 0; i < loops; i++) {
5            put(i);
6        }
7    }
8
9    void *consumer(void *arg) {
10       int i;
11       while (1) {
12           int tmp = get();
13           printf("%d\n", tmp);
14       }
15   }
有問題的方案

顯然,put()和get()函數之中會有臨界區,由於put()更新緩衝區,get()讀取緩衝區。咱們的首次嘗試以下:

1    cond_t cond;
2    mutex_t mutex;
3
4    void *producer(void *arg) {
5        int i;
6        for (i = 0; i < loops; i++) {
7            Pthread_mutex_lock(&mutex);              // p1
8            if (count == 1)                          // p2
9                Pthread_cond_wait(&cond, &mutex);    // p3
10           put(i);                                  // p4
11           Pthread_cond_signal(&cond);              // p5
12           Pthread_mutex_unlock(&mutex);            // p6
13       }
14   }
15
16   void *consumer(void *arg) {
17       int i;
18       for (i = 0; i < loops; i++) {
19           Pthread_mutex_lock(&mutex);              // c1
20           if (count == 0)                          // c2
21               Pthread_cond_wait(&cond, &mutex);    // c3
22           int tmp = get();                         // c4
23           Pthread_cond_signal(&cond);              // c5
24           Pthread_mutex_unlock(&mutex);            // c6
25           printf("%d\n", tmp);
26       }
27   }

當生產者想要填充緩衝區時,它等待緩衝區變空(p1~p3)。消費者具備徹底相同的邏輯,但等待不一樣的條件——變滿(c1~c3)。

當只有一個生產者和一個消費者時,上圖的代碼可以正常運行。但若是有超過一個線程,這個方案會有兩個嚴重的問題。

先來看第一個問題,它與等待以前的if語句有關。假設有兩個消費者(Tc1和Tc2),一個生產者(Tp)。首先,一個消費者(Tc1)先開始執行,它得到鎖(c1),檢查緩衝區是否能夠消費(c2),而後等待(c3)。

接着生產者(Tp)運行。它獲取鎖(p1),檢查緩衝區是否滿(p2),發現沒滿就給緩衝區加入一個數字(p4)。而後生產者發出信號,說緩衝區已滿(p5)。關鍵的是,這讓第一個消費者(Tc1)再也不睡在條件變量上,進入就緒隊列。生產者繼續執行,直到發現緩衝區滿後睡眠(p6,p1-p3)。

這時問題發生了:另外一個消費者(Tc2)搶先執行,消費了緩衝區中的值。如今假設Tc1運行,在從wait返回以前,它獲取了鎖,而後返回。而後它調用了get() (p4),但緩衝區已沒法消費。斷言觸發,代碼不能像預期那樣工做。

問題產生的緣由很簡單:在Tc1被生產者喚醒後,但在它運行以前,因爲Tc2搶先運行,緩衝區的狀態改變了。發信號給線程只是喚醒它們,暗示狀態發生了變化,但並不會保證在它運行以前狀態一直是指望的狀況。

仍有缺陷的方案:使用While替代If

修復這個問題很簡單:把if語句改成while。當消費者Tc1被喚醒後,馬上再次檢查共享變量(c2)。若是緩衝區此時爲空,消費者就會回去繼續睡眠(c3)。生產者中相應的if也改成while(p2)。

1    cond_t cond;
2    mutex_t mutex;
3
4    void *producer(void *arg) {
5        int i;
6        for (i = 0; i < loops; i++) {
7            Pthread_mutex_lock(&mutex);               // p1
8            while (count == 1)                         // p2
9                Pthread_cond_wait(&cond, &mutex);      // p3
10           put(i);                                   // p4
11           Pthread_cond_signal(&cond);               // p5
12           Pthread_mutex_unlock(&mutex);             // p6
13       }
14   }
15
16   void *consumer(void *arg) {
17       int i;
18       for (i = 0; i < loops; i++) {
19           Pthread_mutex_lock(&mutex);                  // c1
20           while (count == 0)                           // c2
21               Pthread_cond_wait(&cond, &mutex);         // c3
22           int tmp = get();                             // c4
23           Pthread_cond_signal(&cond);                  // c5
24           Pthread_mutex_unlock(&mutex);             // c6
25           printf("%d\n", tmp);
26       }
27   }

咱們要記住一條關於條件變量的簡單規則:老是使用while循環。

可是,這段代碼仍然有一個問題,也是上文提到的兩個問題之一,它和咱們只用了一個條件變量有關。

假設兩個消費者(Tc1和Tc2)先運行,都睡眠了(c3)。生產者開始運行,在緩衝區放入一個值,喚醒了一個消費者(假定是Tc1),並開始睡眠。如今是一個消費者立刻要運行(Tc1),兩個線程(Tc2和Tp)都等待在同一個條件變量上。

消費者Tc1醒過來並從wait()調用返回(c3),從新檢查條件(c2),發現緩衝區是滿的,消費了這個值(c4)。這個消費者而後在該條件上發信號(c5),喚醒一個在睡眠的線程。可是,應該喚醒哪一個線程呢?

由於消費者已經清空了緩衝區,很顯然,應該喚醒生產者。可是,若是它喚醒了Tc2,問題就出現了。消費者Tc2會醒過來,發現隊列爲空(c2),又繼續回去睡眠(c3)。生產者Tp剛纔在緩衝區中放了一個值,如今在睡眠。消費者Tc1繼續執行後也回去睡眠了。3個線程都在睡眠,顯然是一個大問題。

咱們能夠看出:信號顯然須要,但必須更有指向性。消費者不該該喚醒消費者,而應該只喚醒生產者,反之亦然。

單值緩衝區的正確方案

這個問題的解決方案也很簡單:使用兩個而不是一個條件變量,以便在系統狀態改變時,能正確地發出信號喚醒哪類線程。下面展現了最終的代碼。

1    cond_t empty, fill;
2    mutex_t mutex;
3
4    void *producer(void *arg) {
5        int i;
6        for (i = 0; i < loops; i++) {
7            Pthread_mutex_lock(&mutex);
8            while (count == 1)
9                Pthread_cond_wait(&empty,  &mutex);
10           put(i);
11           Pthread_cond_signal(&fill);
12           Pthread_mutex_unlock(&mutex);
13       }
14   }
15
16   void *consumer(void *arg) {
17       int i;
18       for (i = 0; i < loops; i++) {
19           Pthread_mutex_lock(&mutex);
20           while (count == 0)
21               Pthread_cond_wait(&fill, &mutex);
22           int tmp = get();
23           Pthread_cond_signal(&empty);
24           Pthread_mutex_unlock(&mutex);
25           printf("%d\n", tmp);
26       }
27   }
最終方案

咱們如今有了可用的生產者/消費者方案,但不太通用,咱們最後所作的修改是爲了提升併發和效率。具體來講就是增長更多緩衝區槽位,這樣在睡眠以前,生產者能夠生產多個值;一樣,消費者在睡眠以前能夠消費多個值

單個生產者和消費者時,這種方案由於上下文切換少,提升了效率。多個生產者和消費者時,它能夠支持併發生產和消費。和現有方案相比,改動也很小。

第一處修改是緩衝區結構自己,以及對應的put()和get()方法:

1    int buffer[MAX];
2    int fill = 0;
3    int use   = 0;
4    int count = 0;
5
6    void put(int value) {
7        buffer[fill] = value;
8        fill = (fill + 1) % MAX;
9        count++;
10   }
11
12   int get() {
13       int tmp = buffer[use];
14       use = (use + 1) % MAX;
15       count--;
16       return tmp;
17   }

下面展現了最終的代碼邏輯。至此,咱們解決了生產者/消費者問題。

1    cond_t empty, fill;
2    mutex_t mutex;
3
4    void *producer(void *arg) {
5        int i;
6        for (i = 0; i < loops; i++) {
7            Pthread_mutex_lock(&mutex);                 // p1
8            while (count == MAX)                        // p2
9                Pthread_cond_wait(&empty, &mutex);      // p3
10           put(i);                                     // p4
11           Pthread_cond_signal(&fill);                 // p5
12           Pthread_mutex_unlock(&mutex);               // p6
13       }
14   }
15
16   void *consumer(void *arg) {
17       int i;
18       for (i = 0; i < loops; i++) {
19           Pthread_mutex_lock(&mutex);               // c1
20           while (count == 0)                            // c2
21               Pthread_cond_wait(&fill, &mutex);     // c3
22           int tmp = get();                              // c4
23           Pthread_cond_signal(&empty);              // c5
24           Pthread_mutex_unlock(&mutex);             // c6
25           printf("%d\n", tmp);
26       }
27   }

覆蓋條件

如今再來看條件變量的一個例子。這段代碼是一個簡單的多線程內存分配庫中的問題片斷:

1    // how many bytes of the heap are free?
2    int bytesLeft = MAX_HEAP_SIZE;
3
4    // need lock and condition too
5    cond_t c;
6    mutex_t m;
7
8    void *allocate(int size) {
9        Pthread_mutex_lock(&m);
10       while (bytesLeft < size)
11           Pthread_cond_wait(&c, &m);
12       void *ptr = ...; // get mem from heap
13       bytesLeft -= size;
14       Pthread_mutex_unlock(&m);
15       return ptr;
16   }
17
18   void free(void *ptr, int size) {
19       Pthread_mutex_lock(&m);
20       bytesLeft += size;
21       Pthread_cond_signal(&c); // whom to signal??
22       Pthread_mutex_unlock(&m);
23   }

從代碼中能夠看出,當線程調用進入內存分配代碼時,它可能會由於內存不足而等待。相應的,線程釋放內存時,會發信號說有更多內存空閒。可是,代碼中有一個問題:應該喚醒哪一個等待線程(可能有多個線程)?

解決方案也很直接:用pthread_cond_broadcast()代替上述代碼中的pthread_cond_signal(),喚醒全部的等待線程。這樣作,確保了全部應該喚醒的線程都被喚醒。固然,不利的一面是可能會影響性能,由於沒必要要地喚醒了其餘許多不應被喚醒的線程。這些線程被喚醒後,從新檢查條件,立刻再次睡眠。

這種條件變量叫做覆蓋條件(covering condition),由於它能覆蓋全部須要喚醒線程的場景(保守策略)。通常來講,若是你發現程序只有改爲廣播信號時才能工做,多是程序有缺陷。但在某些情景下,就像上述內存分配的例子中,廣播多是最直接有效的方案。

信號量

信號量是Dijkstra及其同事發明的,做爲與同步有關的全部工做的惟一原語,可使用信號量做爲鎖和條件變量。

定義

信號量是有一個整數值的對象,能夠用兩個函數來操做它。在POSIX標準中,是sem_wait()和sem_post()。由於信號量的初始值可以決定其行爲,因此首先要初始化信號量,才能調用其餘函數與之交互。

#include <semaphore.h>
sem_t s;
sem_init(&s, 0, 1);

其中申明瞭一個信號量s,經過第三個參數,將它的值初始化爲1。sem_init()的第二個參數,在咱們的全部例子中都被設置爲0,表示信號量是在同一進程的多個線程共享的。信號量初始化以後,咱們能夠調用sem_wait()或sem_post()與之交互。

sem_wait()對信號量的值進行原子減一操做,當信號量的值大於等於1時馬上返回,不然會將調用線程放入信號量關聯的隊列中等待被喚醒。sem_post()對信號量的值進行原子加一操做,它不用等待某些條件知足,直接增長信號量的值,若是有等待線程,就喚醒其中一個。當信號量的值爲負數時,這個值就是等待線程的個數。

二值信號量(鎖)

信號量的第一種用法是咱們已經熟悉的:用信號量做爲鎖。在下面的代碼片斷裏,咱們直接把臨界區用一對sem_wait()/sem_post()環繞。爲了使這段代碼正常工做,信號量m的初始值X是相當重要的。X應該是多少呢?

sem_t m;
sem_init(&m, 0, X); // initialize semaphore to X; what should X be?

sem_wait(&m);
// critical section here
sem_post(&m);

回顧sem_wait()和sem_post()函數的定義,咱們發現初值應該是1。

咱們假設有兩個線程的場景。第一個線程(線程1)調用了sem_wait(),它把信號量的值減爲0。由於值是0,線程1從函數返回並進入臨界區。若是沒有其餘線程嘗試獲取鎖,當它調用sem_post()時,會將信號量重置爲1(由於沒有等待線程,不會喚醒其餘線程)。

若是線程1持有鎖,另外一個線程(線程2)調用sem_wait()嘗試進入臨界區。這種狀況下,線程2把信號量減爲−1,而後等待。線程1再次運行,它最終調用sem_post(),將信號量的值增長到0,喚醒等待的線程,而後線程2就能夠獲取鎖。線程2執行結束時,再次增長信號量的值,將它恢復爲1。

由於鎖只有兩個狀態(持有和沒持有),因此這種用法有時也叫做二值信號量(binary semaphore)。

信號量用做條件變量

下面是一個簡單的例子。假設一個線程建立另外一個線程,而且等待它結束,那麼信號量的初始值X應該是多少?

1    sem_t s;
2
3    void *
4    child(void *arg) {
5        printf("child\n");
6        sem_post(&s); // signal here: child is done
7        return NULL;
8    }
9
10   int
11   main(int argc, char *argv[]) {
12       sem_init(&s, 0, X); // what should X be?
13       printf("parent: begin\n");
14       pthread_t c;
15       Pthread_create(c, NULL, child, NULL);
16       sem_wait(&s); // wait here for child
17       printf("parent: end\n");
18       return 0;
19   }

有兩種狀況須要考慮。第一種,父線程建立了子線程,可是子線程並無運行。這種狀況下,父線程調用sem_wait()會先於子線程調用sem_post()。咱們但願父線程等待子線程運行,惟一的辦法是讓信號量的值不大於0。所以,初值值爲0。父線程運行,將信號量減爲−1,而後睡眠等待;子線程運行的時候,調用sem_post(),信號量增長爲0,喚醒父線程,父線程而後從sem_wait()返回,完成該程序。

第二種狀況是子線程在父線程調用sem_wait()以前就運行結束。在這種狀況下,子線程會先調用sem_post(),將信號量從0增長到1。而後當父線程有機會運行時,會調用sem_wait(),發現信號量的值爲1。因而父線程將信號量從1減爲0,沒有等待,直接從sem_wait()返回,也達到了預期效果。

生產者/消費者問題

在這裏,咱們討論如何使用信號量來解決上面提到的生產者/消費者,也即有界緩衝區問題。封裝的put()和get()函數以下:

1    int buffer[MAX];
2    int fill = 0;
3    int use = 0;
4
5    void put(int value) {
6        buffer[fill] = value;    // line f1
7        fill = (fill + 1) % MAX; // line f2
8    }
9
10   int get() {
11       int tmp = buffer[use];    // line g1
12       use = (use + 1) % MAX;    // line g2
13       return tmp;
14   }
第一次嘗試

咱們用兩個信號量empty和full分別表示緩衝區空或者滿,下面是咱們嘗試解決生產者/消費者問題的代碼。

1    sem_t empty;
2    sem_t full;
3
4    void *producer(void *arg) {
5        int i;
6        for (i = 0; i < loops; i++) {
7            sem_wait(&empty);             // line P1
8            put(i);                       // line P2
9            sem_post(&full);              // line P3
10       }
11   }
12
13   void *consumer(void *arg) {
14       int i, tmp = 0;
15       while (tmp != -1) {
16           sem_wait(&full);            // line C1
17           tmp = get();                // line C2
18           sem_post(&empty);            // line C3
19           printf("%d\n", tmp);
20       }
21   }
22
23   int main(int argc, char *argv[]) {
24       // ...
25       sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
26       sem_init(&full, 0, 0);    // ... and 0 are full
27       // ...
28   }

咱們先假設MAX=1,驗證程序是否有效。假設有兩個線程,一個生產者和一個消費者。咱們來看在一個CPU上的具體場景。消費者先運行,執行到C1行,調用sem_wait(&full)。由於full初始值爲0,wait調用會將full減爲−1,致使消費者睡眠,等待另外一個線程調用sem_post(&full),符合預期。

假設生產者而後運行。執行到P1行,調用sem_wait(&empty)。生產者將繼續執行,由於empty被初始化爲MAX(在這裏是1)。所以,empty被減爲0,生產者向緩衝區中加入數據,而後執行P3行,調用sem_post(&full),把full從−1變成0,喚醒消費者。

在這種狀況下,可能會有兩種狀況。若是生產者繼續執行,再次循環到P1行,因爲empty值爲0,它會阻塞。若是生產者被中斷,而消費者開始執行,調用sem_wait(&full),發現緩衝區確實滿了,消費它。這兩種狀況都是符合預期的。

能夠繼續推導,在MAX=1時,即使有多個生產者和消費者的狀況下,本示例代碼仍然正常運行。

咱們如今假設MAX大於1,同時假定有多個生產者,多個消費者。那麼就有問題了:競態條件。假設兩個生產者(Pa和Pb)幾乎同時調用put()。當Pa先運行,在f1行先加入第一條數據(fill=0),假設Pa在將fill計數器更新爲1以前被中斷,Pb開始運行,也在f1行給緩衝區的0位置加入一條數據,這意味着那裏的數據被覆蓋,這也就意味着生產者的數據丟失。

增長互斥

能夠看到,向緩衝區加入元素和增長緩衝區的索引是臨界區,須要當心保護起來。因此,咱們使用二值信號量做爲鎖來進行互斥。下面是對應的代碼。

1    sem_t empty;
2    sem_t full;
3    sem_t mutex;
4
5    void *producer(void *arg) {
6        int i;
7        for (i = 0; i < loops; i++) {
8            sem_wait(&mutex);           // line p0 (NEW LINE)
9            sem_wait(&empty);           // line p1
10           put(i);                     // line p2
11           sem_post(&full);            // line p3
12           sem_post(&mutex);           // line p4 (NEW LINE)
13       }
14   }
15
16   void *consumer(void *arg) {
17       int i;
18       for (i = 0; i < loops; i++) {
19           sem_wait(&mutex);           // line c0 (NEW LINE)
20           sem_wait(&full);            // line c1
21           int tmp = get();            // line c2
22           sem_post(&empty);           // line c3
23           sem_post(&mutex);           // line c4 (NEW LINE)
24           printf("%d\n", tmp);
25       }
26   }
27
28   int main(int argc, char *argv[]) {
29       // ...
30       sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
31       sem_init(&full, 0, 0);    // ... and 0 are full
32       sem_init(&mutex, 0, 1);   // mutex=1 because it is a lock (NEW LINE)
33       // ...
34   }

如今咱們給整個put()/get()部分都增長了鎖,就是註釋中有NEW LINE的幾行。這彷佛是正確的思路,但仍然有問題——死鎖。

假設有兩個線程,一個生產者和一個消費者。消費者首先運行,得到鎖,而後對full信號量執行sem_wait()。由於尚未數據,因此消費者阻塞,讓出CPU。可是,問題來了,此時消費者仍然持有鎖。而後生產者運行,它首先對二值互斥信號量調用sem_wait()。鎖已經被消費者持有,所以生產者也被卡住。

這裏出現了一個循環等待。消費者持有互斥量,等待在full信號量上。生產者能夠發送full信號,卻在等待互斥量。所以,生產者和消費者互相等待對方——典型的死鎖。

最終方案

要解決這個問題,只需減小鎖的做用域,下面是最終的可行方案。能夠看到,咱們把獲取和釋放互斥量的操做調整爲緊挨着臨界區,把full、empty的喚醒和等待操做調整到鎖外面。就獲得了簡單而有效的有界緩衝區,多線程程序的經常使用模式。

1    sem_t empty;
2    sem_t full;
3    sem_t mutex;
4
5    void *producer(void *arg) {
6        int i;
7        for (i = 0; i < loops; i++) {
8            sem_wait(&empty);            // line p1
9            sem_wait(&mutex);            // line p1.5 (MOVED MUTEX HERE...)
10           put(i);                      // line p2
11           sem_post(&mutex);            // line p2.5 (... AND HERE)
12           sem_post(&full);             // line p3
13       }
14   }
15
16   void *consumer(void *arg) {
17       int i;
18       for (i = 0; i < loops; i++) {
19           sem_wait(&full);             // line c1
20           sem_wait(&mutex);            // line c1.5 (MOVED MUTEX HERE...)
21           int tmp = get();             // line c2
22           sem_post(&mutex);            // line c2.5 (... AND HERE)
23           sem_post(&empty);            // line c3
24           printf("%d\n", tmp);
25       }
26   }
27
28   int main(int argc, char *argv[]) {
29       // ...
30       sem_init(&empty, 0, MAX);  // MAX buffers are empty to begin with...
31       sem_init(&full, 0, 0);     // ... and 0 are full
32       sem_init(&mutex, 0, 1);    // mutex=1 because it is a lock
33       // ...
34   }

讀者—寫者鎖

另外一個經典問題源於對更加靈活的鎖定原語的渴望,它認可不一樣的數據結構訪問可能須要不一樣類型的鎖。例如,一個併發鏈表有不少插入和查找操做。插入操做會修改鏈表的狀態,而查找操做只是讀取該結構,只要沒有進行插入操做,咱們能夠併發的執行多個查找操做。讀者—寫者鎖(reader-writer lock)就是用來完成這種操做的。下面是這種鎖的代碼。

1    typedef struct _rwlock_t {
2      sem_t lock;      // binary semaphore (basic lock)
3      sem_t writelock; // used to allow ONE writer or MANY readers
4      int    readers;  // count of readers reading in critical section
5    } rwlock_t;
6
7    void rwlock_init(rwlock_t *rw) {
8      rw->readers = 0;
9      sem_init(&rw->lock, 0, 1);
10     sem_init(&rw->writelock, 0, 1);
11   }
12
13   void rwlock_acquire_readlock(rwlock_t *rw) {
14     sem_wait(&rw->lock);
15     rw->readers++;
16     if (rw->readers == 1)
17       sem_wait(&rw->writelock); // first reader acquires writelock
18     sem_post(&rw->lock);
19   }
20
21   void rwlock_release_readlock(rwlock_t *rw) {
22     sem_wait(&rw->lock);
23     rw->readers--;
24     if (rw->readers == 0)
25       sem_post(&rw->writelock); // last reader releases writelock
26     sem_post(&rw->lock);
27   }
28
29   void rwlock_acquire_writelock(rwlock_t *rw) {
30     sem_wait(&rw->writelock);
31   }
32
33   void rwlock_release_writelock(rwlock_t *rw) {
34     sem_post(&rw->writelock);
35   }

若是某個線程要更新數據結構,須要調用rwlock_acquire_writelock()得到寫鎖,調用rwlock_release_writelock()釋放寫鎖。內部經過一個writelock的信號量保證只有一個寫者能得到鎖進入臨界區,從而更新數據結構。

獲取讀鎖時,讀者首先要獲取lock,而後增長reader變量,追蹤目前有多少個讀者在訪問該數據結構。當第一個讀者獲取讀鎖時,同時也會獲取寫鎖,即在writelock信號量上調用sem_wait(),最後調用sem_post()釋放lock。

一旦一個讀者得到了讀鎖,其餘的讀者也能夠獲取這個讀鎖。可是,想要獲取寫鎖的線程,就必須等到全部的讀者都結束。最後一個退出的讀者在writelock信號量上調用sem_post(),從而讓等待的寫者可以獲取該鎖。

這一方案可行,但有一些缺陷,尤爲是公平性,讀者很容易餓死寫者。存在複雜一些的解決方案,好比有寫者等待時,避免更多的讀者進入並持有鎖。最後,讀者-寫者鎖一般加入了更多鎖操做,所以和其餘一些簡單快速的鎖相比,讀者—寫者鎖在性能方面沒有優點。

如何實現信號量

最後,咱們用底層的同步原語鎖和條件變量,來實現本身的信號量,名字叫做Zemaphore。

1    typedef struct  _Zem_t {
2        int value;
3        pthread_cond_t cond;
4        pthread_mutex_t lock;
5    } Zem_t;
6
7    // only one thread can call this
8    void Zem_init(Zem_t *s, int value) {
9        s->value = value;
10       Cond_init(&s->cond);
11       Mutex_init(&s->lock);
12   }
13
14   void Zem_wait(Zem_t *s) {
15       Mutex_lock(&s->lock);
16       while (s->value <= 0)
17           Cond_wait(&s->cond, &s->lock);
18       s->value--;
19       Mutex_unlock(&s->lock);
20   }
21
22   void Zem_post(Zem_t *s) {
23       Mutex_lock(&s->lock);
24       s->value++;
25       Cond_signal(&s->cond);
26       Mutex_unlock(&s->lock);
27   }

咱們實現的信號量和Dijkstra定義的信號量有一點細微區別,就是咱們沒有保持當信號量的值爲負數時,讓它反映出等待的線程數。事實上,該值永遠不會小於0。這一行爲更容易實現,並符合現有的Linux實現。

相關文章
相關標籤/搜索