環形無鎖隊列
Table of Contents
1 環形無鎖隊列的實現
數據結構定義:數組
template class LockFreeQueue { private: ElementT *mArray; int mCapacity; int mFront; int mTail; }
因爲出隊操做是在隊首進行,入隊操做是在隊尾進行,所以,咱們能夠嘗試用mFront和mTail來實現多個線程之間的協調。這其中會用到CAS操做:安全
入隊操做僞碼:數據結構
…… do { 獲取當前的mTail的值:curTailIndex; 計算新的mTail的值:newTailIndex = (newTailIndex + 1) % size; } while(!CAS(mTail, curTailIndex, newTailIndex)); 插入元素到curTailIndex;
其中的do-while循環實現的是一個忙式等待:線程試圖獲取當前的隊列尾部空間的控制權;一旦獲取成功,則向其中插入元素。多線程
可是這樣出隊的時候就出現了問題:如何判斷隊首的位置裏是否有相應元素呢?僅使用mFront來判斷是不行的,這隻能保證出隊進程不會對同一個索引位置進行出隊操做,而不能保證mFront的位置中必定有有效的元素。所以,爲了保證出隊隊列與入隊隊列之間的協調,須要在LockFreeQueue中添加一個標誌數組:less
char *mFlagArray;
mFlagArray中的元素標記mArray中與之對應的元素位置是否有效。mFlagArray中的元素有4個取值:函數
- 0表示對應的mArray中的槽位爲空;
- 1表示對應槽位已被申請,正在寫入;
- 2表示對應槽位中爲有效的元素,能夠對其進行出對操做;
- 3則表示正在彈出操做。
修改後的無鎖隊列的代碼以下:post
template class LockFreeQueue { public: LockFreeQueue(int s = 0) { mCapacity = s; mFront = 0; mTail = 0; mSize = 0; } ~LockFreeQueue() {} /** * 初始化queue。分配內存,設定size * 非線程安全,需在單線程環境下使用 */ bool initialize() { mFlagArray = new char[mCapacity]; if (NULL == mFlagArray) return false; memset(mFlagArray, 0, mCapacity); mArray = reinterpret_cast(new char[mCapacity * sizeof(ElementT)]); if (mArray == NULL) return false; memset(mArray, 0, mCapacity * sizeof(ElementT)); return true; } const int capacity(void) const { return mCapacity; } const int size(void) const { return mSize; } /** * 入隊函數,線程安全 */ bool push(const ElementT & ele) { if (mSize >= mCapacity) return false; int curTailIndex = mTail; char *cur_tail_flag_index = mFlagArray + curTailIndex; //// 忙式等待 // while中的原子操做:若是當前tail的標記爲已佔用(1),則更新cur_tail_flag_index,繼續循環;不然,將tail標記設爲已經佔用 while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1)) { curTailIndex = mTail; cur_tail_flag_index = mFlagArray + curTailIndex; } //// 兩個入隊線程之間的同步 int update_tail_index = (curTailIndex + 1) % mCapacity; // 若是已經被其餘的線程更新過,則不須要更新; // 不然,更新爲 (curTailIndex+1) % mCapacity; __sync_bool_compare_and_swap(&mTail, curTailIndex, update_tail_index); // 申請到可用的存儲空間 *(mArray + curTailIndex) = ele; // 寫入完畢 __sync_fetch_and_add(cur_tail_flag_index, 1); // 更新size;入隊線程與出隊線程之間的協做 __sync_fetch_and_add(&mSize, 1); return true; } /** * 出隊函數,線程安全 */ bool pop(ElementT *ele) { if (mSize <= 0) return false; int cur_head_index = mFront; char *cur_head_flag_index = mFlagArray + cur_head_index; while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3)) { cur_head_index = mFront; cur_head_flag_index = mFlagArray + cur_head_index; } // 取模操做能夠優化 int update_head_index = (cur_head_index + 1) % mCapacity; __sync_bool_compare_and_swap(&mFront, cur_head_index, update_head_index); *ele = *(mArray + cur_head_index); // 彈出完畢 __sync_fetch_and_sub(cur_head_flag_index, 3); // 更新size __sync_fetch_and_sub(&mSize, 1); return true; } private: ElementT *mArray; int mCapacity; // 環形數組的大小 int mSize; //隊列中元素的個數 int mFront; int mTail; char *mFlagArray; // 標記位,標記某個位置的元素是否被佔用 };
2 死鎖及飢餓
LockFreeQueue實現了基本的多線程之間的協調,不會存在多個線程同時對同一個資源進行操做的狀況,也就不會產生數據競跑,這保證了對於這個隊列而言,基本的訪問操做(出隊、入隊)的執行都是安全的,其結果是可預期的。fetch
在多線程環境下,LockFreeQueue會不會出現死鎖的狀況呢?死鎖有四個必要條件:優化
- 對資源的訪問是互斥的;
- 請求和保持請求;
- 資源不可剝奪;
- 循環等待。
在LockFreeQueue中,全部的線程都是對資源進行申請後再使用,一個線程若申請到了資源(這裏的資源主要指環形隊列中的內存槽位),就會當即使用,而且在使用完後釋放掉該資源。不存在一個線程使用A資源的同時去申請B資源的狀況,所以並不會出現死鎖。
但LockFreeQueue可能出現飢餓狀態。例如,對兩個出隊線程A、B,二者都循環進行出隊操做。當隊列中有元素時,A總能申請到這個元素而且執行到彈出操做,而B則只能在DeQueue函數的while循環中一直循環下去。
3 一些優化
對LockFreeQueue能夠進行一些優化。好比:
- 對於環形數組大小,能夠設定爲2的整數倍,如1024。這樣取模的操做便可以簡化爲與mCapacity-1的按位與操做。
- 忙式等待的時候可能會出現某個線程一直佔用cpu的狀況。此時可使用sleep(0),讓該線程讓出CPU時間片,從就緒態轉爲掛起態。