Linux線程編程之生產者消費者問題

 

前言

     本文基於順序循環隊列,給出Linux生產者/消費者問題的多線程示例,並討論編程時須要注意的事項。文中涉及的代碼運行環境以下:html

     本文假定讀者已具有線程同步的基礎知識。編程

 

 

一  順序表循環隊列

1.1 順序循環隊列定義

     隊列是一種運算受限的先進先出線性表,僅容許在隊尾插入(入隊),在隊首刪除(出隊)。新元素入隊後成爲新的隊尾元素,元素出隊後其後繼元素就成爲隊首元素。數組

     隊列的順序存儲結構使用一個數組和兩個整型變量實現,其結構以下:安全

1 struct Queue{
2     ElemType elem[MaxSize];
3     int head, tail;
4 };

     即利用數組elem順序存儲隊列中的全部元素,利用整型變量head和tail分別存儲隊首元素和隊尾(或隊尾下一個)元素的下標位置,分別稱爲隊首指針和隊尾指針。MaxSize指定順序存儲隊列的最大長度,即隊列中最多可以存儲的元素個數。當隊列的順序存儲空間靜態分配時,MaxSize一般爲宏定義;若動態分配時,還可爲全局或局部變量。性能優化

     單向順序隊列存在「假溢出」問題,即屢次入隊和出隊操做後,隊首空餘許多位置而隊尾指針指向隊列中最後一個元素位置,從而沒法使新的數據元素入隊。假溢出本質在於隊首和隊尾指針值不能由所定義數組下界值自動轉爲數組上界值,解決辦法是將順序隊列構形成一個邏輯上首尾相連的循環表。當隊列的第MaxSize-1個位置被佔用後,只要隊首還有可用空間,則把新的數據元素插入隊列第0號位置。所以,順序隊列一般都採用順序循環隊列結構。多線程

     下圖所示爲MaxSize=4的循環隊列三種狀態圖:併發

 

     其中,爲區分隊空與隊滿,在入隊時少用一個數據(圖中下標爲3的位置)。本文約定隊首指針head指向隊首元素,隊尾指針tail指向隊尾元素的下一個元素,以隊尾指針加1等於隊首指針判斷隊滿,即:函數

  • 初始化:head = tail = 0;
  • 隊列空:head == tail;
  • 隊列滿:(tail + 1) % MaxSize == head;
  • 元素數:num = (tail - head + MaxSize) % MaxSize

     %爲取模運算符,循環隊列利用數學上的取模運算將首尾巧妙相連。高併發

     當約定head指向隊首前一個元素,tail指向隊尾元素時,元素數目仍知足上面的公式。當約定head指向隊首元素,tail指向隊尾元素時,元素數目爲(tail - head + 1 + MaxSize) % MaxSize。oop

     此外,也可另設一個標誌以區別隊空和隊滿。該標誌可爲布爾型空滿標誌位,也可爲隊列中元素個數。

     經過隊首元素位置和隊列中元素個數,可計算出隊尾元素所在位置。亦即,可用隊列中元素個數代替隊尾指針(或隊首指針)。此時,隊列知足:

  • 隊列空:num == 0;
  • 隊列滿:num == MaxSize;
  • 隊尾位置:tail = (head + num + MaxSize) % MaxSize

1.2 順序循環隊列實現

     本節將採用C語言實現一個簡單但卻標準的順序循環隊列函數集。

     首先定義循環隊列結構以下:

