JUC源碼分析-集合篇(九)SynchronousQueue

JUC源碼分析-集合篇(九)SynchronousQueue

SynchronousQueue 是一個同步阻塞隊列,它的每一個插入操做都要等待其餘線程相應的移除操做,反之亦然。SynchronousQueue 像是生產者和消費者的會合通道,它比較適合「切換」或「傳遞」這種場景:一個線程必須同步等待另一個線程把相關信息/時間/任務傳遞給它。java

SynchronousQueue(後面稱SQ)內部沒有容量,因此不能經過 peek 方法獲取頭部元素;也不能單獨插入元素,能夠簡單理解爲它的插入和移除是「一對」對稱的操做。爲了兼容 Collection 的某些操做(例如contains),SQ 扮演了一個空集合的角色。node

SQ 的一個典型應用場景是在線程池中,Executors.newCachedThreadPool() 就使用了它,這個構造使線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了 60s 後會被回收。算法

1. 實現本身的 SQ

SQ 實現原理參考:http://ifeve.com/java-synchronousqueue/數據結構

1.1 阻塞算法實現

阻塞算法實現一般在內部採用一個鎖來保證多個線程中的 put() 和 take() 方法是串行執行的。採用鎖的開銷是比較大的,還會存在一種狀況是線程 A 持有線程 B 須要的鎖,B 必須一直等待 A 釋放鎖,即便 A 可能一段時間內由於 B 的優先級比較高而得不到時間片運行。因此在高性能的應用中咱們經常但願規避鎖的使用。併發

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e==null) return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item!=null)
            wait();
        putting = false;
        notifyAll();
    }
}

1.2 信號量實現

經典同步隊列實現採用了三個信號量,代碼很簡單,比較容易理解:ide

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);

    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }

    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

2. SQ 源碼分析

SQ 爲等待過程當中的生產者或消費者線程提供可選的公平策略(默認非公平模式)。非公平模式經過棧(LIFO)實現,公平模式經過隊列(FIFO)實現。使用的數據結構是雙重隊列(Dual queue)和雙重棧(Dual stack)。FIFO 一般用於支持更高的吞吐量,LIFO 則支持更高的線程局部存儲(TLS)。源碼分析

// 生產者
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 消費者
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

put 和 take 都是直接委託 transferer 完成的。本節以公平式 TransferQueue 爲例分析 JDK8 的實現原理。性能

2.1 TransferQueue 數據結構

TransferQueue數據結構

以上是 TransferQueue 的大體結構,能夠看到 TransferQueue 是一個普通的隊列,同時存在一個指向隊列頭部的指針 head,和一個指向隊列尾部的指針 tail;cleanMe 的存在主要是解決不可清楚隊列的尾節點的問題;隊列的節點經過內部類 QNode 封裝,QNode 是一個單鏈表結構,包含四個變量:優化

static final class QNode {
    volatile Object item;         // 節點包含的數據,非空表示生產者,空者是消費者
    final boolean isData;         // 表示該節點由生產者建立仍是由消費者建立,生產者true,消費者false  
    volatile Thread waiter;       // 等待在該節點上的線程。to control park/unpark
    volatile QNode next;          // 指向隊列中的下一個節點
}

2.2 SQ 阻塞算法

SQ 的阻塞算法能夠歸結爲如下幾點:ui

(1) 雙重隊列

和典型的單向鏈表結構不一樣,SQ 使用了雙重隊列(Dual queue)和雙重棧(Dual stack)存儲數據,隊列中的每一個節點均可以是一個生產者或是消費者。

在消費者獲取元素時,若是隊列爲空,當前消費者就會做爲一個「元素爲null」的節點被放入隊列中等待,因此 QNode 中 的節點存儲了生產者節點(item!=null & isData=true)和消費者節點(item=null & isData=false),這兩種節點就是經過 isData 來區分的。但同一時間鏈表中要麼全是生產者,要麼全是消費者。

(2) 節點匹配

節點命中後修改 item 的狀態,已取消節點引用指向自身,避免垃圾保留和內存損耗。經過自旋和 LockSupport 的 park/unpark 實現阻塞,在高爭用環境下,自旋能夠顯著提升吞吐量。

  • 當隊列中已經有生產者(item!=null & isData=true)線程時,若是有一個消費者過來,這裏會讓隊列中的生產者節點出隊,並修改節點狀態爲 (item=null & next=this & isData=true)
  • 當隊列中已經有消費者(item=null & isData=false)線程時,若是有一個生產者過來,這裏會讓隊列中的消費者節點出隊,並修改節點狀態爲 (item!=null & next=this & isData=false)
  • 若是隊列中的節點取消或超時了,修改節點的狀態爲 item=this

