基於數組的無鎖隊列(譯)

原文html

1 引言

最近對於注重性能的應用程序,咱們有了一種能顯著提升程序性能的選擇:多線程.線程的概念實際上已經存在了很長時間.在過去,多數計算機只有一個處理器,線程主要用於將一個大的任務拆分紅一系列更小的執行單元.以使得當其中某些執行單元由於等待資源而被阻塞的時候剩餘的執行單元能繼續執行。舉個示例,一個網絡應用程序,它監聽一個TCP端口,當有外部請求到達時,處理請求.對於一個單線程的應用程序來講,只能在處理完一個請求以後再處理另外的請求,顯然這個應用程序對用戶很是不友好,它爲一個用戶服務的時候別的用戶就只能乾等.而對於多線程解決方案,咱們可讓另外的線程來處理接收到的請求,主線程繼續等待在服務端口上接受新的請求.node

在只有一個處理器的機器上,一個多線程應用程序可能沒法達到咱們對它的預期.由於全部的線程都要爭搶處理器以得到執行機會.而它的性能說不定比一個用單線程方式去解決一樣問題的程序還要差,由於線程之間的通信和數據共享也有不小的開銷.程序員

而在一個對稱多處理器機器上(SMP),一個多線程應用能夠真正的實現多任務並行執行.每一個線程均可以擁有單獨的處理器以得到執行的機會,不須要再像單處理器同樣,必需要等處處理器空閒才能得到執行機會.對一個有N個處理器的SMP系統來講,理論上一個N線程的應用程序只須要它的單線程版本的1/N的時間就能夠完成相同的任務(實際上這個理論值是沒法達到的,由於線程之間的通信和數據共享有不小的開銷).算法

SMP的機器在過去是很是昂貴的,只有一些大公司和組織才能負擔得起.而現今,多核心處理器已經至關廉價(現今市場上的pc處理器至少都擁有一個以上的核心),因此讓應用程序並行化以提升性能如今已經變得很是流行.編程

可是編寫多線程應用程序顯然不是一件簡單的事.線程之間須要共享數據,互相通訊,很快你就會發現又要面對之前就遇到過的老問題:死鎖, 對共享數據的非法訪問,多線程動態分配/釋放內存等.若是你足夠幸運,參與到一個對性能有極高要求的項目中,你將會遇到更多致使性能不達標的問題:數組

  • Cache顛簸(Cache trashing)
  • 在同步機制上的爭搶.隊列
  • 動態內存分配

本文主要介紹一種基於數組實現的無鎖隊列用於解決上述三個性能問題,特別是動態內存分配,由於此無鎖隊列最初的設計目的就是爲了解決這個問題.緩存

2 線程同步如何致使性能降低

2.1 Cache顛簸

線程是(wikipedia):"操做系統能夠調度運行的最小的執行單元".每種操做系統都有不一樣的線程實現方式,基原本說,進程擁有一組指令集(代碼)和一段內存,線程執行一個代碼片斷但與包含它的進程共享內存空間.對Linux來講,線程是另外一種"執行上下文", 它沒有線程的念.Linux經過標準的進程來實現線程.Linux內核沒有提供特殊的調度語義或數據結構用於表示線程.線程只是一個與其它進程共享某些資源的普通進程[1].安全

每個運行中的任務/線程或成爲執行上下文,使用了一組CPU寄存器,包含各類內部狀態的數據,如當前正在執行的指令所在的內存地址,當前正在執行操做的操做數和/或操做結果,棧指針等等.全部的這些信息被統稱爲"上下文".任何搶佔式操做系統(幾乎全部現代操做系統都是搶佔式的)都必須具有幾乎在任什麼時候刻中止一個正在運行的任務並在未來將它恢復運行的能力(有些例外狀況,例如一個進程聲明它在某一段時間內是不可被搶佔的).一但任務恢復執行,它會從上次中止的地方繼續執行,就好像歷來沒被打斷過同樣.這是一件好事,全部的任務共享處理器,當一個任務在等待I/O時,它被其它任務搶佔.即便一個單處理的系統也表現得與多處理系統同樣,可是如在現實生活中同樣,沒有免費的午飯:由於處理器被共享,每次任務搶佔都有額外的開銷用於保存被搶佔任務的上下文,將得到運行權的任務的上下文恢復.網絡

在保存和恢復上下文的過程當中還隱藏了額外的開銷:Cache中的數據會失效,由於它緩存的是將被換出任務的數據,這些數據對於新換進的任務是沒用的.處理器的運行速度比主存快N倍,因此大量的處理器時間被浪費在處理器與主存的數據傳輸上.這就是在處理器和主存之間引入Cache的緣由.Cache是一種速度更快但容量更小的內存(也更加昂貴),當處理器要訪問主存中的數據時,這些數據首先被拷貝到Cache中,由於這些數據在不久的未來可能又會被處理器訪問.Cache misses對性能有很是大的影響,由於處理器訪問Cache中的數據將比直接訪問主存快得多.數據結構

