Java多線程進階(三五)—— J.U.C之collections框架:SynchronousQueue

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、SynchronousQueue簡介

SynchronousQueue是JDK1.5時,隨着J.U.C包一塊兒引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於隊列實現:html

clipboard.png

沒有看錯,SynchronousQueue的底層實現包含兩種數據結構——隊列。這是一種很是特殊的阻塞隊列,它的特色簡要歸納以下:java

  1. 入隊線程和出隊線程必須一一匹配,不然任意先到達的線程會阻塞。好比ThreadA進行入隊操做,在有其它線程執行出隊操做以前,ThreadA會一直等待,反之亦然;
  2. SynchronousQueue內部不保存任何元素,也就是說它的容量爲0,數據直接在配對的生產者和消費者線程之間傳遞,不會將數據緩衝到隊列中。
  3. SynchronousQueue支持公平/非公平策略。其中非公平模式,基於內部數據結構——「棧」來實現,公平模式,基於內部數據結構——「隊列」來實現;
  4. SynchronousQueue基於一種名爲「Dual stack and Dual queue」的無鎖算法實現。
注意:上述的特色1,和咱們以前介紹的Exchanger其實很是類似,能夠類比Exchanger的功能來理解。

2、SynchronousQueue原理

構造

以前提到,SynchronousQueue根據公平/非公平訪問策略的不一樣,內部使用了兩種不一樣的數據結構:棧和隊列。咱們先來看下對象的構造,SynchronousQueue只有2種構造器:node

/**
 * 默認構造器.
 * 默認使用非公平策略.
 */
public SynchronousQueue() {
    this(false);
}
/**
 * 指定策略的構造器.
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

能夠看到,對於公平策略,內部構造了一個TransferQueue對象,而非公平策略則是構造了TransferStack對象。這兩個類都繼承了內部類Transferer,SynchronousQueue中的全部方法,其實都是委託調用了TransferQueue/TransferStack的方法:算法

public class SynchronousQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
 
    /**
     * tranferer對象, 構造時根據策略類型肯定.
     */
    private transient volatile Transferer<E> transferer;
 
    /**
     * Shared internal API for dual stacks and queues.
     */
    abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         *
         * @param e 非null表示 生產者 -> 消費者;
         *          null表示, 消費者 -> 生產者.
         * @return 非null表示傳遞的數據; null表示傳遞失敗(超時或中斷).
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }
 
    /**
     * Dual stack(雙棧結構).
     * 非公平策略時使用.
     */
    static final class TransferStack<E> extends Transferer<E> {
        // ...
    }
 
    /**
     * Dual Queue(雙端隊列).
     * 公平策略時使用.
     */
    static final class TransferQueue<E> extends Transferer<E> {
        // ...
    }
 
    // ...
}

棧結構

非公平策略由TransferStack類實現,既然TransferStack是棧,那就有結點。TransferStack內部定義了名爲SNode的結點:segmentfault

static final class SNode {
    volatile SNode next;
    volatile SNode match;       // 與當前結點配對的結點
    volatile Thread waiter;     // 當前結點對應的線程
    Object item;                // 實際數據或null
    int mode;                   // 結點類型
 
    SNode(Object item) {
        this.item = item;
    }
  
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;
 
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    // ...

}

上述SNode結點的定義中有個mode字段,表示結點的類型。TransferStack一共定義了三種結點類型,任何線程對TransferStack的操做都會建立下述三種類型的某種結點:性能優化

  • REQUEST:表示未配對的消費者(當線程進行出隊操做時,會建立一個mode值爲REQUEST的SNode結點 )
  • DATA:表示未配對的生產者(當線程進行入隊操做時,會建立一個mode值爲DATA的SNode結點 )
  • FULFILLING:表示配對成功的消費者/生產者
static final class TransferStack<E> extends Transferer<E> {
 
