環形無鎖隊列

 

環形無鎖隊列html

 

環形無鎖隊列

1 環形無鎖隊列的實現

數據結構定義:數組

template class LockFreeQueue
{
  private:
    ElementT *mArray;
    int mCapacity;
    int mFront;
    int mTail;
}

因爲出隊操做是在隊首進行,入隊操做是在隊尾進行,所以,咱們能夠嘗試用mFront和mTail來實現多個線程之間的協調。這其中會用到CAS操做:安全

入隊操做僞碼:數據結構

……

do {
    獲取當前的mTail的值:curTailIndex;
    計算新的mTail的值:newTailIndex = (newTailIndex + 1) % size;
} while(!CAS(mTail, curTailIndex, newTailIndex));
插入元素到curTailIndex;

其中的do-while循環實現的是一個忙式等待:線程試圖獲取當前的隊列尾部空間的控制權;一旦獲取成功,則向其中插入元素。多線程

可是這樣出隊的時候就出現了問題:如何判斷隊首的位置裏是否有相應元素呢?僅使用mFront來判斷是不行的,這隻能保證出隊進程不會對同一個索引位置進行出隊操做,而不能保證mFront的位置中必定有有效的元素。所以,爲了保證出隊隊列與入隊隊列之間的協調,須要在LockFreeQueue中添加一個標誌數組:less

char *mFlagArray;

mFlagArray中的元素標記mArray中與之對應的元素位置是否有效。mFlagArray中的元素有4個取值:函數

  • 0表示對應的mArray中的槽位爲空;
  • 1表示對應槽位已被申請,正在寫入;
  • 2表示對應槽位中爲有效的元素,能夠對其進行出對操做;
  • 3則表示正在彈出操做。

修改後的無鎖隊列的代碼以下:post

template class LockFreeQueue
{
  public:
    LockFreeQueue(int s = 0)
    {
        mCapacity = s;
        mFront = 0;
        mTail = 0;
        mSize = 0;
    }

    ~LockFreeQueue() {}

    /**
     * 初始化queue。分配內存,設定size
     * 非線程安全,需在單線程環境下使用
     */
    bool initialize()
    {
        mFlagArray = new char[mCapacity];
        if (NULL == mFlagArray)
            return false;
        memset(mFlagArray, 0, mCapacity);

        mArray = reinterpret_cast(new char[mCapacity * sizeof(ElementT)]);
        if (mArray == NULL)
            return false;
        memset(mArray, 0, mCapacity * sizeof(ElementT));

        return true;
    }

    const int capacity(void) const
    {
        return mCapacity;
    }

    const int size(void) const
    {
        return mSize;
    }

    /**
     * 入隊函數,線程安全
     */
    bool push(const ElementT & ele)
    {
        if (mSize >= mCapacity)
            return false;

        int curTailIndex = mTail;

        char *cur_tail_flag_index = mFlagArray + curTailIndex;

        //// 忙式等待
        // while中的原子操做:若是當前tail的標記爲已佔用(1),則更新cur_tail_flag_index,繼續循環;不然,將tail標記設爲已經佔用
        while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1))
        {
            curTailIndex = mTail;
            cur_tail_flag_index = mFlagArray +  curTailIndex;
        }

        //// 兩個入隊線程之間的同步
        int update_tail_index = (curTailIndex + 1) % mCapacity;

        // 若是已經被其餘的線程更新過,則不須要更新;
        // 不然,更新爲 (curTailIndex+1) % mCapacity;
        __sync_bool_compare_and_swap(&mTail, curTailIndex, update_tail_index);

        // 申請到可用的存儲空間
        *(mArray + curTailIndex) = ele;

        // 寫入完畢
        __sync_fetch_and_add(cur_tail_flag_index, 1);

        // 更新size;入隊線程與出隊線程之間的協做
        __sync_fetch_and_add(&mSize, 1);
        return true;
    }

    /**
     * 出隊函數,線程安全
     */
    bool pop(ElementT *ele)
    {
        if (mSize <= 0)
            return false;

        int cur_head_index = mFront;
        char *cur_head_flag_index = mFlagArray + cur_head_index;
        while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3))
        {
            cur_head_index = mFront;
            cur_head_flag_index = mFlagArray + cur_head_index;
        }

        // 取模操做能夠優化
        int update_head_index = (cur_head_index + 1) % mCapacity;
        __sync_bool_compare_and_swap(&mFront, cur_head_index, update_head_index);
        *ele = *(mArray + cur_head_index);

        // 彈出完畢
        __sync_fetch_and_sub(cur_head_flag_index, 3);

        // 更新size
        __sync_fetch_and_sub(&mSize, 1);

        return true;
    }
  private:
    ElementT *mArray;
    int mCapacity; // 環形數組的大小
    int mSize; //隊列中元素的個數
    int mFront;
    int mTail;
    char *mFlagArray; // 標記位,標記某個位置的元素是否被佔用
};

2 死鎖及飢餓

LockFreeQueue實現了基本的多線程之間的協調,不會存在多個線程同時對同一個資源進行操做的狀況,也就不會產生數據競跑,這保證了對於這個隊列而言,基本的訪問操做(出隊、入隊)的執行都是安全的,其結果是可預期的。fetch

在多線程環境下,LockFreeQueue會不會出現死鎖的狀況呢?死鎖有四個必要條件:優化

  1. 對資源的訪問是互斥的;
  2. 請求和保持請求;
  3. 資源不可剝奪;
  4. 循環等待。

在LockFreeQueue中,全部的線程都是對資源進行申請後再使用,一個線程若申請到了資源(這裏的資源主要指環形隊列中的內存槽位),就會當即使用,而且在使用完後釋放掉該資源。不存在一個線程使用A資源的同時去申請B資源的狀況,所以並不會出現死鎖。

但LockFreeQueue可能出現飢餓狀態。例如,對兩個出隊線程A、B,二者都循環進行出隊操做。當隊列中有元素時,A總能申請到這個元素而且執行到彈出操做,而B則只能在DeQueue函數的while循環中一直循環下去。

3 一些優化

對LockFreeQueue能夠進行一些優化。好比:

  1. 對於環形數組大小,能夠設定爲2的整數倍,如1024。這樣取模的操做便可以簡化爲與mCapacity-1的按位與操做。
  2. 忙式等待的時候可能會出現某個線程一直佔用cpu的狀況。此時可使用sleep(0),讓該線程讓出CPU時間片,從就緒態轉爲掛起態。

Date: 2015-12-01T23:33+0800

Author: ruleless

Org version 7.9.3f with Emacs version 24

Validate XHTML 1.0
相關文章
相關標籤/搜索