內存數據結構 Channel
,相似於 Go
的 chan
通道,底層基於 共享內存 + Mutex
互斥鎖實現,可實現用戶態的高性能內存隊列。Channel
可用於多進程環境下,底層在讀取寫入時會自動加鎖,應用層不須要擔憂數據同步問題。數據結構
channel
在以前的文章中出現過,當時用於 manager
和 worker
進程之間進行通訊的重要數據結構,主要用於 worker
進程通知 manager
進程重啓相應 worker
進程。性能
channel
數據結構channel
數據結構的屬性比較多,head
是隊列的頭部位置,tail
是隊列的尾部位置,size
是申請的隊列內存大小,maxlen
是每一個隊列元素的大小,head_tag
和 tail_tag
用於指定隊列的頭尾是否循環被重置回頭部。bytes
是當前 channel
隊列佔用的內存大小,flag
用來指定是否使用共享內存、是否使用鎖、是否使用 pipe
通知。mem
是 channel
的內存首地址。spa
typedef struct _swChannel_item { int length; char data[0]; } swChannel_item; typedef struct _swChannel { off_t head; off_t tail; size_t size; char head_tag; char tail_tag; int num; int max_num; /** * Data length, excluding structure */ size_t bytes; int flag; int maxlen; /** * memory point */ void *mem; swLock lock; swPipe notify_fd; } swChannel;
channel
隊列swChannel_new
建立隊列建立隊列就是根據 flags
來初始化隊列的各個屬性,值得注意的是 maxlen
,當申請內存的時候會多申請這些內存,用來防止內存越界。code
swChannel* swChannel_new(size_t size, int maxlen, int flags) { assert(size >= maxlen); int ret; void *mem; //use shared memory if (flags & SW_CHAN_SHM) { mem = sw_shm_malloc(size + sizeof(swChannel) + maxlen); } else { mem = sw_malloc(size + sizeof(swChannel) + maxlen); } if (mem == NULL) { swWarn("swChannel_create: malloc(%ld) failed.", size); return NULL; } swChannel *object = mem; mem += sizeof(swChannel); bzero(object, sizeof(swChannel)); //overflow space object->size = size; object->mem = mem; object->maxlen = maxlen; object->flag = flags; //use lock if (flags & SW_CHAN_LOCK) { //init lock if (swMutex_create(&object->lock, 1) < 0) { swWarn("mutex init failed."); return NULL; } } //use notify if (flags & SW_CHAN_NOTIFY) { ret = swPipeNotify_auto(&object->notify_fd, 1, 1); if (ret < 0) { swWarn("notify_fd init failed."); return NULL; } } return object; }
swChannel_push
入隊入隊的時候,首先要先加鎖,而後調用 swChannel_in
。隊列
swChannel_in
邏輯很簡單,向隊列的尾部推送數據,若是當前 channel
尾部被重置,head
還未被重置,就須要先判斷剩餘的內存是否夠用。進程
若是當前 channel
尾部未被重置,就能夠放心的追加元素,由於 object->size
和真正申請的內存以前還有 maxlen
能夠富餘,沒必要考慮內存越界的問題。ip
int swChannel_push(swChannel *object, void *in, int data_length) { assert(object->flag & SW_CHAN_LOCK); object->lock.lock(&object->lock); int ret = swChannel_in(object, in, data_length); object->lock.unlock(&object->lock); return ret; } #define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size)) int swChannel_in(swChannel *object, void *in, int data_length) { assert(data_length <= object->maxlen); if (swChannel_full(object)) { return SW_ERR; } swChannel_item *item; int msize = sizeof(item->length) + data_length; if (object->tail < object->head) { //no enough memory space if ((object->head - object->tail) < msize) { return SW_ERR; } item = object->mem + object->tail; object->tail += msize; } else { item = object->mem + object->tail; object->tail += msize; if (object->tail >= object->size) { object->tail = 0; object->tail_tag = 1 - object->tail_tag; } } object->num++; object->bytes += data_length; item->length = data_length; memcpy(item->data, in, data_length); return SW_OK; }
swChannel_push
出隊swChannel_push
出隊的邏輯比較簡單,獲取隊列頭部位置,而後拷貝首部數據便可。當 head
超過 size
值,便可重置 head
。內存
int swChannel_pop(swChannel *object, void *out, int buffer_length) { assert(object->flag & SW_CHAN_LOCK); object->lock.lock(&object->lock); int n = swChannel_out(object, out, buffer_length); object->lock.unlock(&object->lock); return n; } #define swChannel_empty(ch) (ch->num == 0) int swChannel_out(swChannel *object, void *out, int buffer_length) { if (swChannel_empty(object)) { return SW_ERR; } swChannel_item *item = object->mem + object->head; assert(buffer_length >= item->length); memcpy(out, item->data, item->length); object->head += (item->length + sizeof(item->length)); if (object->head >= object->size) { object->head = 0; object->head_tag = 1 - object->head_tag; } object->num--; object->bytes -= item->length; return item->length; }