ZMQ源代碼分析(一)-- 基礎數據結構的實現

yqueue 和 ypipe

zmq號稱是」史上最快的消息隊列」,因而可知zmq中最重要的數據結構就是隊列。css

zmq的隊列主要由yqueue和ypipe實現。yqueue是隊列的基本操做,如下首先分析yqueue的實現。sql

//  Individual memory chunk to hold N elements.
        //  Individual memory chunk to hold N elements.
        struct chunk_t
        {
             T values [N];
             chunk_t *prev;
             chunk_t *next;
        };

        //  Back position may point to invalid memory if the queue is empty,
        //  while begin & end positions are always valid. Begin position is
        //  accessed exclusively be queue reader (front/pop), while back and
        //  end positions are accessed exclusively by queue writer (back/push).
        chunk_t *begin_chunk;
        int begin_pos;
        chunk_t *back_chunk;
        int back_pos;
        chunk_t *end_chunk;
        int end_pos;

        //  People are likely to produce and consume at similar rates.  In
        //  this scenario holding onto the most recently freed chunk saves
        //  us from having to call malloc/free.
        atomic_ptr_t<chunk_t> spare_chunk;

在yqueue中有一個重要的結構體chunk_t,他是yqueue高效的關鍵因素。安全

內存的申請和釋放很浪費效率。yqueue爲了不頻繁的內存操做。每次不會申請一個元素大小的內存空間。而是申請一批,這一批元素就保存在chunk_t結構體中。yqueu用三個指針和三個遊標來記錄chunk以及在chunk內有效的數據的索引。以如下的push操做爲例,當在隊列的末尾加入一個元素時,會先推斷當前尾端的chunk是否還有空暇的元素。即end_pos是否等於N-1。相等則說明需要申請新的chunk_t,不然直接移動end_pos就能夠。另外由於許多隊列中生產和消費的速率比較一致,因此yqueue用一個spare_chunk來保存剛剛釋放的chunk。這樣當需要申請新的chunk時就可以直接使用spare_chunk所記錄的chunk了。除了push外,yqueue還提供了pop和unpush操做,實現原理和push類似。markdown

// Adds an element to the back end of the queue.
        inline void push ()
        {
            back_chunk = end_chunk;
            back_pos = end_pos;

            if (++end_pos != N)
                return;

            chunk_t *sc = spare_chunk.xchg (NULL);
            if (sc) {
                end_chunk->next = sc;
                sc->prev = end_chunk;
            } else {
                end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
                alloc_assert (end_chunk->next);
                end_chunk->next->prev = end_chunk;
            }
            end_chunk = end_chunk->next;
            end_pos = 0;
        }

接下來看ypipe。ypipe繼承自ypipe_base_t,ypipe_base_t抽象出了ypipe和ypipe_conflate(後面分析)的基本操做:session

template <typename T> class ypipe_base_t
    {
    public:
        virtual ~ypipe_base_t () {}
        virtual void write (const T &value_, bool incomplete_) = 0;
        virtual bool unwrite (T *value_) = 0;
        virtual bool flush () = 0;
        virtual bool check_read () = 0;
        virtual bool read (T *value_) = 0;
        virtual bool probe (bool (*fn)(const T &)) = 0;
    };

ypipe包括了了一個yqueue隊列和四個很重要的指針,如下是ypipe的成員變量定義:數據結構

//  Allocation-efficient queue to store pipe items.
        //  Front of the queue points to the first prefetched item, back of
        //  the pipe points to last un-flushed item. Front is used only by
        //  reader thread, while back is used only by writer thread.
        yqueue_t <T, N> queue;

        //  Points to the first un-flushed item. This variable is used
        //  exclusively by writer thread.
        T *w;

        //  Points to the first un-prefetched item. This variable is used
        //  exclusively by reader thread.
        T *r;

        //  Points to the first item to be flushed in the future.
        T *f;

        //  The single point of contention between writer and reader thread.
        //  Points past the last flushed item. If it is NULL,
        //  reader is asleep. This pointer should be always accessed using
        //  atomic operations.
        atomic_ptr_t <T> c;

這四個指針很重要,如下來看一下他們各自的做用:app

//  Initialises the pipe.
        inline ypipe_t ()
        {
            //  Insert terminator element into the queue. queue.push ();

            //  Let all the pointers to point to the terminator.
            //  (unless pipe is dead, in which case c is set to NULL). r = w = f = &queue.back ();
            c.set (&queue.back ());
        }

初始化時先想隊列放入一個空對象做爲結束符,所有指針都指向這個結束符。less

// Write an item to the pipe. Don't flush it yet. If incomplete is
        // set to true the item is assumed to be continued by items
        // subsequently written to the pipe. Incomplete items are never
        // flushed down the stream.
        inline void write (const T &value_, bool incomplete_)
        {
            // Place the value to the queue, add new terminator element.
            queue.back () = value_;
            queue.push ();

            // Move the "flush up to here" poiter.
            if (!incomplete_)
                f = &queue.back ();
        }

        // Pop an incomplete item from the pipe. Returns true is such
        // item exists, false otherwise.
        inline bool unwrite (T *value_)
        {
            if (f == &queue.back ())
                return false;
            queue.unpush ();
            *value_ = queue.back ();
            return true;
        }

f指針指向了當前未作flush操做的第一個元素,假設是寫入了一條完整消息,那f指向的就是結束符。socket

