深刻Java集合學習系列:SynchronousQueue實現原理

http://m.blog.csdn.net/blog/luoyuyou/30256497java


背景:node

一個BlockingQueue的是一個這樣的隊列,每一個插入操做都必須等待另外一個刪除操做,反過來也同樣。一個同步隊列沒有內部容量這個概念。你不能使用peek操做,由於一個元素僅在你試着刪除它的時候纔可以被取得。你不能插入一個元素(任何方法),直到另外一個線程試着刪除它。你不能迭代它們,由於沒有東西能夠被迭代。queue的頭元素head是第一個進入隊列中的元素,而且插入線程正在爲它等待。若是隊列中沒有像以上的元素存在,那麼調用poll的方法會返回null。對於Collection的其餘方法(好比contains),SynchronousQueue表現得像一個空的集合。它不容許null入隊。算法

這個隊列相似於CSP和Ada中使用的會合信道。它們適合於切換的設計,好比一個線程中的對象必須同步等待另外一個線程中運行的對象從而傳遞一些信息/事件/任務。數據結構

這個類支持可選的公平策略從而制訂生產者和等待者的等待順序。默認狀況下,這個順序是沒有保證的,使用true能夠確保隊列是FIFO的。app

這個類以及它的迭代器實現了某些Collection和Iterator中的方法。less


算法:ide

這個算法實現了雙重棧和雙重隊列算法。oop

(LIfo)棧用做非公平模式,(Fifo)隊列用於公平模式。這二者的性能類似。Fifo常常能在競爭狀況下提供更高的吞吐量,可是Lifo可以在通常應用中維持更高的線程局部性。性能

雙重隊列(以及類似的棧)在任什麼時候刻都是持有「數據」--item經過put操做提供或者「請求」--slo經過take操做提供,或者爲空。一個調用試着「知足」(一個請求的調用獲得數據或者一個數據的調用匹配了請求)結果是出隊了一個模式互補的節點。最有趣的的地方在於任何操做都可以明確當前隊列處於哪一種模式,以及表現得好像不須要鎖。
this

隊列和棧都擴展了抽象的Transferer接口,它定義了惟一一個transfer方法用於put或者take。這些方法統一在一個方法下是由於在雙重數據結構中,put和take方法是對稱的兩種方法,因此幾乎全部代碼能夠被組合。結果transfer方法是比較長的,可是這樣相對於把他們分紅幾乎重複的幾部分代碼仍是更好的。

這個隊列和棧共享了不少類似的概念可是不多的具體細節。爲了簡單性,他們保持不一樣從而在後續能夠分開演化。

這個同步隊列的算法不一樣於之前的算法,包括消除的機制。主要的不一樣在於:

  • 其餘的算法使用位-掩碼的方式,可是如今的節點直接使用了模式位,從而致使了其餘的不一樣。
  • 同步隊列必須等待直到被其餘線程來「知足」。
  • 提供了超時和中斷的支持,包括從鏈表中清理節點/線程從而避免垃圾數據的持有和內存消耗。
阻塞操做主要經過LockSupport的park/unpark方法來實現,而那些極可能被下一個調用知足的節點會首先自旋必定次數(僅在多核處理器上)。在很是繁忙的同步隊列中,自旋可以顯著地提高吞吐量。可是在通常狀況下,這個自旋次數也是足夠小從而不那麼明顯。

清理操做在隊列和棧中的方式是不一樣的。隊列中,咱們幾乎使用O(1)的時間來清除被取消的節點。可是若是它正被做爲tail元素,那麼就必須等待後續的清除操做來清理它。棧中,咱們須要潛在的O(n)時間來遍歷,從而確認咱們可以清除這個節點,可是這個操做能夠和其餘線程共同做用於這個棧。

當垃圾回收關注於大部分節點的回收問題時,複雜的非阻塞算法會試着「忘記」數據、其餘節點的引用,以及線程會持有這些數據在一段長的阻塞時間內。在這種狀況下給一個節點中的值設置null將會違背主要的算法,咱們會代替使用將引用指向節點自身的方式。這個狀況不怎麼會在棧中發生(由於阻塞線程不會掛在舊的頭節點上),可是隊列中的節點引用必須積極地忘記,從而避免已經結束的節點的可達性。

