在上一篇博客互斥量中,解決了線程如何互斥訪問臨界資源的問題。html
在開始本文以前,咱們先保留一個問題:爲何須要條件變量,若是隻有互斥量不能解決什麼問題?api
條件變量的數據類型是 pthread_cond_t
.app
初始化,銷燬 API 爲:函數
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); int pthread_cond_destroy(pthread_cond_t *cond);
函數原型:ui
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
做用:atom
The
pthread_cond_wait
function atomically blocks the current thread waiting on the condition variable specified bycond
, and releases the mutex specified bymutex
. The waiting thread unblocks only after another thread callspthread_cond_signal
, orpthread_cond_broadcast
with the same condition variable, and the current thread re-acquires the lock on mutex.線程—— Manual on MacOS.code
在條件變量 cond
上阻塞線程,加入 cond
的等待隊列,並釋放互斥量 mutex
. 若是其餘線程使用同一個條件變量 cond
調用了 pthread_cond_signal/broadcast
,喚醒的線程會從新得到互斥鎖 mutex
.htm
函數原型:blog
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
做用:與 pthread_cond_wait
相似,但該線程被喚醒的條件是其餘線程調用了 signal/broad
,或者系統時間到達了 abstime
。
函數原型:
int pthread_cond_signal(pthread_cond_t *cond)
做用:
The
pthread_cond_signal
function shall unblock at least one of the threads that are blocked no the specified condition variablecond
(if any threads are blocked oncond
).If more than one thread is blocked on a condition variable, the scheduling policy shall determine the order which threads are unblocked.
When each thread unblocked as a result of a
pthread_cond_broadcast()
orpthread_cond_signal()
returns from its call topthread_cond_wait()
orpthread_cond_timedwait()
, the thread shall own themutex
with which it calledpthread_cond_wait()
orpthread_cond_timedwait()
.The thread(s) that are unblocked shall contend for the
mutex
according to the scheduling policy (if applicable), and as if each had calledpthread_mutex_lock()
.The
pthread_cond_broadcast()
andpthread_cond_signal()
functions shall have no effect if there are no threads currently blocked oncond
.——Manual on Ubuntu.
喚醒一個在 cond
上等待的至少一個線程,若是 cond
上阻塞了多個線程,那麼將根據調度策略選取一個。
當被喚醒的線程從 wait/timedwait
函數返回,將從新得到 mutex
(但可能須要競爭,由於可能喚醒多個線程)。
函數原型:
int pthread_cond_broadcast(pthread_cond_t *cond);
做用:喚醒全部在 cond
上等待的線程。
又稱 PC (Producer - Consumer) 問題。
詳細問題定義能夠看:
具體要求:
buffer
實質上是一個隊列。
const int CAPACITY = 4; typedef struct { char items[CAPACITY]; int in, out; } buffer_t; void buffer_init(buffer_t *b) { b->in = b->out = 0; } int buffer_is_full(buffer_t *b) { return ((b->in + 1) % CAPACITY) == (b->out); } int buffer_is_empty(buffer_t *b) { return b->in == b->out; } void buffer_put_item(buffer_t *buf, char item) { buf->items[buf->in] = item; buf->in = (buf->in + 1) % CAPACITY; } char buffer_get_item(buffer_t *buf) { char item = buf->items[buf->out]; buf->out = (buf->out + 1) % CAPACITY; return item; }
const int CAPACITY = 4; // buffer 的容量 const int N = 8; // 依據題意,須要轉換 8 個字符 buffer_t buf1, buf2; pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; // 保證只有一個線程訪問 buf1 pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER; // 保證只有一個線程訪問 buf2 pthread_cond_t empty1 = PTHREAD_COND_INITIALIZER; pthread_cond_t empty2 = PTHREAD_COND_INITIALIZER; pthread_cond_t full1 = PTHREAD_COND_INITIALIZER; pthread_cond_t full2 = PTHREAD_COND_INITIALIZER;
幾個條件變量的做用以下:
empty1
表示當 buf1
爲空的時候,從 buf1
取數據的線程要在此條件變量上等待。full1
表示當 buf1
爲滿的時候,向 buf1
寫數據的線程要在此條件變量上等待。其餘同理。
代碼思路解析:
buf1
操做,首先寫一對 pthread_mutex_lock/unlock
,保證臨界代碼區內只有 producer
操做 buf1
;buf1
是滿的,那麼就將 producer
線程阻塞在條件變量 full1
上,釋放互斥量 mutex1
(雖然不能寫入,但要讓別的線程可以讀取 buf1
的數據);buf1
;buf1
一定不爲空,所以喚醒一個在 empty1
上等待的線程,最後釋放 mutex1
.void *producer(void *arg) { int i = 0; // can be while(true) for (; i < N; i++) { pthread_mutex_lock(&mutex1); while (buffer_is_full(&buf1)) pthread_cond_wait(&full1, &mutex1); buffer_put_item(&buf1, (char)('a' + i)); printf("Producer put [%c] in buffer1. \n", (char)('a' + i)); pthread_cond_signal(&empty1); pthread_mutex_unlock(&mutex1); } return NULL; }
思路與 producer
相似。
void *consumer(void *arg) { int i = 0; for (; i < N; i++) { pthread_mutex_lock(&mutex2); while (buffer_is_empty(&buf2)) pthread_cond_wait(&empty2, &mutex2); char item = buffer_get_item(&buf2); printf("\tConsumer get [%c] from buffer2. \n", item); pthread_cond_signal(&full2); pthread_mutex_unlock(&mutex2); } return NULL; }
這是 produer
和 consumer
的結合體。
void *calculator(void *arg) { int i = 0; char item; for (; i < N; i++) { pthread_mutex_lock(&mutex1); while (buffer_is_empty(&buf1)) pthread_cond_wait(&empty1, &mutex1); item = buffer_get_item(&buf1); pthread_cond_signal(&full1); pthread_mutex_unlock(&mutex1); pthread_mutex_lock(&mutex2); while (buffer_is_full(&buf2)) pthread_cond_wait(&full2, &mutex2); buffer_put_item(&buf2, item - 'a' + 'A'); pthread_cond_signal(&empty2); pthread_mutex_unlock(&mutex2); } return NULL; }
int main() { pthread_t calc, prod, cons; // init buffer buffer_init(&buf1), buffer_init(&buf2); // create threads pthread_create(&calc, NULL, calculator, NULL); pthread_create(&prod, NULL, producer, NULL); pthread_create(&cons, NULL, consumer, NULL); pthread_join(calc, NULL); pthread_join(prod, NULL); pthread_join(cons, NULL); // destroy mutex pthread_mutex_destroy(&mutex1), pthread_mutex_destroy(&mutex2); // destroy cond pthread_cond_destroy(&empty1), pthread_cond_destroy(&empty2); pthread_cond_destroy(&full1), pthread_cond_destroy(&full2); }
從上面的 Producer - Consumer 問題能夠看出,mutex
僅僅能表達「線程可否得到訪問臨界資源的權限」這一層面的信息,而不能表達「臨界資源是否足夠」這個問題。
假設沒有條件變量,producer
線程得到了 buf1
的訪問權限( buf1
的空閒位置對於 producer
來講是一種資源),但若是 buf1
是滿的,producer
就無法對 buf1
操做。
對於 producer
來講,它不能佔用訪問 buf1
的互斥鎖,但卻又什麼都不作。所以,它只能釋放互斥鎖 mutex
,讓別的線程可以訪問 buf1
,並取走數據,等到 buf1
有空閒位置,producer
再對 buf1
寫數據。用僞代碼表述以下:
pthread_mutex_lock(&mutex1); if (buffer_is_full(&buf1)) { pthread_mutex_unlock(&mutex1); wait_until_not_full(&buf1); pthread_mutex_lock(&mutex); } buffer_put_item(&buf1, item); pthread_mutex_unlock(&mutex1);
而條件變量其實是對上述一系列操做的一種封裝。
在上面代碼中,使用 pthread_cond_wait
的時候,咱們是經過這樣的方式的:
while (...) pthread_cond_wait(&cond, &mutex);
但這裏爲何不是 if
而是 while
呢?
參考文章:http://www.javashuo.com/article/p-qkxrvctp-nv.html
#include <pthread.h> struct msg { struct msg *m_next; /* value...*/ }; struct msg* workq; pthread_cond_t qready = PTHREAD_COND_INITIALIZER; pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; void process_msg() { struct msg* mp; for (;;) { pthread_mutex_lock(&qlock); while (workq == NULL) { pthread_cond_wait(&qread, &qlock); } mq = workq; workq = mp->m_next; pthread_mutex_unlock(&qlock); /* now process the message mp */ } } void enqueue_msg(struct msg* mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); /** 此時第三個線程在signal以前,執行了process_msg,恰好把mp元素拿走*/ pthread_cond_signal(&qready); /** 此時執行signal, 在pthread_cond_wait等待的線程被喚醒, 可是mp元素已經被另一個線程拿走,因此,workq仍是NULL ,所以須要繼續等待*/ }
代碼解析:
這裏
process_msg
至關於消費者,enqueue_msg
至關於生產者,struct msg* workq
做爲緩衝隊列。在
process_msg
中使用while (workq == NULL)
循環判斷條件,這裏主要是由於在enqueue_msg
中unlock
以後才喚醒等待的線程,會出現上述註釋出現的狀況,形成workq==NULL
,所以須要繼續等待。可是若是將
pthread_cond_signal
移到pthread_mutex_unlock()
以前執行,則會避免這種競爭,在unlock
以後,會首先喚醒pthread_cond_wait
的線程,進而workq != NULL
老是成立。所以建議使用
while
循環進行驗證,以便可以容忍這種競爭。
pthread_cond_signal
在多核處理器上可能同時喚醒多個線程。
//thread 1 while(0<x<10) pthread_cond_wait(...); //thread 2 while(5<x<15) pthread_cond_wait(...);
若是某段時間內 x == 8
,那麼兩個線程相繼進入等待。
而後第三個線程,進行了以下操做:
x = 12 pthread_cond_signal(...) // or call pthread_cond_broadcast()
那麼可能線程 一、2 都被喚醒了(由於 signal
可能喚醒多個),可是,此時線程 1 仍然不知足 while
,須要再次判斷,而後進入下一次等待。
其次,即便 signal
只喚醒一個,上面咱們提到,若是有多個線程都阻塞在同一個 cond
上,signal
會根據調度策略選取一個喚醒,那若是根據調度策略,喚醒的是線程 1 ,顯然它還須要再一次判斷是否須要繼續等待(不然就違背了 pthead_cond_wait
的本意)。