按個人理解,消息隊列是Skynet的核心,Skynet就是圍繞着消息隊列來工做的。
這個消息隊列分爲兩部分:全局隊列和服務隊列。每一個服務都有一個本身的服務隊列,服務隊列被全局隊列引用。主進程經過多個線程來不斷的從全局隊列中取出服務隊列,而後分發服務隊列中的消息到對應的服務。算法
今天,我將撥開消息隊列的面紗,一探究竟。數組
既然是數據結構,就是用來存儲數據的,伴隨着它的就要有添加、刪除、訪問接口。因爲它是用來存儲消息的,不難想到:向某服務發送消息,就是向服務的服務隊列中添加消息。而Skynet是經過多線程來分發消息的,線程的工做就是遍歷全局隊列,分發服務隊列中的消息到服務。session
我就按照這個思路,帶着問題,去看看Skynet的實現:數據結構
- 全局隊列和服務隊列的結構
- 全局隊列和服務隊列的生成
- 如何向全局隊列添加/刪除服務隊列
- 如何向服務隊列添加/刪除消息
- 工做線程如何分發消息
結構
服務隊列結構
struct message_queue { uint32_t handle; int cap; int head; int tail; int lock; int release; int in_global; struct skynet_message *queue; struct message_queue *next; };
初看此結構,感受很像鏈表:next指向下一個節點,queue存儲消息數據。實際上是錯的,稍微思考一下:若是是鏈表的話,那message_queue
的其餘數據(handle,cap等)豈不是要被複制多份?這顯然不符合大神對代碼質量的要求。
既然不是經過鏈表的方式去實現的,那麼很容易就會想到:是經過數組的形式來實現的,queue
實際上是一個動態申請的數組,裏面存了不少條消息,而cap(容量)、head(頭)、tail(尾)是爲queue
服務的。可是next
指針又有什麼用呢?
先無論這麼多了,繼續讀代碼找答案吧。多線程
全局隊列結構
struct global_queue { uint32_t head; uint32_t tail; struct message_queue ** queue; struct message_queue *list; };
生成
全局隊列
一個Skynet進程中,只有一個全局隊列,在系統啓動的時候就會經過skynet_mq_init
生成它:app
void skynet_mq_init() { struct global_queue *q = skynet_malloc(sizeof(*q)); memset(q,0,sizeof(*q)); q->queue = skynet_malloc(MAX_GLOBAL_MQ * sizeof(struct message_queue *)); memset(q->queue, 0, sizeof(struct message_queue *) * MAX_GLOBAL_MQ); Q=q; }
須要注意的是:它直接申請了MAX_GLOBAL_MQ
個message_queue
用於存儲服務隊列,因此服務隊列的總數不能超過MAX_GLOBAL_MQ
。dom
服務隊列
因爲服務隊列是屬於服務的,因此服務隊列的生命週期應和服務一致:載入服務的時候生成,卸載服務的時候刪除。
服務是經過skynet_context_new
載入的,在此函數中,能夠找到對應的服務隊列的生成語句:函數
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); struct message_queue * skynet_mq_create(uint32_t handle) { struct message_queue *q = skynet_malloc(sizeof(*q)); q->handle = handle; q->cap = DEFAULT_QUEUE_SIZE; q->head = 0; q->tail = 0; q->lock = 0; q->in_global = MQ_IN_GLOBAL; q->release = 0; q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap); q->next = NULL; return q; }
在Skynet內部,是經過handle來定位服務的,handle就至關與服務的地址,此函數保存了服務的handle,這樣,之後就能夠經過服務隊列的handle,直接找到對應的服務了。
默認的容量是DEFAULT_QUEUE_SIZE
(64),從這裏就能夠印證咱們上面的判斷了:message_queue
是經過數組保存消息的,不是經過鏈表。
全局隊列操做
全局隊列是一個用固定大小的數組模擬的循環隊列,此循環隊列向尾部添加,從頭部刪除,分別用head、tail記錄其首尾下標。
全局隊列保存全部的服務隊列,worker線程向全局隊列索取服務隊列。爲了效率,並非簡單的把全部的服務隊列都塞到全局隊列中,而是隻塞入非空的服務隊列,這樣worker線程就不會獲得空的服務隊列而浪費資源。
因爲工做線程有多個,爲了不衝突,Skynet運用了這樣的策略:每次worker線程取得一個服務隊列的時候,都把這個服務隊列從全局隊列中刪除,這樣其餘的worker線程就無法獲取到這個服務隊列了,當此worker線程操做完畢後,再將此服務隊列添加到全局隊列(若服務隊列非空的話)。
可能觸發全局隊列添加操做的狀況有:
- 向服務隊列中添加消息(空變非空)
- worker線程處理完畢,服務隊列非空
可能觸發全局隊列刪除操做的狀況有:
- 從服務隊列中刪除消息(非空變空)
- worker線程獲取消息隊列
添加
void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1)); if (!__sync_bool_compare_and_swap(&q->queue[tail], NULL, queue)) { // The queue may full seldom, save queue in list assert(queue->next == NULL); struct message_queue * last; do { last = q->list; queue->next = last; } while(!__sync_bool_compare_and_swap(&q->list, last, queue)); return; } }
不要被那些原子操做函數嚇倒,它們其實要作的很簡單,只是爲了保證操做的原子性,防止多線程衝突問題,才單獨封裝成一個API,詳細解釋見:GCC內置原子內存存取函數。
當向這樣的固定大小的循環隊列添加元素的時候,會遇到以下狀況:
- tail溢出
- 隊列滿了
上述代碼中,tail溢出的問題是經過GP
取模操做來解決的:
#define GP(p) ((p) % MAX_GLOBAL_MQ)
若是隊列滿了,怎麼辦呢?通常的解決辦法有:擴大容量、直接返回操做失敗等。Skynet沒有采用這樣的方法,它是這麼作的:
struct message_queue * last; do { last = q->list; queue->next = last; } while(!__sync_bool_compare_and_swap(&q->list, last, queue));
由於要考慮多線程的問題,代碼顯的比較難讀,咱們簡化一下:
queue->next = q->list; q->list = queue;
這樣就很清晰了,實際上就是:將新的服務隊列queue
添加到全局隊列的額外服務隊列鏈表list
中。這樣,global_queue
的list
中,就存放了全部沒有成功添加的服務隊列(由於全局隊列滿了)。
刪除
刪除的算法就很簡單了:
- 非空檢查
- 取得head下標,作溢出處理(GP)
- 取出當前的頭節點
- 將head下標對應的指針值空
- head加1
這裏有一個細節,還記得上面的添加操做有可能遇到全局隊列滿的狀況嗎?這裏會嘗試將那些添加失敗的隊列添加到全局隊列中:
struct message_queue * list = q->list; if (list) { struct message_queue * newhead = list->next; if (__sync_bool_compare_and_swap(&q->list, list, newhead)) { list->next = NULL; skynet_globalmq_push(list); } }
由於每次都只會pop一個,因此,每次只從list中取一個push進全局隊列。
服務隊列操做
服務隊列中存儲了全部發給此服務的消息。
服務隊列是可變大小的循環隊列,其容量會在運行時動態增長。
添加
經過調用skynet_mq_push
來將消息添加到服務隊列:
void skynet_mq_push(struct message_queue *q, struct skynet_message *message) { q->queue[q->tail] = *message; if (++ q->tail >= q->cap) q->tail = 0; if (q->head == q->tail) expand_queue(q); if (q->in_global == 0) { q->in_global = MQ_IN_GLOBAL; skynet_globalmq_push(q); } }
同全局隊列同樣,它也會遇到:下標溢出、隊列滿的狀況,因爲它是可擴容的循環隊列,當隊列滿的時候,就調用expand_queue
來擴容(當前容量的兩倍)。
這裏須要注意的是,最後作了這樣的處理:若是當前的服務隊列沒有被添加到全局隊列,則將它添加進去,這是爲worker線程而作的優化。
刪除
刪除的操做就很簡單了:head+1。
細節上考慮了下標溢出的問題,並會在隊列爲空的時候,將隊列的in_global
值爲false。
爲何這裏只設置一個標記呢?爲何不從全局隊列中刪除呢?
哈哈!由於只有worker線程纔會操做服務隊列,而當worker線程獲取到服務隊列的時候,已經將它從全局隊列中刪除了。
消息分發
消息分發是經過啓動多個worker線程來作的,而worker線程則不斷的循環調用skynet_context_message_dispatch
,爲了便於理解,我刪掉了一些細節:
struct message_queue * skynet_context_message_dispatch(struct message_queue *q) { if (q == NULL) { q = skynet_globalmq_pop(); if (q==NULL) return NULL; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); struct skynet_message msg; if (skynet_mq_pop(q,&msg)) { skynet_context_release(ctx); return skynet_globalmq_pop(); } _dispatch_message(ctx, &msg); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; }
這個函數有兩種狀況:
- 傳入的
message_queue
爲NULL - 傳入的
message_queue
非NULL
對於第一種狀況,它會到全局隊列中pop一個出來,後面的和第二種狀況同樣了。
分發步驟以下:
- 經過
message_queue
得到服務的handle - 經過handle查找到服務的
skynet_context
- 從
message_queue
中pop一個元素 - 調用
_dispatch_message
進行消息分發 - 若是全局隊列爲空,則直接返回此隊列(這樣下次就會繼續處理這個隊列,此函數是循環調用的)
- 若是全局隊列非空,則pop全局隊列,獲得下一個服務隊列
- 將此隊列插入全局隊列,返回下一個服務隊列
只因此不一次性處理玩當前隊列,而要用5~7的步驟,是爲了消息調度的公平性,對每個服務都公平。
_dispatch_message
以下:
static void _dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { int type = msg->sz >> HANDLE_REMOTE_SHIFT; size_t sz = msg->sz & HANDLE_MASK; if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) skynet_free(msg->data); }
它從skynet_message
消息中分解出類型和大小,而後調用服務的callback。
這裏須要注意的是:若是消息的callback返回0,則消息的data
將被釋放。