實現:
實現的重點在於transfer這個方法:
  • 不能放入null元素,不然拋出NullPointerException異常。
  • 不限時版本(timed==false),返回非null爲成功插入,返回null拋出InterruptedException異常。
  • 限時版本(timed==true),返回非null爲成功插入,返回null的確狀況下,若處於中斷狀態則拋出InterruptedException異常,不然代表超時。
Transferer接口分爲queue和stack兩個版本, 咱們先分析queue版本(源碼):

queue

        /**
         * Puts or takes an item.
         */
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }


這裏的基本算法是循環試着執行下列行動之一:

  • 若是隊列是空的或者擁有和當前調用一樣的模式,試着在隊列中增長等待節點,等待這個節點被知足(或者取消),而後返回相匹配的數據。
  • 若是隊列不爲空以及隊列擁有和當前調用相補的模式,試着去經過CAS操做來改變等待節點的item域,彈出隊列,以及返回相匹配的數據。
在每一種狀況中,都會檢查而且幫助遞增head和tail域,在其餘的線程相對停滯和緩慢的狀況下。
一開始對於null的檢查是爲了不看到沒有初始化的head和tail值。在當前的SynchronousQueue中這種狀況不會發生,除非調用者使用的是非volatile/final的Transferer。
這個檢查存在的緣由是,放在循環的頂部快於將它在循環中穿插地放置。

詳情以下:
  • 取得當前調用的模式isData,進入循環,首先排除head和tail都爲null的狀況。
  • 當隊列爲空(head=tail)或者隊尾節點的模式和當前調用模式相同的狀況下:假如tail元素已經改變則更新重來,假如當前是限時版本而且時間參數不大於0則返回null,建立新節點而且以CAS方式入隊,更新tail節點,而後等待相補的調用的來臨(awaitFulfill),假如返回的數據是節點自己(說明是中斷或者超市)則作清理,並返回null,不然假如節點還在隊列中則更新head元素而且修改當前節點的item和waiter域,而後返回得到的x和e中非空的元素(此處一定是1空1非空)。
  • 不然,隊列中有相補模式的節點,則試着取得隊首元素即head.next,嘗試知足他(fulfill),假如失敗則重試,成功以後會遞增head的值,而後喚醒(unpark)等待線程以及返回相匹配的數據。


接着咱們來看阻塞操做awaitFulfill:

        /**
         * Spins/blocks until node s is fulfilled.
         *
         * @param s the waiting node
         * @param e the comparison value for checking match
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched item, or s if cancelled
        */
        Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

這裏的s是根據調用構造的節點,e爲傳遞的數據(null是請求,不然數據),timed爲限時標示,nanos時間量(納秒):

  • 首先咱們根據timed取得結束時間,假如非限時則爲0,而後取得線程對象以及自旋次數:隊首元素&非限時(maxUntimedSpins最大);隊首元素&限時(maxTimedSpins次之);不然爲0。
  • 進入循環:假如中斷則嘗試取消節點,假如item變化則返回數據,假如限時版本而且超時則嘗試取消並重試或者更新nanos,遞減自旋次數直到0----->設置節點中的線程對象----->調用非限時掛起版本或者根據閾值判斷是否調用限時掛起版本。
注意:以上循環在有相補的調用發生時老是會返回對應的數據,在被中斷或者超時處理成功狀況下會返回當前節點自己。