所以每當一個任務被搶佔,Cache中的內容都須要被接下來得到運行權的進程覆蓋,這意味着,接下來運行的進程須要花費必定的時間來將Cache預熱才能達到良好的運行效率(某些操做系統,例如Linux,嘗試將進任務恢復到它被搶佔時運行的處理器上以減緩這個預熱的開銷).固然這並不意味任務搶佔是壞事,搶佔機制使得操做系統得以正常的工做.但對某些系統而言,線程被頻繁搶佔產生的Cache顛簸將致使應用程序性能降低.

一個運行中的任務在什麼狀況下會被搶佔?這依賴於你所使用的操做系統,但通常來講,中斷處理,定時器超時,執行系統調等,有極大的可能致使操做系統考慮將處理器的使用權交給系統中的其它進程.這其實是操做系統要處理的一個難題,一方面,沒人願意有進程由於沒法得到處理器而長期處於飢餓狀態.另外一方面,有些系統調用是阻塞的,這意味着一個任務向操做系統請求某些資源,這個任務要等待請求的資源被準備好,由於它須要這些資源以執行後續的操做.這是任務搶佔的一個正面的例子,由於在資源被準備好以前那個任務將無所事事,操做系統能夠將它投入到等待狀態,將處理器讓給其它任務運行.

資源通常來講指的是在內存,磁盤,網絡或外設中的數據,但一些會致使阻塞的同步機制,例如信號量和互斥鎖也能夠被認爲是資源.當一個任務請求得到一個已經被其它任務持有的互斥鎖,它將被搶佔,直到那個請求的互斥鎖被釋放,它纔會被從新投入到可運行任務隊列中.因此若是你擔憂你的進程被過於頻繁的搶佔,你應該儘可能避免使用會致使阻塞的同步機制.

可是,顯然一切沒那麼簡單.若是你使用多於物理處理器數量的線程來執行計算密集型的任務,而且不使用任何會致使阻塞的同步機制,你的系統的響應及時性將變差(操做延時加大).由於操做系統切換任務的次數越少意味着當前處於非運行狀態的進程須要等待更長的時間,直到有一個空閒的處理器使得它能恢復執行.而你的應用程序極可能也會受到影響,由於它可能在等待某個線程執行完一些計算才能進行下一步的處理,而那個線程卻沒法得到處理器來完成它的計算,致使全部其它的線程都在等待它.沒有通用的方法解決全部這些問題,這一般依賴於你的應用程序,機器和操做系統.例如對一個實時的計算密集型應用程序,我會選擇避免使用致使阻塞的同步機制,同時使用更少的線程.而對於那些須要花費大量時間等待外部(例如網絡)數據到來的應用程序,使用非阻塞同步機制可能會殺死你.每種方法都有本身的優勢和缺點,一切皆取決於你是如何使用的.

2.2 在同步機制上的爭搶.隊列

隊列可被應用到大多數多線程解決方案中.若是兩個或更多的線程須要經過有序事件來進行交流,我腦子裏第一個跳出來的就是隊列.簡單易懂,使用方便,被良好的測試.世界上任何一個程序員都要面對隊列.它們無處不在.

隊列對一個單線程應用程序來講,使用很是簡便,對它作一些簡單的處理就能夠適用於多線程的應用.你所須要的只是一個非線程安全的隊列(例如C++的std::queue)和一種阻塞的同步機制(例如互斥鎖和條件變量).我隨本文一塊兒附上一個用glib編寫的簡單的阻塞隊列.雖然沒有任何須要去重複發明輪子,glib中已經包含了一個線程安全的隊列GAsyncQueue[7],但這段代碼展現瞭如何將一個標準隊列轉換成線程安全的隊列.

讓咱們來看一下這個隊列中最經常使用的幾個方法的實現:IsEmpty, PushPop.最基本的存儲隊列使用的是std::queue它被聲明爲成員變量,std::queue m_theQueue.我使用glib的互斥鎖和條件變動量的封裝來做爲同步機制(GMutex* m_mutexCond* m_cond).能夠與本文一塊兒被下載的隊列實現中還包括了TryPushTryPop兩個方法,它們在隊列已滿或爲空的時候不會阻塞調用線程.

template <typename T>
    bool BlockingQueue<T>::IsEmpty()
    {
        bool rv;

        g_mutex_lock(m_mutex);
        rv = m_theQueue.empty();
        g_mutex_unlock(m_mutex);

        return rv;
    }

當隊列中沒有元素的時候IsEmpty返回true,在任何線程能夠訪問到不安全的隊列實現以前,必須首先得到互斥鎖.這意味着調用線程會被阻塞直到互斥鎖被釋放.

template <typename T>
    bool BlockingQueue<T>::Push(const T &a_elem)
    {
        g_mutex_lock(m_mutex);

        while (m_theQueue.size() >= m_maximumSize)
        {
            g_cond_wait(m_cond, m_mutex);
        }

        bool queueEmpty = m_theQueue.empty();

        m_theQueue.push(a_elem);

        if (queueEmpty)
        {
            // wake up threads waiting for stuff
            g_cond_broadcast(m_cond);
        }

        g_mutex_unlock(m_mutex);

        return true;
    }