    /**
     * 未配對的消費者
     */
    static final int REQUEST = 0;
    /**
     * 未配對的生產者
     */
    static final int DATA = 1;
    /**
     * 配對成功的消費者/生產者
     */
    static final int FULFILLING = 2;
 
     volatile SNode head;
 
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
 
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    // ...
}

核心操做——put/take

SynchronousQueue的入隊操做調用了put方法:數據結構

/**
 * 入隊指定元素e.
 * 若是沒有另外一個線程進行出隊操做, 則阻塞該入隊線程.
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

SynchronousQueue的出隊操做調用了take方法:併發

/**
 * 出隊一個元素.
 * 若是沒有另外一個線程進行出隊操做, 則阻塞該入隊線程.
 */
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

能夠看到,SynchronousQueue同樣不支持null元素,實際的入隊/出隊操做都是委託給了transfer方法,該方法返回null表示出/入隊失敗(一般是線程被中斷或超時):框架

/**
 * 入隊/出隊一個元素.
 */
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // s表示新建立的結點
    // 入參e==null, 說明當前是出隊線程(消費者), 不然是入隊線程(生產者)
    // 入隊線程建立一個DATA結點, 出隊線程建立一個REQUEST結點
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {    // 自旋
        SNode h = head;
        if (h == null || h.mode == mode) {          // CASE1: 棧爲空 或 棧頂結點類型與當前mode相同
            if (timed && nanos <= 0) {              // case1.1: 限時等待的狀況
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 將當前結點壓入棧
                SNode m = awaitFulfill(s, timed, nanos);        // 阻塞當前調用線程
                if (m == s) {                                   // 阻塞過程當中被中斷
                    clean(s);
                    return null;
                }

                // 此時m爲配對結點
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);

                // 入隊線程null, 出隊線程返回配對結點的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 執行到此處說明入棧失敗(多個線程同時入棧致使CAS操做head失敗),則進入下一次自旋繼續執行

        } else if (!isFulfilling(h.mode)) {          // CASE2: 棧頂結點還未配對成功
            if (h.isCancelled())                     // case2.1: 元素取消狀況(因中斷或超時)的處理
                casHead(h, h.next);
            else if (casHead(h, s = snode(s, e,
                h, FULFILLING | mode))) {      // case2.2: 將當前結點壓入棧中
                for (; ; ) {
                    SNode m = s.next;       // s.next指向原棧頂結點(也就是與當前結點匹配的結點)
                    if (m == null) {        // m==null說明被其它線程搶先匹配了, 則跳出循環, 從新下一次自旋
                        casHead(s, null);
                        s = null;
                        break;
                    }

                    SNode mn = m.next;
                    if (m.tryMatch(s)) {    // 進行結點匹配
                        casHead(s, mn);     // 匹配成功, 將匹配的兩個結點所有彈出棧
                        return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
                    } else                  // 匹配失敗
                        s.casNext(m, mn);   // 移除原待匹配結點
                }
            }
        } else {                            // CASE3: 其它線程正在匹配
            SNode m = h.next;
            if (m == null)                  // 棧頂的next==null, 則直接彈出, 從新進入下一次自旋
                casHead(h, null);
            else {                          // 嘗試和其它線程競爭匹配
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);         // 匹配成功
                else
                    h.casNext(m, mn);       // 匹配失敗(被其它線程搶先匹配成功了)
            }
        }
    }
}

整個transfer方法考慮了限時等待的狀況,且入隊/出隊其實都是調用了同一個方法,其主幹邏輯就是在一個自旋中完成如下三種狀況之一的操做,直到成功,或者被中斷或超時取消:性能

  1. 棧爲空,或棧頂結點類型與當前入隊結點相同。這種狀況,調用線程會阻塞;
  2. 棧頂結點還未配對成功,且與當前入隊結點能夠配對。這種狀況,直接進行配對操做;
  3. 棧頂結點正在配對中。這種狀況,直接進行下一個結點的配對。

出/入隊示例講解

