死磕 java集合之SynchronousQueue源碼分析

問題

(1)SynchronousQueue的實現方式?java

(2)SynchronousQueue真的是無緩衝的嗎?node

(3)SynchronousQueue在高併發情景下會有什麼問題?多線程

簡介

SynchronousQueue是java併發包下無緩衝阻塞隊列,它用來在兩個線程之間移交元素,可是它有個很大的問題,你知道是什麼嗎?請看下面的分析。併發

源碼分析

主要屬性

// CPU的數量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超時的狀況自旋多少次,當CPU數量小於2的時候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒有超時的狀況自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 針對有超時的狀況,自旋了多少次後,若是剩餘時間大於1000納秒就使用帶時間的LockSupport.parkNanos()這個方法
static final long spinForTimeoutThreshold = 1000L;
// 傳輸器,即兩個線程交換元素使用的東西
private transient volatile Transferer<E> transferer;

經過屬性咱們能夠Get到兩個點:app

(1)這個阻塞隊列裏面是會自旋的;高併發

(2)它使用了一個叫作transferer的東西來交換元素;oop

主要內部類

// Transferer抽象類,主要定義了一個transfer方法用來傳輸元素
abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}
// 以棧方式實現的Transferer
static final class TransferStack<E> extends Transferer<E> {
    // 棧中節點的幾種類型:
    // 1. 消費者(請求數據的)
    static final int REQUEST    = 0;
    // 2. 生產者(提供數據的)
    static final int DATA       = 1;
    // 3. 兩者正在匹配中
    static final int FULFILLING = 2;

    // 棧中的節點
    static final class SNode {
        // 下一個節點
        volatile SNode next;        // next node in stack
        // 匹配者
        volatile SNode match;       // the node matched to this
        // 等待着的線程
        volatile Thread waiter;     // to control park/unpark
        // 元素
        Object item;                // data; or null for REQUESTs
        // 模式,也就是節點的類型,是消費者,是生產者,仍是正在匹配中
        int mode;
    }
    // 棧的頭節點
    volatile SNode head;
}
// 以隊列方式實現的Transferer
static final class TransferQueue<E> extends Transferer<E> {
    // 隊列中的節點
    static final class QNode {
        // 下一個節點
        volatile QNode next;          // next node in queue
        // 存儲的元素
        volatile Object item;         // CAS'ed to or from null
        // 等待着的線程
        volatile Thread waiter;       // to control park/unpark
        // 是不是數據節點
        final boolean isData;
    }

    // 隊列的頭節點
    transient volatile QNode head;
    // 隊列的尾節點
    transient volatile QNode tail;
}

(1)定義了一個抽象類Transferer,裏面定義了一個傳輸元素的方法;源碼分析

(2)有兩種傳輸元素的方法,一種是棧,一種是隊列;測試

(3)棧的特色是後進先出,隊列的特色是先進行出【本篇文章由公衆號「彤哥讀源碼」原創】;this

(4)棧只須要保存一個頭節點就能夠了,由於存取元素都是操做頭節點;

(5)隊列須要保存一個頭節點一個尾節點,由於存元素操做尾節點,取元素操做頭節點;

(6)每一個節點中保存着存儲的元素、等待着的線程,以及下一個節點;

(7)棧和隊列兩種方式有什麼不一樣呢?請看下面的分析。

主要構造方法

public SynchronousQueue() {
    // 默認非公平模式
    this(false);
}

