Java併發——阻塞隊列集(下)

SynchronousQueue

接着上集繼續,SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素,因此其peek()方法始終返回null,沒有數據緩存空間。SynchronousQueue支持公平與非公平訪問,默認採用非公平性策略訪問隊列。java

構造方法

public SynchronousQueue() {
        this(false);
    }
    
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }
複製代碼

相對於ArrayBlockingQueue利用ReentrantLock實現公平與非公平,而SynchronousQueue利用TransferQueue、TransferStack實現公平與非公平,從命名上來看前者隊列,後者棧,SynchronousQueue的入隊、出隊操做都是基於transfer來實現,ctrl+alt+h查看方法調用node

TransferQueue

TransferQueue內部定義以下編程

// 頭節點
    transient volatile QNode head;
    // 尾節點
    transient volatile QNode tail;
    // 指向一個可能還未出隊被取消的節點,由於它在被取消時是最後一個插入節點
    transient volatile QNode cleanMe;

    // 默認構造函數,建立一個假節點
    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }
    
    static final class QNode {
        // 後繼節點
        volatile QNode next;
        // item數據
        volatile Object item;
        // 用來控制阻塞或喚醒
        volatile Thread waiter;       // to control park/unpark
        // 是不是生產者
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }
        ...
    }    
    ...
複製代碼

公平策略

E transfer(E e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            // 判斷是不是生產者,true爲生產者,false爲消費者
            boolean isData = (e != null);
            // 死循環
            for (;;) {
                // 獲取尾節點
                QNode t = tail;
                // 獲取頭節點
                QNode h = head;
                // 若尾節點或尾節點爲空則跳出本次循序
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
                // 若TransferQueue爲空或當前節點與尾節點模式同樣
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    // 若t不是尾節點代表已有其餘線程操做過,跳出本次循環從新來
                    if (t != tail)                  // inconsistent read
                        continue;
                    // 若以前獲取的尾節點後繼不爲空代表已有其餘線程添加過節點
                    if (tn != null) {               // lagging tail
                        // CAS將tn置爲尾節點
                        advanceTail(t, tn);
                        continue;
                    }
                    // 若採用了時限模式且超時,直接返回null
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    // 若s爲null,構建一個新節點    
                    if (s == null)
                        s = new QNode(e, isData);
                    // CAS將新節點加入隊列中,若失敗從新來
                    if (!t.casNext(null, s))        // failed to link in
                        continue;
                    // CAS將新節點s置爲尾節點
                    advanceTail(t, s);              // swing tail and wait
                    // 自旋獲取匹配item
                    Object x = awaitFulfill(s, e, timed, nanos);
                    // 若x==s代表線程獲取匹配項時,超時或者被中斷,清除節點s
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }
                    // 判斷節點s是否已經出隊
                    if (!s.isOffList()) {           // not already unlinked
                        // CAS將節點s置爲head,移出隊列
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;
                }
                // else分支下述
            }
        }
複製代碼

咱們假定有線程A、B在put操做,線程C在take操做,當前TransferQueue初始化以下:數組

線程A添加元素A,head=tail走第一個分支,由於沒有采用鎖機制,因此可能會有其餘線程搶先操做,其採用各類判斷以及CAS來判斷是否有其餘線程操做過,添加完尾結點後,會調用awaitFulfill方法,其做用是自旋尋找匹配節點,若超過自旋次數此線程會阻塞,線程被中斷或採用時限模式時獲取超時這次操做會被取消。

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            // 獲取最後期限
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 獲取當前線程
            Thread w = Thread.currentThread();
            // 獲取自旋次數,若新節點s爲頭節點後繼節點才能自旋
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                // 判斷當前線程是否被中斷
                if (w.isInterrupted())
                    // 取消當前節點,cas將item置爲this
                    s.tryCancel(e);
                // 獲取節點s的item
                Object x = s.item;
                // 若線程中斷,節點s的item與x會不相等,直接返回x
                if (x != e)
                    return x;
                // 若採用了時限模式
                if (timed) {
                    // 計算剩餘時間
                    nanos = deadline - System.nanoTime();
                    // 若超時,取消節點
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                // 若還有自旋次數,自旋-1
                if (spins > 0)
                    --spins;
                // 若等待線程爲null,將節點s的等待線程置爲當前線程
                else if (s.waiter == null)
                    s.waiter = w;
                // 若沒有采用時限模式則調用LockSupport.park()直接阻塞線程
                else if (!timed)
                    LockSupport.park(this);
                // 若剩餘時間超過自旋時間閾值則指定時間阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
        
        void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
        }
