template <typename T, typename Allocator = AlignedAllocator<Slot<T>>> class Queue { private: static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable"); static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible");
第一個模板參數是隊列存儲的對象類型,第二個模板參數爲內存分配器,默認使用AlignedAllocator,即上文定義的內存分配器。c++
要求T類型的拷貝賦值,移動賦值函數和析構函數都要是noexcept的。併發
public: explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { if (capacity_ < 1) { throw std::invalid_argument("capacity < 1"); } // Allocate one extra slot to prevent false sharing on the last slot slots_ = allocator_.allocate(capacity_ + 1); // Allocators are not required to honor alignment for over-aligned types (see http://eel.is/c++draft/allocator.requirements#10) so we verify alignment here if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) { allocator_.deallocate(slots_, capacity_ + 1); throw std::bad_alloc(); } for (size_t i = 0; i < capacity_; ++i) { new (&slots_[i]) Slot<T>(); } static_assert( alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent false sharing between adjacent slots"); static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to prevent false sharing between adjacent queues"); static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing"); }
上面代碼是Queue的構造函數,爲何要多申請一個Slot避免最後一個Slot的僞共享(不懂)?
而後檢查了分配的內存起始地址是否是以Slot<T>的對齊數對齊,由於分配器並不被要求對over-aligned(過分對齊)的類型進行對齊,若是不知足對齊要求,則避免僞共享的要求達不到,釋放內存拋出異常。
接着在capacity_個內存塊上構造Slot<T>對象,最後進行一系列的靜態斷言,確保各個對象的內存分佈與大小符合設計要求。函數
~Queue() noexcept { for (size_t i = 0; i < capacity_; ++i) { slots_[i].~Slot(); } allocator_.deallocate(slots_, capacity_ + 1); } // non-copyable and non-movable Queue(const Queue &) = delete; Queue &operator=(const Queue &) = delete;
析構函數沒有什麼意外之處.fetch
template <typename... Args> void emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); auto &slot = slots_[idx(head)]; while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) ; slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); }
向隊列首部插入一個元素,這個函數涉及到MPMCQueue核心的設計思路,所以對源碼的分析先暫停,研究下MPMCQueue入隊出隊的操做實現。ui
MPMCQueue類使用head_和tail_兩個數據成員做爲隊列的首元素和尾元素的索引標識 head_爲隊列首元素的下一個元素索引,即下一個插入位置的索引值,tail_爲隊列尾元素的索引,可是這兩個數據不會有減少的操做,而是一直fetch_add(1),取元素的時候使用idx(head)得到真正的索引值,這裏idx輔助函數就是head % capacity_,而turn函數的實現爲head / capacity_,能夠這麼理解,turn的返回值表明了head遍歷當前隊列的趟數,假設capacity_ = 5,則:設計
head = 0,turn(head) = 0,當前head遍歷了隊列0趟。
head = 1, turn(head) = 0, head前進了一個單位,但仍是0趟。
...
head = 5,turn(head) = 1, head又指向了隊列的第一個Slot(由於idx(head) = 0),而已是第1趟遍歷隊列了。code
所以,emplace函數中首先遞增head_,這樣就經過idx(head)得到了隊首Slot的前一個Slot索引,也就是本次構造T類型對象的Slot的位置,
調用head_.fetch_add函數,這個函數首先修改head_保存的值而後返回修改以前的值,這樣原子的更新了下一個插入操做的位置並獲得本次插入位置的索引值,經過auto& slot = slots_[idx(head)]得到該Slot的引用。
接下來是一個while循環,經過不斷比較turn(head) * 2 和slot.turn的值,相等的時候認爲該Slot是空的,不然在這裏無限循環,等待slot.turn的值改變。以後就調用construct函數在Slot對象中構造T類型對象,並給slot.turn賦值爲turn(head) * 2 + 1。對象
暫且忽略原子操做的內存一致性選項(以後分析),能夠分析每一個Slot對象turn的值表明了該Slot對象中是否存在T類型對象,當slot.turn = turn(head) * 2時不存在,當slot.turn = turn(head) * 2 + 1時存在。
head_第0趟遍歷到該Slot對象的時候,slot.turn = 0, while判斷成功,構造對象,slot.turn被賦值爲1。假設一直沒有pop操做而不斷插入數據,head_不斷增長直到又找到了這個Slot對象(這個時候隊列已經滿了),這時候head_的趟數變爲1,因此while判斷(1 * 2 != 1)失敗,表示這個Slot對象中已經含有T類型對象,不能插入。分析到這裏能夠知道,pop函數中也在不斷修改slot.turn值,當tail_第0趟遍歷隊列的時候,會把slot.turn從1變爲2,這時emplace操做的while判斷就會成功,便可以插入T類型對象。
所以對於每一個slot.turn其實在不斷經歷以下過程:
slot.turn = 0 // init.索引
//emplace
wait slot.turn == 0 :
slot.turn = 1
construct object.接口
//pop
wait slot.turn == 1 :
slot.turn = 2
destruct object.
//emplace
wait slot.turn == 2 :
slot.turn = 3
construct object
...
當slot.turn爲奇數的時候Slot中存在對象,當slot.turn爲偶數的時候Slot中不存在對象,這時候咱們回顧下Slot的析構函數:
~Slot() noexcept { if (turn & 1) { destroy(); } }
當turn爲奇數的時候turn & 1的結果爲真,調用destroy函數。
爲了驗證這個猜測下面看下pop函數的代碼:
void pop(T &v) noexcept { auto const tail = tail_.fetch_add(1); auto &slot = slots_[idx(tail)]; while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) ; v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); }
能夠看到,pop函數中在等待slot.turn變爲turn(tail) * 2 + 1,而後move出對象,並修改slot.turn爲turn(tail) * 2 + 2。
Queue類中還有try_emplace,push,try_push,try_pop接口函數,核心邏輯與emplace和pop大同小異,預計和原子操做的內存一致性選項一塊兒分析吧。
template <typename... Args> bool try_emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); //獲取當前時刻的插入位置索引值 auto head = head_.load(std::memory_order_acquire); for (;;) { //獲取idx(head)對應的元素,注意此刻slot已經不必定是插入位置索引了。 auto &slot = slots_[idx(head)]; //判斷插入位置是否是空的,若是是空的的話 if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { //判斷head_是否被更新過,若是沒有的話cas操做成功,構造對象,更新turn,最後返回true便可,cas操做失敗,head會被更新爲head_的新值,從新進入循環。 if (head_.compare_exchange_strong(head, head + 1)) { slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); return true; } } else { //插入位置不是空的,此時判斷head_節點是否被更新過,若是沒有更新過就意味着隊列已經滿了,插入節點已是隊尾節點了,所以返回false,若是被更新過,則更新了head,從新進入循環判斷。 auto const prevHead = head; head = head_.load(std::memory_order_acquire); if (head == prevHead) { return false; } } } }
上面是try_emplace函數的代碼,分析以註釋的方式寫在源代碼裏,屬於比較精巧的部分了,由於要處理時刻可能存在的併發問題,所以須要引入關鍵的cas判斷與更新操做。原子讀寫採用Require-Release模型,見:https://zhuanlan.zhihu.com/p/...
其他的接口函數,try_pop的實現思路與try_emplace大同小異,push,try_push只是emplace版本的套殼實習,故不作贅述。