1 #define QUEUE_SIZE   5 //隊列最大容納QUEUE_SIZE-1個元素
2 typedef struct{
3     int aData[QUEUE_SIZE];  //隊列元素
4     int dwHead;  //指向隊首元素
5     int dwTail;  //指向隊尾元素的下一個元素
6 }T_QUEUE, *PT_QUEUE;

     爲求簡單性,元素類型假定爲整型。 

     基於上述結構,定義初始化、入隊出隊和顯示函數:

 1 //初始化循環隊列
 2 void InitQue(PT_QUEUE ptQue)
 3 {
 4     memset(ptQue, 0, sizeof(*ptQue));
 5 }
 6 
 7 //向循環隊列中插入元素
 8 void EnterQue(PT_QUEUE ptQue, int dwElem)
 9 {
10     if(IsQueFull(ptQue))
11     {
12         printf("Elem %d cannot enter Queue %p(Full)!\n", dwElem, ptQue);
13         return;
14     }
15     ptQue->aData[ptQue->dwTail]= dwElem;
16     ptQue->dwTail = (ptQue->dwTail + 1) % QUEUE_SIZE;
17 }
18 
19 //從循環隊列中取出元素
20 int LeaveQue(PT_QUEUE ptQue)
21 {
22     if(IsQueEmpty(ptQue))
23     {
24         printf("Queue %p is Empty!\n", ptQue);
25         return -1;
26     }
27     int dwElem = ptQue->aData[ptQue->dwHead];
28     ptQue->dwHead = (ptQue->dwHead + 1) % QUEUE_SIZE;
29     return dwElem;
30 }
31 
32 //從隊首至隊尾依次顯示隊中元素值
33 void DisplayQue(PT_QUEUE ptQue)
34 {
35     if(IsQueEmpty(ptQue))
36     {
37         printf("Queue %p is Empty!\n", ptQue);
38         return;
39     }
40 
41     printf("Queue Element: ");
42     int dwIdx = ptQue->dwHead;
43     while((dwIdx % QUEUE_SIZE) != ptQue->dwTail)
44         printf("%d ", ptQue->aData[(dwIdx++) % QUEUE_SIZE]);
45 
46     printf("\n");
47 }

     而後提供判空、判滿、查詢等輔助函數:

 1 //判斷循環隊列是否爲空
 2 int IsQueEmpty(PT_QUEUE ptQue)
 3 {
 4     return ptQue->dwHead == ptQue->dwTail;
 5 }
 6 
 7 //判斷循環隊列是否爲滿
 8 int IsQueFull(PT_QUEUE ptQue)
 9 {
10     return (ptQue->dwTail + 1) % QUEUE_SIZE == ptQue->dwHead;
11 }
12 
13 //獲取循環隊列元素數目
14 int QueDataNum(PT_QUEUE ptQue)
15 {
16     return (ptQue->dwTail - ptQue->dwHead + QUEUE_SIZE) % QUEUE_SIZE;
17 }
18 
19 //獲取循環隊列隊首位置
20 int GetQueHead(PT_QUEUE ptQue)
21 {
22     return ptQue->dwHead;
23 }
24 //獲取循環隊列隊首元素
25 int GetQueHeadData(PT_QUEUE ptQue)
26 {
27     return ptQue->aData[ptQue->dwHead];
28 }
29 //獲取循環隊列隊尾位置
30 int GetQueTail(PT_QUEUE ptQue)
31 {
32     return ptQue->dwTail;
33 }

     最後,經過QueueTest()函數來測試隊列函數集:

 1 static T_QUEUE gtQueue;
 2 void QueueTest(void)
 3 {
 4     InitQue(&gtQueue);
 5     printf("Enter Queue 1,2,3,4,5...\n");
 6     EnterQue(&gtQueue, 1);
 7     EnterQue(&gtQueue, 2);
 8     EnterQue(&gtQueue, 3);
 9     EnterQue(&gtQueue, 4);
10     EnterQue(&gtQueue, 5);
11     printf("Queue Status: Empty(%d), Full(%d)\n", IsQueEmpty(&gtQueue), IsQueFull(&gtQueue));
12     printf("Queue Elem Num: %u\n", QueDataNum(&gtQueue));
13     printf("Head: %u(%d)\n", GetQueHead(&gtQueue), GetQueHeadData(&gtQueue));
14     printf("Tail: %u\n", GetQueTail(&gtQueue));
15     DisplayQue(&gtQueue);
16 
17     printf("\nLeave Queue...\n");
18     printf("Leave %d\n", LeaveQue(&gtQueue));
19     printf("Leave %d\n", LeaveQue(&gtQueue));
20     printf("Leave %d\n", LeaveQue(&gtQueue));
21     DisplayQue(&gtQueue);
22 
23     printf("\nEnter Queue 6,7...\n");
24     EnterQue(&gtQueue, 6);
25     EnterQue(&gtQueue, 7);
26     DisplayQue(&gtQueue);
27 
28     printf("\nLeave Queue...\n");
29     printf("Leave %d\n", LeaveQue(&gtQueue));
30     printf("Leave %d\n", LeaveQue(&gtQueue));
31     printf("Leave %d\n", LeaveQue(&gtQueue));
32     DisplayQue(&gtQueue);
33 }

     編譯連接後,運行結果以下:

 1 Enter Queue 1,2,3,4,5...
 2 Elem 5 cannot enter Queue 0x8053f9c(Full)!
 3 Queue Status: Empty(0), Full(1)
 4 Queue Elem Num: 4
 5 Head: 0(1)
 6 Tail: 4
 7 Queue Element: 1 2 3 4 
 8 
 9 Leave Queue...
