在POSIX多線程【一】:簡單隊列simple queue的基礎上使用內部互斥鎖和條件變量來控制併發以達到線程安全的目的,其主要用於 [生產者-消費者] 隊列.html
1.BlockingQueue初始化時會肯定隊列容量(_capacity),若是隊列已滿(capacity=0),則會阻塞enqueue操做.node
2.關閉BlockingQueue(調用queue_free)是一個延遲的操做,它會等待全部元素都dequeue,期間,該隊列的一切enqueue操做將無效.安全
3.此代碼未經生產環境檢驗,僅供學習參考.多線程
BlockingQueue.h併發
#ifndef CUR_BLOCKINGQUEUE_H #define CUR_BLOCKINGQUEUE_H #include <stdlib.h> #include <pthread.h> struct node{ int value; struct node * next; }; typedef struct BlockingQueue_ST{ int capacity,remaining,closed; struct node * head, *tail; pthread_mutex_t queue_mutex; pthread_cond_t cond_not_full; pthread_cond_t cond_not_empty; pthread_cond_t cond_empty; }BlockingQueue; extern BlockingQueue* empty_queue(int _capacity); extern int queue_free(BlockingQueue *q); extern int is_empty(const BlockingQueue *q); extern int is_full(const BlockingQueue *q); extern int enqueue(struct node *item, BlockingQueue *q); extern struct node* dequeue(BlockingQueue *q); #endif
BlockingQueue.c學習
#include "BlockingQueue.h" #include <stdio.h> BlockingQueue* empty_queue(int _capacity) { BlockingQueue *q = malloc(sizeof(BlockingQueue)); q->head = q->tail = NULL; q->capacity = q->remaining = _capacity; q->closed = 0; pthread_mutex_init(&q->queue_mutex , NULL); pthread_cond_init(&q->cond_not_full , NULL); pthread_cond_init(&q->cond_not_empty , NULL); pthread_cond_init(&q->cond_empty , NULL); return q; } int queue_free(BlockingQueue *q) { pthread_mutex_lock(&q->queue_mutex); printf("close queue...\n"); q->closed = 1; //等待cond_empty while(!is_empty(q)) { pthread_cond_wait(&q->cond_empty, &q->queue_mutex); } free(q); pthread_mutex_unlock(&q->queue_mutex); printf("closed...\n"); } int is_empty(const BlockingQueue *q) { return q->capacity == q->remaining; } int is_full(const BlockingQueue *q) { return q->remaining == 0; } int enqueue(struct node *item, BlockingQueue *q) { if(q->closed) goto err; //lock pthread_mutex_lock(&q->queue_mutex); //等待cond_not_full while(is_full(q)) { pthread_cond_wait(&q->cond_not_full, &q->queue_mutex); } if(is_empty(q)) { q->head = q->tail = item; //通知全部等待cond_not_empty的線程 pthread_cond_broadcast(&q->cond_not_empty); } else { q->tail->next = item; q->tail = item; } q->remaining--; //unlock pthread_mutex_unlock(&q->queue_mutex); return 0; err : return -1; } struct node* dequeue(BlockingQueue *q) { //已經關閉的空隊列 if(q->closed && is_empty(q)) goto err; //lock pthread_mutex_lock(&q->queue_mutex); //空隊列,等待cond_not_empty while(!q->closed && is_empty(q)) { pthread_cond_wait(&q->cond_not_empty, &q->queue_mutex); } //take struct node * temp = q->head; q->head = q->head->next; //在未關閉隊列的狀況下,喚醒enqueue等待線程 if(!q->closed && is_full(q)) { pthread_cond_broadcast(&q->cond_not_full); //TODO 1 } q->remaining++; //喚醒關閉隊列線程 if(q->closed && is_empty(q)) { pthread_cond_signal(&q->cond_empty);//TODO 2 } //注意:TODO 1和TODO 2實際上是互斥的,不可能同時知足條件 //必須先判斷是否激活cond_not_full而後remaining++ //最後再判斷是否激活cond_empty //unlock pthread_mutex_unlock(&q->queue_mutex); return temp; err: return NULL; }
測試代碼 : main.c測試
#include<stdio.h> #include<stdlib.h> #include "BlockingQueue.h" extern void* func_put(void* _q); BlockingQueue *q; pthread_t thread1,thread2; void main() { q = empty_queue(5); pthread_create(&thread1,NULL,func_put,(void*)q); pthread_create(&thread2,NULL,func_put,(void*)q); int i; for(i=1; i<=10; i++) { struct node * item = (struct node *)malloc(sizeof(struct node)); item->value = i; item->next = NULL; enqueue(item,q); printf("enqueue -> thread : %d, value : %d, remaining : %d\n",pthread_self(),i,q->remaining); sleep(1); } queue_free(q); } void* func_put(void* _q) { BlockingQueue *q = (BlockingQueue*)_q; struct node *item; while((item = dequeue(q)) != NULL) { printf("dequeue -> thread : %d, value : %d, remaining : %d\n",pthread_self(), item->value,q->remaining); free(item); sleep(3); } }
測試結果 :spa