複製代碼

從源碼中能夠看到只有頭節點後繼才能自旋,線程A自旋一段時間匹配節點,若自旋次數用光會一直阻塞,因此每個線程只有匹配到節點後或者因超時、中斷被取消才能繼續添加元素
緩存

線程A自旋,線程B接着put安全

那麼何時才匹配到呢?在開頭咱們提到每個put操做必須等待一個take操做,這時其餘線程take(),e爲null,isData爲false,與尾節點的isData屬性不一樣,走進else分支,先獲取頭節點的後繼節點數據,若沒有其餘線程搶先操做,且put操做未被取消,m.casItem(x, e)數據替換,將節點m的item屬性置爲null,若CAS替換成功代表匹配成功,在put自旋時會用item與e比較,take()將item置爲null,不相等返回null
併發

else {                            // complementary-mode
                    // 獲取頭節點後繼
                    QNode m = h.next;               // node to fulfill
                    // 若t不是尾節點或者m爲null或者h不是頭節點,即已有其餘線程搶先操做過
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read
                     
                    Object x = m.item;
                    if (isData == (x != null) ||    // 節點已被操做過
                        x == m ||                   // 節點被取消
                        !m.casItem(x, e)) {         // lost CAS
                        // CAS將m置爲頭節點,重來
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
                    // 若走這裏,代表匹配成功
                    advanceHead(h, m);              // successfully fulfilled
                    // 喚醒m的等待線程
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
複製代碼

TransferStack

TransferStack內部定義以下app

// 未執行的消費者
    static final int REQUEST    = 0;
    // 未執行的生產者
    static final int DATA       = 1;
    // 線程正在匹配節點
    static final int FULFILLING = 2;
    volatile SNode head;
    
    static final class SNode {
        volatile SNode next;        // next node in stack
        volatile SNode match;       // the node matched to this
        volatile Thread waiter;     // to control park/unpark
        Object item;                // data; or null for REQUESTs
        int mode;
        ...
    }
    ...
複製代碼

TransferStack相對於TransferQueue中的節點,其數據項item與模式mode不須要用volatile修飾,由於它們老是寫在前讀在後less

非公平模式

E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
            // REQUEST:消費者;DATA:生產者
            int mode = (e == null) ? REQUEST : DATA;
            for (;;) {
                SNode h = head;
                // 若棧爲空或者新增元素模式與首元素模式相同
                if (h == null || h.mode == mode) {  // empty or same-mode
                    // 超時
                    if (timed && nanos <= 0) {      // can't wait
                        // 若節點被取消,將取消節點出隊,從新來
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    //若不採用限時或者未超時,建立節點CAS將其置爲頭節點,s→h     
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        // 自旋匹配
                        SNode m = awaitFulfill(s, timed, nanos);
                        // 若m==s代表節點被取消
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                // 其他分支下述    
            }
        }
複製代碼

依然模擬場景,假定如今線程A、B在put,線程C、D在take。
線程A進行put新增元素A,CAS頭插元素A,調用awaitFulfill()自旋匹配,注意只有頭節點、空棧或者協助節點才能自旋,每次自旋都會進行條件判斷,爲了
dom

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            // 最後期限
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            // 自旋次數
            // 若棧爲空、節點爲首結點或者該節點模式爲FULFILLING才能自旋
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                // 若線程中斷,取消該節點
                if (w.isInterrupted())
                    s.tryCancel();
                // 匹配節點
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    // 超時,取消節點
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                // 每次自旋需先判斷是否知足自旋條件,知足次數-1
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                 // 若沒有采用時限模式則調用LockSupport.park()直接阻塞線程
                else if (!timed)
                    LockSupport.park(this);
                // 若剩餘時間超過自旋時間閾值則指定時間阻塞    
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
複製代碼

線程B接着put元素B,頭節點A的模式與put操做的模式一致,CAS頭插成功後,也調用awaitFulfill()自旋,因爲頭節點變爲線程B因此只有線程B才能自旋匹配,這也是不公平的體現

節點的取消與公平模式的差很少都是將屬性置爲其自己

void tryCancel() {
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }
複製代碼

這時線程C進行take操做,take的模式(REQUEST)明顯與當前頭節點B(DATA)不一致且頭節點模式也不爲FULFILLING,因此transfer走入else if分支。

// 若頭節點的模式不爲 FULFILLING
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    // 若頭節點被取消,將頭節點出隊從新來
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                // 將節點s出隊
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            // 獲取節點m的後繼節點
                            SNode mn = m.next;
                            // 嘗試匹配
                            if (m.tryMatch(s)) {
                                // 匹配成功,將節點s、m出隊
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                // 若匹配失敗,將m出隊
                                s.casNext(m, mn);   // help unlink
                        }
                    }
複製代碼

建立一個FULFILLING模式的節點並CAS將其置爲頭節點,與其後繼匹配,匹配方法以下

boolean tryMatch(SNode s) {
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }
複製代碼

若節點沒有被取消,其match爲null,被取消則爲其自身。成功匹配後將一對put、take操做的節點出隊。咱們假定另外一種場景,若線程C的take節點入隊後,未進行匹配線程D中途take

頭節點C模式爲FULFILLING,transfer走入最後一個分支,並不會先建立節點而是 幫助頭節點先行匹配完成入隊出隊操做後,再第二次循環繼續執行本身操做

// 頭節點模式爲 FULFILLING
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
複製代碼

LinkedTransferQueue

LinkedTransferQueue是由鏈表結構組成的無界阻塞FIFO隊列

主要字段

// 判斷是否多核處理器
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;
    // 自旋次數
    private static final int FRONT_SPINS   = 1 << 7;
    // 前驅節點正在操做,當前節點自旋的次數
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    static final int SWEEP_THRESHOLD = 32;
    // 頭節點
    transient volatile Node head;
    // 尾節點
    private transient volatile Node tail;
    // 刪除節點失敗的次數
    private transient volatile int sweepVotes;
    
    /**
     * xfer()方法中使用
     */
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer
複製代碼

Node內部類

static final class Node {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; 
        Node(Object item, boolean isData) {
            UNSAFE.putObject(this, itemOffset, item); // relaxed write
            this.isData = isData;
        }
        ...
    }    
複製代碼

是否是感受與SynchronousQueue中TransferQueue的QNode節點類定義很相似

xfer

LinkedTransferQueue的大多方法都是基於xfer()方法

/**
     * @param e 入隊數據
     * @param haveData true:入隊;flase:出隊
     * @param how NOW, ASYNC, SYNC, or TIMED
     * @param nanos 期限僅TIMED限時模式使用
     */
    private E xfer(E e, boolean haveData, int how, long nanos) {
        // 如果入隊操做,但無數據拋異常
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed
        retry:
        for (;;) {                            // restart on append race
            // 從頭節點遍歷
            for (Node h = head, p = h; p != null;) { // find & match first node
                // 獲取模式isData
                boolean isData = p.isData;
                // 獲取數據項
                Object item = p.item;
                // 找到未匹配的節點
                if (item != p && (item != null) == isData) { // unmatched
                    // 若操做模式同樣,不匹配
                    if (isData == haveData)   // can't match
                        break;
                    // 若匹配,CAS將替換item
                    if (p.casItem(item, e)) { // match
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            // 更新 head
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        // 喚醒線程
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.cast(item);
                    }
                }
                // 後繼
                Node n = p.next;
                // 若p的後繼是其自身,代表p已經有其餘線程操做過,從頭節點重寫開始
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }
            // 若沒有找到匹配節點,
            // NOW爲untimed poll, tryTransfer,不會入隊
            if (how != NOW) {                 // No matches available
                if (s == null)
                    // 建立節點
                    s = new Node(e, haveData);
                // 尾插入隊    
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                // 若不是異步操做    
                if (how != ASYNC)
                    // 阻塞等待匹配值
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
複製代碼

以put()方法爲例,假定隊列爲空此時有線程put(其內部xfer(e, true, ASYNC, 0)),由於不等於now,調用tryAppend()方法尾插入隊

private Node tryAppend(Node s, boolean haveData) {
        // 從尾節點開始
        for (Node t = tail, p = t;;) {        // move p to last node and append
            Node n, u;                        // temps for reads of next & tail
            // 若隊列爲空CAS將S置爲頭節點
            if (p == null && (p = head) == null) {
                if (casHead(null, s))
                    return s;                 // initialize
            }
            else if (p.cannotPrecede(haveData))
                return null;                  // lost race vs opposite mode
            // 若不是最後節點    
            else if ((n = p.next) != null)    // not last; keep traversing
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
            // CAS設置將s置爲p的後繼
            else if (!p.casNext(null, s))
                // 若設置失敗從新來
                p = p.next;                   // re-read on CAS failure
            else {
                if (p != t) {                 // update if slack now >= 2
                    while ((tail != t || !casTail(t, s)) &&
                           (t = tail)   != null &&
                           (s = t.next) != null && // advance and retry
                           (s = s.next) != null && s != t);
                }
                return p;
            }
        }
    }
複製代碼

從源碼中能夠得知,當第一次tryAppend()隊列爲空時只設置了頭節點,第二次tryAppend()纔會設置尾結點,入隊後,若不是ASYNC還會調用awaitMatch()方法阻塞匹配

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        // 若限時獲取最後期限
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int spins = -1; // initialized after first item and cancel checks
        ThreadLocalRandom randomYields = null; // bound if needed
         
        for (;;) {
            Object item = s.item;
            // 不相等代表已經匹配過,有其餘線程已操做過
            if (item != e) {                  // matched
                // assert item != s;
                // 取消節點
                s.forgetContents();           // avoid garbage
                return LinkedTransferQueue.cast(item);
            }
            // 若線程中斷或超時則取消節點
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                    s.casItem(e, s)) {        // cancel
                unsplice(pred, s);
                return e;
            }
            // 初始化自旋次數 
            if (spins < 0) {                  // establish spins at/near front
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            }
            // 自旋
            else if (spins > 0) {             // spin
                --spins;
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            }
            else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            }
            // 若採用限時則限時阻塞
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos > 0L)
                    LockSupport.parkNanos(this, nanos);
            }
            // 直接阻塞
            else {
                LockSupport.park(this);
            }
        }
    }