Push將一個元素插入隊列尾部.調用線程將會阻塞,若是有其它線程持有互斥鎖.若是隊列已滿調用線程也會阻塞直到有一個線程從隊列中Pop一個元素出列.

template <typename T>
    void BlockingQueue<T>::Pop(T &out_data)
    {
        g_mutex_lock(m_mutex);

        while (m_theQueue.empty())
        {
            g_cond_wait(m_cond, m_mutex);
        }

        bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;

        out_data = m_theQueue.front();
        m_theQueue.pop();

        if (queueFull)
        {
            // wake up threads waiting for stuff
            g_cond_broadcast(m_cond);
        }

        g_mutex_unlock(m_mutex);
    }

Pop將隊首元素出列,調用線程將會阻塞,若是有其它線程持有互斥鎖.若是隊列已滿調用線程也會阻塞直到有一個線程Push一個元素到隊列中.

如我在前面的章節中解釋過的,阻塞不是微不足道的操做.它致使操做系統暫停當前的任務或使其進入睡眠狀態(等待,不佔用任何的處理器).直到資源(例如互斥鎖)可用,被阻塞的任務才能夠解除阻塞狀態(喚醒).在一個負載較重的應用程序中使用這樣的阻塞隊列來在線程之間傳遞消息會致使嚴重的爭用問題.也就是說,任務將大量的時間(睡眠,等待,喚醒)浪費在得到保護隊列數據的互斥鎖,而不是處理隊列中的數據上.

在最簡單的情形下,只有一個線程向隊列插入數據(生產者),也只有一個線程從隊列中提取數據(消費者),這兩個線程爭用保護隊列的互斥鎖.咱們也能夠把咱們的實現從只使用一個互斥鎖改成使用兩個,一個用於插入數據一個用於提取數據.使用這種方式則只有在隊列爲空或滿的時候纔會發生爭搶.可是若是有多於一個線程插入數據和提取數據,那麼咱們的問題又回來了.生產者們和消費者們仍是須要繼續爭搶各自互斥鎖.

這種狀況下,非阻塞機制大展伸手的機會到了.任務之間不爭搶任何資源,在隊列中預約一個位置,而後在這個位置上插入或提取數據.這中機制使用了一種被稱之爲CAS(比較和交換)的特殊操做,這個特殊操做是一種特殊的指令,它能夠原子的完成如下操做:它須要3個操做數m,A,B,其中m是一個內存地址,操做將m指向的內存中的內容與A比較,若是相等則將B寫入到m指向的內存中並返回true,若是不相等則什麼也不作返回false.例如:

volatile int a;
    a = 1;

    // this will loop while 'a' is not equal to 1. If it is equal to 1 the operation will atomically
    // set a to 2 and return true
    while (!CAS(&a, 1, 2))
    {
        ;
    }

使用CAS實現無鎖隊列已經不是什麼新鮮事物了.有不少實現的方案,但大多數都是使用鏈表的實現.有興趣的讀者能夠查看[2][3]或[4].
雖然本文的目的不是描述這種實現,但作一個簡單的介紹仍是有必要的:

  • 向隊列中插入一個元素會動態分配一個新的節點,經過CAS操做將這個節點插入到隊列中
  • 移除元素首先經過CAS操做移動鏈表的指針將節點出列,而後再訪問那個出列節點中的數據.

下面是一個簡單的基於鏈表實現的無鎖隊列(從[2]中拷貝過來的,它的實現基於[5])

