swoole
的底層隊列有兩種:進程間通訊 IPC
的消息隊列 swMsgQueue
,與環形隊列 swRingQueue
。IPC
的消息隊列用於 task_worker
進程接受投遞消息,環形隊列用於 SW_MODE_THREAD
線程模式下 task_worker
接受投遞消息的方法。php
swMsgQueue
消息隊列數據結構swoole
使用的消息隊列並非 POSIX
下的 mq_xx
系統函數,而是 SystemV
下的 msgxxx
系列函數,緣由猜想是 systemv
系統函數能夠指定 mtype
,也就是消息的類型,這樣就能夠實現對指定的 task_worker
的投放。數組
swMsgQueue
的數據結構比較簡單,blocking
指定消息隊列是不是阻塞式,msg_id
是建立的消息隊列的 id
,flags
也是指定阻塞式仍是非阻塞式,perms
指定消息隊列的權限。安全
typedef struct _swMsgQueue { int blocking; int msg_id; int flags; int perms; } swMsgQueue;
swMsgQueue
消息隊列swMsgQueue
消息隊列的建立建立消息隊列就是調用 msgget
函數,這個函數的 msg_key
就是 server
端配置的 message_queue_key
,task
隊列在 server
結束後不會銷燬,從新啓動程序後,task
進程仍然會接着處理隊列中的任務。若是不設置該值,那麼程序會自動生成: ftok($php_script_file, 1)
swoole
void swMsgQueue_set_blocking(swMsgQueue *q, uint8_t blocking) { if (blocking == 0) { q->flags = q->flags | IPC_NOWAIT; } else { q->flags = q->flags & (~IPC_NOWAIT); } } int swMsgQueue_create(swMsgQueue *q, int blocking, key_t msg_key, int perms) { if (perms <= 0 || perms >= 01000) { perms = 0666; } int msg_id; msg_id = msgget(msg_key, IPC_CREAT | perms); if (msg_id < 0) { swSysError("msgget() failed."); return SW_ERR; } else { bzero(q, sizeof(swMsgQueue)); q->msg_id = msg_id; q->perms = perms; q->blocking = blocking; swMsgQueue_set_blocking(q, blocking); } return 0; }
swMsgQueue
消息隊列的發送消息隊列的發送主要利用 msgsnd
函數,flags
指定發送是阻塞式仍是非阻塞式,在 task_worker
進程中都是採用阻塞式發送的方法。數據結構
int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length) { int ret; while (1) { ret = msgsnd(q->msg_id, in, length, q->flags); if (ret < 0) { SwooleG.error = errno; if (errno == EINTR) { continue; } else if (errno == EAGAIN) { return -1; } else { swSysError("msgsnd(%d, %d, %ld) failed.", q->msg_id, length, in->mtype); return -1; } } else { return ret; } } return 0; }
swMsgQueue
消息隊列的接受消息隊列的接受是利用 msgrcv
函數,其中 mtype
是消息的類型,該參數會取出指定類型的消息,若是 task_ipc_mode
設定的是爭搶模式,該值會統一爲 0,不然該值就是消息發送目的 task_worker
的 id
。函數
task_worker
進程的主循環會阻塞在本函數中,直到有消息到達。fetch
int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length) { int ret = msgrcv(q->msg_id, data, length, data->mtype, q->flags); if (ret < 0) { SwooleG.error = errno; if (errno != ENOMSG && errno != EINTR) { swSysError("msgrcv(%d, %d, %ld) failed.", q->msg_id, length, data->mtype); } } return ret; }
swRingQueue
環形隊列的數據結構環形隊列在以前的文章中歷來沒有出現,由於該隊列是用於 SW_MODE_THREAD
模式下的 worker
線程中。因爲並非進程間的通信,而是線程間的通信,所以效率會更高。優化
swoole
的環形隊列有兩種,一種是普通的環形隊列,另外一種是線程安全的環形隊列,本文只會講線程安全的環形隊列,ui
swoole
爲了環形隊列更加高效,並無使用線程鎖,而是使用了無鎖結構,只會利用 atomic
原子鎖。atom
值得注意的是數據結構中的 flags
,該值只會是 0-4 中的一個,該值都是利用原子鎖來改動,以此來實現互斥的做用。
typedef struct _swRingQueue { void **data; /* 隊列空間 */ char *flags; // 0:push ready 1: push now // 2:pop ready; 3: pop now uint size; /* 隊列總尺寸 */ uint num; /* 隊列當前入隊數量 */ uint head; /* 頭部,出隊列方向*/ uint tail; /* 尾部,入隊列方向*/ } swRingQueue;
swRingQueue
環形隊列swRingQueue
環形隊列的建立環形隊列的建立很簡單,就是初始化隊列數據結構中的各類屬性。
int swRingQueue_init(swRingQueue *queue, int buffer_size) { queue->size = buffer_size; queue->flags = (char *)sw_malloc(queue->size); if (queue->flags == NULL) { return -1; } queue->data = (void **)sw_calloc(queue->size, sizeof(void*)); if (queue->data == NULL) { sw_free(queue->flags); return -1; } queue->head = 0; queue->tail = 0; memset(queue->flags, 0, queue->size); memset(queue->data, 0, queue->size * sizeof(void*)); return 0; }
swRingQueue
環形隊列的消息入隊發送消息首先要肯定環形隊列的隊尾。queue->flags
是一個數組,裏面存儲着全部的隊列元素當前的狀態。若是當前隊尾元素的狀態不是 0
,說明已經有其餘線程對該隊列元素進行操做,咱們當前線程暫時不能對當前隊尾進行操做,要等其餘線程將隊尾元素向後移動一位,咱們才能進行更新。
當線程將當前隊尾的狀態從 0 改變爲 1 以後,咱們就要馬上更新隊尾的 offset
,讓其餘線程繼續入隊數據。接着將數據放入 queue->data
,僅僅將數據的地址保存便可。
最後,將 cur_tail_flag_index
原子加 1,將隊列元素狀態改成待讀;將 queue->num
原子加 1
int swRingQueue_push(swRingQueue *queue, void * ele) { if (!(queue->num < queue->size)) { return -1; } int cur_tail_index = queue->tail; char * cur_tail_flag_index = queue->flags + cur_tail_index; //TODO Scheld while (!sw_atomic_cmp_set(cur_tail_flag_index, 0, 1)) { cur_tail_index = queue->tail; cur_tail_flag_index = queue->flags + cur_tail_index; } // 兩個入隊線程之間的同步 //TODO 取模操做能夠優化 int update_tail_index = (cur_tail_index + 1) % queue->size; // 若是已經被其餘的線程更新過,則不須要更新; // 不然,更新爲 (cur_tail_index+1) % size; sw_atomic_cmp_set(&queue->tail, cur_tail_index, update_tail_index); // 申請到可用的存儲空間 *(queue->data + cur_tail_index) = ele; sw_atomic_fetch_add(cur_tail_flag_index, 1); sw_atomic_fetch_add(&queue->num, 1); return 0; }
swRingQueue
環形隊列的消息出隊與入隊相反,出隊須要肯定當前隊列的隊首位置,若是隊首的狀態不是 2,那麼說明有其餘線程已經進行了出隊操做,等待其餘線程更新隊首位置便可。
獲取到隊首元素以後,要馬上更新隊首的新位置,而後將數據的首地址傳遞給 ele
,而後將隊首元素狀態復原,減小隊列的 num
。
int swRingQueue_pop(swRingQueue *queue, void **ele) { if (!(queue->num > 0)) return -1; int cur_head_index = queue->head; char * cur_head_flag_index = queue->flags + cur_head_index; while (!sw_atomic_cmp_set(cur_head_flag_index, 2, 3)) { cur_head_index = queue->head; cur_head_flag_index = queue->flags + cur_head_index; } //TODO 取模操做能夠優化 int update_head_index = (cur_head_index + 1) % queue->size; sw_atomic_cmp_set(&queue->head, cur_head_index, update_head_index); *ele = *(queue->data + cur_head_index); sw_atomic_fetch_sub(cur_head_flag_index, 3); sw_atomic_fetch_sub(&queue->num, 1); return 0; }