Linux上目前有兩種事件通知方式,一種是線程條件變量,一種是利用eventfd實現事件通知,下面介紹一下利用這兩種方法實現異步隊列的方法。 ###線程條件變量 ####相關函數介紹異步
####喚醒丟失問題 若是線程未持有與條件相關聯的互斥鎖,則調用 pthread_cond_signal() 或 pthread_cond_broadcast() 會產生喚醒丟失錯誤。知足如下全部條件時,即會出現喚醒丟失問題:async
信號不起做用,所以將會丟失,僅當修改所測試的條件但未持有與之相關聯的互斥鎖時,纔會出現此問題。只要僅在持有關聯的互斥鎖同時修改所測試的條件,便可調用 pthread_cond_signal() 和 pthread_cond_broadcast(),而不管這些函數是否持有關聯的互斥鎖。函數
####線程條件變量使用方法性能
get_resources(int amount) { pthread_mutex_lock(&rsrc_lock); while (resources < amount) { pthread_cond_wait(&rsrc_add, &rsrc_lock); } resources -= amount; pthread_mutex_unlock(&rsrc_lock); } add_resources(int amount) { pthread_mutex_lock(&rsrc_lock); resources += amount; pthread_cond_broadcast(&rsrc_add); pthread_mutex_unlock(&rsrc_lock); }
###eventfd測試
int eventfd(unsigned int initval, int flags);
線程
eventfd
是Linux提供內核態的事件等待/通知機制,內核維護了一個8字節的整型數,該整型數由initval
來初始化,flags
參數能夠由如下值位或而來:code
O_CLOEXEC
標誌。read
模式會使整型數減1並返回數值1。當內核維護的8字節整型數爲0時,read
操做會阻塞,若是爲fd設置爲非阻塞模式,則返回EAGAIN
錯誤。隊列
###簡單的喚醒隊列事件
下面咱們實現一個簡單的環形隊列:ci
#define default_size 1024 typedef struct queue { int header; int tail; int size; int capcity; void **_buf; } queue_t; queue_t *queue_create(int size) { queue_t *q = malloc(sizeof (queue_t)); if (q != NULL) { if (size > 0) { q->_buf = malloc(size); q->capcity = size; } else { q->_buf = malloc(default_size * sizeof (void *)); q->capcity = default_size; } q->header = q->tail = q->size = 0; } return q; } int queue_is_full(queue_t *q) { return q->size == q->capcity; } int queue_is_empty(queue_t *q) { return q->size == 0; } void queue_push_tail(queue_t *q, void *data) { if (!queue_is_full(q)) { q->_buf[q->tail] = data; q->tail = (q->tail + 1) % q->capcity; q->size++; } } void *queue_pop_head(queue_t *q) { void *data = NULL; if (!queue_is_empty(q)) { data = q->_buf[(q->header)]; q->header = (q->header + 1) % q->capcity; q->size--; } return data; } int *queue_free(queue_t *q) { free(q->_buf); free(q); }
###線程變量實現的異步隊列
typedef struct async_queue { pthread_mutex_t mutex; pthread_cond_t cond; int waiting_threads; queue_t *_queue; } async_queue_t; async_queue_t *async_queue_create(int size) { async_queue_t *q = malloc(sizeof (async_queue_t)); q->_queue = queue_create(size); q->waiting_threads = 0; pthread_mutex_init(&(q->mutex), NULL); pthread_cond_init(&(q->cond), NULL); return q; } void async_queue_push_tail(async_queue_t *q, void *data) { if (!queue_is_full(q->_queue)) { pthread_mutex_lock(&(q->mutex)); queue_push_tail(q->_queue, data); if (q->waiting_threads > 0) { pthread_cond_signal(&(q->cond)); } pthread_mutex_unlock(&(q->mutex)); } } void *async_queue_pop_head(async_queue_t *q, struct timeval *tv) { void *retval = NULL; pthread_mutex_lock(&(q->mutex)); if (queue_is_empty(q->_queue)) { q->waiting_threads++; while (queue_is_empty(q->_queue)) { pthread_cond_wait(&(q->cond), &(q->mutex)); } q->waiting_threads--; } retval = queue_pop_head(q->_queue); pthread_mutex_unlock(&(q->mutex)); return retval; } void async_queue_free(async_queue_t *q) { queue_free(q->_queue); pthread_cond_destroy(&(q->cond)); pthread_mutex_destroy(&(q->mutex)); free(q); }
###eventfd實現的異步隊列
typedef struct async_queue { int efd; //event fd fd_set rdfds; //for select queue_t *_queue; } async_queue_t; async_queue_t *async_queue_create(int size) { async_queue_t *q = malloc(sizeof (async_queue_t)); q->efd = eventfd(0, EFD_SEMAPHORE|EFD_NONBLOCK); q->_queue = queue_create(size); FD_ZERO(&(q->rdfds)); FD_SET(q->efd, &(q->rdfds)); return q; } void async_queue_push_tail(async_queue_t *q, void *data) { unsigned long long i = 1; if (!queue_is_full(q->_queue)) { queue_push_tail(q->_queue, data); write(q->efd, &i, sizeof (i)); } } void *async_queue_pop_head(async_queue_t *q, struct timeval *tv) { unsigned long long i = 0; void *data = NULL; if (select(q->efd + 1, &(q->rdfds), NULL, NULL, tv) == 0) { return data; } else { read(q->efd, &i, sizeof (i)); return queue_pop_head(q->_queue); } } void async_queue_free(async_queue_t *q) { queue_free(q->_queue); close(q->efd); free(q); }
###總結 兩種實現方法線程條件變量比較複雜,可是性能略高,而eventfd實現簡單,可是性能略低。