Linux——互斥鎖與條件變量(一)

爲容許在線程或進程間共享數據,同步一般是必需的。互斥鎖和條件變量是同步的基本組成部分。編程

使用互斥鎖和條件變量的經典例子:生產者-消費者問題。在本例中,使用多個線程而不是多個進程,由於讓多個線程共享本問題採用的公共數據緩衝區很是簡單,而在多個進程間共享一個公共數據緩衝區卻須要某種形式的共享內存區。數組

1、互斥鎖:上鎖與解鎖併發

互斥鎖指代相互排斥mutual exclusion),它是最基本的同步形式。互斥鎖用於保護臨界區(critical region),以保證任什麼時候刻只有一個線程在執行其中的代碼(假設互斥鎖由多個線程共享),或者任什麼時候刻只有一個進程在執行其中的代碼(假設互斥鎖由多個進程共享)。保護一個臨界區的代碼輪廓大致以下:函數

lock_the_mutex(...);
臨界區
unlock_the_mutex(...);

既然任什麼時候刻只有一個線程可以鎖住一個給定的互斥鎖,因而這樣的代碼保證在任什麼時候刻只有一個線程在執行其臨界區中的指令。ui

Posix互斥鎖被聲明爲具備pthread_mutex_t數據類型的變量。若是互斥鎖變量是靜態分配的,那麼咱們能夠把它初始化成常值PTHREAD_MUTEX_INITIALIZER,例如:spa

static pthread_mutex_t lock =  PTHREAD_MUTEX_INITIALIZER;

若是互斥鎖是動態分配的,或者分配在共享內存區中,那麼咱們必須在運行之時,經過調用pthread_mutex_init函數來初始化它。後面有所介紹。命令行

在Posix中,定義了三個函數來給一個互斥鎖上鎖和解鎖:線程

#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_trylock(pthread_mutex_t *mptr);
int pthread_mutex_unlock(pthread_mutex_t *mptr);
均返回:若成功則爲0,若出錯則爲正的Exxx值

若是嘗試給一個已有某個線程鎖住的互斥鎖上鎖,那麼pthread_mutex_lock將阻塞到該互斥鎖解鎖爲止。pthread_mutex_trylock是對應的非阻塞函數,若是該互斥鎖已鎖住,它就返回一個EBUSY錯誤。指針

這裏有一個問題:code

若是有多個線程阻塞在等待同一個互斥鎖上,那麼當該互斥鎖解鎖時,哪個線程會開始運行呢?

===>>>提供一個優先級調度選項,不一樣線程可被賦予不一樣的優先級,同步函數(互斥鎖、讀寫鎖、信號量)將喚醒優先級最高(隊列實現?)的被阻塞線程。

臨界區實際上保護的是在臨界區中被操縱的數據,也就是說,互斥鎖一般用於保護由多個線程或多個進程分享的共享數據(shared data)

互斥鎖是協做性鎖。這就是說,若是共享數據是一個鏈表,那麼操縱該鏈表的全部線程都必須在實際操縱前獲取該互斥鎖。不過也沒有辦法防止某個線程不首先獲取該互斥鎖就操縱該鏈表。

2、生產者-消費者問題

同步中有一個稱爲「生產者-消費者(producer-consumer)」問題的經典問題,也稱爲有界緩衝區(bounded buffer)問題。一個或多個生產者(線程或進程)建立者一個個數據條目,而後這些條目由一個或多個消費者(線程或進程)處理。數據條目在生產者和消費者之間是使用某種類型的IPC(interprocess communication)傳遞的。

當共享存儲區用做生產者和消費者之間的IPC形式時,生產者和消費者必須執行某種類型的顯式(explicit)同步。咱們使用互斥鎖展現顯式同步。

在這裏,咱們選擇使用單個消費者線程、多個生產者線程的方法。首先,介紹一些相關信息:在單個進程中有多個生產者線程和單個消費者線程。整數數組buff含有被生產和消費的條目(也就是共享數據)。爲簡單起見,生產者只是把buff[0]設置爲0,把buff[1]設置爲1,如此等等。消費者只是沿着該數組行進,並驗證每一個數組元素都是正確的。

而後,寫出main函數以下:

#include 「unpipc.h」
 
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
 
int nitems;
 
struct{
    pthread_mutex_t mutex;
    int buff[MAXNITEMS];
    int nput;
    int nval;  
}shared= {PTHREAD_MUTEX_INITIALIZER};
 