typedef struct _Node Node;
    typedef struct _Queue Queue;

    struct _Node {
        void *data;
        Node *next;
    };

    struct _Queue {
        Node *head;
        Node *tail;
    };

    Queue*
    queue_new(void)
    {
        Queue *q = g_slice_new(sizeof(Queue));
        q->head = q->tail = g_slice_new0(sizeof(Node));
        return q;
    }

    void
    queue_enqueue(Queue *q, gpointer data)
    {
        Node *node, *tail, *next;

        node = g_slice_new(Node);
        node->data = data;
        node->next = NULL;

        while (TRUE) {
            tail = q->tail;
            next = tail->next;
            if (tail != q->tail)
                continue;

            if (next != NULL) {
                CAS(&q->tail, tail, next);
                continue;
            }

            if (CAS(&tail->next, null, node)
                break;
        }

        CAS(&q->tail, tail, node);
    }

    gpointer
    queue_dequeue(Queue *q)
    {
        Node *node, *tail, *next;

        while (TRUE) {
            head = q->head;
            tail = q->tail;
            next = head->next;
            if (head != q->head)
                continue;

            if (next == NULL)
                return NULL; // Empty

            if (head == tail) {
                CAS(&q->tail, tail, next);
                continue;
            }

            data = next->data;
            if (CAS(&q->head, head, next))
                break;
        }

        g_slice_free(Node, head); // This isn't safe
        return data;
    }

在不支持垃圾回收的編程語言中,最後的g_slice_free調用是不安全的,具體緣由能夠參看ABA問題:

首先假設隊列中只有一個節點,q->head = N1,q->tail = N2,其中N1是哨兵節點

  • 1 線程T1在執行完data = next->data;以後被暫停.此時next = N2,head = N1
  • 2 線程T2執行queue_dequeue將節點(N1)出列,並將節點(N1)的內存釋放,操做完成後,q->head = N2,q->tail = N2,N2是哨兵節點
  • 3 線程T2執行queue_enqueue,產生的節點(N3)與(N1)內存地址一致(內存被重用),操做完成後,q->head = N2,q->tail = N3 = N1
  • 4 線程T2再次執行queue_dequeue,操做完成後q->head = q->tail = N3 = N1,N1從新變會哨兵節點
  • 4 T1恢復執行,接下來執行CAS成功(head = q->head = N1).而實際上此時隊列已經爲空,沒有元素可供出列.

ABA問題能夠經過給每一個節點增長引用計數來解決.在執行CAS操做以前首先要檢查計數值以肯定操做的是一個正確的節點.好消息是本文提供的這個無鎖鏈表隊列實現沒有ABA問題,由於它沒有使用動態內存分配.

2.3 動態內存分配

在多線程系統中,須要仔細的考慮動態內存分配.當一個任務從堆中分配內存時,標準的內存分配機制會阻塞全部與這個任務共享地址空間的其它任務(進程中的全部線程).這樣作的緣由是讓處理更簡單,且它工做得很好.兩個線程不會被分配到一塊相同的地址的內存,由於它們沒辦法同時執行分配請求.顯然線程頻繁分配內存會致使應用程序性能降低(必須注意,向標準隊列或map插入數據的時候都會致使堆上的動態內存分配)

存在一些非標準庫,提供了無鎖內存分配機制,以減小多線程對堆的爭搶,例如libhoard[6].除此以外還有更多的其它選擇,你能夠將標準的C++分配器替換成這些無鎖內存分配器,它們可能會大大的提升你的應用程序的性能.

3 基於循環數組的無鎖隊列

終於到了本文的重點部分,基於循環數組的無鎖隊列解決了上一節中提到的3個問題,首先歸納一下基於循環數組的無鎖隊列的特性:

  • 做爲一種無鎖同步機制,它顯著下降了任務搶佔的頻率,所以有效減緩了cache顛簸.
  • 如全部其它無鎖隊列同樣,線程之間的爭搶顯著下降,由於它不須要鎖去保護任何數據結構:線程所要作的就是索要一塊空間,而後將數據寫進去.
  • 隊列的操做不會致使動態內存分配
  • 沒有ABA問題,只是在數組處理上須要一些額外的開銷.

3.1 它是如何工做的?

隊列的實現使用了一個數組和3個做用不一樣的下標:

  • writeIndex:新元素入列時存放位置在數組中的下標
  • readIndex:下一個出列元素在數組中的下標
  • maximumReadIndex:最後一個已經完成入列操做的元素在數組中的下標.若是它的值跟writeIndex不一致,代表有寫請求還沒有完成.這意味着,有寫請求成功申請了空間但數據還沒徹底寫進隊列.因此若是有線程要讀取,必需要等到寫線程將數徹底據寫入到隊列以後.

必須指明的是使用3種不一樣的下標都是必須的,由於隊列容許任意數量的生產者和消費者圍繞着它工做.已經存在一種基於循環數組的無鎖隊列,使得惟一的生產者和惟一的消費者能夠良好的工做[11].它的實現至關簡潔很是值得閱讀.

3.1.1 CAS操做

此無鎖隊列基於CAS指令,CAS操做在GCC4.1.1中被包含進來.由於我使用GCC 4.4進行編譯,因此我使用了GCC內置的CAS操做__sync_bool_compare_and_swap.爲了支持更多的平臺和其它不一樣的編譯器,我在atomic_ops.h中用宏將它映射到名字CAS上.

/// @brief Compare And Swap
    ///        If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
    /// @return true if the comparison is successful and a_newVal was written
    #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

若是你打算在不一樣的編譯器上編譯這個隊列,你所要作的就是實現一個在你的編譯器上可使用的CAS函數.你的實現必須符合如下接口:

  • 參數1是要被修改的變量的地址
  • 參數2是要被修改變量的老值
  • 參數3是要被修改爲的新值
  • 若是修改爲功返回true,不然返回false

3.1.2 向隊列插入元素

如下代碼用於向隊列插入元素:

/* ... */
    template <typename ELEM_T, uint32_t Q_SIZE>
    inline
    uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
    {
        return (a_count % Q_SIZE);
    }

    /* ... */

    template <typename ELEM_T>
    bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
    {
        uint32_t currentReadIndex;
        uint32_t currentWriteIndex;

        do
        {
            currentWriteIndex = m_writeIndex;
            currentReadIndex  = m_readIndex;
            if (countToIndex(currentWriteIndex + 1) ==
                countToIndex(currentReadIndex))
            {
                // the queue is full
                return false;
            }

        } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));

        // We know now that this index is reserved for us. Use it to save the data
        m_theQueue[countToIndex(currentWriteIndex)] = a_data;

        // update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
        // inserting in the queue. It might fail if there are more than 1 producer threads because this
        // operation has to be done in the same order as the previous CAS

        while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
        {
            // this is a good place to yield the thread in case there are more
            // software threads than hardware processors and you have more
            // than 1 producer thread
            // have a look at sched_yield (POSIX.1b)
            sched_yield();
        }

        return true;
    }

