Linux平臺上C語言實現異步隊列的兩種方法

Linux上目前有兩種事件通知方式,一種是線程條件變量,一種是利用eventfd實現事件通知,下面介紹一下利用這兩種方法實現異步隊列的方法。 ###線程條件變量 ####相關函數介紹異步

  • pthread_cond_init:初始化一個線程條件變量。
  • pthread_cond_wait:等待條件觸發。
  • pthread_cond_signal:通知一個線程,線程條件發生。
  • pthread_cond_timedwait:等待條件觸發,能夠設置超時時間。
  • pthread_cond_reltimedwait_np:和pthread_cond_timedwait使用基本相同,區別是使用的是相對時間間隔而不是絕對時間間隔。
  • pthread_cond_broadcast:通知全部等待線程,線程條件發生。
  • pthread_cond_destroy:銷燬條件變量。

####喚醒丟失問題 若是線程未持有與條件相關聯的互斥鎖,則調用 pthread_cond_signal() 或 pthread_cond_broadcast() 會產生喚醒丟失錯誤。知足如下全部條件時,即會出現喚醒丟失問題:async

  • 一個線程調用 pthread_cond_signal() 或 pthread_cond_broadcast()
  • 另外一個線程已經測試了該條件,可是還沒有調用 pthread_cond_wait()
  • 沒有正在等待的線程

信號不起做用,所以將會丟失,僅當修改所測試的條件但未持有與之相關聯的互斥鎖時,纔會出現此問題。只要僅在持有關聯的互斥鎖同時修改所測試的條件,便可調用 pthread_cond_signal() 和 pthread_cond_broadcast(),而不管這些函數是否持有關聯的互斥鎖。函數

####線程條件變量使用方法性能

get_resources(int amount)    
{    
    pthread_mutex_lock(&rsrc_lock);    
    while (resources < amount) 
    {    
        pthread_cond_wait(&rsrc_add, &rsrc_lock);
    }   
    resources -= amount;
    pthread_mutex_unlock(&rsrc_lock);
}

add_resources(int amount)
{
    pthread_mutex_lock(&rsrc_lock);
    resources += amount;
    pthread_cond_broadcast(&rsrc_add);
    pthread_mutex_unlock(&rsrc_lock);
 }

###eventfd測試

int eventfd(unsigned int initval, int flags);線程

eventfd是Linux提供內核態的事件等待/通知機制,內核維護了一個8字節的整型數,該整型數由initval來初始化,flags參數能夠由如下值位或而來:code

  • EFD_CLOEXEC:設置該描述符的O_CLOEXEC標誌。
  • EFD_NONBLOCK:設置描述符爲非阻塞模式。
  • EFD_SEMAPHORE:設置描述符爲信號量工做模式,在此模式下,read模式會使整型數減1並返回數值1。

當內核維護的8字節整型數爲0時,read操做會阻塞,若是爲fd設置爲非阻塞模式,則返回EAGAIN錯誤。隊列

###簡單的喚醒隊列事件

下面咱們實現一個簡單的環形隊列:ci

#define default_size 1024

typedef struct queue
{
    int header;
    int tail;
    int size;
    int capcity;
    void **_buf;
} queue_t;

queue_t *queue_create(int size)
{
    queue_t *q = malloc(sizeof (queue_t));
    if (q != NULL)
    {
        if (size > 0)
        {
            q->_buf = malloc(size);
            q->capcity = size;
        }
        else
        {
            q->_buf = malloc(default_size * sizeof (void *));
            q->capcity = default_size;
        }
        q->header = q->tail = q->size = 0;
    }

    return q;
}

int queue_is_full(queue_t *q)
{
    return q->size == q->capcity;
}

int queue_is_empty(queue_t *q)
{
    return q->size == 0;
}

void queue_push_tail(queue_t *q, void *data)
{
    if (!queue_is_full(q))
    {
        q->_buf[q->tail] = data;
        q->tail = (q->tail + 1) % q->capcity;
        q->size++;
    }
}

void *queue_pop_head(queue_t *q)
{
    void *data = NULL;
    if (!queue_is_empty(q))
    {
        data = q->_buf[(q->header)];
        q->header = (q->header + 1) % q->capcity;
        q->size--;
    }
    return data;
}

int *queue_free(queue_t *q)
{
    free(q->_buf);
    free(q);
}

###線程變量實現的異步隊列

typedef struct async_queue
{
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int waiting_threads;
    queue_t *_queue;
} async_queue_t;
async_queue_t *async_queue_create(int size)
{
    async_queue_t *q = malloc(sizeof (async_queue_t));
    q->_queue = queue_create(size);
    q->waiting_threads = 0;
    pthread_mutex_init(&(q->mutex), NULL);
    pthread_cond_init(&(q->cond), NULL);

    return q;
}

void async_queue_push_tail(async_queue_t *q, void *data)
{
    if (!queue_is_full(q->_queue))
    {
        pthread_mutex_lock(&(q->mutex));
        queue_push_tail(q->_queue, data);
        if (q->waiting_threads > 0)
        {
            pthread_cond_signal(&(q->cond));
        }
        pthread_mutex_unlock(&(q->mutex));
    }

}

void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
{
    void *retval = NULL;
    pthread_mutex_lock(&(q->mutex));
    if (queue_is_empty(q->_queue))
    {
        q->waiting_threads++;
        while (queue_is_empty(q->_queue))
        {
            pthread_cond_wait(&(q->cond), &(q->mutex));
        }
        q->waiting_threads--;
    }
    retval = queue_pop_head(q->_queue);
    pthread_mutex_unlock(&(q->mutex));
    return retval;
}

void async_queue_free(async_queue_t *q)
{
    queue_free(q->_queue);
    pthread_cond_destroy(&(q->cond));
    pthread_mutex_destroy(&(q->mutex));
    free(q);
}

###eventfd實現的異步隊列

typedef struct async_queue
{
    int efd; //event fd
    fd_set rdfds; //for select
    queue_t *_queue;
} async_queue_t;
async_queue_t *async_queue_create(int size)
{
    async_queue_t *q = malloc(sizeof (async_queue_t));

    q->efd = eventfd(0, EFD_SEMAPHORE|EFD_NONBLOCK);
    q->_queue = queue_create(size);
    FD_ZERO(&(q->rdfds));
    FD_SET(q->efd, &(q->rdfds));

    return q;
}

void async_queue_push_tail(async_queue_t *q, void *data)
{
    unsigned long long i = 1;
    if (!queue_is_full(q->_queue))
    {
        queue_push_tail(q->_queue, data);
        write(q->efd, &i, sizeof (i));
    }
}

void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
{
    unsigned long long i = 0;
    void *data = NULL;
    if (select(q->efd + 1, &(q->rdfds), NULL, NULL, tv) == 0)
    {
        return data;
    }
    else
    {
        read(q->efd, &i, sizeof (i));
        return queue_pop_head(q->_queue);
    }
}

void async_queue_free(async_queue_t *q)
{
    queue_free(q->_queue);
    close(q->efd);
    free(q);
}

###總結 兩種實現方法線程條件變量比較複雜,可是性能略高,而eventfd實現簡單,可是性能略低。

相關文章
相關標籤/搜索