除了提供互斥以外,信號量的另一個做用是調度對共享資源的訪問。在這種情景中,一個線程用信號量來通知另外一個線程,程序狀態中的某個條件已經爲真了。 函數
下圖給出了生產者-------消費者問題。生產者和消費者線程共享一個有n個槽的有限緩衝區。 post
生產者線程反覆的生成新的項目(item),並把它們插入到緩衝區中。消費者線程不斷的從緩衝區中取出這些項目,而後消費( 使用它們)。
spa
由於插入和取出項目都涉及更新共享變量,因此咱們必須保證對緩衝區的訪問時互斥的。 線程
可是隻保證互斥訪問仍是不夠的,咱們還須要調度 對緩衝區的訪問 (若是緩衝區是滿的即沒有可用的槽,那麼生產者必須等待直到有一個空的槽變爲可用爲止。與之類似,若緩衝區是空的即沒有可取用的槽, 那麼消費者必須等待直到有一個項目變爲可用。) 索引
接下來咱們爲生產者----消費者 定義一個結構體: 來存儲數據 ci
Typedef struct {資源
Int * buf; // item存放在一個動態分配的n項整數buf中 rem
Int n; //同步
Int front; // 索引值記錄第一項 和最後一項 string
Int rear; //
Sem_t mutex; // 三個信號量同步對緩衝區的訪問。提供互斥的緩衝區的訪問。
Sem_t slots; // 分別記錄空槽和可用item的數量
Sem_t items; //
}sbuf_t;
/*
Buffer array
Maximum number of slots
Buf[(front+1)%n] is first item
Buf[rear%n] is last item
Protects accessed to buf
Counts available slots
Counts available items
*/
此結構體包含使用的有限緩衝區
咱們使用一個函數sbuf_init來初始化此緩衝區,並設置front 和rear 表示一個空的緩衝區,併爲三個信號量賦予初始值。使用sbuf_deinit函數來刪除緩衝區(當程序使用完以後)
Sbuf_insert函數等待一個可用的槽,對互斥鎖加鎖,添加項目item。對互斥鎖解鎖,而後宣佈有一個新的item可用。
Sbuf_remove函數式與上一個函數對應的。在等待一個可用的緩衝區以後,對互斥鎖加鎖,從緩衝區的前面取出該項目,對互斥鎖解鎖。而後發信號通知一個新的槽可供使用。
void sbuf_init(sbuf_t *sp,int n)
{
If((Sp->buf=calloc(n,sizeof(int))==NULL)
Printf(「calloc 錯誤\n」);
Sp->n=n; //Buffer holds max of n items
Sp->front=sp->rear=0; //Empty buffer iff front==rear
Sem_init(&sp->mutex,0,1);//binary semaphore for locking
Sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots
Sem_init(&sp->items,0,0); //initially, buf has zero data items
}
Void sbuf_deinit(sbuf_t *sp)
{
Free(sp->buf);
}
Void sbuf_insert(sbuf_t *sp,int item)
{
Sem_wait(&sp->slots);//wait for available slot
Sem_wait(&sp->mutex);//lock the buffer
Sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item
Sem_post(&sp->mutex);//unlock the buffer
Sem_post(&sp->items);//announce available item
}
Int sbuf_remove(sbuf_t *sp)
{
Int item;
Sem_wait(&sp->items);//wait for available item
Sem_wait(&sp->mutex);//lock the buffer
Item=sp->buf[(++sp->front)%(sp->n)];// remove the item
Sem_post(&sp->mutex);
Sem_post(&sp->slots);
Return item;
}
#include <stdio.h>
#include <stdlib.h> #include <string.h> #include <pthread.h> #include <unistd.h> #include <semaphore.h> typedef struct { int * buf; // item存放在一個動態分配的n項整數buf中 int n; // int front; // 索引值記錄第一項 和最後一項 int rear; // sem_t mutex; // 三個信號量同步對緩衝區的訪問。提供互斥的緩衝區的訪問。 sem_t slots; // 分別記錄空槽和可用item的數量 sem_t items; // }sbuf_t; sbuf_t g_sp; void sbuf_init(sbuf_t *sp,int n) { sp->buf=(int*)calloc(n,sizeof(int)); if( NULL == sp->buf) printf("calloc 錯誤\n"); sp->n=n; //Buffer holds max of n items sp->front=sp->rear=0; //Empty buffer iff front==rear sem_init(&sp->mutex,0,1);//binary semaphore for locking sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots sem_init(&sp->items,0,0); //initially, buf has zero data items } void sbuf_deinit(sbuf_t *sp) { free(sp->buf); } void sbuf_insert(sbuf_t *sp,int item) { sem_wait(&sp->slots);//wait for available slot sem_wait(&sp->mutex);//lock the buffer sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item sem_post(&sp->mutex);//unlock the buffer sem_post(&sp->items);//announce available item } int sbuf_remove(sbuf_t *sp) { int item; sem_wait(&sp->items);//wait for available item sem_wait(&sp->mutex);//lock the buffer item=sp->buf[(++sp->front)%(sp->n)];// remove the item sem_post(&sp->mutex); sem_post(&sp->slots); return item; } void* producer(void* arg) { int i,item,index; //index 是線程編號 index=*(int*)arg; for(int i=0;i<g_sp.n;i++) { item =(i*2+1); sbuf_insert(&g_sp,item); printf("[P%d] Producing %d ...\n",index,item); fflush(stdout); sleep(1); } } void* consumer(void* arg) { int i,item,index; index=*(int*)arg; for(int i=0;i<g_sp.n;i++) { item =sbuf_remove(&g_sp); printf("------>[C%d] Consuming %d ...\n",index,item); sleep(1); } } int main(int argc,char* argv[]) { sbuf_init(&g_sp,10); int NP=3,NC=3; pthread_t idp[NP],idc[NC]; //1 for(int i=0;i<NP;i++) { pthread_create(&idp[i],NULL,producer,(void*)&i); } //2 for(int i=0;i<NC;i++) { pthread_create(&idc[i],NULL,consumer,(void*)&i); } //3 for(int i=0;i<NP;i++) pthread_join(idp[i],NULL); for(int i=0;i<NC;i++) pthread_join(idc[i],NULL); sbuf_deinit(&g_sp); return 0; }