如下插圖展現了對隊列執行操做時各下標是如何變化的.若是一個位置被標記爲X,標識這個位置裏存放了數據.空白表示位置時空的.對於下圖的狀況,隊列中存放了兩個元素.WriteIndex指示的位置是新元素將會被插入的位置.ReadIndex指向的位置中的元素將會在下一次pop操做中被彈出.

Alt text

當生產者準備將數據插入到隊列中,它首先經過增長WriteIndex的值來申請空間.MaximumReadIndex指向最後一個存放有效數據的位置(也就是實際的隊列尾).

Alt text

一旦空間的申請完成,生產者就能夠將數據拷貝到剛剛申請到的位置中.完成以後增長MaximumReadIndex使得它與WriteIndex的一致.

Alt text

如今隊列中有3個元素,接着又有一個生產者嘗試向隊列中插入元素.

Alt text

在第一個生產者完成數據拷貝以前,又有另一個生產者申請了一個新的空間準備拷貝數據.如今有兩個生產者同時向隊列插入數據.

Alt text

如今生產者開始拷貝數據,在完成拷貝以後,對MaximumReadIndex的遞增操做必須嚴格遵循一個順序:第一個生產者線程首先遞增MaximumReadIndex,接着才輪到第二個生產者.這個順序必須被嚴格遵照的緣由是,咱們必須保證數據被徹底拷貝到隊列以後才容許消費者線程將其出列.

Alt text

第一個生產者完成了數據拷貝,並對MaximumReadIndex完成了遞增,如今第二個生產者能夠遞增MaximumReadIndex了

Alt text

第二個生產者完成了對MaximumReadIndex的遞增,如今隊列中有5個元素.

##3.1.3 從隊列中移除元素

如下代碼用於從隊列中移除元素:

/* ... */

    template <typename ELEM_T>
    bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
    {
        uint32_t currentMaximumReadIndex;
        uint32_t currentReadIndex;

        do
        {
            // to ensure thread-safety when there is more than 1 producer thread
            // a second index is defined (m_maximumReadIndex)
            currentReadIndex        = m_readIndex;
            currentMaximumReadIndex = m_maximumReadIndex;

            if (countToIndex(currentReadIndex) == 
                countToIndex(currentMaximumReadIndex))
            {
                // the queue is empty or
                // a producer thread has allocate space in the queue but is 
                // waiting to commit the data into it
                return false;
            }

            // retrieve the data from the queue
            a_data = m_theQueue[countToIndex(currentReadIndex)];

            // try to perfrom now the CAS operation on the read index. If we succeed
            // a_data already contains what m_readIndex pointed to before we 
            // increased it
            if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
            {
                return true;
            }

            // it failed retrieving the element off the queue. Someone else must
            // have read the element stored at countToIndex(currentReadIndex)
            // before we could perform the CAS operation        

        } while(1); // keep looping to try again!

        // Something went wrong. it shouldn't be possible to reach here
        assert(0);

        // Add this return statement to avoid compiler warnings
        return false;
    }

如下插圖展現了元素出列的時候各類下標是如何變化的,隊列中初始有2個元素.WriteIndex指示的位置是新元素將會被插入的位置.ReadIndex指向的位置中的元素將會在下一次pop操做中被彈出.

Alt text

消費者線程拷貝數組ReadIndex位置的元素,而後嘗試用CAS操做將ReadIndex加1.若是操做成功消費者成功的將數據出列.由於CAS操做是原子的,因此只有惟一的線程能夠在同一時刻更新ReadIndex的值.若是操做失敗,讀取新的ReadIndex值,比重複以上操做(copy數據,CAS).

Alt text

如今又有一個消費者將元素出列,隊列變成空.

Alt text

如今有一個生產者正在向隊列中添加元素.它已經成功的申請了空間,但還沒有完成數據拷貝.任何其它企圖從隊列中移除元素的消費者都會發現隊列非空(由於writeIndex不等於readIndex).但它不能讀取readIndex所指向位置中的數據,由於readIndex與MaximumReadIndex相等.消費者將會在do循環中不斷的反覆嘗試,直到生產者完成數據拷貝增長MaximumReadIndex的值,或者隊列變成空(這在多個消費者的場景下會發生).

Alt text

當生產者完成數據拷貝,隊列的大小是1,消費者線程能夠讀取這個數據了.

##3.1.4 在多於一個生產者線程的狀況下yielding處理器的必要性

讀者可能注意到了push函數中使用了sched_yield()來主動的讓出處理器,對於一個聲稱無鎖的算法而言,這個調用看起來有點奇怪.正如我在文章開始的部分解釋過的,多線程環境下影響性能的其中一個因素就是Cache顛簸.而產生Cache顛簸的一種狀況就是一個線程被搶佔,操做系統須要保存被搶佔線程的上下文,而後將被選中做爲下一個調度線程的上下文載入.此時Cache中緩存的數據都會失效,由於它是被搶佔線程的數據而不是新線程的數據.

