linux 互斥鎖和條件變量

爲何有條件變量?

請參看一個線程等待某種事件發生html

注意:本文是linux c版本的條件變量和互斥鎖(mutex),不是C++的。

mutex : mutual exclusion(相互排斥)

1,互斥鎖的初始化,有如下2種方式。linux

  • 調用方法的初始化:互斥鎖是用malloc動態分配,或者分配在內存共享區的時候使用。
  • 不調用方法的初始化:靜態分配的時候使用。
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  • 返回值:成功0;失敗errno

2,互斥鎖的銷燬函數

int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • 返回值:成功0;失敗errno
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

3,加鎖和解鎖測試

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • pthread_mutex_lock:加鎖。若是是沒有加鎖的狀態,則加鎖並返回不阻塞。若是是已經被加鎖的狀態,這阻塞在這裏,並一直等待,直到解鎖。
  • pthread_mutex_trylock:嘗試去加鎖。若是是沒有加鎖的狀態,則加鎖並返回不阻塞。果是已經被加鎖的狀態,則不阻塞,當即返回,返回值爲EBUSY。

4,條件變量的2個函數線程

int pthread_cond_wait(pthread_cond_t *restrict cond,
           pthread_mutex_t *restrict mutex);
int pthread_cond_signal(pthread_cond_t *cond);
  • pthread_cond_wait:rest

    • 調用此函數時點的處理:code

      1,給互斥鎖解鎖。htm

      2,把調用此函數的線程投入睡眠,直到另外某個線程就本條件變量調用pthread_cond_signal。blog

    • 被喚醒後的處理:返回前從新給互斥鎖加鎖。事件

  • pthread_cond_signal:喚醒調用pthread_cond_wait函數的線程

條件變量一般用於生產者和消費者模式。

什麼是生成者和消費者模式?

版本1:全部生產者線程是並行執行的,消費者線程是等待全部的生產者線性執行結束後,消費者線程纔開始執行。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXITEM  100000000
#define MAXTHREAD  100
#define min(x,y) ( x>y?y:x )

int nitem;

struct {
  pthread_mutex_t mutex;
  int buf[MAXITEM];
  int idx;
  int val;
}shared = {
  PTHREAD_MUTEX_INITIALIZER
};

void* produce(void*);
void* consume(void*);

int main(int argc, char** argv){
  int i;
  int nthreads;
  int count[MAXTHREAD];

  pthread_t tid_produce[MAXTHREAD], tid_consume;

  if(argc != 3){
    printf("arg error\n");
    return 1;
  }

  nitem = min(MAXITEM,atoi(argv[1]));
  nthreads = min(MAXTHREAD, atoi(argv[2]));

  for(i = 0; i < nthreads; ++i){
    count[i] = 0;
    pthread_create(&tid_produce[i], NULL, produce, &count[i]);
  }

  for(i = 0; i < nthreads; ++i){
    pthread_join(tid_produce[i], NULL);
    printf("cout[%d] = %d\n", i, count[i]);
  }

  pthread_create(&tid_consume, NULL, consume, NULL);
  pthread_join(tid_consume, NULL);

  return 0;
}

void* produce(void* arg){
  while(1){
    pthread_mutex_lock(&shared.mutex);
    if(shared.idx >= nitem){
      pthread_mutex_unlock(&shared.mutex);
      return NULL;
    }
    shared.buf[shared.idx] = shared.val;
    shared.idx++;
    shared.val++;
    pthread_mutex_unlock(&shared.mutex);
    *((int*)arg) +=1;
  }
}

void* consume(void* arg){
  int i;
  for(i = 0; i < nitem; ++i){
    if(shared.buf[i] != i){
      printf("buf[%d] = %d\n", i, shared.buf[i]);
    }
  }
}

版本2:全部生產者線程和消費者線程都是並行執行的。這時會有個問題,就是消費者線程被先執行的狀況下,生產者線程尚未生產數據,這時消費者線程就只能循環給互斥鎖解鎖又上鎖。這成爲輪轉(spinning)或者輪詢(polling),是一種多CPU時間的浪費。咱們也能夠睡眠很短的一段時間,可是不知道睡多久。這時,條件變量就登場了。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXITEM  100000000
#define MAXTHREAD  100
#define min(x,y) ( x>y?y:x )

int nitem;

struct {
  pthread_mutex_t mutex;
  int buf[MAXITEM];
  int idx;
  int val;
}shared = {
  PTHREAD_MUTEX_INITIALIZER
};

void* produce(void*);
void* consume(void*);

int main(int argc, char** argv){
  int i;
  int nthreads;
  int count[MAXTHREAD];

  pthread_t tid_produce[MAXTHREAD], tid_consume;

  if(argc != 3){
    printf("arg error\n");
    return 1;
  }

  nitem = min(MAXITEM,atoi(argv[1]));
  nthreads = min(MAXTHREAD, atoi(argv[2]));

  for(i = 0; i < nthreads; ++i){
    count[i] = 0;
    pthread_create(&tid_produce[i], NULL, produce, &count[i]);
  }
  pthread_create(&tid_consume, NULL, consume, NULL);
  
  for(i = 0; i < nthreads; ++i){
    pthread_join(tid_produce[i], NULL);
    printf("cout[%d] = %d\n", i, count[i]);
  }
  pthread_join(tid_consume, NULL);

  return 0;
}

