一個無鎖消息隊列引起的血案(三)——地:q3.h 與 RingBuffer

目錄

(一)原由 (二)混合自旋鎖 (三)q3.h 與 RingBuffer html

(四)RingQueue(上) 自旋鎖  (五)RingQueue(中) 休眠的藝術git

(六)RingQueue(中) 休眠的藝術 [續]github

無鎖隊列

  第一篇文章末尾咱們提到的《無鎖隊列的實現》(陳皓(hào)),該文末尾提到的「用數組實現無鎖隊列」,即用 RingBuffer 實現的無鎖隊列:shell

  RingBuffer 是一個很好的東西,用在無鎖/有鎖隊列實在是太棒了,如該文提到的同樣,RingBuffer因爲使用的是序號(或可稱爲索引),且用數組存儲的隊列,跟使用鏈表存儲隊列相比,優勢是能夠避免ABA問題(關於ABA問題能夠參考該文,或Google、百度本身搜),使用鏈表和指針來構造FIFO Queue(先進先出隊列),只有使用Double CAS(Double Compare And Swap)才能避免ABA問題,這裏不作討論。《無鎖隊列的實現》一文雖有提到,可是其描述是亂七八糟的,足見其只是大天然的搬運工,本身其實徹底不懂。不過,東西是點到了。編程

  包括上面截圖中的 RingBuffer 的描述,不少地方也是錯漏百出,RingBuffer 更新 head 和 tail 根本不須要使用Double CAS,咱們也並不須要把(TAIL, EMPTY) 更新成 (x, TAIL),只須要使用CAS更新HEAD或TAIL便可。不過對於咱們仍是有幫助的,至少你知道了這個東西,還有個好處,就是你看這個東西須要本身腦補一下,也算是一個功德吧,否則一點腦子都不動也很差玩。哈哈。數組

Compare-and-swap

  Compare and Swap 或 Compare and Set(CAS)是 lock-free(無鎖編程) 中常見的技術,能夠參考 wikipedia 的 Compare-and-swap,用C語言的實現就是:緩存

int val_compare_and_swap(volatile int *dest_ptr, int old_value, int new_value) { int orig_value = *dest_ptr; if (*dest_ptr == old_value) *dest_ptr = new_value; return orig_value; }

意思就是,先保存*dest_ptr的值(dest_ptr是一個指針),再檢查 *dest_ptr 的值是否等於舊址 old_value,若是等於的話,就把新值 new_value 寫入*dest_ptr,不然什麼都不作,最後返回值是 *dest_ptr 的原始值。CAS操做在CPU裏是能保證原子性的,即整個CAS操做不會被CPU的中斷或別的緣由打斷。在x86下對應的彙編指令是 LOCKCMPXCHG 。優化

Compare and Swap 還有一種形式,即:ui

bool bool_compare_and_swap(volatile int *dest_ptr, int old_value, int new_value) { if (*dest_ptr == old_value) { *dest_ptr = new_value; return true; } return false; }

這實際上是 val_compare_and_swap() 的變種,即若是 *dest_ptr 跟 old_value 相等則寫入新值 new_value,並返回 true,不相等則返回 false。編碼

RingBuffer

  下面咱們來看看什麼叫作 RingBuffer,先忘掉前面那篇文章,咱們以 q3.h 的實現爲範例:

  文件地址:https://github.com/shines77/RingQueue/blob/master/douban/q3.h

定義

  咱們把 RingBuffer 這麼定義:

  上面定義的是一個容量(Capacity)爲8的RingBuffer,通常RingBuffer的容量取爲2的N次冪,這樣計算索引時可使用 index = sequence & mask; (其中mask = capacity - 1;) 以提升代碼效率。其中索引表示數組的下標,數組中保存的數據是A, B, C, D, Empty等。Head 表示隊列的頭指針(即首個能夠壓入數據的位置),Tail 表示隊列的尾指針(即首個能夠彈出數據的位置),Head、Tail都是以序號的形式存在,且是單調遞增的,且能夠大於等於Capacity(8),若是Head = 8 時表示數組的第一個元素(由於迴轉回來了,即 index = 8 & (8 - 1) = 8 & 7 = 0;),Head = 9 表示的實際上是存儲在數組的第二個元素。

  隊列內元素的實際長度爲:Length = Head - Tail,其中 (Head - Tail) 必須 <= Capacity(最大容量),由於這個時候表示隊列已經滿了,若是(Head - Tail) = 0,則表明隊列爲空。爲何叫 RingBuffer,就是由於它是一個環,遊標到了數組的尾部又迴轉到數組的頭部。

  至於定義哪邊是頭,哪邊是尾,這不是很重要,邏輯上更換一下便可。可是這樣定義比較直觀,你試想一下,若是把上面那個圖豎起來,你以爲哪邊是頭哪邊是尾?還有個緣由是由於 q3.h 也是這麼定義的,而云風本身寫的 mq.c 裏則是恰好反過來定義的。不過如何定義的確不重要,只是不方便討論而已。