因此,當此算法調用sched_yield()意味着告訴操做系統:"我要把處理器時間讓給其它線程,由於我要等待某件事情的發生".無鎖算法和經過阻塞機制同步的算法的一個主要區別在於無鎖算法不會阻塞在線程同步上,那麼爲何在這裏咱們要主動請求操做系統搶佔本身呢?這個問題的答案沒那麼簡單.它與有多少個生產者線程在併發的往隊列中存放數據有關:每一個生產者線程所執行的CAS操做都必須嚴格遵循FIFO次序,一個用於申請空間,另外一個用於通知消費者數據已經寫入完成能夠被讀取了.

若是咱們的應用程序只有惟一的生產者操做這個隊列,sche_yield()將永遠沒有機會被調用,第2個CAS操做永遠不會失敗.由於在一個生產者的狀況下沒有人能破壞生產者執行這兩個CAS操做的FIFO順序.

而當多於一個生產者線程往隊列中存放數據的時候,問題就出現了.存放數據的完整過程能夠參看3.1.1小節,歸納來講,一個生產者經過第1個CAS操做申請空間,而後將數據寫入到申請到的空間中,而後執行第2個CAS操做通知消費者數據準備完畢可供讀取了.這第2個CAS操做必須遵循FIFO順序,也就是說,若是A線程第首先執行完第一個CAS操做,那麼它也要第1個執行完第2個CAS操做,若是A線程在執行完第一個CAS操做以後中止,而後B線程執行完第1個CAS操做,那麼B線程將沒法完成第2個CAS操做,由於它要等待A先完成第2個CAS操做.而這就是問題產生的根源.讓咱們考慮以下場景,3個消費者線程和1個消費者線程:

  • 線程1,2,3按順序調用第1個CAS操做申請了空間.那麼它們完成第2個CAS操做的順序也應該與這個順序一致,1,2,3.
  • 線程2首先嚐試執行第2個CAS,但它會失敗,由於線程1還沒完成它的第2此CAS操做呢.一樣對於線程3也是同樣的.
  • 線程2和3將會不斷的調用它們的第2個CAS操做,直到線程1完成它的第2個CAS操做爲止.
  • 線程1最終完成了它的第2個CAS,如今線程3必須等線程2先完成它的第2個CAS.
  • 線程2也完成了,最終線程3也完成.

在上面的場景中,生產者可能會在第2個CAS操做上自旋一段時間,用於等待先於它執行第1個CAS操做的線程完成它的第2次CAS操做.在一個物理處理器數量大於操做隊列線程數量的系統上,這不會有太嚴重的問題:由於每一個線程均可以分配在本身的處理器上執行,它們最終都會很快完成各自的第2次CAS操做.雖然算法致使線程處理忙等狀態,但這正是咱們所指望的,由於這使得操做更快的完成.也就是說在這種狀況下咱們是不須要sche_yield()的,它徹底能夠從代碼中刪除.

可是,在一個物理處理器數量少於線程數量的系統上,sche_yield()就變得相當重要了.讓咱們再次考查上面3個線程的場景,當線程3準備向隊列中插入數據:若是線程1在執行完第1個CAS操做,在執行第2個CAS操做以前被搶佔,那麼線程2,3就會一直在它們的第2個CAS操做上忙等(它們忙等,不讓出處理器,線程1也就沒機會執行,它們就只能繼續忙等),直到線程1從新被喚醒,完成它的第2個CAS操做.這就是須要sche_yield()的場合了,操做系統應該避免讓線程2,3處於忙等狀態.它們應該儘快的讓出處理器讓線程1執行,使得線程1能夠把它的第2個CAS操做完成.這樣線程2和3才能繼續完成它們的操做.

4 已知的問題

這個無鎖隊列的設計目標就是實現一個無須動態內存分配的無鎖隊列.顯然這個目標已經達到了,可是這個算法也存在一些缺點,在將此隊列用於生產環境以前你必須仔細考慮這些缺點會不會對你的應用程序形成什麼問題.

4.1 多於一個生產者線程

這個問題咱們在3.1.4小節已經詳細的討論過了.若是有多於一個的生產者線程,那麼將它們極可能花費大量的時間用於等待更新MaximumReadIndex(第2個CAS).這個隊列最初的設計場景是知足單一消費者,因此不用懷疑在多生產者的情形下會比單一輩子產者有大幅的性能降低.

另外若是你只打算將此隊列用於單一輩子產者的場合,那麼第2個CAS操做能夠去除.一樣m_maximumReadIndex也能夠一同被移除了,全部對m_maximumReadIndex的引用都改爲m_writeIndex.因此,在這樣的場合下push和pop能夠被改寫以下:

