pthread 條件變量

在上一篇博客互斥量中,解決了線程如何互斥訪問臨界資源的問題。html

在開始本文以前,咱們先保留一個問題:爲何須要條件變量,若是隻有互斥量不能解決什麼問題?api

API

init/destroy

條件變量的數據類型是 pthread_cond_t .app

初始化,銷燬 API 爲:函數

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);

pthread_cond_wait

函數原型:ui

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

做用:atom

The pthread_cond_wait function atomically blocks the current thread waiting on the condition variable specified by cond, and releases the mutex specified by mutex. The waiting thread unblocks only after another thread calls pthread_cond_signal, or pthread_cond_broadcast with the same condition variable, and the current thread re-acquires the lock on mutex.線程

—— Manual on MacOS.code

在條件變量 cond 上阻塞線程,加入 cond 的等待隊列,並釋放互斥量 mutex . 若是其餘線程使用同一個條件變量 cond 調用了 pthread_cond_signal/broadcast ,喚醒的線程會從新得到互斥鎖 mutex .htm

pthread_cond_timedwait

函數原型:blog

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

做用:與 pthread_cond_wait 相似,但該線程被喚醒的條件是其餘線程調用了 signal/broad ,或者系統時間到達了 abstime

pthread_cond_signal

函數原型:

int pthread_cond_signal(pthread_cond_t *cond)

做用:

The pthread_cond_signal function shall unblock at least one of the threads that are blocked no the specified condition variable cond (if any threads are blocked on cond).

If more than one thread is blocked on a condition variable, the scheduling policy shall determine the order which threads are unblocked.

When each thread unblocked as a result of a pthread_cond_broadcast() or pthread_cond_signal() returns from its call to pthread_cond_wait() or pthread_cond_timedwait(), the thread shall own the mutex with which it called pthread_cond_wait() or pthread_cond_timedwait().

The thread(s) that are unblocked shall contend for the mutex according to the scheduling policy (if applicable), and as if each had called pthread_mutex_lock().

The pthread_cond_broadcast() and pthread_cond_signal() functions shall have no effect if there are no threads currently blocked on cond.

——Manual on Ubuntu.

喚醒一個在 cond 上等待的至少一個線程,若是 cond 上阻塞了多個線程,那麼將根據調度策略選取一個。

當被喚醒的線程從 wait/timedwait 函數返回,將從新得到 mutex (但可能須要競爭,由於可能喚醒多個線程)。

pthread_cond_broadcast

函數原型:

int pthread_cond_broadcast(pthread_cond_t *cond);

做用:喚醒全部在 cond 上等待的線程。

生產者消費者問題

又稱 PC (Producer - Consumer) 問題。

詳細問題定義能夠看:

具體要求:

  • 系統中有3個線程:生產者、計算者、消費者
  • 系統中有2個容量爲4的緩衝區:buffer一、buffer2
  • 生產者生產'a'、'b'、'c'、‘d'、'e'、'f'、'g'、'h'八個字符,放入到buffer1; 計算者從buffer1取出字符,將小寫字符轉換爲大寫字符,放入到buffer2
  • 消費者從buffer2取出字符,將其打印到屏幕上

buffer 的定義

buffer 實質上是一個隊列。

const int CAPACITY = 4;
typedef struct
{
    char items[CAPACITY];
    int in, out;
} buffer_t;
void buffer_init(buffer_t *b) { b->in = b->out = 0; }
int buffer_is_full(buffer_t *b) { return ((b->in + 1) % CAPACITY) == (b->out); }
int buffer_is_empty(buffer_t *b) { return b->in == b->out; }
void buffer_put_item(buffer_t *buf, char item)
{
    buf->items[buf->in] = item;
    buf->in = (buf->in + 1) % CAPACITY;
}
char buffer_get_item(buffer_t *buf)
{
    char item = buf->items[buf->out];
    buf->out = (buf->out + 1) % CAPACITY;
    return item;
}

一些全局變量