void* produce(void* arg){
  while(1){
    pthread_mutex_lock(&shared.mutex);
    if(shared.idx >= nitem){
      pthread_mutex_unlock(&shared.mutex);
      return NULL;
    }
    shared.buf[shared.idx] = shared.val;
    shared.idx++;
    shared.val++;
    pthread_mutex_unlock(&shared.mutex);
    *((int*)arg) +=1;
  }
}

void consume_wait(int i){
  while(1){
    pthread_mutex_lock(&shared.mutex);
    if(i < shared.idx){
      pthread_mutex_unlock(&shared.mutex);
      return;
    }
    pthread_mutex_unlock(&shared.mutex);
  }
}

void* consume(void* arg){
  int i;
  for(i = 0; i < nitem; ++i){
    consume_wait(i);
    if(shared.buf[i] != i){
      printf("buf[%d] = %d\n", i, shared.buf[i]);
    }
  }
  return NULL;
}

版本3:全部生產者線程和消費者線程都是並行執行的。解決版本2的輪詢問題。使用條件變量。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXITEM  100000000
#define MAXTHREAD  100
#define min(x,y) ( x>y?y:x )

int nitem;
int buf[MAXITEM];

struct {
  pthread_mutex_t mutex;
  int idx;
  int val;
} shared = {
  PTHREAD_MUTEX_INITIALIZER
};

struct {
  pthread_mutex_t mutex;
  pthread_cond_t  cond;
  int nready;
} nready = {
  PTHREAD_MUTEX_INITIALIZER,
  PTHREAD_COND_INITIALIZER
};

void* produce(void*);
void* consume(void*);

int main(int argc, char** argv){
  int i;
  int nthreads;
  int count[MAXTHREAD];

  pthread_t tid_produce[MAXTHREAD], tid_consume;

  if(argc != 3){
    printf("arg error\n");
    return 1;
  }

  nitem = min(MAXITEM,atoi(argv[1]));
  nthreads = min(MAXTHREAD, atoi(argv[2]));

  for(i = 0; i < nthreads; ++i){
    count[i] = 0;
    pthread_create(&tid_produce[i], NULL, produce, &count[i]);
  }
  pthread_create(&tid_consume, NULL, consume, NULL);
  
  for(i = 0; i < nthreads; ++i){
    pthread_join(tid_produce[i], NULL);
    printf("cout[%d] = %d\n", i, count[i]);
  }
  pthread_join(tid_consume, NULL);

  return 0;
}

void* produce(void* arg){
  while(1){
    pthread_mutex_lock(&shared.mutex);
    if(shared.idx >= nitem){
      pthread_mutex_unlock(&shared.mutex);
      return NULL;
    }
    buf[shared.idx] = shared.val;
    shared.idx++;
    shared.val++;
    pthread_mutex_unlock(&shared.mutex);

    pthread_mutex_lock(&nready.mutex);
    if(nready.nready == 0){
      pthread_cond_signal(&nready.cond);//--------------②
    }
    nready.nready++;
    pthread_mutex_unlock(&nready.mutex);//--------------③

    *((int*) arg) += 1;
  }
}

void* consume(void* arg){
  int i;
  for(i = 0; i < nitem; ++i){
    pthread_mutex_lock(&nready.mutex);
    while(nready.nready == 0){//--------------①
      pthread_cond_wait(&nready.cond, &nready.mutex);
    }
    nready.nready--;
    pthread_mutex_unlock(&nready.mutex);

    if(buf[i] != i){
      printf("buf[%d] = %d\n", i, buf[i]);
    }
  }
  printf("buf[%d] = %d\n", nitem-1, buf[nitem-1]);
}

關於互斥鎖和條件變量的最佳實踐:

1,把要多個線程共享的數據定義和互斥鎖定義在一個結構體裏。

2,把條件變量,互斥鎖,和臨界條件定義在一個結構體裏。

3,在①的地方,最後不要用if,理由是,pthread_cond_wait返回後,有可能另外一個消費者線程把它消費掉了,因此要再次測試相應的條件成立與否,防止發生虛假的(spurious)喚醒。各類線程都應該試圖最大限度減小這些虛假喚醒,可是仍有可能發生。

4,注意②處的代碼pthread_cond_signal,設想一下最壞的狀況,調用該函數後,另一個等待的線程當即被喚醒,因此被喚醒的pthread_cond_wait函數要當即加鎖,可是調用pthread_cond_signal函數的線程尚未執行到③處的pthread_mutex_unlock,因此被喚醒的線程又當即終止了。因此爲了不這種狀況發生,把②處的代碼pthread_cond_signal放在③處的下一行。

參考下面的僞代碼:

int flag;    
pthread_mutex_lock(&nready.mutex);
int = nready.nready == 0);
nready.nready++;
pthread_mutex_unlock(&nready.mutex);

if(flag){
  pthread_cond_signal(&nready.cond);
}
相關文章
相關標籤/搜索