若是全是生產者線程,當消費者線程調用 take 時會匹配鏈表中的元素,將第一個生產者線程節點 node 出隊,也就是 transfer 的過程。數據從一個線程 transfer 到另外一個線程,同時修改該節點 node 的狀態。若是全是消費者線程亦然。

TransferQueue隊列

以生產者線程入隊爲例:

  1. 默認 head=tail 都指向同一個空節點。head 節點永遠都是一個哨兵節點,真正的數據節點是從下一個節點開始。tail 節點永遠指向真正的尾節點。
  2. 當隊列爲空或隊列中所有是生產者線程時,即沒法當即匹配時,節點入到隊尾。入到隊列後,不會馬上將該線程掛起,線程會自旋必定的次數後,始終無消費者線程過來,則掛起線程。
  3. 當生產者線程過來後,發現隊列中生產者線程在等待,則消費者線程直接和隊列中的頭節點進行匹配,將 head.next.item 置爲 null。
    這時生產者線程若是是在自旋,則會發現 item 已屬性改變,直接退出自旋,繼續執行。
    若是生產者線程已經被掛起,則消費者線程會喚醒該生產者線程。

下面主要分析 TransferQueue 的三個重要方法:transfer、awaitFulfill、clean。這三個方法是 TransferQueue 的核心,入口是 transfer(),下面具體看代碼。

2.3 transfer