const int CAPACITY = 4;  // buffer 的容量
const int N = 8;         // 依據題意,須要轉換 8 個字符
buffer_t buf1, buf2;
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;  // 保證只有一個線程訪問 buf1
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;  // 保證只有一個線程訪問 buf2
pthread_cond_t empty1 = PTHREAD_COND_INITIALIZER;
pthread_cond_t empty2 = PTHREAD_COND_INITIALIZER;
pthread_cond_t full1 = PTHREAD_COND_INITIALIZER;
pthread_cond_t full2 = PTHREAD_COND_INITIALIZER;

幾個條件變量的做用以下:

  • empty1 表示當 buf1 爲空的時候,從 buf1 取數據的線程要在此條件變量上等待。
  • full1 表示當 buf1 爲滿的時候,向 buf1 寫數據的線程要在此條件變量上等待。

其餘同理。

producer

代碼思路解析:

  • 由於要對 buf1 操做,首先寫一對 pthread_mutex_lock/unlock ,保證臨界代碼區內只有 producer 操做 buf1
  • 若是 buf1 是滿的,那麼就將 producer 線程阻塞在條件變量 full1 上,釋放互斥量 mutex1 (雖然不能寫入,但要讓別的線程可以讀取 buf1 的數據);
  • 進入臨界區,把數據寫入 buf1
  • 離開臨界區,由於寫入了一次數據,buf1 一定不爲空,所以喚醒一個在 empty1 上等待的線程,最後釋放 mutex1 .
void *producer(void *arg)
{
    int i = 0;
    // can be while(true)
    for (; i < N; i++)
    {
        pthread_mutex_lock(&mutex1);
        while (buffer_is_full(&buf1))
            pthread_cond_wait(&full1, &mutex1);
        buffer_put_item(&buf1, (char)('a' + i));
        printf("Producer put [%c] in buffer1. \n", (char)('a' + i));
        pthread_cond_signal(&empty1);
        pthread_mutex_unlock(&mutex1);
    }
    return NULL;
}

consumer

思路與 producer 相似。

void *consumer(void *arg)
{
    int i = 0;
    for (; i < N; i++)
    {
        pthread_mutex_lock(&mutex2);
        while (buffer_is_empty(&buf2))
            pthread_cond_wait(&empty2, &mutex2);
        char item = buffer_get_item(&buf2);
        printf("\tConsumer get [%c] from buffer2. \n", item);
        pthread_cond_signal(&full2);
        pthread_mutex_unlock(&mutex2);
    }
    return NULL;
}

calculator

這是 produerconsumer 的結合體。

void *calculator(void *arg)
{
    int i = 0;
    char item;
    for (; i < N; i++)
    {
        pthread_mutex_lock(&mutex1);
        while (buffer_is_empty(&buf1))
            pthread_cond_wait(&empty1, &mutex1);
        item = buffer_get_item(&buf1);
        pthread_cond_signal(&full1);
        pthread_mutex_unlock(&mutex1);

        pthread_mutex_lock(&mutex2);
        while (buffer_is_full(&buf2))
            pthread_cond_wait(&full2, &mutex2);
        buffer_put_item(&buf2, item - 'a' + 'A');
        pthread_cond_signal(&empty2);
        pthread_mutex_unlock(&mutex2);
    }
    return NULL;
}

main 函數

int main()
{
    pthread_t calc, prod, cons;
    // init buffer
    buffer_init(&buf1), buffer_init(&buf2);
	// create threads
    pthread_create(&calc, NULL, calculator, NULL);
    pthread_create(&prod, NULL, producer, NULL);
    pthread_create(&cons, NULL, consumer, NULL);
    pthread_join(calc, NULL);
    pthread_join(prod, NULL);
    pthread_join(cons, NULL);
    // destroy mutex
    pthread_mutex_destroy(&mutex1), pthread_mutex_destroy(&mutex2);
    // destroy cond
    pthread_cond_destroy(&empty1), pthread_cond_destroy(&empty2);
    pthread_cond_destroy(&full1), pthread_cond_destroy(&full2);
}

爲何須要條件變量