//  Flush all the completed items into the pipe. Returns false if
        //  the reader thread is sleeping. In that case, caller is obliged to
        //  wake the reader up before using the pipe again.
        inline bool flush ()
        {
            //  If there are no un-flushed items, do nothing.
            if (w == f)
                return true;

            //  Try to set 'c' to 'f'.
            if (c.cas (w, f) != w) {

                //  Compare-and-swap was unseccessful because 'c' is NULL.
                //  This means that the reader is asleep. Therefore we don't // care about thread-safeness and update c in non-atomic // manner. We'll return false to let the caller know
                //  that reader is sleeping.
                c.set (f);
                w = f;
                return false;
            }

            //  Reader is alive. Nothing special to do now. Just move
            //  the 'first un-flushed item' pointer to 'f'.
            w = f;
            return true;
        }

flush操做比較重要,除了要把w指向f外,還要推斷當前pipe的read是不是sleep狀態,推斷的方式是用c和w做比較,c僅僅能有兩個值,要麼等於w,要麼爲空,當c爲空時說明以前的check_read操做沒有讀到元素。check_read返回false同一時候將c置爲空。ide

check_read的返回值決定了上層的操做策略。flush的返回值也代表了以前check_read操做是否返回了false。

// Check whether item is available for reading. inline bool check_read () { // Was the value prefetched already?

If so, return. if (&queue.front () != r && r) return true; // There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the // pointer from c in atomic fashion. If there are no // items to prefetch, set c to NULL (using compare-and-swap). r = c.cas (&queue.front (), NULL); // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, // it can happen during pipe shutdown when items // are being deallocated. if (&queue.front () == r || !r) return false; // There was at least one value prefetched. return true; } // Reads an item from the pipe. Returns false if there is no value. // available. inline bool read (T *value_) { // Try to prefetch a value. if (!check_read ()) return false; // There was at least one value prefetched. // Return it to the caller. *value_ = queue.front (); queue.pop (); return true; }

以前提到過check_read操做,它的返回值標記了隊列中是否有數據,他使用r指針來標記當前可以讀到的位置,假設r指針不在front位置處,說明有元素可讀。不然就用c和front對照來推斷當前是否有元素,假設沒有將c置爲空,代表讀操做處於睡眠狀態。

yqueue中指針的使用相對複雜。他們除了指向詳細位置外還標記了一些狀態,使用很巧妙。

dbuffer_t 和 ypipe_conflate_t

ypipe_conflate_t是ypipe_base_t的還有一種實現,和ypipe相比它的效率更高。但是數據是不安全的。

它的底層使用dbuffer_t實現的。

ypipe_conflate_t是zmq4.x版本號中新加入的一個數據結構,使用一些對數據完整性要求不高的需求,實現相對簡單。這裏不作詳細分析。

pipe

pipe是zmq中保存消息的一個雙向管道,他維護兩個ypipe_base_t隊列。一個inpipe,一個outpipe。他主要用於socket_base之間(進程內通信)或者socket_base和session_base之間傳遞消息。如下是pipe中比較重要的成員變量:

// Underlying pipes for both directions. upipe_t *inpipe; upipe_t *outpipe; // Can the pipe be read from / written to?

bool in_active; bool out_active; // High watermark for the outbound pipe. int hwm; // Low watermark for the inbound pipe. int lwm; // Number of messages read and written so far. uint64_t msgs_read; uint64_t msgs_written; // Last received peer's msgs_read. The actual number in the peer // can be higher at the moment. uint64_t peers_msgs_read; // The pipe object on the other side of the pipepair. pipe_t *peer; // Sink to send events to. i_pipe_events *sink; // States of the pipe endpoint: // active: common state before any termination begins, // delimiter_received: delimiter was read from pipe before // term command was received, // waiting_fo_delimiter: term command was already received // from the peer but there are still pending messages to read, // term_ack_sent: all pending messages were already read and // all we are waiting for is ack from the peer, // term_req_sent1: 'terminate' was explicitly called by the user, // term_req_sent2: user called 'terminate' and then we've got // term command from the peer as well. enum { active, delimiter_received, waiting_for_delimiter, term_ack_sent, term_req_sent1, term_req_sent2 } state; // If true, we receive all the pending inbound messages before // terminating. If false, we terminate immediately when the peer // asks us to. bool delay; // Identity of the writer. Used uniquely by the reader side. blob_t identity; // Pipe's credential. blob_t credential; const bool conflate;

in_active和out_active標記管道中的隊列是不是活躍狀態,假設隊列已滿或者隊列爲空,這兩個標記則設爲false,上層依據管道的狀態決定是否要進行休眠或者其它操做。比方session_base假設檢測到false則會把engine中相應的fd設置爲reset狀態。hwm和lwm是兩個閾值,hwm表示當前隊列已滿,lwm表示當msgs_read每達到lwm時要象對面的pipe發送一條激活消息。代表已經處理了一些數據,對面的可以繼續向管道內寫入數據。

消息的發送機制會在接下來的章節中分析。i_pipe_events 是一個抽象類:

struct i_pipe_events
    {
        virtual ~i_pipe_events () {}
        virtual void read_activated (zmq::pipe_t *pipe_) = 0;
        virtual void write_activated (zmq::pipe_t *pipe_) = 0;
        virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
        virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
    };

sink是指向上層實現i_pipe_events的類的指針(session_base或者socket_base),當隊列變爲激活狀態時。pipe需要經過sink通知上層可以從pipe中讀取數據或者寫入數據了。

相關文章
相關標籤/搜索