// 生產者e!=null,消費者e=null。timed=true表示超時等待,不然無限等待
E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 1.1 h==t 表示尚未節點入隊
        // 1.2 isData==isData 表示該隊列中的等待的線程與當前線程是相同模式
        //     (同爲生產者,或者同爲消費者,隊列中只存在一種模式的線程)
        // 總之只有生產者或只有消費者時,須要將該線程插入到隊列中進行等待
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // 其它線程修改了尾節點,continue
                continue;
            if (tn != null) {               // 其它線程有節點入隊,幫助其它線程修改尾節點 tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            if (s == null)                  // 僅初始化一次s,經過區分isData生產者和消費者
                s = new QNode(e, isData);

            // 2. 最重要的一步,上面判斷了這麼多數據不一致的狀況,最終完成節點入隊,失敗重試。
            //    其實上面兩個 continue 不執行也沒有關係,大不了在這一步失敗後重試
            //    t 若是不是尾節點 next 確定不爲空。要麼指定本身(失效),要麼指向下一個節點。
            if (!t.casNext(null, s))        // failed to link in
                continue;
            // 執行失敗沒有關係,會有其餘線程幫忙執行完成的 ok
            advanceTail(t, s);   // swing tail and wait

            // 3. 等待其它線程匹配。二種狀況:一是匹配完成,返回數據;二是等待超時/取消,返回原節點s
            Object x = awaitFulfill(s, e, timed, nanos);
            // 3.1 等待超時/取消,返回原節點s
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }
            // 3.2 匹配成功了,可是還須要將該節點從隊列中移除
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        // 4. 若是隊列已有線程在等待,直接進行匹配便可
        } else {                            // complementary-mode
            // 進行匹配,從隊列的頭部開始,即head.next
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            // 5.1 前面已經說過匹配成功會修改 item,併發時可能頭節點已經匹配過了
            //     isData == (x != null) 相等則說明 m 已經匹配過了,由於正常狀況是不相等纔對
            // 5.2 x==m 說明 m 被取消了,見 QNode#tryCancel()
            // 5.3 CAS失敗說明 m 已經被其餘線程匹配了,因此將其出隊,而後 retry
            //     CAS設置m.item爲e,這裏的e,若是是生產者則是數據,消費者則是null,
            //     因此m若是是生產者,則item變爲null,消費者則變爲生產者的數據
            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            // 6. 與m匹配成功,將m出隊,並喚醒等待在m上的線程m.waiter
            //    同上,失敗則說明有其它線程修改了頭節點 ok
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

從上面的代碼能夠看出 TransferQueue.transfer() 的總體流程:

  1. 判斷當前隊列是否爲空或者隊尾節點(頭節點可能被修改)線程是否與當前線程匹配,隊列爲空或者不匹配都將進行入隊操做
  2. 入隊分紅兩步:修改 tail.next 爲新節點,同時修改 tail 爲新節點,這兩步操做有可能分在兩個不一樣的線程執行,不過不影響執行結果
  3. 入隊以後須要將當前線程阻塞,調用 LockSupport.park() 方法,直到打斷/超時/被匹配的線程喚醒。若是 ①打斷/超時,返回節點自己;②匹配成功,返回隊列中節點的數據,若是是隊列中是生產者線程則返回數據自己,不然返回 null
  4. 若是被取消,則須要調用 clean() 方法進行清除
  5. 因爲 FIFO,因此匹配老是發生在隊列的頭部,修改節點的 item 屬性傳遞數據,同時喚醒等待在節點上的線程
// advanceHead 更新頭節點並將失效的頭節點踢出隊列(h.next = h)
void advanceHead(QNode h, QNode nh) {
    if (h == head &&
        UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}

2.4 等待匹配 awaitFulfill

/**
 *  等待匹配,該方法會進入阻塞,直到三種狀況下才返回:
 *  a. 超時被取消了,返回值爲 s
 *  b. 匹配上了,返回另外一個線程傳過來的值
 *  c. 線程被打斷,會取消,返回值爲 s
 */
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 1. 自旋鎖次數,若是不是隊列的第一個元素則不自旋,由於壓根輪不上他,自旋只是浪費 CPU
    //    若是等待的話則自旋的次數少些,不等待就多些
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 2. 支持打斷
        if (w.isInterrupted())
            s.tryCancel(e);

        // 3. 若是s的item不等於e,有如下二種狀況,不論是哪一種狀況都不要再等待了,返回便可
        //    a. 超時或線程被打斷了,此時x==s
        //    b. 匹配上了,此時x==另外一個線程傳過來的值
        Object x = s.item;
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)      // 自旋,直到spins==0,進入等待
            --spins;
        else if (s.waiter == null)
            s.waiter = w;  // 設置等待線程才能被喚醒
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

awaitFulfill() 主要涉及自旋以及 LockSupport.park() 兩個關鍵點,自旋可去了解自旋鎖的原理。

自旋鎖原理:經過空循環則霸佔着 CPU,避免當前線程進入睡眠,由於睡眠/喚醒是須要進行線程上下文切換的,因此若是線程睡眠的時間很段,那麼使用空循環可以避免線程進入睡眠的耗時,從而快速響應。可是因爲空循環會浪費 CPU,因此也不能一直循環。自旋鎖通常適合同步快很小,競爭不是很激烈的場景。

java 中大量運用了這樣的技術。凡有阻塞的操做都會這樣作,包括內置鎖在內,內置鎖其實也是這樣的,內置鎖分爲偏向鎖,輕量級鎖和重量級鎖,其中輕量級鎖其實就是自旋來替代阻塞。

固然須要自旋多長時間。這是一個根據不一樣狀況來設定的值並無一個準確的結論,一般來講競爭越激烈這樣多自旋一段時間老是好的,效果也越明顯,可是自旋時間過長會浪費 cpu 時間因此,設定時間仍是一個很依靠經驗的值。

在這裏實際上是這樣作的,首先看一下當前 cpu 的數量,NCPUS 而後分兩種狀況一種是設定了時間限的自旋時間。若是設定了時間限則使用 maxTimedSpins,若是 NCPUS 數量大於等於 2 則設定爲爲 32 不然爲 0,既一個 CPU 時不自旋;這是顯然了,由於惟一的 cpu 在自旋顯然不能進行其餘操做來知足條件。 若是沒有設定時間限則使用 maxUntimedSpins,若是 NCPUS 數量大於等於 2 則設定爲爲 32 * 16,不然爲 0;

另外還有一個參數 spinForTimeoutThreshold 這個參數是爲了防止自定義的時間限過長,而設置的,若是設置的時間限長於這個值則取這個 spinForTimeoutThreshold 爲時間限。這是爲了優化而考慮的。這個的單位爲納秒。

2.5 節點清除 clean

大概總結一下 clean 方法在作什麼?。

首先,這裏的隊列實際上是單向鏈表。因此只能設置後繼的節點而不能設置前向的節點,這會產生一個問題,就是加入隊列尾的節點失效了要刪除怎麼辦?咱們沒辦法引用隊列尾部倒數第二個節點。因此這裏採用了一個方法就是講當前的尾結點保存問 cleanMe 節點,這樣在下次再次清除的時候一般 cleanMe 一般就不是尾結點了,這樣就能夠刪除了。也就是每次調用的時候刪除的實際上是上次須要結束的節點。更多關於清除節點 clean

參考:

  1. 《JUC源碼分析-集合篇(八):SynchronousQueue》:https://www.jianshu.com/p/c4855acb57ec
  2. 《源碼分析-SynchronousQueue》:https://blog.csdn.net/u011518120/article/details/53906484

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索