public SynchronousQueue(boolean fair) {
    // 若是是公平模式就使用隊列,若是是非公平模式就使用棧
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

(1)默認使用非公平模式,也就是棧結構;

(2)公平模式使用隊列,非公平模式使用棧;

入隊

咱們這裏主要介紹以棧方式實現的傳輸模式,以put(E e)方法爲例。

public void put(E e) throws InterruptedException {
    // 元素不可爲空
    if (e == null) throw new NullPointerException();
    // 直接調用傳輸器的transfer()方法
    // 三個參數分別是:傳輸的元素,是否須要超時,超時的時間
    if (transferer.transfer(e, false, 0) == null) {
        // 若是傳輸失敗,直接讓線程中斷並拋出中斷異常
        Thread.interrupted();
        throw new InterruptedException();
    }
}

調用transferer的transfer()方法,傳入元素e,說明是生產者

出隊

咱們這裏主要介紹以棧方式實現的傳輸模式,以take()方法爲例。

public E take() throws InterruptedException {
    // 直接調用傳輸器的transfer()方法
    // 三個參數分別是:null,是否須要超時,超時的時間
    // 第一個參數爲null表示是消費者,要取元素
    E e = transferer.transfer(null, false, 0);
    // 若是取到了元素就返回
    if (e != null)
        return e;
    // 不然讓線程中斷並拋出中斷異常
    Thread.interrupted();
    throw new InterruptedException();
}

調用transferer的transfer()方法,傳入null,說明是消費者。

transfer()方法

transfer()方法同時實現了取元素和放元素的功能,下面我再來看看這個transfer()方法裏究竟幹了什麼。

// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    // 根據e是否爲null決定是生產者仍是消費者
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋+CAS,熟悉的套路,熟悉的味道
    for (;;) {
        // 棧頂元素
        SNode h = head;
        // 棧頂沒有元素,或者棧頂元素跟當前元素是一個模式的
        // 也就是都是生產者節點或者都是消費者節點
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 若是有超時並且已到期
            if (timed && nanos <= 0) {      // can't wait
                // 若是頭節點不爲空且是取消狀態
                if (h != null && h.isCancelled())
                    // 就把頭節點彈出,並進入下一次循環
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 不然,直接返回null(超時返回null)
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 入棧成功(由於是模式相同的,因此只能入棧)
                // 調用awaitFulfill()方法自旋+阻塞當前入棧的線程並等待被匹配到
                SNode m = awaitFulfill(s, timed, nanos);
                // 若是m等於s,說明取消了,那麼就把它清除掉,並返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    // 被取消了返回null
                    return null;
                }
                
                // 到這裏說明匹配到元素了
                // 由於從awaitFulfill()裏面出來要不被取消了要不就匹配到了
                
                // 若是頭節點不爲空,而且頭節點的下一個節點是s
                // 就把頭節點換成s的下一個節點
                // 也就是把h和s都彈出了
                // 也就是把棧頂兩個元素都彈出了
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 根據當前節點的模式判斷返回m仍是s中的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 到這裏說明頭節點和當前節點模式不同
            // 若是頭節點不是正在匹配中
            
            // 若是頭節點已經取消了,就把它彈出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 頭節點沒有在匹配中,就讓當前節點先入隊,再讓他們嘗試匹配
                // 且s成爲了新的頭節點,它的狀態是正在匹配中
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    // 若是m爲null,說明除了s節點外的節點都被其它線程先一步匹配掉了
                    // 就清空棧並跳出內部循環,到外部循環再從新入棧判斷
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // 若是m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        // 返回匹配結果
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
                        // 就協助清除它
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            // 到這裏說明當前節點和頭節點模式不同
            // 且頭節點是正在匹配中
            
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                // 若是m爲null,說明m已經被其它線程先一步匹配了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                // 協助匹配,若是m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
                if (m.tryMatch(h))          // help match
                    // 將棧頂的兩個元素彈出後,再讓s從新入棧
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
                    // 就協助清除它【本篇文章由公衆號「彤哥讀源碼」原創】
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

// 三個參數:須要等待的節點,是否須要超時,超時時間
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 到期時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 當前線程
    Thread w = Thread.currentThread();
    // 自旋次數
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 當前線程中斷了,嘗試清除s
        if (w.isInterrupted())
            s.tryCancel();
        
        // 檢查s是否匹配到了元素m(有多是其它線程的m匹配到當前線程的s)
        SNode m = s.match;
        // 若是匹配到了,直接返回m
        if (m != null)
            return m;
        
        // 若是須要超時
        if (timed) {
            // 檢查超時時間若是小於0了,嘗試清除s
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            // 若是還有自旋次數,自旋次數減一,並進入下一次自旋
            spins = shouldSpin(s) ? (spins-1) : 0;
        
        // 後面的elseif都是自旋次數沒有了
        else if (s.waiter == null)
            // 若是s的waiter爲null,把當前線程注入進去,並進入下一次自旋
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // 若是不容許超時,直接阻塞,並等待被其它線程喚醒,喚醒後繼續自旋並查看是否匹配到了元素
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            // 若是容許超時且還有剩餘時間,就阻塞相應時間
            LockSupport.parkNanos(this, nanos);
    }
}

    // SNode裏面的方向,調用者m是s的下一個節點
    // 這時候m節點的線程應該是阻塞狀態的
    boolean tryMatch(SNode s) {
        // 若是m尚未匹配者,就把s做爲它的匹配者
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                // 喚醒m中的線程,二者匹配完畢
                LockSupport.unpark(w);
            }
            // 匹配到了返回true【本篇文章由公衆號「彤哥讀源碼」原創】
            return true;
        }
        // 可能其它線程先一步匹配了m,返回其是不是s
        return match == s;
    }