10 Leave 1
11 Leave 2
12 Leave 3
13 Queue Element: 4 
14 
15 Enter Queue 6,7...
16 Queue Element: 4 6 7 
17 
18 Leave Queue...
19 Leave 4
20 Leave 6
21 Leave 7
22 Queue 0x8053f9c is Empty!

     可見,隊列行爲徹底符合指望。

 

 

二  生產者消費者問題

2.1 問題概述

     本節將討論Linux併發編程中經典的生產者/消費者(producer-consumer)問題。該問題涉及一個大小限定的有界緩衝區(bounded buffer)和兩類線程或進程(生產者和消費者)。

     在緩衝區中有可用空間時,一個或一組生產者(線程或進程)將建立的產品(數據條目)放入緩衝區,而後由一個或一組消費者(線程或進程)提取這些產品。產品在生產者和消費者之間經過某種類型的IPC傳遞。

     在多個進程間共享一個公共數據緩衝區須要某種形式的共享內存區(如存儲映射或共享內存),而多線程自然地共享存儲空間。爲簡便起見,本節的討論僅限於多線程。

多個生產者和消費者線程的場景以下圖所示:

 

     在生產者/消費者問題中,生產者線程必須在緩衝區中有可用空間後才能向其中放置內容,不然將阻塞(進入休眠狀態)直到出現下一個可用的空位置。生產者線程可以使用互斥量原子性地檢查緩衝區,而不受其餘線程干擾。當發現緩衝區已滿後,生產者阻塞本身並在緩衝區變爲非滿時被喚醒,這可由條件變量實現。

     消費者線程必須在生產者向緩衝區中寫入以後才能從中提取內容。同理,可用互斥量和條件變量以無競爭的方式等待緩衝區由空變爲非空。

2.2 多線程實現

     本節將採用隊列模擬任務,給出生產者/消費者問題的多線程示例。

     爲簡單起見,生產者將隊列元素下標做爲元素值入隊,消費者使元素出隊並驗證元素值正確性。

     首先定義一組全局數據:

 1 #define PRODUCER_NUM   5  //生產者數
 2 #define CONSUMER_NUM   3  //消費者數
 3 #define PRD_NUM        20 //最多生產的產品數
 4 #define DELAY_TIME     3  //生產(或消費)任務之間的最大時間間隔
 5 
 6 #define QUEUE_SIZE     (PRD_NUM+1) //隊列最大容納QUEUE_SIZE-1個元素
 7 
 8 T_QUEUE gtQueue;
 9 pthread_mutex_t gtQueLock = PTHREAD_MUTEX_INITIALIZER;
