Swoole 源碼分析——基礎模塊之Queue隊列

前言

swoole 的底層隊列有兩種:進程間通訊 IPC 的消息隊列 swMsgQueue,與環形隊列 swRingQueueIPC 的消息隊列用於 task_worker 進程接受投遞消息,環形隊列用於 SW_MODE_THREAD 線程模式下 task_worker 接受投遞消息的方法。php

swMsgQueue 消息隊列數據結構

swoole 使用的消息隊列並非 POSIX 下的 mq_xx 系統函數,而是 SystemV 下的 msgxxx 系列函數,緣由猜想是 systemv 系統函數能夠指定 mtype,也就是消息的類型,這樣就能夠實現對指定的 task_worker 的投放。數組

swMsgQueue 的數據結構比較簡單,blocking 指定消息隊列是不是阻塞式,msg_id 是建立的消息隊列的 idflags 也是指定阻塞式仍是非阻塞式,perms 指定消息隊列的權限。安全

typedef struct _swMsgQueue
{
    int blocking;
    int msg_id;
    int flags;
    int perms;
} swMsgQueue;

swMsgQueue 消息隊列

swMsgQueue 消息隊列的建立

建立消息隊列就是調用 msgget 函數,這個函數的 msg_key 就是 server 端配置的 message_queue_keytask 隊列在 server 結束後不會銷燬,從新啓動程序後,task 進程仍然會接着處理隊列中的任務。若是不設置該值,那麼程序會自動生成: ftok($php_script_file, 1)swoole

void swMsgQueue_set_blocking(swMsgQueue *q, uint8_t blocking)
{
    if (blocking == 0)
    {
        q->flags = q->flags | IPC_NOWAIT;
    }
    else
    {
        q->flags = q->flags & (~IPC_NOWAIT);
    }
}

int swMsgQueue_create(swMsgQueue *q, int blocking, key_t msg_key, int perms)
{
    if (perms <= 0 || perms >= 01000)
    {
        perms = 0666;
    }
    int msg_id;
    msg_id = msgget(msg_key, IPC_CREAT | perms);
    if (msg_id < 0)
    {
        swSysError("msgget() failed.");
        return SW_ERR;
    }
    else
    {
        bzero(q, sizeof(swMsgQueue));
        q->msg_id = msg_id;
        q->perms = perms;
        q->blocking = blocking;
        swMsgQueue_set_blocking(q, blocking);
    }
    return 0;
}

swMsgQueue 消息隊列的發送

消息隊列的發送主要利用 msgsnd 函數,flags 指定發送是阻塞式仍是非阻塞式,在 task_worker 進程中都是採用阻塞式發送的方法。數據結構

int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length)
{
    int ret;

    while (1)
    {
        ret = msgsnd(q->msg_id, in, length, q->flags);
        if (ret < 0)
        {
            SwooleG.error = errno;
            if (errno == EINTR)
            {
                continue;
            }
            else if (errno == EAGAIN)
            {
                return -1;
            }
            else
            {
                swSysError("msgsnd(%d, %d, %ld) failed.", q->msg_id, length, in->mtype);
                return -1;
            }
        }
        else
        {
            return ret;
        }
    }
    return 0;
}

swMsgQueue 消息隊列的接受

消息隊列的接受是利用 msgrcv 函數,其中 mtype 是消息的類型,該參數會取出指定類型的消息,若是 task_ipc_mode 設定的是爭搶模式,該值會統一爲 0,不然該值就是消息發送目的 task_workerid函數

task_worker 進程的主循環會阻塞在本函數中,直到有消息到達。fetch

int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length)
{
    int ret = msgrcv(q->msg_id, data, length, data->mtype, q->flags);
    if (ret < 0)
    {
        SwooleG.error = errno;
        if (errno != ENOMSG && errno != EINTR)
        {
            swSysError("msgrcv(%d, %d, %ld) failed.", q->msg_id, length, data->mtype);
        }
    }
    return ret;
}

swRingQueue 環形隊列的數據結構

環形隊列在以前的文章中歷來沒有出現,由於該隊列是用於 SW_MODE_THREAD 模式下的 worker 線程中。因爲並非進程間的通信,而是線程間的通信,所以效率會更高。優化

swoole 的環形隊列有兩種,一種是普通的環形隊列,另外一種是線程安全的環形隊列,本文只會講線程安全的環形隊列,ui

swoole 爲了環形隊列更加高效,並無使用線程鎖,而是使用了無鎖結構,只會利用 atomic 原子鎖。atom