咱們最後看transfer方法中的清理操做:

        void clean(QNode pred, QNode s) {
            s.waiter = null; // forget thread
            /*
             * At any given time, exactly one node on list cannot be
             * deleted -- the last inserted node. To accommodate this,
             * if we cannot delete s, we save its predecessor as
             * "cleanMe", deleting the previously saved version
             * first. At least one of node s or the node previously
             * saved can always be deleted, so this always terminates.
             */
            while (pred.next == s) { // Return early if already unlinked
                QNode h = head;
                QNode hn = h.next;   // Absorb cancelled first node as head
                if (hn != null && hn.isCancelled()) {
                    advanceHead(h, hn);
                    continue;
                }
                QNode t = tail;      // Ensure consistent read for tail
                if (t == h)
                    return;
                QNode tn = t.next;
                if (t != tail)
                    continue;
                if (tn != null) {
                    advanceTail(t, tn);
                    continue;
                }
                if (s != t) {        // If not tail, try to unsplice
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn))
                        return;
                }
                QNode dp = cleanMe;
                if (dp != null) {    // Try unlinking previous cancelled node
                    QNode d = dp.next;
                    QNode dn;
                    if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       // d unspliced
                        casCleanMe(dp, null);
                    if (dp == pred)
                        return;      // s is already saved node
                } else if (casCleanMe(null, pred))
                    return;          // Postpone cleaning s
            }
        }

方法中的pred爲s入隊時的前驅,這個方法中註釋裏有以下說明:

在任一時刻,只有一個鏈表中的節點不能被刪除---最後一個插入的節點。爲了遷就這個原則,若是咱們沒法刪除s,那麼咱們就保存它的前驅節點爲「cleanMe」,首先刪除以前保存的版本。因此至少s或者以前保存的節點可以被刪除,因此最後老是可以被刪除!

詳情以下:

  • 首先既然s已經被取消,則設置它的等待者waiter爲null,進入循環(條件:pred.next爲s推斷出s不爲head,s爲head的話不須要刪除了)。
  • 取得head和tail,探測他們是否停滯,過於停滯時更新他們的值。
  • 當s節點不爲隊尾節點時候(嘗試更新前驅pred,或者當s處於OffList時返回)。
  • s節點爲尾元素:假如cleanMe不爲空,則說明有以前並未刪除的尾節點,那麼則嘗試刪除以前的cleanMe以後的節點,不然嘗試設置當前的pred爲cleanMe,等待下一次的刪除,而後返回。
