操做系統思考 第十章 條件變量

第十章 條件變量

做者:Allen B. Downeyjavascript

原文:Chapter 10 Condition variableshtml

譯者:飛龍java

協議:CC BY-NC-SA 4.0nginx

像上一章所展現的那樣,許多簡單的同步問題均可以用互斥體解決。這一章中我會介紹一個更大的挑戰,著名的「生產者-消費者」問題,以及一個用於解決它的新工具,條件變量。git

10.1 工做隊列

在一些多線程的程序中,線程被組織用於執行不一樣的任務。一般它們使用隊列來相互通訊,其中一些線程叫作「生產者」,向隊列中放入數據,另外一些線程叫作「消費者」,從隊列取出數據。程序員

例如,在GUI應用中,可能有一個運行GUI的線程響應用戶事件,而其它線程負責處理用戶的請求。這裏,GUI線程可能將數據放入隊列中,而「後臺」線程從隊列中取出請求並執行。github

爲了支持這種組織,咱們須要一個「線程安全」的隊列實現,也就是說每一個線程均可以同時訪問隊列。咱們至少須要處理一個特殊狀況,隊列是空的,以及若是隊列的大小有限制,隊列是滿的。web

我會從一個非線程安全的簡單隊列開始,以後咱們會觀察其中的錯誤並修復它。這個示例的代碼在本書倉庫的queue目錄中。queue.c文件包含了一個環形緩衝區的基本實現。你能夠在環形緩衝區的維基百科查詢更多信息。數組

下面是結構體的定義:安全

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
} Queue;

array是包含隊列元素的數組。在這個例子中,元素都是整數,可是一般它們都是一些結構體,包含用戶事件、工做項目以及其它。

length是數組的長度,next_in是數組的下標,用於索引下個元素應該添加到哪裏;與之類似, next_out是應該被移除的下個元素的下標。

make_queue爲這個結構體分配空間,而且初始化全部字段:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  return queue;
}

next_out的初始值須要一些解釋。因爲隊列一開始爲空,沒有可移除的下一個元素,因此next_out是無效的。next_out==next_in是個特殊狀況,它表示隊列爲空,因此咱們能夠編寫:

int queue_empty(Queue *queue) {
  return (queue->next_in == queue->next_out);
}

如今咱們可使用queue_push向隊列裏面添加元素:

void queue_push(Queue *queue, int item) {
  if (queue_full(queue)) {
    perror_exit("queue is full");
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
}

若是隊列滿了,queue_push打印出錯誤信息並退出,我以後會解釋queue_full

若是隊列沒有滿,queue_push插入新元素,以後使用queue_incr增長next_in

int queue_incr(Queue *queue, int i) {
  return (i+1) % queue->length;
}

當索引i到達隊列末尾時,它會轉換爲0。因而這樣就很微妙了。若是咱們持續向隊列添加元素,最後next_in會遇上next_out。可是若是next_in == next_out咱們會錯誤地認爲隊列是空的。

爲了不這種狀況,咱們定義另外一種特殊狀況來表示隊列是滿的:

int queue_full(Queue *queue) {
  return (queue_incr(queue, queue->next_in) == queue->next_out);
}

若是next_in增長後與next_out重合,那麼咱們若是添加新的元素,就會使隊列看起來是空的。因此咱們在「末尾」留出一個元素(要記住隊列的末尾可能位於任何地方,不必定是數組末尾)。

如今咱們能夠編寫queue_pop,它移除並返回隊列的下一個元素:

int queue_pop(Queue *queue) {
  if (queue_empty(queue)) {
    perror_exit("queue is empty");
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  return item;
}

若是你嘗試從空隊列中彈出元素,queue_pop會打印錯誤信息並退出。

10.2 生產者和消費者

如今讓咱們建立一些訪問這個隊列的線程。下面是生產者的代碼:

void *producer_entry(void *arg)
{
  int i;
  Shared *shared = (Shared *) arg;

  for (i=0; i<QUEUE_LENGTH-1; i++) {
    printf("adding item %d\n", i);
    queue_push(shared->queue, i);
  }
  pthread_exit(NULL);
}

下面是消費者的代碼:

void *consumer_entry(void *arg)
{
  int i;
  int item;
  Shared *shared = (Shared *) arg;

  for (i=0; i<QUEUE_LENGTH-1; i++) {
    item = queue_pop(shared->queue);
    printf("consuming item %d\n", item);
  }
  pthread_exit(NULL);
}

下面是用於啓動線程並等待它們的主線程代碼:

int i;
pthread_t child[NUM_CHILDREN];

Shared *shared = make_shared();

child[0] = make_thread(producer_entry, shared);
child[1] = make_thread(consumer_entry, shared);

for (i=0; i<NUM_CHILDREN; i++) {
    join_thread(child[i]);
}

最後,下面是包含隊列的共享結構:

typedef struct {
  Queue *queue;
} Shared;

Shared *make_shared()
{
  Shared *shared = check_malloc(sizeof(Shared));
  shared->queue = make_queue(QUEUE_LENGTH);
  return shared;
}

到目前爲止咱們所寫的代碼是一個好的開始,可是有以下幾種問題:

  • 隊列的訪問不是線程安全的。不一樣的線程能同時訪問arraynext_innext_out,而且會使隊列處於損壞的、「不一致」的狀態。

  • 若是消費者首先被調度,它會發現隊列爲空,打印錯誤信息並退出。咱們應該阻塞住消費者,直到隊列非空。與之類似,咱們應該在隊列滿了的狀況下阻塞住生產者。

在下一節中,咱們會使用互斥體解決這一個問題。以後的章節中咱們會使用條件變量解決第二個問題。

10.3 互斥體

咱們可使用互斥體使隊列線程安全。這個版本的代碼在queue_mutex.c中。

首先咱們向隊列結構中添加一個互斥體指針:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
  Mutex *mutex;          //-- this line is new
} Queue;

以後在make_queue中初始化互斥體:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  queue->mutex = make_mutex();   //-- new
  return queue;
}