整個邏輯比較複雜,這裏爲了簡單起見,屏蔽掉多線程處理的細節,只描述正常業務場景下的邏輯:

(1)若是棧中沒有元素,或者棧頂元素跟將要入棧的元素模式同樣,就入棧;

(2)入棧後自旋等待一會看有沒有其它線程匹配到它,自旋完了還沒匹配到元素就阻塞等待;

(3)阻塞等待被喚醒了說明其它線程匹配到了當前的元素,就返回匹配到的元素;

(4)若是二者模式不同,且頭節點沒有在匹配中,就拿當前節點跟它匹配,匹配成功了就返回匹配到的元素;

(5)若是二者模式不同,且頭節點正在匹配中,當前線程就協助去匹配,匹配完成了再讓當前節點從新入棧從新匹配;

若是直接閱讀這部分代碼仍是比較困難的,建議寫個測試用例,打個斷點一步一步跟蹤調試。

下面是個人測試用例,能夠參考下,在IDEA中可讓斷點只阻塞線程:

public class TestSynchronousQueue {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);

        new Thread(()->{
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


        Thread.sleep(500);
        System.out.println(queue.take());
    }
}

修改斷點只阻塞線程的方法,右擊斷點,選擇Thread:

thread

交給你了

上面的源碼分析都是基於Stack的方式來分析的,那麼隊列是怎麼運行的呢?很簡單哦,測試用例中的false改爲true就能夠了,這就交給你了。

總結

(1)SynchronousQueue是java裏的無緩衝隊列,用於在兩個線程之間直接移交元素;

(2)SynchronousQueue有兩種實現方式,一種是公平(隊列)方式,一種是非公平(棧)方式;

(3)棧方式中的節點有三種模式:生產者、消費者、正在匹配中;

(4)棧方式的大體思路是若是棧頂元素跟本身同樣的模式就入棧並等待被匹配,不然就匹配,匹配到了就返回;

(5)隊列方式的大體思路是……不告訴你^^(二者的邏輯差異仍是挺大的)

彩蛋

(1)SynchronousQueue真的是無緩衝的隊列嗎?

經過源碼分析,咱們能夠發現其實SynchronousQueue內部或者使用棧或者使用隊列來存儲包含線程和元素值的節點,若是同一個模式的節點過多的話,它們都會存儲進來,且都會阻塞着,因此,嚴格上來講,SynchronousQueue並不能算是一個無緩衝隊列。

(2)SynchronousQueue有什麼缺點呢?

試想一下,若是有多個生產者,但只有一個消費者,若是消費者處理不過來,是否是生產者都會阻塞起來?反之亦然。

這是一件很危險的事,因此,SynchronousQueue通常用於生產、消費的速度大體至關的狀況,這樣纔不會致使系統中過多的線程處於阻塞狀態。


歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode

相關文章
相關標籤/搜索