複製代碼

其整個隊列只存在一個操做(入隊或出隊),若不一樣操做會替換item喚醒相應另個線程,若相同操做則根據形參how判斷判斷
NOW:直接返回操做節點不入隊
ASYNC:操做節點尾插入隊,但不會阻塞等待直接返回,同一個線程隨便可以接着操做
SYNC:操做節點尾插入隊且會自旋匹配一段時間,自旋次數用完進入阻塞狀態,像SynchronousQueue同樣同一個線程操做完必須匹配到或被取消後才能繼續操做
TIMED:限時模式,在指定時間內若沒匹配到操做會被取消

相對於SynchronousQueue,LinkedTransferQueue能夠存儲元素且可支持不阻塞形式的操做,而相對於LinkedBlockingQueue維護了入隊鎖和出隊鎖,LinkedTransferQueue經過CAS保證線程安全更提升了效率

LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列,雙向隊列就意味着能夠從對頭、對尾兩端插入和移除元素。LinkedBlockingDeque默認構造容量Integer.MAX_VALUE,也能夠指定容量

主要屬性

// 頭節點
    transient Node first;
    // 尾節點
    transient Node last;
    // 元素個數
    private transient int count;
    // 容量
    private final int capacity;
    
    final ReentrantLock lock = new ReentrantLock();
    
    private final Condition notEmpty = lock.newCondition();
    
    private final Condition notFull = lock.newCondition();
