0MQ底層隊列設計

 

 

 

 

ypipe_t has a yqueue_t. pipe_t relates two ypipe(s)。pipe_t就是0MQ框架內使用的底層隊列。react

yqueue_t的設計目的。算法

 

 yqueue_t 的結構成員編程

這個隊列的設計與std::deque有的類似,都用chunk雙向鏈表實現,一樣避免FIFO隊列對內存過分頻繁的分配和釋放。yqueue_t不支持隨機訪問,以及遍歷迭代。yqueue_t採用了一個spare_chunk,保存最近一次訪問事後釋放或分配出來的chunk,提升空閒chunk的cache親和性。而且spare_chunk使用了lock-free的atomic_ptr_t。數組

begin_chunk指向chunk鏈表的開始端,back_chunk指向chunk鏈表的有效尾端。網絡

yqueue只支持相似std::deque的pop_front, pop_back, push_back。session

pop_front當消耗完一個chunk,而後空閒出一個chunk,替換spare_chunk。併發

push_back當接近可寫chunk的末端,就會分配出一個chunk或取出spare_chunk,鏈入到鏈表尾端,end_chunk指向這個新的chunk。框架

pop_back可能會使當前可寫chunk空閒,但這個空閒出來的chunk不會放到spare_chunk。socket

 

接着就是ypipe_t, 其設計目的:atom

ypipe_t 結構成員:

這裏的lock-free,一樣關鍵在atomic_ptr_t類型的成員。一樣是利用cas(compare_and_swap)替代解決某些狀況的鎖。這裏並無實現RCU這類lock-free讀寫併發算法,也不支持讀之間的併發。設計目的,明確指出,只有一條線程能夠在任何指定時刻讀,以及一條線程能夠在任何指定時刻寫,換句話說,讀寫能夠無鎖併發,但讀之間不可併發,寫之間也不能夠併發。

這裏的讀寫lock-free,w指向yqueue的back,r指向yqueue的front,f指向yqueue未flush的位置。也就是f在w前,r是不會指向到f和w之間的位置,也就是yqueue未空也是不可讀的。讀寫以前的競態關鍵就是當r到達了f的位置。這時使用了一個atomic_ptr_t類型的c去進行lock-free解決。準確來講,競態可能會發生在讀線程的讀和寫線程的flush時刻。

 

若是你要使用這個ypipe做爲你的程序的隊列的話,你仍是得用同步原語控制讀之間以及寫之間的同步。最理想的使用就是,同時只有一條線程可讀,另外一條線程可寫,讀寫兩條線程之間是能夠lock-free併發的。若是用鎖的話,當發生競態就有一條線程要進入內核態等待,另外一條線程也要進入內核態去喚醒。詳細請看前面關於futex的文章。

 

最後就是pipe_t。

主要結構成員:

 

pipe_t關聯inpipe的讀端,關聯outpipe的寫端。在這一層並提供一個事件槽。

 

==================================================================

ypipe_t用於兩個小組件, 一個是IO消息的底層upipe_t,上面封裝一層pipe_t,用於某些類型的0MQ socket (socket_base_t),另外一個是cmd命令的cpipe_t用於mailbox,而mailbox是向io_thread_t以及rearer_t對象通信的隊列。mailbox並非底層io文件,但借用了signaler_t(實現用某種文件系統文件),並實現i_poll_event接口,讓mailbox的"io"歸入到操做系統的poller。當經過mailbox發送命令與mailbox的擁有者通信時,就會在某個reactor線程中處理事件。參看mailbox::send

 

==========================================================

整個zmq框架內部,對象之間使用mailbox進行通信。

來看一下zmq context

cxt_t將reaper_t回收線程,io_thread_t 以及 socket_base_t 的mailbox 按放在 slots 指針數組。

slots[0] = &term_mailbox

slots[1] = reaper->get_mailbox()

slots[2 .. 2+io_nr] = io_thread->get_mailbox()

slots[...] = socket_base->get_mailbox()

這裏要清楚,

1. 能夠向一個io_thread發送command,並讓command在目標io_thread上運行。

2. 向socket_base_t發送command,不會喚醒io_thread,由於socket_base_t不負責io,負責io的stream_engine_t,這纔是咱們一般理解的stream connection。你zmq_poll這個socket_base_t的控制線程會被喚醒,並執行這個命令。可是通常不會直接向socket_base_t發送command。而是一般繼承它的tid(實質是slots的索引)的pipe_t,在寫入pipe_t後,向pipe_t發送send_activate_read(send_command),就會最終向它的父對象(不是繼承關係,而是建立關係)socket_base_t發送command。

 

======================================================

在zmq框架內部有這樣一張 pipe 網絡。

1. 首先0MQ socket 不是咱們理解的傳統意義的socket(某個鏈接的插口對),一個0MQ socket能夠有多個鏈接,而且是透明的。

2. 每一個0MQ socket會一對 pipe 用於編程者的訪問。

3. 0MQ socket裏面每一個鏈接(session)都有一對 pipe,0MQ socket會透明地將消息在它的 pipe 和 多個session的pipe之間進行交換。

4. ctx_t包含了slots(mailbox,一種借用了signaler能夠被socket poll的pipe),對象間的消息通信使用這些mailbox。

5. zmq_proxy,透明地將一個socket_base_t的pipe和另外一個socket_base_t的pipe的消息進行交換。

相關文章
相關標籤/搜索