3、對比上鎖與等待數組
在上述的生產者-消費者問題中,咱們在實現同步的時候還能夠用如下的方法來實現,這裏只是說明與上述實現中的不一樣之處。併發
一、首先,說明一下此版本中的相關特徵:函數
在此版本中,消費者線程在生產者線程建立完畢後立刻就建立,而不是等待全部生產者線程完成而終止後再建立。測試
二、再則,如何來實現:spa
A、Set_concurrency(..)中參量併發線程從nthreads變爲nthreads+1;線程
B、同時,爲了實現同步,咱們必須設置一個consume_wait(int i)函數,其實現的功能是檢測到對應i的pthread_mutex_unlock後,便啓動consume對應的i判斷部分。這裏將這兩個函數源代碼實現呈現以下:設計
void consume_wait(int i){ for(;;){ pthread_mutex_lock(&shared.mutex); if(i < nput){ pthread_mutex_unlock(&shared.mutex); return ; } pthread_mutex_unlock(&shared.mutex); } } void *consume(void *arg){ int i; for(i=0;i<nitems;i++){ consume_wait(i); if(shared.buff[i] != i) printf("buff[%d] = %d\n",i,shared.buff[i]); } return NULL; }
這裏,特別說明一些關於consume_wait(int i)的實現及具體運行方式:code
首先,當臨界區出於未上鎖時,consume_wait便調用pthread_mutex_lock(..)來上鎖互斥鎖,得到對臨界區的全部權,這時,判斷i<nput條件是否成立,以判斷生產者線程是否產生了第i個條目。若是檢測到以後,即可返回以通告consume能夠處理第i個條目,同時解鎖互斥鎖。ip
而後,若第i個條目未產生,這時,函數便一直循環,每次給互斥鎖上鎖又解鎖,這樣稱爲「輪詢(polling)」,這對CPU是一種浪費,但着實能夠實現同步問題。同步
4、條件變量(Condition):等待與信號發送
互斥鎖用於上鎖,條件變量用於等待。這兩種不一樣類型的同步都是須要的。
條件變量是類型爲pthread_cond_t的變量,如下兩個函數使用了這些變量:
#include<pthread.h> int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr); int pthread_cond_signal(pthread_cond_t *cptr); 均返回:若成功則爲0,若出錯則爲正的Exxx值
其中第二個函數的名字中的「signal」一詞指的不是Unix_SIGxxx信號。
這兩個函數所等待或由之得以通知的「條件」,其定義由咱們選擇:咱們在代碼中測試這種條件。
每一個條件變量老是有一個互斥鎖與之關聯。咱們調用pthread_cond_wait等待某個條件爲真時,還會指定其條件變量的地址和所關聯的互斥鎖的地址。
咱們利用條件變量和互斥鎖從新設計生產者-消費者同步問題,其改變以下:
全局變量聲明:
#include "unpipc.h" #define MAXNITEMS 1000000 #define MAXNTHREADS 100 int nitems; int buff[MAXITEMS]; struct { pthread_mutex_t mutex; int nput; int nval; }put ={PTHREAD_MUTEX_INITIALIZER}; struct { pthread_mutex_t mutex; pthread_cond_t cond; int nready; }nready={PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER};
把生產者變量和互斥鎖收集到一個結構中;
把計數器、條件變量和互斥鎖收集到一個結構中,nready指的是對消費者已準備好的線程數;
main函數同上鎖與等待時的main函數;
Produce和consume函數:
void *produce(void *arg){ for(;;){ pthread_mutex__lock(&put.mutex); if(put.nput >= nitems){ pthread_mutex_unlock(&put.mutex); return NULL; } buff[put.nput]=put.nval; put.nput++; put.nval++; pthread_mutex_unlock(&put.mutex); pthread_mutex_lock(&nready.mutex); if(nready.nready == 0){ pthread_cond_signal(&nready.cond); } nready.nready++; pthread_mutex_unlock(&nready.mutex); } } void *consume(void *arg){ int i; for(i=0;i<nitems;i++){ pthread_mutex_lock(&nready.mutex); while(nready.nready == 0) pthread_cond_wait(&nready.cond,&nready.mutex); nready.nready--; pthread_mutex_unlock(&nready.mutex); if(buff[i]!=i){ printf("buff[%d]=%d\n",i,buff[i]); } } return NULL; }
這裏的分析很重要:
Produce函數中的for語句部分前半部分實現的是:當生產者往數組buff中放置一個新條目時,咱們改用put.mutex來爲臨界區上鎖。
For語句後半部分實現的是通知消費者,給用來統計由消費者處理的條目數的計數器nready.nready加1。在加1以前,若是該計數器的值爲0,就調用pthread_cond_signal喚醒可能正等待其值變爲非零的任意線程(如消費者)。如今能夠看出與該計數器關聯的互斥鎖和條件變量的相互做用。該計數器在生產者和消費者之間共享,所以,只有鎖住與之關聯的互斥鎖(nready.mutex)時才能訪問它。與之關聯的條件變量則用於等待和發送信號。
對於消費者,消費者只是等待計數器nready.nready變爲非零。既然該計數器是在全部的生產者和消費者之間共享的,那麼只有鎖住與之關聯的互斥鎖(nready.mutex)時才能測試它的值。若是在鎖住該互斥鎖期間該計數器的值爲0,咱們就調用pthread_cond_wait進入睡眠。該函數原子地執行如下兩個動做:
A、給互斥鎖nready.mutex解鎖;
B、把調用線程投入睡眠,直到另外某個線程就本條件變量調用pthread_cond_signal。同時,pthread_cond_wait在返回前從新給互斥鎖nready.mutex上鎖。
5、條件變量:定時等待和廣播
一般pthread_cond_signal只喚醒等待在相應條件變量上的一個線程。在某些狀況下,一個線程認定有多個其餘線程應被喚醒,這時它能夠調用pthread_cond_broadcast喚醒阻塞在相應條件變量上的全部線程。
#include<pthread.h> int pthread_cond_broadcast(pthread_cond_t *cptr); int pthread_cond_timewait(pthread_cond_t *cptr,pthread_mutex_t *mptr,const struct timespc *abstime);
對於pthread_cond_timewait(...),其容許線程就阻塞時間設置一個限制值。Abstime參數是一個timespec結構體,該結構體指定這個函數必須返回的時間,即使當時相應的條件變量還沒收到信號,若是發生這種超時狀況,該函數返回ETIMEDOUT錯誤。該時間值是絕對時間。而不是時間差。
6、互斥鎖和條件變量的屬性
在前面的互斥鎖和條件變量的講解中,咱們用兩個常量PTHREAD_MUTEX_INITIALIZER和PTHREAD_COND_INITIALIZER來初始化它們。有這種方式初始化的互斥鎖和條件變量具有默認屬性,不過咱們還能以非默認屬性來初始化它們。
#include<pthread.h> int pthread_mutex_init(pthread_mutex_t *mptr,const pthread_mutex_mutexattr_t *attr); int pthread_mutex_destory(pthread_mutex_t *mptr); int pthread_cond_init(pthread_cond_t *cptr,const pthread_cond_condattr_t *attr); int pthread_cond_destory(pthread_cond_t *cptr);