複製代碼

Node內部類

static final class Node {
        // 數據項
        E item;
        // 前驅節點
        Node prev;
        // 後繼節點
        Node next;
        
        Node(E x) {
            item = x;
        }
    }
複製代碼

入隊

  • 頭插
  • public void putFirst(E e) throws InterruptedException {
            // 判空
            if (e == null) throw new NullPointerException();
            // 建立節點
            Node node = new Node(e);
            final ReentrantLock lock = this.lock;
            // 獲取鎖
            lock.lock();
            try {
                while (!linkFirst(node))
                    notFull.await();
            } finally {
                lock.unlock();
            }
        }
    複製代碼
    判空處理而後獲取鎖,調用linkFirst()入隊
    private boolean linkFirst(Node node) {
            // assert lock.isHeldByCurrentThread();
            // 若當前元素個數超過指定容量,返回false
            if (count >= capacity)
                return false;
            // 獲取首節點    
            Node f = first;
            // 新節點後繼指向首節點
            node.next = f;
            // 新節點置爲首節點
            first = node;
            // 若隊列爲空則新節點置爲尾節點
            if (last == null)
                last = node;
            // 若不爲空,新節點置爲首節點的前驅節點    
            else
                f.prev = node;
            // 元素個數+1    
            ++count;
            // 喚醒出隊(消費者)等待隊列中線程
            notEmpty.signal();
            return true;
        }
    複製代碼
  • 尾插
  • public void putLast(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            Node node = new Node(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                while (!linkLast(node))
                    notFull.await();
            } finally {
                lock.unlock();
            }
        }
    複製代碼
    判空處理而後獲取鎖,調用linkLast()入隊
    private boolean linkLast(Node node) {
            // assert lock.isHeldByCurrentThread();
            // 若當前元素個數超過指定容量,返回false
            if (count >= capacity)
                return false;
            // 獲取尾節點
            Node l = last;
            // 將新節點的前驅節點置爲原尾節點
            node.prev = l;
            // 新節點置爲尾節點
            last = node;
            // 若隊列爲空,首結點置爲頭節點
            if (first == null)
                first = node;
            // 不然將新節點置爲原未節點的後繼節點
            else
                l.next = node;
            // 元素個數+1    
            ++count;
            // 喚醒出隊(消費者)等待隊列中線程
            notEmpty.signal();
            return true;
        }
    複製代碼

    出隊

  • 頭出
  • public E takeFirst() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E x;
                while ( (x = unlinkFirst()) == null)
                    notEmpty.await();
                return x;
            } finally {
                lock.unlock();
            }
        }
    複製代碼
    unlinkFirst()方法
    private E unlinkFirst() {
            // assert lock.isHeldByCurrentThread();
            // 獲取頭節點
            Node f = first;
            // 若first爲null即隊列爲空,返回null
            if (f == null)
                return null;
            // 獲取頭節點的後繼節點
            Node n = f.next;
            E item = f.item;
            // 刪除頭節點
            f.item = null;
            f.next = f; // help GC
            // 將原頭節點的後繼節點置爲頭節點
            first = n;
            // 若原隊列僅一個節點,則尾節點置空
            if (n == null)
                last = null;
            // 不然原頭節點的後繼節點的前驅置爲null
            else
                n.prev = null;
            // 元素個數-1    
            --count;
            // 喚醒入隊(生產者)等待隊列中線程
            notFull.signal();
            return item;
        }
    複製代碼
  • 尾出
  • public E takeLast() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E x;
                while ( (x = unlinkLast()) == null)
                    notEmpty.await();
                return x;
            } finally {
                lock.unlock();
            }
        }
    複製代碼
    unlinkLast
    private E unlinkLast() {
            // assert lock.isHeldByCurrentThread();
            // 獲取尾節點
            Node l = last;
            // 尾節點爲null即隊列爲空,返回null
            if (l == null)
                return null;
            // 獲取原尾節點的前驅節點    
            Node p = l.prev;
            E item = l.item;
            // 刪除尾節點
            l.item = null;
            l.prev = l; // help GC
            // 將原尾節點的前驅節點置爲尾節點
            last = p;
            // 若原隊列僅一個節點,則頭節點置空
            if (p == null)
                first = null;
            // 不然原尾節點的前驅節點的後繼置爲null    
            else
                p.next = null;
            // 元素個數-1    
            --count;
            notFull.signal();
            return item;
        }
    複製代碼
    邏輯就很少說了,看過LinkedList源碼的應該不會陌生,除了多了喚醒阻塞獲取鎖操做,基本邏輯相似

    總結

  • ArrayBlockingQueue
  • 數組實現的有界FIFO阻塞隊列,初始化時 必須指定容量。內部經過ReentrantLock(一把鎖)保證出入隊線程安全、支持公平與非公平,由於公平性一般會下降吞吐量因此默認非公平策略;putIndex、takeIndex屬性維護入隊、出隊位置;notEmpty、notFull兩個Condition隊列,利用Condition的等待喚醒機制實現可阻塞式的入隊和出隊

  • LinkedBlockingQueue
  • 鏈表實現的有界FIFO阻塞隊列,默認容量Integer.MAX_VALUE。內部經過takeLock、putLock兩把ReentrantLock鎖保證出入隊線程安全, 兩個鎖下降線程因爲線程沒法獲取單個lock而進入WAITING狀態的可能性提升了線程併發執行的效率,也所以其count屬性用原子操做類(可能兩個線程一個出隊一個入隊同時操做count須要原子操做)。notEmpty、notFull兩個Condition隊列,利用Condition的等待喚醒機制實現可阻塞式的入隊和出隊

  • PriorityBlockingQueue
  • 支持優先級的無界阻塞隊列(就像無限流量同樣內部仍是定義了最大容量Integer.MAX_VALUE - 8),默認狀況元素採起天然順序升序排序,但不能保證同優先級元素的順序。由於無界其插入始終成功,因此內部只維護了一個notEmpty(出隊)Condition隊列。經過ReentrantLock以及CAS一同維護線程安全且儘量地縮小了鎖的範圍以此減小鎖競爭提升性能,底層結構採用基於數組的堆實現

  • DelayQueue
  • 支持優先級、延時獲取元素的無界阻塞隊列,隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,只有在延遲期滿時才能從隊列中提取元素。雖然PriorityQueue線程不安全,但在調用同時須要獲取ReentrantLock鎖,也是利用Condition的等待喚醒機制實現可阻塞式的入隊和出隊。屬性leader不爲null時,代表有線程佔用其做用在於減小沒必要要的競爭

  • SynchronousQueue
  • 不存儲元素的阻塞隊列沒有數據緩存空間,每一個線程的put操做必須等待一個take操做,不然不能繼續添加元素。兩個內部類TransferQueue隊列、TransferStack棧實現公平與非公平策略,其大多方法基於transfer()方法實現

  • LinkedTransferQueue
  • 鏈表結構組成的無界阻塞FIFO隊列,核心方法xfer()。是SynchronousQueue和LinkedBlockingQueues的合體,相對於SynchronousQueue,LinkedTransferQueue能夠存儲元素且可支持不阻塞形式的操做,而相對於LinkedBlockingQueue維護了入隊鎖和出隊鎖,LinkedTransferQueue經過CAS保證線程安全更提升了效率

  • LinkedBlockingDeque
  • LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列,雙向隊列就意味着能夠從對頭、對尾兩端插入和移除元素。默認構造容量Integer.MAX_VALUE,出隊入隊和LinkedList有點相似,LinkedBlockingDeque多了ReentrantLock鎖機制實現線程安全以及notEmpty、notFull兩個Condition隊列,利用Condition的等待喚醒機制實現可阻塞式的入隊和出隊

    感謝

    《java併發編程的藝術》

    相關文章
    相關標籤/搜索