這裏的難點在於: 當咱們取得cleanMe時,若是不爲空,並不瞭解是否有其餘線程進過一樣的操做,因此咱們首先要判斷d=dp.nex不爲空(若爲空則修改cleanMe而後返回)、d != dp(不然說明dp已經offList)、d.isCancelled(不然說明以前的節點已經被刪除、以及d不爲尾節點&&dn=d.next不爲空&&dn != d(不然d已經offList),以上狀況下才能夠修改dp的next爲dn從而刪除了以前存儲的待刪除尾節點,而且修改cleanMe的值從dp變爲null。假如cleanMe爲空,則能夠嘗試設置本身的pred節點,在競爭失敗的狀況下能夠重試是能夠取得進展的,由於任什麼時候候只有一個節點可以做爲tail。
因此咱們會嘗試清理以前被刪除的尾節點,以及嘗試設置本身的前驅爲cleanMe節點。

if (dp == pred)
這一行在我看來是不必的,由於永遠不可能出現。


咱們接着分析stack:

stack

        /**
         * Puts or takes an item.
         */
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {


            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }


這裏的Node實際上擁有三種狀態:REQUEST/DATA/FULFILLING,基本算法是循環試着執行下列3種行動之一:

  • 若是棧爲空或者包含相同模式的節點,試着入棧而後等待匹配,返回匹配數據,在取消時返回null。
  • 若是包含相補的節點,試着入棧一個fulfill模式的節點,匹配相對應的等待節點,彈出這兩個節點,返回匹配數據。這裏的匹配或者取消連接操做可能沒有被實際執行,由於第三種行動:
  • 若是棧頂元素爲fulfill模式的節點,嘗試幫助它執行match以及pop操做,而後再重試。這裏的代碼和fulfill的行爲幾乎相同的,只不過不返回數據。
詳情以下:
  • 取得當前調用的模式mode,REQUEST或則DATA,而後進入循環。
  • 取得棧頂元素h,當隊列爲空或者棧頂元素模式與當前模式相同時:首先排除限時調用和nanos不大於0的狀況,而後返回null。不然連接到h,並嘗試入棧,成功以後經過awaitFulfill等待相補調用的來臨,而後根據返回SNode節點是否爲s自己來判斷是否須要清理操做,假如獲取了數據那麼便協助更新head,以及根據當前模式返回數據。
  • 當前棧頂元素與當前調用模式不一樣,那麼假如當前棧頂元素模式不爲fulfill時(經過 (mode & FULFILLING) != 0來判斷),進入循環,s爲當前棧頂元素,m爲下一個待匹配元素(一定不爲null),嘗試取得mn(m.next),試着使用m.tryMatch(s),來完成m和s的匹配,而且傳遞s到了m.match(注意,這一步可能也會由另外一種狀況完成),成功以後改變head值,以及返回當前數據或者匹配數據。假如失敗(意味着最後探測到的match不爲s,惟一的場景爲等待線程中斷或者超時),則從新設置s的next值爲mn,而後重試。
  • 當前棧頂元素的模式爲fulfill時,嘗試取得棧頂元素h和next節點m,而後試着匹配m和h的值,這裏的做用幾乎和上一種狀況中相似,只不過不返回數據,以及只協助一次。

接着咱們來看阻塞操做awaitFulfill:

        /**
         * Spins/blocks until node s is matched by a fulfill operation.
         *
         * @param s the waiting node
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched node, or s if cancelled
         */
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

這個操做事實上和隊列版本中的相似,首先來解釋下注釋中的內容:當一個節點/線程試着去阻塞,它會在設置waiter域以後至少檢查一次狀態,而後纔會調用parking(阻塞),這樣子能夠經過waiter從而和它的知足者協做從而確保不會丟失信號。若是當前調用的幾點位於棧頂,那麼在park以前會首先嚐試自旋,這樣能夠在生產者和消費者很是接近時避免阻塞。可是這個只在多核處理器下才會有用。從代碼中的檢查狀況能夠看出,在優先級上,中斷狀態--->正式返回---->超時。(因此最後一個檢查是用來探測超時的)除了非限時的同步隊列。{poll/offer}方法不會檢查中斷以及等待過久,因此對於中斷和超時的判斷被放置於transfer方法中,這樣要好於調用awaitFulfill。詳情以下(與隊列版本中相似):取得當前的結束時間,當前線程,以及自旋次數。而後進入循環。首先判斷是否中斷,判斷限時版本下的時間流逝,判斷自旋,以及根據當前節點s所處的位置來設置自旋次數。設置線程對象(用於喚醒)。最後根據是否限時來阻塞當前線程,限時版本下會根據閾值來判斷是否須要阻塞。最後咱們來看處理中斷和超時狀況下的清理操做clean:

 void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread

            /*
             * At worst we may need to traverse entire stack to unlink
             * s. If there are multiple concurrent calls to clean, we
             * might not see s if another thread has already removed
             * it. But we can stop when we see any node known to
             * follow s. We use s.next unless it too is cancelled, in
             * which case we try the node one past. We don't check any
             * further because we don't want to doubly traverse just to
             * find sentinel.
             */

            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next;

            // Absorb cancelled nodes at head
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);

            // Unsplice embedded nodes
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }

這裏的s爲被取消了的節點,這裏的註釋有以下說明:

最壞狀況下咱們須要遍歷整個棧才能取消s的連接。若是有其餘的取消操做同時在進行,咱們可能看不到s,由於它已經被其餘的線程刪除了。可是咱們能夠觀察跟隨s以後的節點,若是這個節點也是取消狀態,那麼咱們會使用下一個節點,咱們不會再檢查,由於不想要遍歷兩遍僅僅是爲了找到哨兵節點。

詳情以下:
  • 設置s節點的item和waiter都爲null,由於已經不須要了,而且它的狀態能夠由match爲this來判斷。
  • 取得s的下一個節點past,假如past也是取消的,那麼再取下一節點。
  • 從頭p=head開始,逐步斷開past以前的那些被取消的節點。
  • 再從p開始刪除嵌在棧中的節點,知道棧爲空或者找到哨兵節點(past)。
相關文章
相關標籤/搜索