數據結構【二】:簡單阻塞隊列BlockingQueue

POSIX多線程【一】:簡單隊列simple queue的基礎上使用內部互斥鎖條件變量來控制併發以達到線程安全的目的,其主要用於 [生產者-消費者] 隊列.html

1.BlockingQueue初始化時會肯定隊列容量(_capacity),若是隊列已滿(capacity=0),則會阻塞enqueue操做.node

2.關閉BlockingQueue(調用queue_free)是一個延遲的操做,它會等待全部元素都dequeue,期間,該隊列的一切enqueue操做將無效.安全

3.此代碼未經生產環境檢驗,僅供學習參考.多線程

BlockingQueue.h併發

#ifndef CUR_BLOCKINGQUEUE_H
#define CUR_BLOCKINGQUEUE_H
#include <stdlib.h>
#include <pthread.h>


struct node{
    int value;
    struct node * next;
};


typedef struct BlockingQueue_ST{
    int capacity,remaining,closed;
    struct node * head, *tail;
    pthread_mutex_t queue_mutex;
    pthread_cond_t  cond_not_full;
    pthread_cond_t  cond_not_empty;
    pthread_cond_t  cond_empty;
}BlockingQueue;


extern BlockingQueue* empty_queue(int _capacity);
extern int queue_free(BlockingQueue *q);
extern int is_empty(const BlockingQueue *q);
extern int is_full(const BlockingQueue *q);
extern int enqueue(struct node *item, BlockingQueue *q);
extern struct node* dequeue(BlockingQueue *q);

#endif

 

BlockingQueue.c學習

#include "BlockingQueue.h"
#include <stdio.h>

BlockingQueue* empty_queue(int _capacity)
{
     BlockingQueue *q = malloc(sizeof(BlockingQueue));
     q->head  = q->tail = NULL;
     q->capacity = q->remaining = _capacity;
     q->closed = 0;
     pthread_mutex_init(&q->queue_mutex , NULL);
     pthread_cond_init(&q->cond_not_full , NULL);
     pthread_cond_init(&q->cond_not_empty , NULL);
     pthread_cond_init(&q->cond_empty , NULL);
     return q;
}
 
int queue_free(BlockingQueue *q)
{
    pthread_mutex_lock(&q->queue_mutex);
    printf("close queue...\n");
    q->closed = 1;
    //等待cond_empty
    while(!is_empty(q))
    {
        pthread_cond_wait(&q->cond_empty, &q->queue_mutex);
    }
    free(q);
    pthread_mutex_unlock(&q->queue_mutex);
    printf("closed...\n");
}

int is_empty(const BlockingQueue *q)
{
    return q->capacity == q->remaining;
}

int is_full(const BlockingQueue *q)
{
    return q->remaining == 0;
}

int enqueue(struct node *item, BlockingQueue *q)
{

    if(q->closed) goto err;
    //lock
    pthread_mutex_lock(&q->queue_mutex);
    //等待cond_not_full
    while(is_full(q))
    {
        pthread_cond_wait(&q->cond_not_full, &q->queue_mutex);
    }
    
    if(is_empty(q))
    {
        q->head = q->tail = item;
        //通知全部等待cond_not_empty的線程
        pthread_cond_broadcast(&q->cond_not_empty);
    }
    else
    {
        q->tail->next = item;
        q->tail = item;
    }
    q->remaining--;
    //unlock
    pthread_mutex_unlock(&q->queue_mutex);

    return 0;
err : 
    return -1;
}

struct node* dequeue(BlockingQueue *q)
{
    
    //已經關閉的空隊列
    if(q->closed && is_empty(q)) goto err;
    //lock 
    pthread_mutex_lock(&q->queue_mutex);
    //空隊列,等待cond_not_empty
    while(!q->closed && is_empty(q))
    {
        pthread_cond_wait(&q->cond_not_empty, &q->queue_mutex);
    }
    //take
    struct node * temp = q->head;
    q->head = q->head->next;


    //在未關閉隊列的狀況下,喚醒enqueue等待線程
    if(!q->closed && is_full(q))
    {
        pthread_cond_broadcast(&q->cond_not_full); //TODO 1
    }
    q->remaining++; 
    //喚醒關閉隊列線程
    if(q->closed && is_empty(q))
    {
        pthread_cond_signal(&q->cond_empty);//TODO 2
    }

    //注意:TODO 1和TODO 2實際上是互斥的,不可能同時知足條件
    //必須先判斷是否激活cond_not_full而後remaining++
    //最後再判斷是否激活cond_empty
    //unlock
    pthread_mutex_unlock(&q->queue_mutex);
    return temp;
err:
    return NULL;
}

測試代碼 : main.c測試

#include<stdio.h>
#include<stdlib.h>
#include "BlockingQueue.h"
extern void* func_put(void* _q);

BlockingQueue *q;
pthread_t thread1,thread2;
void main()
{
    q = empty_queue(5);
    pthread_create(&thread1,NULL,func_put,(void*)q);
    pthread_create(&thread2,NULL,func_put,(void*)q);
    
    int i;
    for(i=1; i<=10; i++)
    {
        struct node * item = (struct node *)malloc(sizeof(struct node));
        item->value = i;
        item->next = NULL;
        enqueue(item,q);
        printf("enqueue -> thread : %d, value : %d, remaining : %d\n",pthread_self(),i,q->remaining);
        sleep(1);
    }
    queue_free(q);
}

void* func_put(void* _q)
{
    BlockingQueue *q = (BlockingQueue*)_q;
    struct node *item;
    while((item = dequeue(q)) != NULL)
    {
        printf("dequeue -> thread : %d, value : %d, remaining : %d\n",pthread_self(), item->value,q->remaining);
        free(item);
        sleep(3);
    }
}

測試結果 :spa

相關文章
相關標籤/搜索