從上面的 Producer - Consumer 問題能夠看出,mutex 僅僅能表達「線程可否得到訪問臨界資源的權限」這一層面的信息,而不能表達「臨界資源是否足夠」這個問題。

假設沒有條件變量,producer 線程得到了 buf1 的訪問權限( buf1 的空閒位置對於 producer 來講是一種資源),但若是 buf1 是滿的,producer 就無法對 buf1 操做。

對於 producer 來講,它不能佔用訪問 buf1 的互斥鎖,但卻又什麼都不作。所以,它只能釋放互斥鎖 mutex ,讓別的線程可以訪問 buf1 ,並取走數據,等到 buf1 有空閒位置,producer 再對 buf1 寫數據。用僞代碼表述以下:

pthread_mutex_lock(&mutex1);
if (buffer_is_full(&buf1))
{
    pthread_mutex_unlock(&mutex1);
    wait_until_not_full(&buf1);
    pthread_mutex_lock(&mutex);
}
buffer_put_item(&buf1, item);
pthread_mutex_unlock(&mutex1);

而條件變量其實是對上述一系列操做的一種封裝。

爲何是 while

在上面代碼中,使用 pthread_cond_wait 的時候,咱們是經過這樣的方式的:

while (...)
    pthread_cond_wait(&cond, &mutex);

但這裏爲何不是 if 而是 while 呢?

參考文章:http://www.javashuo.com/article/p-qkxrvctp-nv.html

解釋 1

#include <pthread.h>
struct msg {
  struct msg *m_next;
  /* value...*/
};
 
struct msg* workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
 
void
process_msg() {
  struct msg* mp;
  for (;;) {
    pthread_mutex_lock(&qlock);
    while (workq == NULL) {
      pthread_cond_wait(&qread, &qlock);
    }
    mq = workq;
    workq = mp->m_next;
    pthread_mutex_unlock(&qlock);
    /* now process the message mp */
  }
}
 
void
enqueue_msg(struct msg* mp) {
    pthread_mutex_lock(&qlock);
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    /** 此時第三個線程在signal以前,執行了process_msg,恰好把mp元素拿走*/
    pthread_cond_signal(&qready);
    /** 此時執行signal, 在pthread_cond_wait等待的線程被喚醒,
        可是mp元素已經被另一個線程拿走,因此,workq仍是NULL ,所以須要繼續等待*/
}

代碼解析:

這裏 process_msg 至關於消費者,enqueue_msg 至關於生產者,struct msg* workq 做爲緩衝隊列。

process_msg 中使用 while (workq == NULL) 循環判斷條件,這裏主要是由於在 enqueue_msgunlock 以後才喚醒等待的線程,會出現上述註釋出現的狀況,形成 workq==NULL,所以須要繼續等待。

可是若是將 pthread_cond_signal 移到 pthread_mutex_unlock() 以前執行,則會避免這種競爭,在 unlock 以後,會首先喚醒 pthread_cond_wait 的線程,進而 workq != NULL 老是成立。

所以建議使用 while 循環進行驗證,以便可以容忍這種競爭。

解釋 2

pthread_cond_signal 在多核處理器上可能同時喚醒多個線程。

//thread 1
while(0<x<10)
    pthread_cond_wait(...);

//thread 2
while(5<x<15)
    pthread_cond_wait(...);

若是某段時間內 x == 8,那麼兩個線程相繼進入等待。

而後第三個線程,進行了以下操做:

x = 12
pthread_cond_signal(...) 
// or call pthread_cond_broadcast()

那麼可能線程 一、2 都被喚醒了(由於 signal 可能喚醒多個),可是,此時線程 1 仍然不知足 while,須要再次判斷,而後進入下一次等待。

其次,即便 signal 只喚醒一個,上面咱們提到,若是有多個線程都阻塞在同一個 cond 上,signal 會根據調度策略選取一個喚醒,那若是根據調度策略,喚醒的是線程 1 ,顯然它還須要再一次判斷是否須要繼續等待(不然就違背了 pthead_cond_wait 的本意)。

相關文章
相關標籤/搜索