接下來向queue_push添加同步代碼:

void queue_push(Queue *queue, int item) {
  mutex_lock(queue->mutex);   //-- new
  if (queue_full(queue)) {
    mutex_unlock(queue->mutex);   //-- new
    perror_exit("queue is full");
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
  mutex_unlock(queue->mutex);   //-- new
}

在檢查隊列是否已滿以前,咱們須要鎖住互斥體。若是隊列是滿的,咱們須要在退出以前解鎖互斥體。不然線程應該保持互斥體鎖住,使其它線程不能前進。

queue_pop的同步代碼與之類似:

int queue_pop(Queue *queue) {
  mutex_lock(queue->mutex);
  if (queue_empty(queue)) {
    mutex_unlock(queue->mutex);
    perror_exit("queue is empty");
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  mutex_unlock(queue->mutex);
  return item;
}

要注意其它隊列函數,queue_fullqueue_emptyqueue_incr都不須要鎖住互斥體。任何調用這些函數的線程都須要首先鎖住互斥體。這些要求是這些函數的接口文檔的一部分。

使用這些額外的代碼,隊列就線程安全了。若是你運行它,你不會看到任何的同步錯誤。可是彷佛消費者會在某個時間上退出,由於隊列是空的。或者生產者會因爲隊列是知足而退出。

下一步就是添加條件變量。

10.4 條件變量

條件變量是條件相關的數據結構。它容許線程在某些條件變爲真以前被阻塞。例如,thread_push可能但願檢查隊列是否已滿,若是是這樣,就在隊列未滿以前阻塞。因此咱們感興趣的「條件」就是「隊列未滿」。

與之類似,thread_pop但願等待「隊列非空」的條件。

下面是咱們向代碼添加這些功能的方式。首先咱們向隊列結構中添加兩個條件變量:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
  Mutex *mutex;
  Cond *nonempty;   //-- new
  Cond *nonfull;    //-- new
} Queue;

以後在make_queue中初始化它們:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  queue->mutex = make_mutex();
  queue->nonempty = make_cond();   //-- new
  queue->nonfull = make_cond();    //-- new
  return queue;
}

如今在queue_pop中,若是咱們發現隊列爲空,咱們不要退出,而是使用條件變量來阻塞:

int queue_pop(Queue *queue) {
  mutex_lock(queue->mutex);
  while (queue_empty(queue)) {
    cond_wait(queue->nonempty, queue->mutex);  //-- new
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  mutex_unlock(queue->mutex);
  cond_signal(queue->nonfull);   //-- new
  return item;
}

cond_wait有點複雜,因此讓咱們慢慢來。第一個參數是條件變量。這裏咱們須要等待的條件是「隊列非空」。第二個變量是保護隊列的互斥體。在你調用cond_wait以前,你須要先鎖住互斥體,不然它不會生效。

當鎖住互斥體的線程調用cond_wait時,它首先解鎖互斥體,以後阻塞。這很是重要。若是cond_wait不在阻塞以前解鎖互斥體,其它線程就不能訪問隊列,不能添加任何物品,隊列會永遠爲空。

因此當消費者阻塞在nonempty的時候,生產者也能夠運行。讓咱們來觀察生產者運行queue_push時會發生什麼:

void queue_push(Queue *queue, int item) {
  mutex_lock(queue->mutex);
  while (queue_full(queue)) {
    cond_wait(queue->nonfull, queue->mutex);    //-- new
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
  mutex_unlock(queue->mutex);
  cond_signal(queue->nonempty);  //-- new
}

讓咱們假設隊列如今未滿,因而生產者並不會調用cond_wait也不會阻塞。它會向隊列添加新的元素並解鎖互斥體。可是在退出以前,它作了額外的一件事:它向nonempty條件變量發送信號。

向條件變量發送更新好表示條件爲真,或者至少它可能爲真。若是沒有任何線程在等待條件變量,信號就不起做用。

若是有線程在等待條件變量,它們所有會從cond_wait解除阻塞而且恢復執行。可是在被喚醒的進程從cond_wait返回以前,它須要等待並再次鎖住互斥體。

如今咱們回到queue_pop來觀察當線程從cond_wait返回時會發生什麼。它會循環到while語句的開頭,並再次檢查條件。我會在以後解釋其緣由,可是如今讓咱們假設條件爲真,也就是說隊列非空。

當線程從while循環退出以後,咱們知道了兩件事情:(1)條件爲真,因此隊列中至少有一個物品,(2)互斥體是鎖住的,因此訪問隊列是安全的。

在移除物品以後,queue_pop解鎖了互斥體,發送了隊列未滿的信號,以後退出。

在下一節我會向你展現個人Cond的工做緣由,可是首先我想回答兩個常見問題:

  • 爲何cond_waitwhile循環中,而不是if語句中?也就是說,爲何在從cond_wait返回以後要再次檢查條件?

    須要再次檢查條件的首要緣由就是信號攔截的可能性。假設線程A在等待`nonempty`,線程B向隊列添加元素,以後向`nonempty`發送信號。線程A被喚醒而且嘗試鎖住互斥體,可是在輪到它以前,邪惡的線程C插進來了,鎖住了互斥體,從隊列中彈出物品而且解鎖了互斥體。如今隊列再次爲空,可是線程A沒有被阻塞。線程A會鎖住互斥體而且從`cond_wait`返回。若是線程A再也不次檢查條件,它會嘗試從空隊列中彈出元素,可能會產生錯誤。
    
    > 譯者注:有些條件變量的實現能夠每次只喚醒一個線程,好比Java對象的`notify`方法。這種狀況就可使用`if`。
  • 當人們瞭解條件變量時,另外一個問題是「條件變量怎麼知道它關聯了哪一個條件?」

    這一問題能夠理解,由於在`Cond`結構和有關條件之間沒有明顯的關聯。在它的使用方式中,關聯是隱性的。
    
    下面是一種理解它的辦法:當你調用`cond_wait`時,`Cond`所關聯的條件爲假;當你調用`cond_signal`時它爲真。固然,可能有一些條件第一種狀況下爲真,第二種狀況下爲假。正確的狀況只在程序員的腦子中,因此它應該在文檔中有詳細的解釋。

10.5 條件變量的實現

我在上一節中使用的條件變量是pthread_cond_t類型的包裝,它定義在POSIX線程API中。這很是相似於Mutex,它是pthread_mutex_t的包裝。兩個包裝都定義在utils.cutils.h中。

下面是類型定義:

typedef pthread_cond_t Cond;

make_cond分配空間,初始化條件變量,以後返回指針:

Cond *make_cond()
{
  Cond *cond = check_malloc(sizeof(Cond)); 
  int n = pthread_cond_init(cond, NULL);
  if (n != 0) perror_exit("make_cond failed");
 
  return cond;
}

下面是cond_waitcond_signal的包裝:

 
 
 
 
<button href="javascript:void(0);" _xhe_href="javascript:void(0);" class="copyCode btn btn-xs" data-clipboard-text="" void="" cond_wait(cond="" *cond,="" mutex="" *mutex)"="" data-toggle="tooltip" data-placement="bottom" title="" style="color: rgb(255, 255, 255); font-style: inherit; font-variant: inherit; font-stretch: inherit; font-size: 12px; line-height: 1.5; font-family: inherit; margin: 0px 0px 0px 5px; overflow: visible; cursor: pointer; vertical-align: middle; border: 1px solid transparent; white-space: nowrap; padding-right: 5px; padding-left: 5px; border-radius: 3px; -webkit-user-select: none; box-shadow: rgba(0, 0, 0, 0.0980392) 0px 1px 2px; background-image: none; background-color: rgba(0, 0, 0, 0.74902);">複製
void cond_wait(Cond *cond, Mutex *mutex) { int n = pthread_cond_wait(cond, mutex); if (n != 0) perror_exit("cond_wait failed"); } void cond_signal(Cond *cond) { int n = pthread_cond_signal(cond); if (n != 0) perror_exit("cond_signal failed"); }

到這裏就應該沒有什麼意外的東西了。

相關文章
相關標籤/搜索