爲了便於理解,咱們來看下面這個調用示例(假設不考慮限時等待的狀況),假設一共有三個線程ThreadA、ThreadB、ThreadC:

①初始棧結構

初始棧爲空,head爲棧頂指針,始終指向棧頂結點:

clipboard.png

②ThreadA(生產者)執行入隊操做

因爲此時棧爲空,因此ThreadA會進入CASE1,建立一個類型爲DATA的結點:

if (h == null || h.mode == mode) {          // CASE1: 棧爲空 或 棧頂結點類型與當前mode相同
    if (timed && nanos <= 0) {              // case1.1: 限時等待的狀況
        if (h != null && h.isCancelled())
            casHead(h, h.next);
        else
            return null;
    } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 將當前結點壓入棧
        SNode m = awaitFulfill(s, timed, nanos);        // 阻塞當前調用線程
        if (m == s) {                                   // 阻塞過程當中被中斷
            clean(s);
            return null;
        }

        // 此時m爲配對結點
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);

        // 入隊線程null, 出隊線程返回配對結點的值
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }
    // 執行到此處說明入棧失敗(多個線程同時入棧致使CAS操做head失敗),則進入下一次自旋繼續執行
}

CASE1分支中,將結點壓入棧後,會調用awaitFulfill方法,該方法會阻塞調用線程:

/**
 * 阻塞當前調用線程, 並將線程信息記錄在s.waiter字段上.
 *
 * @param s 等待的結點
 * @return 返回配對的結點 或 當前結點(說明線程被中斷了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能優化操做(計算自旋次數)
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存當前結點的匹配結點.
         * s.match==null說明尚未匹配結點
         * s.match==s說明當前結點s對應的線程被中斷了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 尚未匹配結點, 則保存當前線程
            s.waiter = w;           // s.waiter保存當前阻塞線程
        else if (!timed)
            LockSupport.park(this); // 阻塞當前線程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

此時棧結構以下,結點的waiter字段保存着建立該結點的線程ThreadA,ThreadA等待着被配對消費者線程喚醒:
clipboard.png

③ThreadB(生產者)執行入隊操做

此時棧頂結點的類型和ThreadB建立的結點相同(都是DATA類型的結點),因此依然走CASE1分支,直接將結點壓入棧:
clipboard.png

④ThreadC(消費者)執行出隊操做

此時棧頂結點的類型和ThreadC建立的結點匹配(棧頂DATA類型,ThreadC建立的是REQUEST類型),因此走CASE2分支,該分支會將匹配的兩個結點彈出棧:

else if (!isFulfilling(h.mode)) {          // CASE2: 棧頂結點還未配對成功
    if (h.isCancelled())                     // case2.1: 元素取消狀況(因中斷或超時)的處理
        casHead(h, h.next);
    else if (casHead(h, s = snode(s, e,
        h, FULFILLING | mode))) {      // case2.2: 將當前結點壓入棧中
        for (; ; ) {
            SNode m = s.next;       // s.next指向原棧頂結點(也就是與當前結點匹配的結點)
            if (m == null) {        // m==null說明被其它線程搶先匹配了, 則跳出循環, 從新下一次自旋
                casHead(s, null);
                s = null;
                break;
            }

            SNode mn = m.next;
            if (m.tryMatch(s)) {    // 進行結點匹配
                casHead(s, mn);     // 匹配成功, 將匹配的兩個結點所有彈出棧
                return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
            } else                  // 匹配失敗
                s.casNext(m, mn);   // 移除原待匹配結點
        }
    }
}

上述isFulfilling方法就是判斷結點是否匹配:

/**
 * 判斷m是否已經配對成功.
 */
static boolean isFulfilling(int m) {
    return (m & FULFILLING) != 0;
}

ThreadC建立結點並壓入棧後,棧的結構以下:
clipboard.png