template <typename ELEM_T>
    bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
    {
        uint32_t currentReadIndex;
        uint32_t currentWriteIndex;

        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
        if (countToIndex(currentWriteIndex + 1) == 
            countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }

        // save the date into the q
        m_theQueue[countToIndex(currentWriteIndex)] = a_data;

        // increment atomically write index. Now a consumer thread can read
        // the piece of data that was just stored 
        AtomicAdd(&m_writeIndex, 1);

        return true;
    }

    template <typename ELEM_T>
    bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
    {
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;

    do
    {
        // m_maximumReadIndex doesn't exist when the queue is set up as
        // single-producer. The maximum read index is described by the current
        // write index
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_writeIndex;

        if (countToIndex(currentReadIndex) == 
            countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is 
            // waiting to commit the data into it
            return false;
        }

        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];

        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we 
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
            return true;
        }

        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation        

    } while(1); // keep looping to try again!

    // Something went wrong. it shouldn't be possible to reach here
    assert(0);

    // Add this return statement to avoid compiler warnings
    return false;
    }

若是你打算將此隊列用於單一輩子產者和單一消費者的場合,那麼閱讀[11]將是十分有價值的,由於它的設計目標就是針對這種場合的.

4.2 與智能指針一塊兒使用隊列

若是你打算用這個隊列來存放智能指針對象.須要注意,將一個智能指針存入隊列以後,若是它所佔用的位置沒有被另外一個智能指針覆蓋,那麼它所指向的內存是沒法被釋放的(由於它的引用計數器沒法降低爲0).這對於一個操做頻繁的隊列來講沒有什麼問題,可是程序員須要注意的是,一旦隊列被填滿過一次那麼應用程序所佔用的內存就不會降低,即便隊列被清空.

4.3 計算隊列的大小

size函數可能會返回一個不正確的值,size的實現以下:

template <typename ELEM_T>    
    inline uint32_t ArrayLockFreeQueue<ELEM_T>::size()
    {
        uint32_t currentWriteIndex = m_writeIndex;
        uint32_t currentReadIndex  = m_readIndex;

        if (currentWriteIndex >= currentReadIndex)
        {
            return (currentWriteIndex - currentReadIndex);
        }
        else
        {
            return (m_totalSize + currentWriteIndex - currentReadIndex);
        }
    }

下面的場景描述了size爲什麼會返回一個不正確的值:

  • 1 當currentWriteIndex = m_writeIndex執行以後,m_writeIndex=3,m_readIndex = 2那麼實際size是1
  • 2 以後操做線程被搶佔,且在它中止運行的這段時間內,有2個元素被插入和從隊列中移除.因此m_writeIndex=5,m_readIndex = 4,而size仍是1
  • 3 如今被搶佔的線程恢復執行,讀取m_readIndex值,這個時候currentReadIndex=4,currentWriteIndex=3.
  • 4 currentReadIndex > currentWriteIndex'因此m_totalSize + currentWriteIndex - currentReadIndex`被返回,這個值意味着隊列幾乎是滿的,而實際上隊列幾乎是空的

與本文一塊兒上傳的代碼中包含了處理這個問題的解決方案.添加一個用於保存隊列中元素數量的成員count,這個成員能夠經過AtomicAdd/AtomicSub來實現原子的遞增和遞減.
但須要注意的是這增長了必定開銷,由於原子遞增,遞減操做比較昂貴也很難被編譯器優化.

例如,在core 2 duo E6400 2.13 Ghz 的機器上,單生產者單消費者,隊列數組的初始大小是1000,測試執行10,000k次的插入,沒有count成員的版本用時2.64秒,而維護了count成員的版本用時3.42秒.而對於2消費者,1生產者的狀況,沒有count成員的版本用時3.98秒,維護count的版本用時5.15秒.

這也就是爲何我把是否啓用此成員變量的選擇交給實際的使用者.使用者能夠根據本身的使用場合選擇是否承受額外的運行時開銷.
array_lock_free_queue.h中有一個名爲ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE的宏變量,若是它被定義那麼將啓用count變量,不然將size函數將有可能返回不正確的值.

5 編譯代碼

本文中的無鎖隊列和Glib阻塞隊列都是用模板實現的C++類型.模板代碼放在頭文件中,因此若是沒在.cpp文件中引用到相關的模板類型它是不會被編譯的.
我製做了一個.zip文件,裏面每種隊列都有一個對應的測試文件用於展現隊列的使用和相應的多線程測試用例.

測試代碼是用gomp編寫的,它是一個GNU實現的OpenMP應用程序接口,用於C/C++多平臺共享內存模式並行程序的開發[9].OpenMP是一個種簡單靈活的編程接口,
專門用於不一樣平臺的並行程序開發,使用它能夠方便快捷的編寫出多線程代碼.

本文附帶的代碼分紅3部分,每一部分都有不一樣的需求:

1.基於數組的無鎖隊列:

  • 有兩個版本的無鎖隊列.一個用於任意多線程,一個用於單一輩子產者.它們分別在array_lock_free_queue.h和array_lock_free_queue_single_producer.h
  • GCC的版本號必須大於等於4.1.0(CAS, AtomicAdd和AtomicSub).若是使用其它的編譯器,你須要在atomic_ops.h本身定義CAS實現.(這一般依賴於你的編譯器或平臺或者二者)
  • 若是在你的stdint.h中沒有定義uint32_t,那麼你必須本身定義,定義方式以下:

    typedef unsigned int uint32_t; // int is (normally) 32bit in both 32 and 64bit machines

    另一個必須注意的是這個隊列沒有在64位環境下測試過.若是不支持64位的原子操做,GCC可能會拋出編譯時錯誤.這也就是爲什麼選擇用32位的變量來實現這個隊列(在一個32位的機器上可能不支持64位的原子操做).若是你的機器能夠支持64位原子操做,我沒發現隊列有什麼地方會由於操做64位索引而致使錯誤的地方.

  • 對任意多線程的隊列版本(在push中執行2個CAS操做),還依賴於sched_yield().這個函數是POSIX[10]的一部分,因此任何兼容POSIX的操做系統都應該能夠成功編譯.

