linux producer consumer sources

除了提供互斥以外,信號量的另一個做用是調度對共享資源的訪問。在這種情景中,一個線程用信號量來通知另外一個線程,程序狀態中的某個條件已經爲真了。 函數

下圖給出了生產者-------消費者問題。生產者和消費者線程共享一個有n個槽的有限緩衝區。 post

生產者線程反覆的生成新的項目(item),並把它們插入到緩衝區中。消費者線程不斷的從緩衝區中取出這些項目,而後消費( 使用它們)
 spa

 

由於插入和取出項目都涉及更新共享變量,因此咱們必須保證對緩衝區的訪問時互斥的。 線程

可是隻保證互斥訪問仍是不夠的,咱們還須要調度  對緩衝區的訪問 (若是緩衝區是滿的即沒有可用的槽,那麼生產者必須等待直到有一個空的槽變爲可用爲止。與之類似,若緩衝區是空的即沒有可取用的槽, 那麼消費者必須等待直到有一個項目變爲可用。索引

接下來咱們爲生產者----消費者  定義一個結構體: 來存儲數據 ci

Typedef struct {資源

       Int * buf;      //  item存放在一個動態分配的n項整數buf rem

       Int n;         //同步

       Int front;     //  索引值記錄第一項 和最後一項 string

       Int rear;     //

       Sem_t mutex;  //  三個信號量同步對緩衝區的訪問。提供互斥的緩衝區的訪問。

       Sem_t slots; //   分別記錄空槽和可用item的數量

       Sem_t items; //

}sbuf_t;

/*

Buffer array

Maximum number of slots

Buf[(front+1)%n]  is first item

Buf[rear%n] is last item

Protects accessed to buf

Counts available slots

Counts available items

*/

此結構體包含使用的有限緩衝區

咱們使用一個函數sbuf_init來初始化此緩衝區,並設置front rear 表示一個空的緩衝區,併爲三個信號量賦予初始值。使用sbuf_deinit函數來刪除緩衝區(當程序使用完以後)

Sbuf_insert函數等待一個可用的槽,對互斥鎖加鎖,添加項目item。對互斥鎖解鎖,而後宣佈有一個新的item可用。

Sbuf_remove函數式與上一個函數對應的。在等待一個可用的緩衝區以後,對互斥鎖加鎖,從緩衝區的前面取出該項目,對互斥鎖解鎖。而後發信號通知一個新的槽可供使用。

 

 

void  sbuf_init(sbuf_t *sp,int n)

{

       If((Sp->buf=calloc(n,sizeof(int))==NULL)

              Printf(「calloc  錯誤\n」);

       Sp->n=n; //Buffer holds max of n items

       Sp->front=sp->rear=0;  //Empty buffer iff front==rear

       Sem_init(&sp->mutex,0,1);//binary semaphore for locking

       Sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots

       Sem_init(&sp->items,0,0); //initially, buf has zero data items

}

Void sbuf_deinit(sbuf_t *sp)

{

       Free(sp->buf);

}

Void sbuf_insert(sbuf_t *sp,int item)

{

       Sem_wait(&sp->slots);//wait for available slot

       Sem_wait(&sp->mutex);//lock the buffer

       Sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item

       Sem_post(&sp->mutex);//unlock the buffer

       Sem_post(&sp->items);//announce available item

}

 

Int sbuf_remove(sbuf_t *sp)

{

       Int item;

       Sem_wait(&sp->items);//wait for available item

       Sem_wait(&sp->mutex);//lock the buffer

       Item=sp->buf[(++sp->front)%(sp->n)];// remove the item

       Sem_post(&sp->mutex);

       Sem_post(&sp->slots);

       Return item;

}

#include  <stdio.h>

#include  <stdlib.h> #include  <string.h> #include  <pthread.h> #include  <unistd.h> #include  <semaphore.h> typedef struct {     int * buf;      //  item存放在一個動態分配的n項整數buf中     int n;         //     int front;     //  索引值記錄第一項 和最後一項     int rear;     //     sem_t mutex;  //  三個信號量同步對緩衝區的訪問。提供互斥的緩衝區的訪問。     sem_t slots; //   分別記錄空槽和可用item的數量     sem_t items; // }sbuf_t; sbuf_t g_sp; void  sbuf_init(sbuf_t *sp,int n) {     sp->buf=(int*)calloc(n,sizeof(int));     if( NULL == sp->buf)         printf("calloc  錯誤\n");     sp->n=n; //Buffer holds max of n items     sp->front=sp->rear=0;  //Empty buffer iff front==rear     sem_init(&sp->mutex,0,1);//binary semaphore for locking     sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots     sem_init(&sp->items,0,0); //initially, buf has zero data items } void sbuf_deinit(sbuf_t *sp) {     free(sp->buf); } void sbuf_insert(sbuf_t *sp,int item) {     sem_wait(&sp->slots);//wait for available slot     sem_wait(&sp->mutex);//lock the buffer     sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item     sem_post(&sp->mutex);//unlock the buffer     sem_post(&sp->items);//announce available item } int sbuf_remove(sbuf_t *sp) {     int item;     sem_wait(&sp->items);//wait for available item     sem_wait(&sp->mutex);//lock the buffer     item=sp->buf[(++sp->front)%(sp->n)];// remove the item     sem_post(&sp->mutex);     sem_post(&sp->slots);     return item; } void* producer(void* arg) {     int i,item,index;     //index 是線程編號     index=*(int*)arg;     for(int i=0;i<g_sp.n;i++)     {         item =(i*2+1);         sbuf_insert(&g_sp,item);         printf("[P%d] Producing %d ...\n",index,item);         fflush(stdout);         sleep(1);     } } void* consumer(void* arg) {     int i,item,index;     index=*(int*)arg;     for(int i=0;i<g_sp.n;i++)     {         item =sbuf_remove(&g_sp);         printf("------>[C%d] Consuming %d ...\n",index,item);         sleep(1);     } } int main(int argc,char* argv[]) {     sbuf_init(&g_sp,10);     int NP=3,NC=3;     pthread_t idp[NP],idc[NC];     //1     for(int i=0;i<NP;i++)     {             pthread_create(&idp[i],NULL,producer,(void*)&i);     }     //2     for(int i=0;i<NC;i++)     {             pthread_create(&idc[i],NULL,consumer,(void*)&i);     }     //3     for(int i=0;i<NP;i++)         pthread_join(idp[i],NULL);     for(int i=0;i<NC;i++)         pthread_join(idc[i],NULL);              sbuf_deinit(&g_sp);     return 0; }

相關文章
相關標籤/搜索