void *produce(void *),*consumer(void *);//生產者、消費者函數
 
int main(int argc , char **argv){
    int i, nthreads, count[MAXNTHREADS];
    pthread_t tid_produce[MAXNTHREADS],tid_consume;
    if(argc!=3){
        err_quit("usage:prodcons <#items> <#threads>"); 
    }
    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);
  
    Set_concurrency(nthreads);
  
    /*start all the producer threads*/
    for(i=0;i<nthreads;i++){
    count[i] = 0;
    pthread_create(&tid_produce[i],NULL,produce,&count[i]); 
    }

    /*wait for all producer threads  */
    for(i=0;i<nthreads;i++){
    pthread_join(tid_produce[i],NULL);
    printf("count[%d]=%d",i,count[i]); 
    }
  
    /*start, then wait for the consumer thread*/
    pthread_create(&tid_consume,NULL,consume,NULL);
    pthread_join(tid_consume,NULL);
  
    exit(0);
}
 
void *produce(void *arg){
    for(;;){
        pthread_mutex_lock(&shared.mutex);
        if(shared.nput >= nitems){
        pthread_mutex_unlock(&shared.mutex);
        return NULL; 
        /* array is full,we are done*/
        }
        shared.buff[shared.nput] = shared.nval;
        shared.nput++;
        shared.nval++;
        pthread_mutex_unlock(&shared.mutex);
        *((int *)arg)+=1;
    } 
}
 
void *consume(void *arg){
    int i;
    for(i=0;i<nitems;i++){
        if(shared.buff[i] != i)
        printf("buff[%d] = %d\n",i,shared.buff[i]);
    }
    return NULL;
}

在上面的main函數,咱們須要注意這樣幾個方面:

一、struct shared的做用:

首先咱們分析這些成員變量,mutex是互斥鎖,buff數組是數據空間,nputbuff數組中下一次存放數據的元素下標,nval是下一次存放的值。咱們將這些變量整合放在一個結構體中的目的是爲了強調這些變量只應該在擁有互斥鎖時訪問。咱們分配這個結構,並初始化其中用於生產者線程間同步的互斥鎖。

對於這裏,咱們要考慮這樣一個問題:將全部受互斥鎖控制的臨界區變量都整合放在一個結構體中是否十分完善?這是不完善的,把共享數據和它們的同步變量收集到一個結構體中,這是一個很好的編程技巧,然而在不少狀況下共享數據是動態分配的,譬如說一個鏈表,咱們能夠把一個鏈表的頭及該鏈表的同步變量存放到一個結構體中,可是其餘共享數據(該鏈表的其餘部分)卻不在結構體中,所以這種方法是不完善的。

二、命令行輸入參數分析:形如「prodcons items threads」,其中,items指定生產者存放的條目數,threads指定待建立生產者線程的數目。

三、Set_concurrency(nthreads)函數的做用:

告訴線程系統咱們但願併發運行多少線程。

四、建立生產者線程:

在這裏,咱們用線程建立函數pthread_create(...)來建立生產者進程,每一個線程執行produce。在tid_produce數組中保存每一個生成線程的pid。傳遞給每一個生產者線程的參數是指向count數組中某個元素的指針。那麼count數組的做用是什麼?代碼中,咱們首先把count計數器初始化爲0,而後每一個線程在每次往緩衝區中存放一個條目時給這個計數器加1(*(int*)arg)+=1),。當一切生產完畢時,咱們輸出這個計數器數組中各元素的值,以查看每一個生產者線程分別存放了多少條目。

五、等待生產者線程,而後啓動消費者線程:

等待全部生產者線程終止,同時輸出每一個線程的計數器值,此後才啓動單個消費者線程。此處,咱們實現了生產者與消費者之間的同步問題,接着等待消費者線程完成,而後終止進程。

六、Produce函數產生數據條目:

在這裏,咱們使用互斥鎖來實現對臨界區的保護,同時保證在一切生產完畢的狀況下給該互斥鎖解鎖。注意count元素的增長不屬於該臨界區,由於每一個線程有各自的計數器(main函數中的count)。既然如此,咱們就不把這行代碼包括在由互斥鎖鎖住的臨界區中,由於做爲一個通用的編程原則,咱們老是應該努力減小由一個互斥鎖鎖住的代碼量。

七、消費者驗證數組內容:

這裏不須要任何同步。

未完待續!

相關文章
相關標籤/搜索