Swoole 源碼分析——基礎模塊之 Channel 隊列

前言

內存數據結構 Channel,相似於 Gochan 通道,底層基於 共享內存 + Mutex 互斥鎖實現,可實現用戶態的高性能內存隊列。Channel 可用於多進程環境下,底層在讀取寫入時會自動加鎖,應用層不須要擔憂數據同步問題。數據結構

channel 在以前的文章中出現過,當時用於 managerworker 進程之間進行通訊的重要數據結構,主要用於 worker 進程通知 manager 進程重啓相應 worker 進程。性能

channel 數據結構

channel 數據結構的屬性比較多,head 是隊列的頭部位置,tail 是隊列的尾部位置,size 是申請的隊列內存大小,maxlen 是每一個隊列元素的大小,head_tagtail_tag 用於指定隊列的頭尾是否循環被重置回頭部。bytes 是當前 channel 隊列佔用的內存大小,flag 用來指定是否使用共享內存、是否使用鎖、是否使用 pipe 通知。memchannel 的內存首地址。spa

typedef struct _swChannel_item
{
    int length;
    char data[0];
} swChannel_item;

typedef struct _swChannel
{
    off_t head;
    off_t tail;
    size_t size;
    char head_tag;
    char tail_tag;
    int num;
    int max_num;
    /**
     * Data length, excluding structure
     */
    size_t bytes;
    int flag;
    int maxlen;
    /**
     * memory point
     */
    void *mem;
    swLock lock;
    swPipe notify_fd;
} swChannel;

channel 隊列

swChannel_new 建立隊列

建立隊列就是根據 flags 來初始化隊列的各個屬性,值得注意的是 maxlen,當申請內存的時候會多申請這些內存,用來防止內存越界。code

swChannel* swChannel_new(size_t size, int maxlen, int flags)
{
    assert(size >= maxlen);
    int ret;
    void *mem;

    //use shared memory
    if (flags & SW_CHAN_SHM)
    {
        mem = sw_shm_malloc(size + sizeof(swChannel) + maxlen);
    }
    else
    {
        mem = sw_malloc(size + sizeof(swChannel) + maxlen);
    }

    if (mem == NULL)
    {
        swWarn("swChannel_create: malloc(%ld) failed.", size);
        return NULL;
    }
    swChannel *object = mem;
    mem += sizeof(swChannel);

    bzero(object, sizeof(swChannel));

    //overflow space
    object->size = size;
    object->mem = mem;
    object->maxlen = maxlen;
    object->flag = flags;

    //use lock
    if (flags & SW_CHAN_LOCK)
    {
        //init lock
        if (swMutex_create(&object->lock, 1) < 0)
        {
            swWarn("mutex init failed.");
            return NULL;
        }
    }
    //use notify
    if (flags & SW_CHAN_NOTIFY)
    {
        ret = swPipeNotify_auto(&object->notify_fd, 1, 1);
        if (ret < 0)
        {
            swWarn("notify_fd init failed.");
            return NULL;
        }
    }
    return object;
}

swChannel_push 入隊

入隊的時候,首先要先加鎖,而後調用 swChannel_in隊列

swChannel_in 邏輯很簡單,向隊列的尾部推送數據,若是當前 channel 尾部被重置,head 還未被重置,就須要先判斷剩餘的內存是否夠用。進程

若是當前 channel 尾部未被重置,就能夠放心的追加元素,由於 object->size 和真正申請的內存以前還有 maxlen 能夠富餘,沒必要考慮內存越界的問題。ip

int swChannel_push(swChannel *object, void *in, int data_length)
{
    assert(object->flag & SW_CHAN_LOCK);
    object->lock.lock(&object->lock);
    int ret = swChannel_in(object, in, data_length);
    object->lock.unlock(&object->lock);
    return ret;
}

#define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size))
int swChannel_in(swChannel *object, void *in, int data_length)
{
    assert(data_length <= object->maxlen);
    if (swChannel_full(object))
    {
        return SW_ERR;
    }
    swChannel_item *item;
    int msize = sizeof(item->length) + data_length;

    if (object->tail < object->head)
    {
        //no enough memory space
        if ((object->head - object->tail) < msize)
        {
            return SW_ERR;
        }
        item = object->mem + object->tail;
        object->tail += msize;
    }
    else
    {
        item = object->mem + object->tail;
        object->tail += msize;
        if (object->tail >= object->size)
        {
            object->tail = 0;
            object->tail_tag = 1 - object->tail_tag;
        }
    }
    object->num++;
    object->bytes += data_length;
    item->length = data_length;
    memcpy(item->data, in, data_length);
    return SW_OK;
}

swChannel_push 出隊

swChannel_push 出隊的邏輯比較簡單,獲取隊列頭部位置,而後拷貝首部數據便可。當 head 超過 size 值,便可重置 head內存

int swChannel_pop(swChannel *object, void *out, int buffer_length)
{
    assert(object->flag & SW_CHAN_LOCK);
    object->lock.lock(&object->lock);
    int n = swChannel_out(object, out, buffer_length);
    object->lock.unlock(&object->lock);
    return n;
}

#define swChannel_empty(ch) (ch->num == 0)

int swChannel_out(swChannel *object, void *out, int buffer_length)
{
    if (swChannel_empty(object))
    {
        return SW_ERR;
    }

    swChannel_item *item = object->mem + object->head;
    assert(buffer_length >= item->length);
    memcpy(out, item->data, item->length);
    object->head += (item->length + sizeof(item->length));
    if (object->head >= object->size)
    {
        object->head = 0;
        object->head_tag = 1 - object->head_tag;
    }
    object->num--;
    object->bytes -= item->length;
    return item->length;
}
相關文章
相關標籤/搜索