邊界問題

  這裏討論一下邊界的問題,q3.h 的邊界斷定不是很準確,if ((mask + tail - head) < 1U) return -1; 這樣的寫法會致使隊列的實際最大長度爲 Capacity - 1,而不是 Capacity。這還不是最大的問題,咱們從上面得知 RingBuffer 的實際長度爲:Length = Head - Tail,只要 Head, Tail 是無符號整型,即便 Head 從 4294967295 迴繞到 0 的時候,這個公式依然成立。例如:Head = 2, Tail = 4294967295,Length = 2 - 4294967295 = -4294967293,而 4294967293 = 0xFFFFFFFD,一個數的負數計算機裏是用補碼,即取反再加1,0xFFFFFFFD 取反等於 0x00000002,再加1,就是3,因此實際長度爲3,正確。因此只要RingQueue的定義一旦肯定,只要Head,Tail使用無符號整型表示,實際元素長度公式 Length = Head - Tail 是永遠成立的。

  那麼咱們來看看 (mask + tail - head) < 1U 意味着什麼,tail - head其實等於-length,則有:(mask - length) < 1U,由於兩邊都是無符號整型,因此只有 (mask - length) = 0 的時候才能成立,其餘時候都不會成立,即 length = mask 的時候條件纔會成立,而 mask = capacity - 1; 因此 length = capacity - 1 的時候,q3.h 就認爲隊列已經滿了,因此 q3.h 最大的實際長度只能爲 capacity - 1 個元素。

  並且他還有一個問題,可能你也看到了,由於只有 length = mask 這一種狀況是被認爲隊列滿了,當 length < mask 還好一點,一旦 length > (mask + 1) 的時候,實際上隊列已經滿了,但 q3.h 不知道,它依舊認爲是能夠push()的,爲何它沒出錯呢?緣由是這樣的,你分析過 q3.h 的push()代碼後,能夠得知,因爲head,tail都是單調遞增的,因爲線程可能隨時被中斷,因此head,tail只會比當前實際的head,tail值小,而head不等於當前實際值時,是通不過CAS的原子性檢驗的,因此咱們能夠認爲head永遠等於實際當前值,那麼只考慮一種狀況,就是tail可能比當前實際的tail值小,因爲length = head - tail,那麼意味着咱們計算的length值,要比實際的length要大,也就是說隊列實際沒滿,而 q3.h 可能會認爲滿了,從而致使push()失敗。沒有滿卻push()失敗沒有什麼壞處,大不了再繼續push()就是了,因此它並無出現問題。但從邏輯上講是不嚴密的。

  因此經過上面的分析,咱們得知判斷隊列是否已滿的邏輯爲:隊列實際長度 >= 隊列最大容量,即:if ((head - tail) >= capacity) return -1; 因爲 mask = capacity - 1,還能夠簡化爲:if ((head - tail) > mask) return -1; 。

  一樣的道理,q3.h 裏判斷隊列爲空的邏輯:if ((tail - head) < 1U) return NULL; 也是存在問題的,因爲都是無符號整型,因此其等價於:if ((tail - head) == 0) return NULL; 同理,它在 q3.h 裏也不會出現問題,不過更嚴密的判斷邏輯應該爲:if ((tail == head) || (tail > head && (head - tail) > mask)) return NULL; 。

struct queue

  咱們再來看一下 q3.h,地址是:

  https://github.com/shines77/RingQueue/blob/master/douban/q3.h

q3.h 裏 struct queue 是這麼定義的:

struct queue {
    struct {
        uint32_t mask;
        uint32_t size;
        volatile uint32_t head;
        volatile uint32_t tail;
    } p;
    char pad[CACHE_LINE_SIZE - 4 * sizeof(uint32_t)];

    struct {
        uint32_t mask;
        uint32_t size;
        volatile uint32_t head;
        volatile uint32_t tail;
    } c;
    char pad2[CACHE_LINE_SIZE - 4 * sizeof(uint32_t)];

    void        *msgs[0];
};

說實話,這樣寫並非很準確,並且難於理解,其所定義的 struct p 和 struct c,嚴格意義來說,p 應該叫 head,c 應該叫 tail,而p,c裏面定義的 head,tail 應該叫 first,second,他們實際上是一個pair,有點相似於std::pair,因此你也就知道我爲何叫它們 first,second 了。(注:在 disruptor 裏面,first, second 被稱之爲 next,cursor(next是首個可壓入數據的位置,cursor(遊標)是最新一個已經成功提交(publish)數據的位置,這裏咱們仍是使用個人叫法)。

disruptor 裏next,cursor的示意圖以下:

 

因此 struct queue 更準確的定義應該是這樣:

struct queue {
    struct {
        uint32_t mask;
        uint32_t size;
        volatile uint32_t first;
        volatile uint32_t second;
    } head;
    char pad1[CACHE_LINE_SIZE - 4 * sizeof(uint32_t)];

    struct {
        uint32_t mask;
        uint32_t size;
        volatile uint32_t first;
        volatile uint32_t second;
    } tail;
    char pad2[CACHE_LINE_SIZE - 4 * sizeof(uint32_t)];

    void        *msgs[0];
};

這裏面mask,size其實都是常量,徹底能夠不用放進來,不過這個不算過重要,暫時忽略。

因此 q3.h 裏的 push() 和 pop() 能夠改寫爲,同時咱們把邊界判斷也修改了:

static inline int
push(struct queue *q, void *m)
{
    uint32_t head, tail, mask, next;
    int ok;

    mask = q->head.mask;

    do {
        head = q->head.first;
        tail = q->tail.second;
        if ((head - tail) > mask)
            return -1;
        next = head + 1;
        ok = __sync_bool_compare_and_swap(&q->head.first, head, next);
    } while (!ok);

    q->msgs[head & mask] = m;
    asm volatile ("":::"memory");

    while (unlikely((q->head.second != head)))
        _mm_pause();

    q->head.second = next;

    return 0;
}

static inline void *
pop(struct queue *q)
{
    uint32_t tail, head, mask, next;
    int ok;
    void *ret;

    mask = q->tail.mask;

    do {
        tail = q->tail.first;
        head = q->head.second;
        if ((tail == head) || (tail > head && (head - tail) > mask))
            return NULL;
        next = tail + 1;
        ok = __sync_bool_compare_and_swap(&q->tail.first, tail, next);
    } while (!ok);

    ret = q->msgs[tail & mask];
    asm volatile ("":::"memory");

    while (unlikely((q->tail.second != tail)))
        _mm_pause();

    q->tail.second = next;

    return ret;
}

這樣應該比原來要清晰明瞭,容易理解了吧。其中「asm volatile ("":::"memory");」是編譯器內存屏障,若是你不理解的話,能夠忽略它,大意就是編譯器作優化的時候,全部該屏障以前的寫入或讀入操做都不能越過該屏障,反之屏障以後的也相似。

該文件已上傳至:https://github.com/shines77/RingQueue/blob/master/douban/q3_new.h

push()

下面咱們來分析一下push()是怎麼工做的?代碼以下:

static inline int
push(struct queue *q, void *m)
{
    uint32_t head, tail, mask, next;
    int ok;

    mask = q->head.mask;

    do {
        head = q->head.first;
        tail = q->tail.second;
        if ((head - tail) > mask)
            return -1;
        next = head + 1;
        ok = __sync_bool_compare_and_swap(&q->head.first, head, next);
    } while (!ok);

    q->msgs[head & mask] = m;
    asm volatile ("":::"memory");

    while (unlikely((q->head.second != head)))
        _mm_pause();

    q->head.second = next;

    return 0;
}

  先來看看 head 裏的 head.first,head.second 的示意圖:

  根據前面也提到過的 disruptor 的 next 與 cursor,相似的,這裏 head.first 是首個能夠壓入數據的位置,head.second 是最新一個已經成功提交數據的位置。head.first 經過do while() 和 CAS 操做的保證,讓每一個線程取得一個惟一的序號(sequence),由於CAS的原子性,便可保證每次只有一個線程有令 head.first 增一的機會。這有點相似你去銀行取錢的時候,一進門先得去一臺機器上領個號碼,而後銀行再按照這個號碼的前後順序爲客戶服務。咱們稱這個號碼爲序號(sequence),好比你領到的是5號,如上圖,可是銀行目前只處理完了2號客戶以前的客戶(包括2號客戶,即上圖中的 head.second),3, 4號客服都是正在辦理中的,而5號是你新領到的。跟現實銀行不太同樣地方是,這裏只要你一領到序號,窗口就馬上開始爲你服務了,而各個窗口(各個線程)辦理完成的時間是不定的,可能4號窗口先辦理完,而後再是3號窗口,最後5號窗口,也就是你領到的號碼。還有一點跟現實銀行不同的地方,就是無論3, 4, 5窗口誰先辦理完成,都必須按照序號的大小來結束辦理過程,即4號窗口即便完成了,也要等3號窗口完成了之後纔算完成,不然只能等3號窗口完成。若是5號窗口先辦理完畢,也必須等4號窗口完成,才能算最終完成。由於只有這麼依次完成,咱們纔好挪動 head.second 到最新成功提交數據的那個位置,若是不按照這個順序,跳着挪動 head.second,那麼 head.second 就亂套了。

  這個按順序提交數據是經過 while (unlikely((q->head.second != head))) _mm_pause(); 實現的。只要最後成功提交數據的位置跟你領的序號(sequence)不同的話,就必須一直等,直到等於你的序號爲止。這樣就實現了按順序提交數據(按領到的序號的順序)。這裏再提一下邊界判斷時講過的,這裏的 tail 能夠認爲小於等於實時的 tail.first 值(tail = tail.second,而 tail.second <= tail.first,並且 tail 也能夠小於實時的 tail.second值),而 head 能夠認爲永遠等於 head.first 值(由於若是不等於,則CAS是通不過的,必須重來),所以 (head - tail) 是比實時的 (head - tail) 值大的,全部 push() 可能會在隊列未滿的時候就可能認爲隊列滿了,於是提早退出,這樣是無害的,大不了從新push()。

  另外,提交數據是由 q->msgs[head & mask] = m; 這句實現的,即按照你領到的序號寫入數據便可,由於這個序號是惟一的,只要保證隊列不溢出或負溢出,每個獨立的序號都會在數組中有惟一的一個存儲位置。

  咱們再來分析一下Cache Line(緩存行)的佔用,咱們看到,前面那個do while() + CAS裏,CAS操做會鎖住 head.first 所在的緩存行(即讓其失效)。然後面的確認提交的循環中,while (unlikely((q->head.second != head))) 有對 head.second 的內存引用,而根據 q3.h 和 q3_new.h 的定義,二者是在同一條Cache Line(內存行)上的(雖然不必定是100%在一條Cache Line上,但因爲如今大部分編譯器內存對齊默認都是8字節,因此在一行上的機率幾乎接近100%)。而再後面的那句:q->head.second = next; 對 head.second 的寫入操做也將時其餘線程的 do while() + CAS 循環裏的 head.first 的緩存失效。因此這是 q3.h 設計上的一個失誤,雖然考慮了Cache Line Padding(緩存行填充),可是沒有考慮周全,仍是存在 False Sharing (僞共享)。這些問題在 disruptor 裏都很好的規避了,由於它給每個序號變量都運用了Cache Line Padding,即 Sequence 類。其實 False Sharing 也不是很嚴重的事情,只不過線程越多,被鎖區域的運行時間越短,False Sharing形成的不利影響將越明顯。

  從循環的角度來看,前面那個do while() + CAS,能夠認爲是一個自旋計數爲0的自旋鎖,然後面那個確認提交的循環,則是一個真正意義的自旋,只有條件知足了才能退出,兩個循環中都要依賴或等待別的線程才能確保退出循環,因此咱們大體認爲他們是兩個自旋在運做,這個我在「第二篇:自旋鎖」裏也有提到。

  q3.h 還有一個問題就是,有可能會活鎖(livelock),雖然沒有徹底死鎖(deadlock),可是 push() 和 pop() 執行得很慢很慢,超出常規的效率,緣由還不太明白,有空再好好想一想。出現這種狀況的條件是(如下兩種描述由於有點忘記了,若是說得不對我會及時糾正):若是不開CPU親緣性的話,且當PUSH_CNT + POP_CNT 大於CPU的實際核心數的時候,會很慢;若是開啓了CPU親緣性的話,當PUSH_CNT + POP_CNT 大於CPU的實際核心數的時候,會很是慢,比前者還要慢。除了這些狀況之外,執行效率還算正常。

pop()

  咱們再來看看pop(),代碼以下:

static inline void *
pop(struct queue *q)
{
    uint32_t tail, head, mask, next;
    int ok;
    void *ret;

    mask = q->tail.mask;

    do {
        tail = q->tail.first;
        head = q->head.second;
        if ((tail == head) || (tail > head && (head - tail) > mask))
            return NULL;
        next = tail + 1;
        ok = __sync_bool_compare_and_swap(&q->tail.first, tail, next);
    } while (!ok);

    ret = q->msgs[tail & mask];
    asm volatile ("":::"memory");

    while (unlikely((q->tail.second != tail)))
        _mm_pause();

    q->tail.second = next;

    return ret;
}

  tail 裏的 tail.first,tail.second 的示意圖:

  同上可知,這裏 tail.first 是首個能夠彈出數據的位置,tail.second 是最新一個已經成功彈出數據的位置。彈出數據的語句是:ret = q->msgs[tail & mask]; 最終確認彈出的語句是:q->tail.second = next; 其餘分析同 push() 相似。

問題

  縱觀 push() 和 pop(),你會發現 head.first,head.second,tail.first,tail.second 這四個變量環環相扣,互相影響,push() 一開始引用了 head.first 和 tail.second,而 pop() 一開始引用了 tail.first 和 head.second,但 head.first 和 head.second 是一條Cache Line上的,而 tail.first 和 tail.second 也是在一條Cache Line上的,不管誰的值更新或進入CAS操做,都會形成 False Sharing(僞共享) 讓緩存失效而互相影響。十足有當年赤壁之戰曹軍水軍連環船的風範……(火燒連環船)。這是 q3.h 設計上的一個大失誤,若是想提升一點效率,能夠把這四個變量分別放到不一樣的 Cache Line 上,會好不少,而 disruptor 在這方面考慮得比較周全。

  前面提到的關於 disruptor 的那篇原理文章是:http://blog.codeaholics.org/2011/the-disruptor-lock-free-publishing/ ,這不是我惟一看過的 disruptor 文章,我看了不少,不過這篇文章讓我能更好的來寫本文。關於 disruptor 的一些原理介紹,請自行 Google 或 百度,由於 disruptor 版本升級很快,而各篇文章描述所用的代碼、名稱或結構不必定跟最新版的相同,但原理上大體是同樣的,我也是看到了各類版本,各類說話,須要本身綜合起來。

休眠

  暫時寫到這了,先休息,晚安,地球……。

RingQueue

  RingQueue 的GitHub地址是:https://github.com/shines77/RingQueue,也能夠下載UTF-8編碼版:https://github.com/shines77/RingQueue-utf8。 我敢說是一個不錯的混合自旋鎖,你能夠本身去下載回來看看,支持Makefile,支持CodeBlocks, 支持Visual Studio 2008, 2010, 2013等,還支持CMake,支持Windows, MinGW, cygwin, Linux, Mac OSX等等,固然可能不支持ARM。

目錄

(一)原由 (二)混合自旋鎖 (三)q3.h 與 RingBuffer 

(四)RingQueue(上) 自旋鎖  (五)RingQueue(中) 休眠的藝術

(六)RingQueue(中) 休眠的藝術 [續]

 

上一篇:一個無鎖消息隊列引起的血案(二)——月:自旋鎖

下一篇:一個無鎖消息隊列引起的血案(四)——月:關於RingQueue(上)

.

相關文章
相關標籤/搜索