1.條件變量建立 靜態建立:pthread_cond_t cond=PTHREAD_COND_INITIALIZER; 動態建立:pthread_cond _t cond; pthread_cond_init(&cond,NULL); 其中的第二個參數NULL表示條件變量的屬性,雖然POSIX中定義了條件變量的屬性,但在LinuxThread中並無實現,所以經常忽略。 2.條件等待 pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&mutex); while(條件1) pthread_cond_wait(&cond,&mutex); 函數操做1; pthread_mutex_unlock(&mutex); 當條件1成立的時候,執行pthread_cond_wait(&cond,&mutex)這一句,開放互斥鎖,而後線程被掛起。當條件1不成立的時候,跳過while循環體,執行函數操做1,而後開放互斥鎖。 3.條件激發 pthread_mutex_lock(&mutex); 函數操做2; if(條件1不成立) pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); 先執行函數操做2,改變條件狀態,使得條件1不成立的時候,執行pthread_cond_signal(&cond)這句話。這句話的意思是激發條件變量cond,使得被掛起的線程被喚醒。 pthread_cond_broadcast(&cond); 這句話也是激發條件變量cond,可是,這句話是激發全部因爲cond條件被掛起的線程。而signal的函數則是激發一個因爲條件變量cond被掛起的線程。 4.條件變量的銷燬 pthread_cond_destroy(&cond); 在linux中,因爲條件變量不佔用任何資源,因此,這句話除了檢查有沒有等待條件變量cond的線程外,不作任何操做。
#include <pthread.h> #include <stdio.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int count = 5; //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //pthread_mutex_lock(&mutex); //while(條件1) // pthread_cond_wait(&cond, &mutex); //函數操做1 //pthread_mutex_unlock(&mutex); //解釋:當條件1成立的時候,執行pthread_cond_wait(&cond, &mutex)這一句,開放互斥鎖,而後線程被掛起。 // 當條件1不成立的時候,跳過while循環體,執行函數操做1,而後開放互斥鎖 // 即不管走哪一個分支,都會放開互斥鎖 //此線程先啓動 void* decrement(void* arg) { while(1) { //條件等待 pthread_mutex_lock(&mutex); while(count <= 0) { printf("count<=0,thread is hanging!\n"); pthread_cond_wait(&cond, &mutex); sleep(1); printf("sleep!\n"); } count -= 1; pthread_mutex_unlock(&mutex); if (count == 9) { printf("count==9,thread1 is over!\n"); return NULL; } } } //條件激發 //pthread_mutex_lock(&mutex); //函數操做2; //if(條件1不成立) // pthread_cond_signal(&cond); // pthread_mutex_unlock(&mutex); //解釋:先執行函數操做2,改變條件變量,使得條件1不成立的時候,執行pthread_cond_signal(&cond)這句話的意思是激發條件變量cond,使得被掛起的線程被喚醒。 // pthread_cond_broadcast(&cond); 激發全部因爲cond條件而被掛起的線程。而signal的函數則是激發一個因爲條件變量cond被掛起的線程 void *increment(void *arg) { sleep(1); while(1) { pthread_mutex_lock(&mutex); count += 1; if(count > 0) { printf("count=%d, change cond state!\n", count); pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex); if (count == 10) { printf("count=10,thread is over!\n"); return NULL; } } } int main(void) { int i1=1, i2=1; pthread_t id1, id2; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); pthread_create(&id1, NULL, decrement, NULL); pthread_create(&id2, NULL, increment, NULL); i2 = pthread_join(id2, NULL); i1 = pthread_join(id1, NULL); if ( (i2==0) && (i1==0) ) { printf("count=%d, the main thread!\n", count); pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); return 0; } return -1; }
#include <stdio.h> #include <pthread.h> #include <queue> #include <string> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; std::queue<std::string> que; void* consumer(void* arg) { while (true) { pthread_mutex_lock(&mutex); while ( que.empty() ) { printf("que is empty , thread consumer hanging!\n"); pthread_cond_wait(&cond, &mutex); } while (!que.empty()) { printf("%s\n", que.front().c_str()); que.pop(); } pthread_mutex_unlock(&mutex); } return NULL; } void* producer(void* arg) { int count = 0; while (true) { pthread_mutex_lock(&mutex); for (int i=0; i<10; i++) { char arr[20]; snprintf(arr, sizeof(arr), "my num is: %d", i); que.push(arr); pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex); count++; if (count > 10) { printf("stop producer %d s\n", 5); sleep(15); count = 0; } } return NULL; } int main(void) { pthread_t arr[2]; pthread_create(&arr[0], NULL, consumer, NULL); sleep(3); printf("sleep 3 s\n"); pthread_create(&arr[1], NULL, producer, NULL); for(int i=0; i< 2; i++) { pthread_join(arr[i], NULL); } pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); }
http://blog.csdn.net/hemmanhui/article/details/4417433
互斥鎖:用來上鎖。linux
條件變量:用來等待,當條件變量用來自動阻塞一個線程,直到某特殊狀況發生爲止。一般條件變量和互斥鎖同時使用。算法
函數介紹:編程
1.服務器
名稱:網絡 |
pthread_cond_init多線程 |
目標:框架 |
條件變量初始化異步 |
頭文件:async |
#include < pthread.h>函數 |
函數原形: |
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); |
參數: |
cptr 條件變量 attr 條件變量屬性 |
返回值: |
成功返回0,出錯返回錯誤編號。 |
pthread_cond_init函數能夠用來初始化一個條件變量。他使用變量attr所指定的屬性來初始化一個條件變量,若是參數attr爲空,那麼它將使用缺省的屬性來設置所指定的條件變量。
2.
名稱: |
pthread_cond_destroy |
目標: |
條件變量摧毀 |
頭文件: |
#include < pthread.h> |
函數原形: |
int pthread_cond_destroy(pthread_cond_t *cond); |
參數: |
cptr 條件變量 |
返回值: |
成功返回0,出錯返回錯誤編號。 |
pthread_cond_destroy函數能夠用來摧毀所指定的條件變量,同時將會釋放所給它分配的資源。調用該函數的進程也並不要求等待在參數所指定的條件變量上。
3.
名稱: |
pthread_cond_wait/pthread_cond_timedwait |
目標: |
條件變量等待 |
頭文件: |
#include < pthread.h> |
函數原形: |
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex); int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_t mytex,const struct timespec *abstime); |
參數: |
cond 條件變量 mutex 互斥鎖 |
返回值: |
成功返回0,出錯返回錯誤編號。 |
第一個參數*cond是指向一個條件變量的指針。第二個參數*mutex則是對相關的互斥鎖的指針。函數pthread_cond_timedwait函數類型與函數pthread_cond_wait,區別在於,若是達到或是超過所引用的參數*abstime,它將結束並返回錯誤ETIME.pthread_cond_timedwait函數的參數*abstime指向一個timespec結構。該結構以下:
typedef struct timespec{
time_t tv_sec;
long tv_nsex;
}timespec_t;
3.名稱:
pthread_cond_signal/pthread_cond_broadcast |
|
目標: |
條件變量通知 |
頭文件: |
#include < pthread.h> |
函數原形: |
int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond); |
參數: |
cond 條件變量 |
返回值: |
成功返回0,出錯返回錯誤編號。 |
參數*cond是對類型爲pthread_cond_t 的一個條件變量的指針。當調用pthread_cond_signal時一個在相同條件變量上阻塞的線程將被解鎖。若是同時有多個線程阻塞,則由調度策略肯定接收通知的線程。若是調用pthread_cond_broadcast,則將通知阻塞在這個條件變量上的全部線程。一旦被喚醒,線程仍然會要求互斥鎖。若是當前沒有線程等待通知,則上面兩種調用實際上成爲一個空操做。若是參數*cond指向非法地址,則返回值EINVAL。
下面是一個簡單的例子,咱們能夠從程序的運行來了解條件變量的做用。
#include <pthread.h> sleep(1); } if(i%3!=0) sleep(1); } |
程序建立了2個新線程使他們同步運行,實現進程t_b打印20之內3的倍數,t_a打印其餘的數,程序開始線程t_b不知足條件等待,線程t_a運行使a循環加1並打印。直到i爲3的倍數時,線程t_a發送信號通知進程t_b,這時t_b知足條件,打印i值。
下面是運行結果:
#cc –lpthread –o cond cond.c
#./cond
thread1:1
thread1:2
thread2:3
thread1:4
thread1:5
thread2:6
thread1:7
thread1:8
thread2:9
備註:
pthread_cond_wait 執行的流程首先將這個mutex解鎖, 而後等待條件變量被喚醒, 若是沒有被喚醒, 該線程將一直休眠, 也就是說, 該線程將一直阻塞在這個pthread_cond_wait調用中, 而當此線程被喚醒時, 將自動將這個mutex加鎖,而後再進行條件變量判斷(緣由是「驚羣效應」,若是是多個線程都在等待這個條件,而同時只能有一個線程進行處理,此時就必需要再次條件判斷,以使只有一個線程進入臨界區處理。),若是知足,則線程繼續執行,最後解鎖,
也就是說pthread_cond_wait實際上能夠看做是如下幾個動做的合體:
解鎖線程鎖
等待線程喚醒,而且條件爲true
加鎖線程鎖.
pthread_cond_signal僅僅負責喚醒正在阻塞在同一條件變量上的一個線程,若是存在多個線程,系統自動根據調度策略決定喚醒其中的一個線程,在多處理器上,該函數是可能同時喚醒多個線程,同時該函數與鎖操做無關,解鎖是由pthread_mutex_unlock(&mutex)完成
喚醒丟失問題
在線程並無阻塞在條件變量上時,調用pthread_cond_signal或pthread_cond_broadcast函數可能會引發喚醒丟失問題。
喚醒丟失每每會在下面的狀況下發生:
一個線程調用pthread_cond_signal或pthread_cond_broadcast函數;
另外一個線程正處在測試條件變量和調用pthread_cond_wait函數之間;
沒有線程正在處在阻塞等待的狀態下
由來: 最近一直在想怎麼高效率的在IO線程接收到數據時通知邏輯線程(基於線程池)工做的問題,像網絡編程的服務器模型的一些模型都須要用到這個實現,下面我這裏簡單的羅列一個多線程的網絡服務器模型 半同步/半異步(half-sync/half-async): 許多餐廳使用 半同步/半異步 模式的變體。例如,餐廳經常僱傭一個領班負責迎接顧客,並在餐廳繁忙時留意給顧客安排桌位,爲等待就餐的顧客按序排隊是必要的。領班由全部顧客「共享」,不能被任何特定顧客佔用太多時間。當顧客在一張桌子入坐後,有一個侍應生專門爲這張桌子服務。 對於上面羅列的這種模型,本文討論的問題是當領班接到客人時,如何高效率的通知侍應生去服務顧客. 在咱們使用很普遍的線程池實現中,也會有同樣的問題 方法實現: 1.使用鎖+輪詢 使用這種方法能夠很簡單的實現,可是會有必定的性能消耗,其還有一個點要好好把握,就是一次輪詢沒有結果後相隔多久進行下一次的輪詢,間隔時間過短,消耗的CPU資源較多,間隔時間太長,不能很及時的響應請求。這就至關於上面的這個例子,侍應生時不時的取詢問領班有沒有顧客到來 2.使用條件變量的線程同步 線程條件變量pthread_cond_t 線程等待某個條件 int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime); int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 通知函數 通知全部的線程 int pthread_cond_broadcast(pthread_cond_t *cond); 只通知一個線程 int pthread_cond_signal(pthread_cond_t *cond); 正確的使用方法 pthread_cond_wait用法: pthread_mutex_lock(&mutex); while(condition_is_false) { pthread_cond_wait(&cond,&mutex); } condition_is_false=true; //此操做是帶鎖的,也就是說只有一個線程同時進入這塊 pthread_mutex_unlock(&mutex); pthread_cond_signal用法: pthread_mutex_lock(&mutex); condition_is_false=false; pthread_cond_signal(&cond) pthread_mutex_unlock(&mutex) 我剛初用的時候,以爲很是的奇怪,爲何要這樣用,加了mutex後還須要一個condition_is_false變量來表示有沒有活幹。其實這樣子的一個操做主要是爲了解決「假激活」問題,由於我麼您這裏的使用場景,只須要激活一個線程,由於一個線程幹一個活,而不是多個線程幹一個活,因此爲了不線程被激活了,但實際又沒有事情幹,因此使用了這麼一套機制。 實際上,信號和pthread_cond_broadcast是兩個常見的致使假喚醒的狀況。假如條件變量上有多個線程在等待,pthread_cond_broadcast會喚醒全部的等待線程,而pthread_cond_signal只會喚醒其中一個等待線程。這樣,pthread_cond_broadcast的狀況也許要在pthread_cond_wait前使用while循環來檢查條件變量。 來個例子: 複製代碼 1 #include <pthread.h> 2 #include <stdio.h> 3 #include<stdlib.h> 4 #include<unistd.h> 5 6 /* For safe condition variable usage, must use a boolean predicate and */ 7 /* a mutex with the condition. */ 8 int workToDo = 0; 9 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 10 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 11 12 #define NTHREADS 20 13 14 static void checkResults(char *string, int rc) { 15 if (rc) { 16 printf("Error on : %s, rc=%d", 17 string, rc); 18 exit(EXIT_FAILURE); 19 } 20 return; 21 } 22 23 void *threadfunc(void *parm) 24 { 25 int rc; 26 27 while (1) { 28 /* Usually worker threads will loop on these operations */ 29 rc = pthread_mutex_lock(&mutex); 30 checkResults("pthread_mutex_lock()\n", rc); 31 32 while (!workToDo) { 33 printf("Thread blocked\n"); 34 rc = pthread_cond_wait(&cond, &mutex); 35 checkResults("pthread_cond_wait()\n", rc); 36 } 37 printf("Thread awake, finish work!\n"); 38 sleep(2); 39 /* Under protection of the lock, complete or remove the work */ 40 /* from whatever worker queue we have. Here it is simply a flag */ 41 workToDo = 0; 42 printf("In mutex lock\n"); 43 rc = pthread_mutex_unlock(&mutex); 44 sleep(2); 45 printf("Out mutex lock\n"); 46 checkResults("pthread_mutex_lock()\n", rc); 47 } 48 return NULL; 49 } 50 51 int main(int argc, char **argv) 52 { 53 int rc=0; 54 int i; 55 pthread_t threadid[NTHREADS]; 56 57 printf("Enter Testcase - %s\n", argv[0]); 58 59 printf("Create %d threads\n", NTHREADS); 60 for(i=0; i<NTHREADS; ++i) { 61 rc = pthread_create(&threadid[i], NULL, threadfunc, NULL); 62 checkResults("pthread_create()\n", rc); 63 } 64 65 sleep(5); /* Sleep is not a very robust way to serialize threads */ 66 67 for(i=0; i<5; ++i) { 68 printf("Wake up a worker, work to do...\n"); 69 70 rc = pthread_mutex_lock(&mutex); 71 checkResults("pthread_mutex_lock()\n", rc); 72 73 /* In the real world, all the threads might be busy, and */ 74 /* we would add work to a queue instead of simply using a flag */ 75 /* In that case the boolean predicate might be some boolean */ 76 /* statement like: if (the-queue-contains-work) */ 77 if (workToDo) { 78 printf("Work already present, likely threads are busy\n"); 79 } 80 workToDo = 1; 81 rc = pthread_cond_broadcast(&cond); 82 // rc = pthread_cond_signal(&cond); 83 checkResults("pthread_cond_broadcast()\n", rc); 84 85 rc = pthread_mutex_unlock(&mutex); 86 checkResults("pthread_mutex_unlock()\n", rc); 87 sleep(5); /* Sleep is not a very robust way to serialize threads */ 88 } 89 90 printf("Main completed\n"); 91 exit(0); 92 return 0; 93 } 複製代碼 事實上上面的例子不管是使用pthread_cond_signal仍是pthread_cond_broadcast,都只會打印Thread awake, finish work5次,你們可能會以爲很是奇怪,可是實際狀況就是這樣的。 爲了明白其pthread_cont_wait內部幹了什麼工做,有必要深刻一下其內部實現。 關於其內部實現僞代碼以下: 複製代碼 1 pthread_cond_wait(mutex, cond): 2 value = cond->value; /* 1 */ 3 pthread_mutex_unlock(mutex); /* 2 */ 4 pthread_mutex_lock(cond->mutex); /* 10 */ pthread_cond_t自帶一個mutex來互斥對waiter等待鏈表的操做 5 if (value == cond->value) { /* 11 */ 檢查一次是否是cond有被其餘線程設置過,至關於單例模式的第二次檢測是否爲NULL 6 me->next_cond = cond->waiter; 7 cond->waiter = me;//鏈表操做 8 pthread_mutex_unlock(cond->mutex); 9 unable_to_run(me); 10 } else 11 pthread_mutex_unlock(cond->mutex); /* 12 */ 12 pthread_mutex_lock(mutex); /* 13 */ 13 14 pthread_cond_signal(cond): 15 pthread_mutex_lock(cond->mutex); /* 3 */ 16 cond->value++; /* 4 */ 17 if (cond->waiter) { /* 5 */ 18 sleeper = cond->waiter; /* 6 */ 19 cond->waiter = sleeper->next_cond; /* 7 */ //鏈表操做 20 able_to_run(sleeper); /* 8 */ 運行sleep的線程,即上面的me 21 } 22 pthread_mutex_unlock(cond->mutex); /* 9 */ 複製代碼 pthread_cond_broadcast雖然可以激活全部的線程,可是激活以後會有mutex鎖,也就是說他的激活是順序進行的,只有第一個激活的線程調用pthread_mutex_unlock(&mutex)後,後一個等待的線程纔會繼續運行.由於從pthread_cond_wait(&cond,&mutex)到pthread_mutex_unlock(&mutex)區間是加的獨佔鎖,從wait激活後的第一個線程佔用了這個鎖,因此其餘的線程不能運行,只能等待。因此當第一個被激活的線程修改了condition_is_false後(上面測試代碼的workToDo),接着調用pthread_mutex_unlock(&mutex)後,此時其餘等待在cond的一個線程會激活,可是此時condition_is_false已經被設置,因此他跑不出while循環,當調用pthread_cond_wait時,其內部pthread_mutex_unlock(mutex)調用會致使另外一個在它後面的等待在cond的線程被激活。 因此,經過這種方式,即使是誤調用了pthread_cond_broadcast或者因爲信號中斷的緣由激活了全部在等待條件的線程,也能保證其結果是正確的。 另外說一句題外話,不少人寫的基於條件變量線程同步的框架,說本身是無鎖的,其實這是不對的,只是內部鎖的機制在pthread_cond_wait實現了而已,其仍是基於互斥鎖的實現。真正想要達到無鎖的能夠關注一下lockfree相關的CAS算法,其內部使用一個intel CPU的cmpxchg8指令完成的,其實這種實現我的認爲和傳統鎖相比只是一個非阻塞鎖和阻塞鎖的區別。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> // for safe condition variable usage, must use a boolean predicate and // a mutex with the condition int work_to_do = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define NUM_THREADS 20 static void check_result(char *str, int rc) { if (rc) { printf("Error on: %s, rc=%d", str, rc); exit(EXIT_FAILURE); } return; } void *threadfunc(void *param) { int rc; while (1) { //usually woker threads will loop on these operations rc = pthread_mutex_lock(&mutex); check_result("pthread_mutex_lock()\n", rc); while (!work_to_do) { printf("thread hanging\n"); rc = pthread_cond_wait(&cond, &mutex); check_result("pthread_cond_wait()\n", rc); } printf("thread awake, finish work!\n"); sleep(2); // under protection of the lock, complete or remove the work // from whatever worker queue we have, here it is simply a flag work_to_do = 0; printf("in mutex lock\n"); rc = pthread_mutex_unlock(&mutex); sleep(2); printf("out mutex lock\n"); check_result("pthread_mutex_unlock()\n", rc); } return NULL; } int main(void) { int rc = 0; int i; pthread_t threadid[NUM_THREADS]; printf("Enter %d threads\n", NUM_THREADS); for (i=0; i<NUM_THREADS; ++i) { rc = pthread_create(&threadid[i], NULL, threadfunc, NULL); check_result("pthread_create()\n", rc); } sleep(5); //sleep is not a very robust way to serialize threads for(i=0; i<5; ++i) { printf("wake up a worker, work to do...\n"); rc = pthread_mutex_lock(&mutex); check_result("pthread_mutex_lock()\n", rc); // in the real world, all the threads might be busy, and // we would add work to a queue instead of simply using a flag // in that case the boolean predicate might be some boolean // statement like: if (the-queue-contains-work) if(work_to_do) { printf("work already present, likely threads are busy\n"); } work_to_do = 1; //rc = pthread_cond_broadcast(&cond); rc = pthread_cond_signal(&cond); check_result("pthread_cond_broadcast()\n", rc); rc = pthread_mutex_unlock(&mutex); check_result("pthread_mutex_unlock()\n", rc); sleep(5); // sleep is not a very robust way to serialize threads } printf("mian complete\n"); exit(0); return 0; }