2.基於Glib的阻塞隊列:

  • 首先你的系統中必須安裝了glib.對於GNU-Linux而言這個條件必然是知足的.對於其它系統,能夠從下面下載一個完整的GTK+庫, 它在GNU-Linux,Windows,OSX下均可以良好的工做:http://www.gtk.org/download.html

  • 使用到了glib實現的互斥鎖和條件變量,它們是gthread庫的一部分.因此你必須把這個庫連接到你編譯的程序中.

  1. 測試程序:
  • 知足上面所提到的需求
  • 使用GNU make處理makefile,你能夠向編譯器提供一些編譯時選項,例如:

    make N_PRODUCERS=1 N_CONSUMERS=1 N_ITERATIONS=10000000 QUEUE_SIZE=1000

    其中:

    • N_PRODUCERS是生產者線程的數量
    • N_CONSUMERS是消費者線程的數量
    • N_ITERATIONS對隊列執行的插入和移除次數
    • QUEUE_SIZE隊列的數組大小
  • GCC版本號大於4.2,用於編譯Gomp
  • 在運行測試程序以前在命令行添加OMP_NESTED=TRUE參數.例如:

    OMP_NESTED=TRUE ./test_lock_free_q

6 一些圖片

下面的對比圖展現了測試程序在2核心的機器上,不一樣設置和不一樣線程配置的測試數據.

##6.1 第2個CAS操做對性能形成的影響

一個已知道的問題是在單一輩子產者的狀況下,第2個CAS將對性能產生影響.下圖對比了單生產者優化的隊列和任意生產者隊列(值越小越好).從對比圖能夠看出,單生產者優化的版本有30%的性能提高.

Alt text

6.2 無鎖 vs 阻塞隊列

下圖的測試是在各類線程配置下,併發的插入和移除100W元素所花費的時間(越小越好,隊列的數組大小初始爲16384).
在單生產者的狀況下,無鎖隊列打敗了阻塞隊列.而隨着生產者數量的增長,無鎖隊列的效率迅速降低.

Alt text

6.3 4線程的效率

下面的圖,展現了在不一樣線程數量的配置下,不一樣的隊列執行100W次push和pop操做的性能對比.

6.3.1 一個生產者線程

Alt text

6.3.2 兩個生產者線程

Alt text

6.3.3 三個生產者線程

Alt text

6.3.1 一個消費者線程

Alt text

6.3.2 兩個消費者線程

Alt text

6.3.3 三個消費者線程

Alt text

6.4 使用一臺4核心的機器

強烈推薦你在一臺擁有4個核心的機器上執行上述測試.這樣你就能夠觀察sched_yield()對性能產生的影響.

7 結論

基於數組的無鎖隊列的兩個版本已經被證實能夠正常的工做,一個版本是多生產者線程安全的,另外一個版本是單生產者但能夠有多消費者.兩個版本的隊列均可以安全的做爲多線程應用程序的同步機制使用,由於:

  • CAS操做是原子的,線程並行執行push/pop不會致使死鎖
  • 多生產者同時向隊列push數據的時候不會將數據寫入到同一個位置,產生數據覆蓋
  • 多消費者同時執行pop不會致使一個元素被出列多於1次
  • 線程不能將數據push進已經滿的隊列中,不能從空的隊列中pop元素
  • push和pop都沒有ABA問題

可是,雖然這個隊列是線程安全的,可是在多生產者線程的環境下它的性能仍是不如阻塞隊列.所以,在符合下述條件的狀況下能夠考慮使用這個隊列來代替阻塞隊列:

  • 只有一個生產者線程
  • 只有一個頻繁操做隊列的生產者,但偶爾會有其它生產者向隊列push數據

8 歷史

4rd January 2011: Initial version

27th April 2011: Highlighting of some key words. Removed unnecesary uploaded file. A few typos fixed

9 參考

[1] Love, Robert: "Linux Kernel Development Second Edition", Novell Press, 2005

[2] Introduction to lock-free/wait-free and the ABA problem

[3] Lock Free Queue implementation in C++ and C#

[4] High performance computing: Writing Lock-Free Code

[5] M.M. Michael and M.L. Scott, "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms," Proc. 15th Ann. ACM Symp. Principles of Distributed Computing, pp. 267-275, May 1996.

[6] The Hoard Memory Allocator

[7] Glib Asynchronous Queues

[8] boost::shared_ptr documentation

[9] GNU libgomp

[10] sched_yield documentation

[11] lock-free single producer - single consumer circular queue

相關文章
相關標籤/搜索