做者:Allen B. Downeyphp
原文:Chapter 11 Semaphores in Chtml
譯者:飛龍git
協議:CC BY-NC-SA 4.0github
信號量是學習同步的一個好方式,可是它們實際上並無像互斥體和條件變量同樣被普遍使用。安全
儘管如此,仍是有一些同步問題能夠用信號量簡單解決,產生顯然更加合適的解決方案。ruby
這一章展現了C語言用於處理信號量的API,以及我用於使它更加容易使用的代碼。並且它展現了一個終極挑戰:你能不能使用互斥體和條件變量來實現一個信號量?數據結構
這一章的代碼在本書倉庫的semaphore
目錄中。函數
信號量是用於使線程協同工做而不互相影響的數據結構。post
POSIX標準規定了信號量的接口,它並非pthread
的一部分,可是多數實現pthread
的UNIX系統也實現了信號量。學習
POSIX信號量的類型是sem_t
。這個類型表現爲結構體,因此若是你將它賦值給一個變量,你會獲得它的內容副本。複製信號量徹底是一個壞行爲,在POSIX中,它的複製行爲是未定義的。
幸運的是,包裝sem_t
使之更安全並易於使用至關容易。個人包裝API在sem.h
中:
typedef sem_t Semaphore;
Semaphore *make_semaphore(int value);
void semaphore_wait(Semaphore *sem);
void semaphore_signal(Semaphore *sem);
Semaphore
是sem_t
的同義詞,可是我認爲它更加可讀,並且大寫的首字母會提醒我將它當作對象並使用指針傳遞它。
這些函數的實如今sem.c
中:
Semaphore *make_semaphore(int value)
{
Semaphore *sem = check_malloc(sizeof(Semaphore));
int n = sem_init(sem, 0, value);
if (n != 0) perror_exit("sem_init failed");
return sem;
}
make_semaphore
接收信號量的初始值做爲參數。它爲信號量分配空間,將信號量初始化,以後返回指向Semaphore
的指針。
若是執行成功,sem_init
返回0;若是有任何錯誤,它返回-1。使用包裝函數的一個好處就是你能夠封裝錯誤檢查代碼,這會使使用這些函數的代碼更加易讀。
下面是semaphore_wait
的實現:
void semaphore_wait(Semaphore *sem) {
int n = sem_wait(sem);
if (n != 0) perror_exit("sem_wait failed");
}
下面是semaphore_signal
:
void semaphore_signal(Semaphore *sem) {
int n = sem_post(sem);
if (n != 0) perror_exit("sem_post failed");
}
我更喜歡把這個這個操做叫作「signal」而不是「post」,雖然它們是一個意思(發射)。
譯者注:若是你習慣了互斥體(鎖)的操做,也能夠改爲
lock
和unlock
。互斥體其實就是信號量容量爲1時的特殊形態。
下面是一個例子,展現瞭如何將信號量用做互斥體:
Semaphore *mutex = make_semaphore(1);
semaphore_wait(mutex);
// protected code goes here
semaphore_signal(mutex);
當你將信號量用做互斥體時,一般須要將它初始化爲1,來表示互斥體是未鎖的。也就是說,只有一個線程能夠經過信號量而不被阻塞。
這裏我使用了變量名稱mutex
來代表信號量被用做互斥體。可是要記住信號量的行爲和pthread
互斥體不徹底相同。
使用這些信號量的包裝函數,咱們能夠編寫出生產者-消費者問題的解決方案。這一節的代碼在queue_sem.c
。
下面是Queue
的一個新定義,使用信號量來代替互斥體和條件變量:
typedef struct {
int *array;
int length;
int next_in;
int next_out;
Semaphore *mutex; //-- new
Semaphore *items; //-- new
Semaphore *spaces; //-- new
} Queue;
下面是make_queue
的新版本:
Queue *make_queue(int length)
{
Queue *queue = (Queue *) malloc(sizeof(Queue));
queue->length = length;
queue->array = (int *) malloc(length * sizeof(int));
queue->next_in = 0;
queue->next_out = 0;
queue->mutex = make_semaphore(1);
queue->items = make_semaphore(0);
queue->spaces = make_semaphore(length-1);
return queue;
}
mutex
用於確保隊列的互斥訪問,初始值爲1,說明互斥體最開始是未鎖的。
item
是隊列中物品的數量,它也是可非阻塞執行queue_pop
的消費者線程的數量。最開始隊列中沒有任何物品。
spaces
是隊列中剩餘空間的數量,也是可非阻塞執行queue_push
的線程數量。最開始的空間數量就是隊列的容量length - 1
。
下面是queue_push
的新版本,它由生產者線程調用:
void queue_push(Queue *queue, int item) {
semaphore_wait(queue->spaces);
semaphore_wait(queue->mutex);
queue->array[queue->next_in] = item;
queue->next_in = queue_incr(queue, queue->next_in);
semaphore_signal(queue->mutex);
semaphore_signal(queue->items);
}
要注意queue_push
並不須要調用queue_full
,由於信號量跟蹤了有多少空間可用,而且在隊列滿了的時候阻塞住生產者。
下面是queue_pop
的新版本:
int queue_pop(Queue *queue) {
semaphore_wait(queue->items);
semaphore_wait(queue->mutex);
int item = queue->array[queue->next_out];
queue->next_out = queue_incr(queue, queue->next_out);
semaphore_signal(queue->mutex);
semaphore_signal(queue->spaces);
return item;
}
這個解決方案在《The Little Book of Semaphores》中的第四章以僞代碼解釋。
爲了使用本書倉庫的代碼,你須要編譯並運行這個解決方案,你應該執行:
$ make queue_sem
$ ./queue_sem
任何可使用信號量解決的問題也可使用條件變量和互斥體來解決。一個證實方法就是可使用條件變量和互斥體來實現信號量。
在你繼續以前,你可能想要將其作爲一個練習:編寫函數,使用條件變量和互斥體實現sem.h
中的信號量API。你能夠將你的解決方案放到本書倉庫的mysem.c
和mysem.h
中,你會在 mysem_soln.c
和mysem_soln.h
中找到個人解決方案。
若是你在開始時遇到了麻煩,你可使用下面來源於個人代碼的結構體定義,做爲提示:
typedef struct {
int value, wakeups;
Mutex *mutex;
Cond *cond;
} Semaphore;
value
是信號量的值。wakeups
記錄了掛起信號的數量,也就是說它是已被喚醒可是尚未恢復執行的線程數量。wakeups
的緣由是確保咱們的信號量擁有《The Little Book of Semaphores》中描述的性質3。
mutex
提供了value
和wakeups
的互斥訪問,cond
是線程在須要等待信號量時所等待的條件變量。
下面是這個結構體的初始化代碼:
Semaphore *make_semaphore(int value)
{
Semaphore *semaphore = check_malloc(sizeof(Semaphore));
semaphore->value = value;
semaphore->wakeups = 0;
semaphore->mutex = make_mutex();
semaphore->cond = make_cond();
return semaphore;
}
下面是我使用POSIX互斥體和條件變量的信號量實現:
void semaphore_wait(Semaphore *semaphore)
{
mutex_lock(semaphore->mutex);
semaphore->value--;
if (semaphore->value < 0) {
do {
cond_wait(semaphore->cond, semaphore->mutex);
} while (semaphore->wakeups < 1);
semaphore->wakeups--;
}
mutex_unlock(semaphore->mutex);
}
當線程等待信號量時,須要在減小value
以前對互斥體加鎖。若是信號量的值爲負,線程會被阻塞直到wakeups
可用。要注意當它被阻塞時,互斥體是未鎖的,因此其它線程能夠向條件變量發送信號。
semaphore_signal
的代碼以下:
void semaphore_signal(Semaphore *semaphore)
{
mutex_lock(semaphore->mutex);
semaphore->value++;
if (semaphore->value <= 0) {
semaphore->wakeups++;
cond_signal(semaphore->cond);
}
mutex_unlock(semaphore->mutex);
}
一樣,線程在增長value
以前須要對互斥體加鎖。若是信號量是負的,說明還有等待線程,因此發送線程須要增長wakeups
並向條件變量發送信號。
此時等待線程可能會喚醒,可是互斥體仍然會鎖住它們,直到發送線程解鎖了它。
這個時候,某個等待線程從cond_wait
中返回,以後檢查是否wakeup
仍然有效。若是沒有它會循環並再次等待條件變量。若是有效,它會減小wakeup
,解鎖互斥體並退出。
這個解決方案使用do-while
循環的緣由可能並非很明顯。你知道爲何不使用更廣泛的while
循環嗎?會出現什麼問題呢?
問題就是while
循環的實現不知足性質3。一個發送線程能夠在以後的運行中收到它本身的信號。
使用do-while
循環,就確保[1]了當一個線程發送信號時,另外一個等待線程會收到信號,即便發送線程在某個等待線程恢復以前繼續運行並對互斥體加鎖。
1] 好吧,幾乎是這樣。實際上一個時機恰當的[虛假喚醒會打破這一保證。