請參看一個線程等待某種事件發生html
1,互斥鎖的初始化,有如下2種方式。linux
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2,互斥鎖的銷燬函數
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
3,加鎖和解鎖測試
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
4,條件變量的2個函數線程
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_wait:rest
調用此函數時點的處理:code
1,給互斥鎖解鎖。htm
2,把調用此函數的線程投入睡眠,直到另外某個線程就本條件變量調用pthread_cond_signal。blog
被喚醒後的處理:返回前從新給互斥鎖加鎖。事件
pthread_cond_signal:喚醒調用pthread_cond_wait函數的線程
條件變量一般用於生產者和消費者模式。
什麼是生成者和消費者模式?
版本1:全部生產者線程是並行執行的,消費者線程是等待全部的生產者線性執行結束後,消費者線程纔開始執行。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #define MAXITEM 100000000 #define MAXTHREAD 100 #define min(x,y) ( x>y?y:x ) int nitem; struct { pthread_mutex_t mutex; int buf[MAXITEM]; int idx; int val; }shared = { PTHREAD_MUTEX_INITIALIZER }; void* produce(void*); void* consume(void*); int main(int argc, char** argv){ int i; int nthreads; int count[MAXTHREAD]; pthread_t tid_produce[MAXTHREAD], tid_consume; if(argc != 3){ printf("arg error\n"); return 1; } nitem = min(MAXITEM,atoi(argv[1])); nthreads = min(MAXTHREAD, atoi(argv[2])); for(i = 0; i < nthreads; ++i){ count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } for(i = 0; i < nthreads; ++i){ pthread_join(tid_produce[i], NULL); printf("cout[%d] = %d\n", i, count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); pthread_join(tid_consume, NULL); return 0; } void* produce(void* arg){ while(1){ pthread_mutex_lock(&shared.mutex); if(shared.idx >= nitem){ pthread_mutex_unlock(&shared.mutex); return NULL; } shared.buf[shared.idx] = shared.val; shared.idx++; shared.val++; pthread_mutex_unlock(&shared.mutex); *((int*)arg) +=1; } } void* consume(void* arg){ int i; for(i = 0; i < nitem; ++i){ if(shared.buf[i] != i){ printf("buf[%d] = %d\n", i, shared.buf[i]); } } }
版本2:全部生產者線程和消費者線程都是並行執行的。這時會有個問題,就是消費者線程被先執行的狀況下,生產者線程尚未生產數據,這時消費者線程就只能循環給互斥鎖解鎖又上鎖。這成爲輪轉(spinning)或者輪詢(polling),是一種多CPU時間的浪費。咱們也能夠睡眠很短的一段時間,可是不知道睡多久。這時,條件變量就登場了。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #define MAXITEM 100000000 #define MAXTHREAD 100 #define min(x,y) ( x>y?y:x ) int nitem; struct { pthread_mutex_t mutex; int buf[MAXITEM]; int idx; int val; }shared = { PTHREAD_MUTEX_INITIALIZER }; void* produce(void*); void* consume(void*); int main(int argc, char** argv){ int i; int nthreads; int count[MAXTHREAD]; pthread_t tid_produce[MAXTHREAD], tid_consume; if(argc != 3){ printf("arg error\n"); return 1; } nitem = min(MAXITEM,atoi(argv[1])); nthreads = min(MAXTHREAD, atoi(argv[2])); for(i = 0; i < nthreads; ++i){ count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); for(i = 0; i < nthreads; ++i){ pthread_join(tid_produce[i], NULL); printf("cout[%d] = %d\n", i, count[i]); } pthread_join(tid_consume, NULL); return 0; } void* produce(void* arg){ while(1){ pthread_mutex_lock(&shared.mutex); if(shared.idx >= nitem){ pthread_mutex_unlock(&shared.mutex); return NULL; } shared.buf[shared.idx] = shared.val; shared.idx++; shared.val++; pthread_mutex_unlock(&shared.mutex); *((int*)arg) +=1; } } void consume_wait(int i){ while(1){ pthread_mutex_lock(&shared.mutex); if(i < shared.idx){ pthread_mutex_unlock(&shared.mutex); return; } pthread_mutex_unlock(&shared.mutex); } } void* consume(void* arg){ int i; for(i = 0; i < nitem; ++i){ consume_wait(i); if(shared.buf[i] != i){ printf("buf[%d] = %d\n", i, shared.buf[i]); } } return NULL; }
版本3:全部生產者線程和消費者線程都是並行執行的。解決版本2的輪詢問題。使用條件變量。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #define MAXITEM 100000000 #define MAXTHREAD 100 #define min(x,y) ( x>y?y:x ) int nitem; int buf[MAXITEM]; struct { pthread_mutex_t mutex; int idx; int val; } shared = { PTHREAD_MUTEX_INITIALIZER }; struct { pthread_mutex_t mutex; pthread_cond_t cond; int nready; } nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }; void* produce(void*); void* consume(void*); int main(int argc, char** argv){ int i; int nthreads; int count[MAXTHREAD]; pthread_t tid_produce[MAXTHREAD], tid_consume; if(argc != 3){ printf("arg error\n"); return 1; } nitem = min(MAXITEM,atoi(argv[1])); nthreads = min(MAXTHREAD, atoi(argv[2])); for(i = 0; i < nthreads; ++i){ count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); for(i = 0; i < nthreads; ++i){ pthread_join(tid_produce[i], NULL); printf("cout[%d] = %d\n", i, count[i]); } pthread_join(tid_consume, NULL); return 0; } void* produce(void* arg){ while(1){ pthread_mutex_lock(&shared.mutex); if(shared.idx >= nitem){ pthread_mutex_unlock(&shared.mutex); return NULL; } buf[shared.idx] = shared.val; shared.idx++; shared.val++; pthread_mutex_unlock(&shared.mutex); pthread_mutex_lock(&nready.mutex); if(nready.nready == 0){ pthread_cond_signal(&nready.cond);//--------------② } nready.nready++; pthread_mutex_unlock(&nready.mutex);//--------------③ *((int*) arg) += 1; } } void* consume(void* arg){ int i; for(i = 0; i < nitem; ++i){ pthread_mutex_lock(&nready.mutex); while(nready.nready == 0){//--------------① pthread_cond_wait(&nready.cond, &nready.mutex); } nready.nready--; pthread_mutex_unlock(&nready.mutex); if(buf[i] != i){ printf("buf[%d] = %d\n", i, buf[i]); } } printf("buf[%d] = %d\n", nitem-1, buf[nitem-1]); }
關於互斥鎖和條件變量的最佳實踐:
1,把要多個線程共享的數據定義和互斥鎖定義在一個結構體裏。
2,把條件變量,互斥鎖,和臨界條件定義在一個結構體裏。
3,在①的地方,最後不要用if,理由是,pthread_cond_wait返回後,有可能另外一個消費者線程把它消費掉了,因此要再次測試相應的條件成立與否,防止發生虛假的(spurious)喚醒。各類線程都應該試圖最大限度減小這些虛假喚醒,可是仍有可能發生。
4,注意②處的代碼pthread_cond_signal,設想一下最壞的狀況,調用該函數後,另一個等待的線程當即被喚醒,因此被喚醒的pthread_cond_wait函數要當即加鎖,可是調用pthread_cond_signal函數的線程尚未執行到③處的pthread_mutex_unlock,因此被喚醒的線程又當即終止了。因此爲了不這種狀況發生,把②處的代碼pthread_cond_signal放在③處的下一行。
參考下面的僞代碼:
int flag; pthread_mutex_lock(&nready.mutex); int = nready.nready == 0); nready.nready++; pthread_mutex_unlock(&nready.mutex); if(flag){ pthread_cond_signal(&nready.cond); }