10 pthread_cond_t gtPrdCond = PTHREAD_COND_INITIALIZER; //Full->Not Full
11 pthread_cond_t gtCsmCond = PTHREAD_COND_INITIALIZER; //Empty->Not Empty

     此處,QUEUE_SIZE按照產品數從新定義。互斥量gtQueLock用於保護全局隊列gtQueue和兩個條件變量。條件變量gtPrdCond用於隊列由滿變爲非滿時通知(喚醒)生產者線程,而gtCsmCond用於隊列由空變爲非空時通知消費者線程。

     若首先建立並啓動生產者線程,再當即或稍候建立消費者線程,則不須要條件變量gtCsmCond(隊列中始終有產品)。本節爲全面展示線程間的同步,約定消費者線程建立和啓動完畢以後,再建立生產者線程。這樣,全部消費者線程將會阻塞,在條件變量gtCsmCond的線程列表裏等待條件狀態的改變。生產者線程啓動並開始「產出」後,廣播通知全部消費者線程。反之,由於消費者線程不會所有阻塞,可單播喚醒某個消費者。

     初始化隊列和建立線程的主函數以下:

 1 int main(void)
 2 {
 3     InitQue(&gtQueue);
 4     srand(getpid());  //初始化隨機函數發生器
 5 
 6     pthread_t aThrd[CONSUMER_NUM+PRODUCER_NUM];
 7     int dwThrdIdx;
 8     for(dwThrdIdx = 0; dwThrdIdx < CONSUMER_NUM; dwThrdIdx++)
 9     {  //建立CONSUMER_NUM個消費者線程,傳入(void*)dwThrdIdx做爲ConsumerThread的參數
10         pthread_create(&aThrd[dwThrdIdx], NULL, ConsumerThread, (void*)dwThrdIdx);
11     }
12     sleep(2);
13     for(dwThrdIdx = 0; dwThrdIdx < PRODUCER_NUM; dwThrdIdx++)
14     {
15         pthread_create(&aThrd[dwThrdIdx+CONSUMER_NUM], NULL, ProducerThread, (void*)dwThrdIdx);
16     }
17     while(1);
18     return 0 ;
19 }

     生產者線程啓動例程ProducerThread()實現以下:

 1 void *ProducerThread(void *pvArg)
 2 {
 3     pthread_detach(pthread_self());
 4     int dwThrdNo = (int)pvArg;
 5     while(1)
 6     {
 7         pthread_mutex_lock(&gtQueLock);
 8         while(IsQueFull(&gtQueue))  //隊列由滿變爲非滿時,生產者線程被喚醒
 9             pthread_cond_wait(&gtPrdCond, &gtQueLock);
10 
11         EnterQue(&gtQueue, GetQueTail(&gtQueue)); //將隊列元素下標做爲元素值入隊
12         if(QueDataNum(&gtQueue) == 1) //當生產者開始產出後,通知(喚醒)消費者線程
13             pthread_cond_broadcast(&gtCsmCond);
14         printf("[Producer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(&gtQueue));
15 
16         pthread_mutex_unlock(&gtQueLock);
17         sleep(rand()%DELAY_TIME + 1);
18     }
19 }

     隊列變滿時,生產者線程進入休眠狀態。消費者線程取出產品,將隊列由滿變爲非滿時,生產者線程再次被喚醒。

     消費者線程啓動例程ConsumerThread()實現以下:

 1 void *ConsumerThread(void *pvArg)
 2 {
 3     pthread_detach(pthread_self());
 4     int dwThrdNo = (int)pvArg;
 5     while(1)
 6     {
 7         pthread_mutex_lock(&gtQueLock);
 8         while(IsQueEmpty(&gtQueue)) //隊列由空變爲非空時,消費者線程將被喚醒
 9             pthread_cond_wait(&gtCsmCond, &gtQueLock);
10 
11         if(GetQueHead(&gtQueue) != GetQueHeadData(&gtQueue))
12         {
13             printf("[Consumer %2u]Product: %d, Expect: %d\n", dwThrdNo,
14                    GetQueHead(&gtQueue), GetQueHeadData(&gtQueue));
15             exit(0);
16         }  
17         LeaveQue(&gtQueue);
18         if(QueDataNum(&gtQueue) == (PRD_NUM-1)) //當隊列由滿變爲非滿時,通知(喚醒)生產者線程
19             pthread_cond_broadcast(&gtPrdCond);
20         printf("[Consumer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(&gtQueue));
21 
22         pthread_mutex_unlock(&gtQueLock);
23         sleep(rand()%DELAY_TIME + 1);
24     }
25 }

     當隊列元素值不符合指望時,打印出錯提示並調用exit()終止進程。

     編譯連接後,截取部分運行結果以下(因每次執行時線程延時隨機值,故執行順序可能不一樣):

[wangxiaoyuan_@localhost Thread]$ gcc -Wall -o procon procon.c -pthread
[wangxiaoyuan_@localhost Thread]$ ./procon
[Producer  4]Current Product Num: 1
[Consumer  1]Current Product Num: 0
[Producer  3]Current Product Num: 1
[Consumer  0]Current Product Num: 0
[Producer  2]Current Product Num: 1
[Consumer  2]Current Product Num: 0
[Producer  1]Current Product Num: 1
[Producer  0]Current Product Num: 2
//... ...
[Consumer  0]Current Product Num: 17
[Producer  3]Current Product Num: 18
[Producer  1]Current Product Num: 19
[Consumer  2]Current Product Num: 18
[Producer  4]Current Product Num: 19
[Producer  2]Current Product Num: 20
[Consumer  1]Current Product Num: 19
[Consumer  2]Current Product Num: 18
[Producer  0]Current Product Num: 19
[Producer  4]Current Product Num: 20
//Ctrl + C

     此處,編譯連接時使用-pthread選項,該選項編譯時會在傳統-lpthread選項基礎上附加一個宏定義-D_REENTRANT。具體細節可經過verbose模式(-v選項)對比。

2.3 注意事項

     本節基於生產者/消費者問題的多線程實現,討論一些編程注意事項。爲簡化書寫,省略線程相關函數名的前綴"pthread_"。

     1. 互斥量實際上保護的是臨界區中被多個線程或進程共享的數據(shared data)。此外,互斥量是協做性(cooperative)鎖,沒法禁止繞過這種機制的訪問。例如,若共享數據爲一個鏈表,則操做該鏈表的全部線程都應該在實際操做前獲取該互斥量(但沒法防止某個線程不首先獲取該互斥量就操做鏈表)。

     對本文實現而言,互斥量gtQueLock保護全局隊列gtQueue。但由於各生產者(或消費者)線程啓動例程相同,內部對該隊列的操做邏輯和順序相同,故gtQueLock看起來在對隊列函數加鎖。進而,在生產者與消費者線程之間,均在加鎖期間操做隊列,所以對隊列函數加鎖近似等效於對隊列數據加鎖。但仍需認識到,這種協做對於頻繁調用的基礎性函數(如庫函數)而言並不現實。當某些調用未經加鎖直接操做時,對於其餘使用互斥量的調用而言,互斥保護就失去意義。本文也可考慮在隊列函數集內加鎖(互斥量或讀寫鎖),但由於互斥量gtQueLock同時還保護條件變量,隊列內再加鎖就稍顯複雜,並且線程內互斥量同名時容易產生死鎖。

     所以,設計時最好約定全部線程遵照相同的數據訪問規則。

     2. 應儘可能減小由一個互斥量鎖住的代碼量,以加強併發性。

     例如,生產者線程啓動例程ProducerThread()中,對pvArg進行賦值操做時並不須要互斥量保護(不屬於臨界區)。再者,若各生產者線程之間與生產者/消費者線程之間共享不一樣的數據,則可以使用兩個互斥量,但複雜度也隨之上升。

     多線程軟件設計常常要考慮加鎖粒度的折中。若使用粗粒度鎖定(coarse-grained locking),即長期持有鎖定以便儘量下降獲取和釋放鎖的開銷,則可能有不少線程阻塞等待相同的鎖,從而下降併發性;若使用細粒度鎖定(fine-grained locking),即僅在必要時鎖定並在沒必要要時儘快解鎖以便儘量提升併發性,則較多的鎖開銷可能會下降系統性能,並且代碼變得至關複雜。所以,設計時應劃分適當數目的鎖來保護數據,以在代碼複雜性和性能優化之間找好平衡點。

     3. 條件自己是由互斥量保護的。當線程調用cond_wait()函數基於條件變量阻塞以前,應先鎖定互斥量以免線程間的競爭和飢餓狀況(不然將致使未定義的行爲)。

     調用cond_wait()時,線程將鎖定的互斥量傳遞給該函數,對條件進行保護。該函數將調用線程放到等待條件的線程列表上,而後對互斥量解鎖。這兩個操做是原子性的,即關閉了條件檢查和線程進入休眠狀態等待條件改變這兩個操做之間的時間通道,這樣線程就不會錯過條件的任何變化。該函數對互斥量解鎖後,其它線程能夠得到加鎖的權利,並訪問和修改臨界資源(條件狀態)。一旦其它某個線程改變了條件使其爲真,該線程將通知相應的條件變量喚醒一個或多個正被該條件變量阻塞的線程。cond_wait()返回時,互斥量再次被鎖定並被調用線程擁有,即便返回錯誤時也是如此。

     可見,cond_wait()函數解鎖和阻塞調用線程的「原子性」針對其餘線程訪問互斥量和條件變量而言。若線程B在即將阻塞的線程A解鎖以後得到互斥量,則線程B隨後調用cond_ signal/broadcast()函數通告條件狀態變化時,該信號必然在線程A阻塞以後發出(不然將遺漏該信號)。

     使用多個互斥量保護基於相同條件變量的cond_wait()調用時,其效果未定義。當線程基於條件變量阻塞時,該條件變量綁定到惟一的互斥量上,且這種(動態)綁定將在cond_wait()調用返回時結束。

     4. 函數cond_signal()喚醒被阻塞在條件變量上的至少一個線程。在多處理器上,該函數在不一樣處理器上同時單播信號時可能喚醒多個線程。當多個線程阻塞在條件變量上時,哪一個或哪些線程被喚醒由線程的調度策略(scheduling policy)所決定。

     cond_broadcast()會喚醒阻塞在條件變量上的全部線程。這些線程被喚醒後將再次競爭相應的互斥量。喚醒多個線程的典型場景是讀出者與寫入者問題。當一個寫入者完成訪問並釋放相應的鎖後,它但願喚醒全部正在排隊的讀出者,由於同時容許多個讀出者訪問。

     若已肯定只有一個等待者須要喚醒,且喚醒哪一個等待者可有可無,則可以使用單播發送;全部其餘狀況下都應使用廣播發送(更爲安全)。

     若當前沒有任何線程基於條件變量阻塞,則調用cond_signal/broadcast()不起做用。

     5. 虛假喚醒(spurious wakeup)指沒有線程明確調用cond_signal/broadcast()時,cond_wait()偶爾也會返回;或者條件狀態尚不知足時就調用cond_signal/broadcast()。此時,線程雖被喚醒但條件並不成立,若再也不次檢查條件而往下執行,極可能致使後續的處理出現錯誤。所以,當從cond_wait()返回時,線程應從新測試條件成立與否。該過程通常用while循環實現,即:

1 pthread_mutex_lock();
2 while(IsConditionFalse)
3     pthread_cond_wait();
4 //Do something
5 pthread_mutex_unlock();

     使用while循環不只能避免虛假喚醒形成的錯誤,還能避免喚醒線程間競爭致使的「驚羣效應」。

     例如,ProducerThread()內,調用cond_wait()對互斥量自動解鎖後,在容許消費者線程修改條件的同時,也容許其餘生產者線程調用該函數依次阻塞。當這些線程被喚醒時(如隊列由滿變爲非滿),會再次競爭相應的互斥量。得到互斥量的那個線程進入臨界區處理,這可能改變測試條件(如產出一件使得隊列再次變滿)。該線程釋放互斥量後,其餘某個處於等待狀態的線程得到該互斥量時,雖然cond_wait()成功返回但極可能條件已不成立。所以,調用cond_wait()成功返回時,線程須要從新測試條件是否知足。

     調用cond_timedwait()的狀況與之相似。因爲等待超時與條件改變之間存在沒法避免的競爭,當該函數返回超時錯誤時條件仍可能成立。

     6. 若線程未持有與條件相關聯的互斥量,則調用cond_signal/broadcast()可能會產生喚醒丟失問題。

     當知足如下全部條件時,即會出現喚醒丟失問題:

     1) 一個線程調用cond_signal/broadcast()函數;

     2) 另外一個線程已測試該條件,但還沒有調用cond_wait()函數;

     3) 沒有正處於阻塞等待狀態的線程。

     只要僅在持有關聯互斥量的同時修改所測試的條件,便可調用cond_signal/broadcast(),而不管這些函數是否持有關聯的互斥量。

     可見,調用cond_signal/broadcast()時若無線程等待條件變量,則該信號將被丟失。所以,發送信號前最好檢查下是否有正在等待的線程。例如,可維護一個等待線程計數器,在觸發條件變量前檢查該計數器。本文線程代碼內對隊列元素數目的檢查與之相似。

     對於等待條件的線程而言,若錯失信號(如啓動過遲),則會一直阻塞到其它線程再次發送信號到該條件變量。

     7. ProducerThread()中,cond_broadcast()函數由當前鎖住互斥量的生產者線程調用,本函數將發送信號給該互斥量所關聯的條件變量。當該條件變量被髮送信號後,系統當即調度等待在其上的消費者線程;該線程開始運行後試圖獲取互斥量,但該互斥量仍由生產者線程所持有。所以被喚醒的消費者線程被迫進入休眠狀態,直至生產者線程釋放互斥量後再次被喚醒。這種沒必要要的上下文切換可能會嚴重影響性能。

     爲避免這種加鎖衝突(以提升效率),可將判斷隊列元素數目的語句值賦給一個局部變量bHasOnePrd,直到釋放互斥量gtQueLock後才判斷bHasOnePrd並向與之關聯的條件變量gtCsmCond發送信號。

     Posix標準規定,調用cond_signal/broadcast()的線程沒必要是與之關聯的互斥量的當前擁有者,即容許在釋放互斥量後纔給與之關聯的條件變量發送信號。若程序不關心線程可預知的調度行爲,最好在鎖定區域之外調用cond_signal/broadcast()。

     固然,本文所示的生產者和消費者線程中,即便不判斷隊列元素數目而直接發送信號也能夠(經過修改隊列指針間接地改變條件狀態,雖然會發送一些無效信號)。

     8. 當只有一個生產者和一個消費者時,經過謹慎操做隊列可避免線程間的原子性同步。例如,生產者線程僅更新隊尾指針,消費者線程僅更新隊首指針。簡化的示例以下:

 1 #define QUEUE_SIZE  20
 2 volatile unsigned int gdwPrdNum = 0, gdwCsmNum = 0;
 3 int gQueue[QUEUE_SIZE] = {0};
 4  
 5 void *Producer(void *pvArg) {
 6     while(1) {
 7         while(gdwPrdNum - gdwCsmNum == QUEUE_SIZE)
 8             ; //Full
 9  
10         gQueue[gdwPrdNum % QUEUE_SIZE]++;
11         gdwPrdNum++;
12     }
13     pthread_exit(0);
14 }
15  
16 void *Consumer(void *pvArg) {
17     while(1) {
18         while(gdwPrdNum - gdwCsmNum == 0)
19            ; //Empty
20  
21         gQueue[gdwCsmNum % QUEUE_SIZE]--;
22         gdwCsmNum++;
23     }
24     pthread_exit(0);
25 }

     該例中使用輪詢(polling)方式,可在不依賴互斥量和條件變量的狀況下高效地共享數據。判空和判滿的循環保證兩個線程不可能同時操做同一個隊列元素。當線程被取消時,可觀察到隊列中元素值爲全零(若無循環則爲隨機值)。雖然輪詢方式一般比較低效,但在多處理器環境中線程休眠和喚醒的頻率較小,故繞開數據原子性操做是有利的。

     另外一則實例則可參考《守護進程接收終端輸入的一種變通性方法》一文。

     9. 使用Posix信號量可模擬互斥量和條件變量,並且一般更有優點。

     當函數sem_wait()和sem_post()用於線程內時,兩個調用間的區域就是所要保護的臨界區代碼;當用於線程間時,則與條件變量等效。

     此外,信號量還可用做資源計數器,即初始化信號量的值做爲某個資源當前可用的數量,使用時遞減釋放時遞增。這樣,原先一些保存隊列狀態的變量都再也不須要。

     最後,內核會記錄信號的存在,不會將信號丟失;而喚醒條件變量時若沒有線程在等待該條件變量,信號將被丟失。

相關文章
相關標籤/搜索