做者:Allen B. Downeyjavascript
原文:Chapter 10 Condition variableshtml
譯者:飛龍java
協議:CC BY-NC-SA 4.0nginx
像上一章所展現的那樣,許多簡單的同步問題均可以用互斥體解決。這一章中我會介紹一個更大的挑戰,著名的「生產者-消費者」問題,以及一個用於解決它的新工具,條件變量。git
在一些多線程的程序中,線程被組織用於執行不一樣的任務。一般它們使用隊列來相互通訊,其中一些線程叫作「生產者」,向隊列中放入數據,另外一些線程叫作「消費者」,從隊列取出數據。程序員
例如,在GUI應用中,可能有一個運行GUI的線程響應用戶事件,而其它線程負責處理用戶的請求。這裏,GUI線程可能將數據放入隊列中,而「後臺」線程從隊列中取出請求並執行。github
爲了支持這種組織,咱們須要一個「線程安全」的隊列實現,也就是說每一個線程均可以同時訪問隊列。咱們至少須要處理一個特殊狀況,隊列是空的,以及若是隊列的大小有限制,隊列是滿的。web
我會從一個非線程安全的簡單隊列開始,以後咱們會觀察其中的錯誤並修復它。這個示例的代碼在本書倉庫的queue
目錄中。queue.c
文件包含了一個環形緩衝區的基本實現。你能夠在環形緩衝區的維基百科查詢更多信息。數組
下面是結構體的定義:安全
typedef struct {
int *array;
int length;
int next_in;
int next_out;
} Queue;
array
是包含隊列元素的數組。在這個例子中,元素都是整數,可是一般它們都是一些結構體,包含用戶事件、工做項目以及其它。
length
是數組的長度,next_in
是數組的下標,用於索引下個元素應該添加到哪裏;與之類似, next_out
是應該被移除的下個元素的下標。
make_queue
爲這個結構體分配空間,而且初始化全部字段:
Queue *make_queue(int length)
{
Queue *queue = (Queue *) malloc(sizeof(Queue));
queue->length = length;
queue->array = (int *) malloc(length * sizeof(int));
queue->next_in = 0;
queue->next_out = 0;
return queue;
}
next_out
的初始值須要一些解釋。因爲隊列一開始爲空,沒有可移除的下一個元素,因此next_out
是無效的。next_out==next_in
是個特殊狀況,它表示隊列爲空,因此咱們能夠編寫:
int queue_empty(Queue *queue) {
return (queue->next_in == queue->next_out);
}
如今咱們可使用queue_push
向隊列裏面添加元素:
void queue_push(Queue *queue, int item) {
if (queue_full(queue)) {
perror_exit("queue is full");
}
queue->array[queue->next_in] = item;
queue->next_in = queue_incr(queue, queue->next_in);
}
若是隊列滿了,queue_push
打印出錯誤信息並退出,我以後會解釋queue_full
。
若是隊列沒有滿,queue_push
插入新元素,以後使用queue_incr
增長next_in
:
int queue_incr(Queue *queue, int i) {
return (i+1) % queue->length;
}
當索引i
到達隊列末尾時,它會轉換爲0。因而這樣就很微妙了。若是咱們持續向隊列添加元素,最後next_in
會遇上next_out
。可是若是next_in == next_out
咱們會錯誤地認爲隊列是空的。
爲了不這種狀況,咱們定義另外一種特殊狀況來表示隊列是滿的:
int queue_full(Queue *queue) {
return (queue_incr(queue, queue->next_in) == queue->next_out);
}
若是next_in
增長後與next_out
重合,那麼咱們若是添加新的元素,就會使隊列看起來是空的。因此咱們在「末尾」留出一個元素(要記住隊列的末尾可能位於任何地方,不必定是數組末尾)。
如今咱們能夠編寫queue_pop
,它移除並返回隊列的下一個元素:
int queue_pop(Queue *queue) {
if (queue_empty(queue)) {
perror_exit("queue is empty");
}
int item = queue->array[queue->next_out];
queue->next_out = queue_incr(queue, queue->next_out);
return item;
}
若是你嘗試從空隊列中彈出元素,queue_pop
會打印錯誤信息並退出。
如今讓咱們建立一些訪問這個隊列的線程。下面是生產者的代碼:
void *producer_entry(void *arg)
{
int i;
Shared *shared = (Shared *) arg;
for (i=0; i<QUEUE_LENGTH-1; i++) {
printf("adding item %d\n", i);
queue_push(shared->queue, i);
}
pthread_exit(NULL);
}
下面是消費者的代碼:
void *consumer_entry(void *arg)
{
int i;
int item;
Shared *shared = (Shared *) arg;
for (i=0; i<QUEUE_LENGTH-1; i++) {
item = queue_pop(shared->queue);
printf("consuming item %d\n", item);
}
pthread_exit(NULL);
}
下面是用於啓動線程並等待它們的主線程代碼:
int i;
pthread_t child[NUM_CHILDREN];
Shared *shared = make_shared();
child[0] = make_thread(producer_entry, shared);
child[1] = make_thread(consumer_entry, shared);
for (i=0; i<NUM_CHILDREN; i++) {
join_thread(child[i]);
}
最後,下面是包含隊列的共享結構:
typedef struct {
Queue *queue;
} Shared;
Shared *make_shared()
{
Shared *shared = check_malloc(sizeof(Shared));
shared->queue = make_queue(QUEUE_LENGTH);
return shared;
}
到目前爲止咱們所寫的代碼是一個好的開始,可是有以下幾種問題:
隊列的訪問不是線程安全的。不一樣的線程能同時訪問array
、next_in
和next_out
,而且會使隊列處於損壞的、「不一致」的狀態。
若是消費者首先被調度,它會發現隊列爲空,打印錯誤信息並退出。咱們應該阻塞住消費者,直到隊列非空。與之類似,咱們應該在隊列滿了的狀況下阻塞住生產者。
在下一節中,咱們會使用互斥體解決這一個問題。以後的章節中咱們會使用條件變量解決第二個問題。
咱們可使用互斥體使隊列線程安全。這個版本的代碼在queue_mutex.c
中。
首先咱們向隊列結構中添加一個互斥體指針:
typedef struct {
int *array;
int length;
int next_in;
int next_out;
Mutex *mutex; //-- this line is new
} Queue;
以後在make_queue
中初始化互斥體:
Queue *make_queue(int length)
{
Queue *queue = (Queue *) malloc(sizeof(Queue));
queue->length = length;
queue->array = (int *) malloc(length * sizeof(int));
queue->next_in = 0;
queue->next_out = 0;
queue->mutex = make_mutex(); //-- new
return queue;
}
接下來向queue_push
添加同步代碼:
void queue_push(Queue *queue, int item) {
mutex_lock(queue->mutex); //-- new
if (queue_full(queue)) {
mutex_unlock(queue->mutex); //-- new
perror_exit("queue is full");
}
queue->array[queue->next_in] = item;
queue->next_in = queue_incr(queue, queue->next_in);
mutex_unlock(queue->mutex); //-- new
}
在檢查隊列是否已滿以前,咱們須要鎖住互斥體。若是隊列是滿的,咱們須要在退出以前解鎖互斥體。不然線程應該保持互斥體鎖住,使其它線程不能前進。
queue_pop
的同步代碼與之類似:
int queue_pop(Queue *queue) {
mutex_lock(queue->mutex);
if (queue_empty(queue)) {
mutex_unlock(queue->mutex);
perror_exit("queue is empty");
}
int item = queue->array[queue->next_out];
queue->next_out = queue_incr(queue, queue->next_out);
mutex_unlock(queue->mutex);
return item;
}
要注意其它隊列函數,queue_full
、queue_empty
和queue_incr
都不須要鎖住互斥體。任何調用這些函數的線程都須要首先鎖住互斥體。這些要求是這些函數的接口文檔的一部分。
使用這些額外的代碼,隊列就線程安全了。若是你運行它,你不會看到任何的同步錯誤。可是彷佛消費者會在某個時間上退出,由於隊列是空的。或者生產者會因爲隊列是知足而退出。
下一步就是添加條件變量。
條件變量是條件相關的數據結構。它容許線程在某些條件變爲真以前被阻塞。例如,thread_push
可能但願檢查隊列是否已滿,若是是這樣,就在隊列未滿以前阻塞。因此咱們感興趣的「條件」就是「隊列未滿」。
與之類似,thread_pop
但願等待「隊列非空」的條件。
下面是咱們向代碼添加這些功能的方式。首先咱們向隊列結構中添加兩個條件變量:
typedef struct {
int *array;
int length;
int next_in;
int next_out;
Mutex *mutex;
Cond *nonempty; //-- new
Cond *nonfull; //-- new
} Queue;
以後在make_queue
中初始化它們:
Queue *make_queue(int length)
{
Queue *queue = (Queue *) malloc(sizeof(Queue));
queue->length = length;
queue->array = (int *) malloc(length * sizeof(int));
queue->next_in = 0;
queue->next_out = 0;
queue->mutex = make_mutex();
queue->nonempty = make_cond(); //-- new
queue->nonfull = make_cond(); //-- new
return queue;
}
如今在queue_pop
中,若是咱們發現隊列爲空,咱們不要退出,而是使用條件變量來阻塞:
int queue_pop(Queue *queue) {
mutex_lock(queue->mutex);
while (queue_empty(queue)) {
cond_wait(queue->nonempty, queue->mutex); //-- new
}
int item = queue->array[queue->next_out];
queue->next_out = queue_incr(queue, queue->next_out);
mutex_unlock(queue->mutex);
cond_signal(queue->nonfull); //-- new
return item;
}
cond_wait
有點複雜,因此讓咱們慢慢來。第一個參數是條件變量。這裏咱們須要等待的條件是「隊列非空」。第二個變量是保護隊列的互斥體。在你調用cond_wait
以前,你須要先鎖住互斥體,不然它不會生效。
當鎖住互斥體的線程調用cond_wait
時,它首先解鎖互斥體,以後阻塞。這很是重要。若是cond_wait
不在阻塞以前解鎖互斥體,其它線程就不能訪問隊列,不能添加任何物品,隊列會永遠爲空。
因此當消費者阻塞在nonempty
的時候,生產者也能夠運行。讓咱們來觀察生產者運行queue_push
時會發生什麼:
void queue_push(Queue *queue, int item) {
mutex_lock(queue->mutex);
while (queue_full(queue)) {
cond_wait(queue->nonfull, queue->mutex); //-- new
}
queue->array[queue->next_in] = item;
queue->next_in = queue_incr(queue, queue->next_in);
mutex_unlock(queue->mutex);
cond_signal(queue->nonempty); //-- new
}
讓咱們假設隊列如今未滿,因而生產者並不會調用cond_wait
也不會阻塞。它會向隊列添加新的元素並解鎖互斥體。可是在退出以前,它作了額外的一件事:它向nonempty
條件變量發送信號。
向條件變量發送更新好表示條件爲真,或者至少它可能爲真。若是沒有任何線程在等待條件變量,信號就不起做用。
若是有線程在等待條件變量,它們所有會從cond_wait
解除阻塞而且恢復執行。可是在被喚醒的進程從cond_wait
返回以前,它須要等待並再次鎖住互斥體。
如今咱們回到queue_pop
來觀察當線程從cond_wait
返回時會發生什麼。它會循環到while
語句的開頭,並再次檢查條件。我會在以後解釋其緣由,可是如今讓咱們假設條件爲真,也就是說隊列非空。
當線程從while
循環退出以後,咱們知道了兩件事情:(1)條件爲真,因此隊列中至少有一個物品,(2)互斥體是鎖住的,因此訪問隊列是安全的。
在移除物品以後,queue_pop
解鎖了互斥體,發送了隊列未滿的信號,以後退出。
在下一節我會向你展現個人Cond
的工做緣由,可是首先我想回答兩個常見問題:
爲何cond_wait
在while
循環中,而不是if
語句中?也就是說,爲何在從cond_wait
返回以後要再次檢查條件?
須要再次檢查條件的首要緣由就是信號攔截的可能性。假設線程A在等待`nonempty`,線程B向隊列添加元素,以後向`nonempty`發送信號。線程A被喚醒而且嘗試鎖住互斥體,可是在輪到它以前,邪惡的線程C插進來了,鎖住了互斥體,從隊列中彈出物品而且解鎖了互斥體。如今隊列再次爲空,可是線程A沒有被阻塞。線程A會鎖住互斥體而且從`cond_wait`返回。若是線程A再也不次檢查條件,它會嘗試從空隊列中彈出元素,可能會產生錯誤。
> 譯者注:有些條件變量的實現能夠每次只喚醒一個線程,好比Java對象的`notify`方法。這種狀況就可使用`if`。
當人們瞭解條件變量時,另外一個問題是「條件變量怎麼知道它關聯了哪一個條件?」
這一問題能夠理解,由於在`Cond`結構和有關條件之間沒有明顯的關聯。在它的使用方式中,關聯是隱性的。 下面是一種理解它的辦法:當你調用`cond_wait`時,`Cond`所關聯的條件爲假;當你調用`cond_signal`時它爲真。固然,可能有一些條件第一種狀況下爲真,第二種狀況下爲假。正確的狀況只在程序員的腦子中,因此它應該在文檔中有詳細的解釋。
我在上一節中使用的條件變量是pthread_cond_t
類型的包裝,它定義在POSIX線程API中。這很是相似於Mutex
,它是pthread_mutex_t
的包裝。兩個包裝都定義在utils.c
和utils.h
中。
下面是類型定義:
typedef pthread_cond_t Cond;
make_cond
分配空間,初始化條件變量,以後返回指針:
Cond *make_cond()
{
Cond *cond = check_malloc(sizeof(Cond));
int n = pthread_cond_init(cond, NULL);
if (n != 0) perror_exit("make_cond failed");
return cond;
}
下面是cond_wait
和cond_signal
的包裝:
void cond_wait(Cond *cond, Mutex *mutex) {
int n = pthread_cond_wait(cond, mutex);
if (n != 0) perror_exit("cond_wait failed");
}
void cond_signal(Cond *cond) {
int n = pthread_cond_signal(cond);
if (n != 0) perror_exit("cond_signal failed");
}
到這裏就應該沒有什麼意外的東西了。