template <typename T, typename Allocator = AlignedAllocator<Slot<T>>> class Queue { private: static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable"); static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible");
public: explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { if (capacity_ < 1) { throw std::invalid_argument("capacity < 1"); } // Allocate one extra slot to prevent false sharing on the last slot slots_ = allocator_.allocate(capacity_ + 1); // Allocators are not required to honor alignment for over-aligned types (see http://eel.is/c++draft/allocator.requirements#10) so we verify alignment here if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) { allocator_.deallocate(slots_, capacity_ + 1); throw std::bad_alloc(); } for (size_t i = 0; i < capacity_; ++i) { new (&slots_[i]) Slot<T>(); } static_assert( alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent false sharing between adjacent slots"); static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to prevent false sharing between adjacent queues"); static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing"); }
~Queue() noexcept { for (size_t i = 0; i < capacity_; ++i) { slots_[i].~Slot(); } allocator_.deallocate(slots_, capacity_ + 1); } // non-copyable and non-movable Queue(const Queue &) = delete; Queue &operator=(const Queue &) = delete;
template <typename... Args> void emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); auto &slot = slots_[idx(head)]; while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) ; slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); }
MPMCQueue類使用head_和tail_兩個數據成員做爲隊列的首元素和尾元素的索引標識 head_爲隊列首元素的下一個元素索引,即下一個插入位置的索引值,tail_爲隊列尾元素的索引,可是這兩個數據不會有減少的操做,而是一直fetch_add(1),取元素的時候使用idx(head)得到真正的索引值,這裏idx輔助函數就是head % capacity_,而turn函數的實現爲head / capacity_,能夠這麼理解,turn的返回值表明了head遍歷當前隊列的趟數,假設capacity_ = 5,則:設計
head = 0,turn(head) = 0,當前head遍歷了隊列0趟。
head = 1, turn(head) = 0, head前進了一個單位,但仍是0趟。
head = 5,turn(head) = 1, head又指向了隊列的第一個Slot(由於idx(head) = 0),而已是第1趟遍歷隊列了。code
調用head_.fetch_add函數,這個函數首先修改head_保存的值而後返回修改以前的值,這樣原子的更新了下一個插入操做的位置並獲得本次插入位置的索引值,經過auto& slot = slots_[idx(head)]得到該Slot的引用。
接下來是一個while循環,經過不斷比較turn(head) * 2 和slot.turn的值,相等的時候認爲該Slot是空的,不然在這裏無限循環,等待slot.turn的值改變。以後就調用construct函數在Slot對象中構造T類型對象,並給slot.turn賦值爲turn(head) * 2 + 1。對象
暫且忽略原子操做的內存一致性選項(以後分析),能夠分析每一個Slot對象turn的值表明了該Slot對象中是否存在T類型對象,當slot.turn = turn(head) * 2時不存在,當slot.turn = turn(head) * 2 + 1時存在。
head_第0趟遍歷到該Slot對象的時候,slot.turn = 0, while判斷成功,構造對象,slot.turn被賦值爲1。假設一直沒有pop操做而不斷插入數據,head_不斷增長直到又找到了這個Slot對象(這個時候隊列已經滿了),這時候head_的趟數變爲1,因此while判斷(1 * 2 != 1)失敗,表示這個Slot對象中已經含有T類型對象,不能插入。分析到這裏能夠知道,pop函數中也在不斷修改slot.turn值,當tail_第0趟遍歷隊列的時候,會把slot.turn從1變爲2,這時emplace操做的while判斷就會成功,便可以插入T類型對象。
slot.turn = 0 // init.索引
wait slot.turn == 0 :
slot.turn = 1
construct object.接口
wait slot.turn == 1 :
slot.turn = 2
destruct object.
wait slot.turn == 2 :
slot.turn = 3
construct object
~Slot() noexcept { if (turn & 1) { destroy(); } }
當turn爲奇數的時候turn & 1的結果爲真,調用destroy函數。
void pop(T &v) noexcept { auto const tail = tail_.fetch_add(1); auto &slot = slots_[idx(tail)]; while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) ; v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); }
能夠看到,pop函數中在等待slot.turn變爲turn(tail) * 2 + 1,而後move出對象,並修改slot.turn爲turn(tail) * 2 + 2。
template <typename... Args> bool try_emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); //獲取當前時刻的插入位置索引值 auto head = head_.load(std::memory_order_acquire); for (;;) { //獲取idx(head)對應的元素,注意此刻slot已經不必定是插入位置索引了。 auto &slot = slots_[idx(head)]; //判斷插入位置是否是空的,若是是空的的話 if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { //判斷head_是否被更新過,若是沒有的話cas操做成功,構造對象,更新turn,最後返回true便可,cas操做失敗,head會被更新爲head_的新值,從新進入循環。 if (head_.compare_exchange_strong(head, head + 1)) { slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); return true; } } else { //插入位置不是空的,此時判斷head_節點是否被更新過,若是沒有更新過就意味着隊列已經滿了,插入節點已是隊尾節點了,所以返回false,若是被更新過,則更新了head,從新進入循環判斷。 auto const prevHead = head; head = head_.load(std::memory_order_acquire); if (head == prevHead) { return false; } } } }