此時,ThreadC會調用tryMatch方法進行匹配,該方法的主要做用有兩點:

  1. 將待結點的match字段置爲與當前配對的結點(如上圖中,結點m是待配對結點,最終m.math == s
  2. 喚醒待配對結點中的線程(如上圖中,喚醒結點m中ThreadB線程)
/**
 * 嘗試將當前結點和s結點配對.
 */
boolean tryMatch(SNode s) {
    if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // 喚醒當前結點對應的線程
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;      // 配對成功返回true
}

匹配完成後,會將匹配的兩個結點彈出棧,並返回匹配值:

if (m.tryMatch(s)) {    // 進行結點匹配
    casHead(s, mn);     // 匹配成功, 將匹配的兩個結點所有彈出棧
    return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
}

最終,ThreadC拿到了等待配對結點中的數據並返回,此時棧的結構以下:
clipboard.png

注意: CASE2分支中ThreadC建立的結點的mode值並非REQUEST,其mode值爲 FULFILLING | modeFULFILLING | mode的主要做用就是給棧頂結點置一個標識(二進制爲11或10), 表示當前有線程正在對棧頂匹配,這時若是有其它線程進入自旋(併發狀況),則CASE2必定失敗,由於 isFulfilling的結果必然爲true,因此會進入 CASE3分支——跳過棧頂結點進行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))

⑤ThreadB(生產者)喚醒後繼續執行

ThreadB被喚醒後,會從原阻塞處繼續執行,並進入下一次自旋,在下一次自旋中,因爲結點的match字段已經有了匹配結點,因此直接返回配對結點:

/**
 * 阻塞當前調用線程, 並將線程信息記錄在s.waiter字段上.
 *
 * @param s 等待的結點
 * @return 返回配對的結點 或 當前結點(說明線程被中斷了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能優化操做(計算自旋次數)
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存當前結點的匹配結點.
         * s.match==null說明尚未匹配結點
         * s.match==s說明當前結點s對應的線程被中斷了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 尚未匹配結點, 則保存當前線程
            s.waiter = w;           // s.waiter保存當前阻塞線程
        else if (!timed)
            LockSupport.park(this); // 阻塞當前線程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

最終,在下面分支中返回:

else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 將當前結點壓入棧
    SNode m = awaitFulfill(s, timed, nanos);        // 阻塞當前調用線程
    if (m == s) {                                   // 阻塞過程當中被中斷
        clean(s);
        return null;
    }

    // 此時m爲配對結點
    if ((h = head) != null && h.next == s)
        casHead(h, s.next);

    // 入隊線程null, 出隊線程返回配對結點的值
    return (E) ((mode == REQUEST) ? m.item : s.item);
}
注意:對於 入隊線程(生產者),返回的是它入隊時攜帶的 原有元素值。

隊列結構

SynchronousQueue的公平策略由TransferQueue類實現,TransferQueue內部定義了名爲QNode的結點,一個head隊首指針,一個tail隊尾指針:

/**
 * Dual Queue(雙端隊列).
 * 公平策略時使用.
 */
static final class TransferQueue<E> extends Transferer<E> {

    /**
     * Head of queue
     */
    transient volatile QNode head;
    /**
     * Tail of queue
     */
    transient volatile QNode tail;
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it was cancelled.
     */
    transient volatile QNode cleanMe;

    /**
     * 隊列結點定義.
     */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;
        // ...
    }
    
    // ...
}
關於TransferQueue的transfer方法就再也不贅述了,其思路和TransferStack大體相同,總之就是入隊/出隊必須一一匹配,不然任意一方就會加入隊列並等待匹配線程喚醒。讀者能夠自行閱讀TransferQueued的源碼。

3、總結

TransferQueue主要用於線程之間的數據交換,因爲採用無鎖算法,其性能通常比單純的其它阻塞隊列要高。它的最大特色時不存儲實際元素,而是在內部經過棧或隊列結構保存阻塞線程。後面咱們講JUC線程池框架的時候,還會再次看到它的身影。

相關文章
相關標籤/搜索