值得注意的是數據結構中的 flags,該值只會是 0-4 中的一個,該值都是利用原子鎖來改動,以此來實現互斥的做用。

typedef struct _swRingQueue
{
    void **data; /* 隊列空間 */
    char *flags; 
    // 0:push ready 1: push now
    // 2:pop ready; 3: pop now
    uint size; /* 隊列總尺寸 */
    uint num; /* 隊列當前入隊數量 */
    uint head; /* 頭部,出隊列方向*/
    uint tail; /* 尾部,入隊列方向*/

} swRingQueue;

swRingQueue 環形隊列

swRingQueue 環形隊列的建立

環形隊列的建立很簡單,就是初始化隊列數據結構中的各類屬性。

int swRingQueue_init(swRingQueue *queue, int buffer_size)
{
    queue->size = buffer_size;
    queue->flags = (char *)sw_malloc(queue->size);
    if (queue->flags == NULL)
    {
        return -1;
    }
    queue->data = (void **)sw_calloc(queue->size, sizeof(void*));
    if (queue->data == NULL)
    {
        sw_free(queue->flags);
        return -1;
    }
    queue->head = 0;
    queue->tail = 0;
    memset(queue->flags, 0, queue->size);
    memset(queue->data, 0, queue->size * sizeof(void*));
    return 0;
}

swRingQueue 環形隊列的消息入隊

發送消息首先要肯定環形隊列的隊尾。queue->flags 是一個數組,裏面存儲着全部的隊列元素當前的狀態。若是當前隊尾元素的狀態不是 0,說明已經有其餘線程對該隊列元素進行操做,咱們當前線程暫時不能對當前隊尾進行操做,要等其餘線程將隊尾元素向後移動一位,咱們才能進行更新。

當線程將當前隊尾的狀態從 0 改變爲 1 以後,咱們就要馬上更新隊尾的 offset,讓其餘線程繼續入隊數據。接着將數據放入 queue->data,僅僅將數據的地址保存便可。

最後,將 cur_tail_flag_index 原子加 1,將隊列元素狀態改成待讀;將 queue->num 原子加 1

int swRingQueue_push(swRingQueue *queue, void * ele)
{
    if (!(queue->num < queue->size))
    {
        return -1;
    }
    int cur_tail_index = queue->tail;
    char * cur_tail_flag_index = queue->flags + cur_tail_index;
    //TODO Scheld
    while (!sw_atomic_cmp_set(cur_tail_flag_index, 0, 1))
    {
        cur_tail_index = queue->tail;
        cur_tail_flag_index = queue->flags + cur_tail_index;
    }

    // 兩個入隊線程之間的同步
    //TODO 取模操做能夠優化
    int update_tail_index = (cur_tail_index + 1) % queue->size;

    // 若是已經被其餘的線程更新過,則不須要更新;
    // 不然,更新爲 (cur_tail_index+1) % size;
    sw_atomic_cmp_set(&queue->tail, cur_tail_index, update_tail_index);

    // 申請到可用的存儲空間
    *(queue->data + cur_tail_index) = ele;

    sw_atomic_fetch_add(cur_tail_flag_index, 1);
    sw_atomic_fetch_add(&queue->num, 1);
    return 0;
}

swRingQueue 環形隊列的消息出隊

與入隊相反,出隊須要肯定當前隊列的隊首位置,若是隊首的狀態不是 2,那麼說明有其餘線程已經進行了出隊操做,等待其餘線程更新隊首位置便可。

獲取到隊首元素以後,要馬上更新隊首的新位置,而後將數據的首地址傳遞給 ele,而後將隊首元素狀態復原,減小隊列的 num

int swRingQueue_pop(swRingQueue *queue, void **ele)
{
    if (!(queue->num > 0))
        return -1;
    int cur_head_index = queue->head;
    char * cur_head_flag_index = queue->flags + cur_head_index;

    while (!sw_atomic_cmp_set(cur_head_flag_index, 2, 3))
    {
        cur_head_index = queue->head;
        cur_head_flag_index = queue->flags + cur_head_index;
    }
    //TODO 取模操做能夠優化
    int update_head_index = (cur_head_index + 1) % queue->size;
    sw_atomic_cmp_set(&queue->head, cur_head_index, update_head_index);
    *ele = *(queue->data + cur_head_index);

    sw_atomic_fetch_sub(cur_head_flag_index, 3);
    sw_atomic_fetch_sub(&queue->num, 1);
    return 0;
}
相關文章
相關標籤/搜索