併發容器學習—SynchronousQueue

1、SynchronousQueue併發容器
1.Synchronous Queue的底層實現
    Synchronous Queue稱爲同步隊列,是一種 容量爲0的阻塞隊列。每一個插入操做必須等待另外一個線程的對應移除操做 ,反之亦然。也就是說同步隊列面向的是操做,而不是數據元素,即同步隊列是入隊操做而不是入隊數據,這個操做能夠是take或者put,put操做與take操做相對應。再簡單點說,同步隊列能夠看作是一個只有入隊沒有出隊的面向操做的隊列,沒執行一次take或put,本質上都是在執行一次操做的入隊;只是在隊列的內部執行時,會根據每次入隊的操做類型進行匹配,每配對成功一對take和put操做,就會將這兩個操做從隊列中移除並反饋給對應的調用線程。
public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

    上面是SynchronousQueue的兩個構造方法,能夠看出SynchronousQueue的底層實際是建立一個轉移隊列,而且這個隊列有公平和非公平兩種模式,下面咱們先來看看公平模式的轉移隊列是怎麼實現的。java

 

2.SynchronousQueue的公平模式node

    不管是非公平的TransferStack隊列仍是公平的TransferQueue隊列,都是對父類Transfer的實現:安全

//Transferer是SynchronousQueue中的一個內部類,定義了一個轉移方法
//TransferStack和TransferQueue都要實現該轉移方法
abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

    TransferQueue的是一個底層由鏈表實現的FIFO隊列,其結點的定義以下:數據結構

static final class QNode {
    volatile QNode next;          // 結點的後繼
    volatile Object item;         // 結點中存儲的數據
    volatile Thread waiter;       // 等待的線程

    //是不是數據,此標識要與判斷入隊的操做是什麼操做
    //即判斷相鄰兩次入隊的操做是否相同,如果相鄰兩次入隊
    //的是不相同的操做(一次put一次take)那麼就要進行配對後移除出隊。
    //不然,將操做入隊便可
    final boolean isData;    

    //構造方法
    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    //CAS更新後繼結點
    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //CAS更新結點中的數據
    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    //取消操做,即嘗試將item更新爲結點自己
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    //判斷節點是否應該取消,即item是否是結點自己
    boolean isCancelled() {
        return item == this;
    }

    //判斷當前結點是否已再也不隊列之中
    boolean isOffList() {
        return next == this;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;


    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = QNode.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

    知道了TransferQueue的底層實現,在看看TransferQueue中重要的屬性及構造方法:併發

static final class TransferQueue<E> extends Transferer<E> {

    //隊列的隊首結點
    transient volatile QNode head;
    //隊列的隊尾結點
    transient volatile QNode tail;
    //清除標記
    transient volatile QNode cleanMe;

    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long cleanMeOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
            cleanMeOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("cleanMe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

3.公平模式下的take和put操做app

//將指定元素添加到此隊列,若有必要則等待另外一個線程接收它
public void put(E e) throws InterruptedException {
    //判斷轉移元素是否爲null,也就是說同步隊列中不能對null元素進行轉移
    //由於e元素是否爲null,是transfer方法中判斷入隊操做是put仍是take
    //的一個依據
    if (e == null) throw new NullPointerException();

    //put方法中傳入transfer的e不爲null
    //調用transfer 默認不進行超時等待,若是發生中斷則拋出中斷異常
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

//獲取並移除此隊列的頭,若有必要則等待另外一個線程插入它
public E take() throws InterruptedException {

    //take方法中傳入transfer方法的e爲null
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

 

    由take方法和put方法的源碼可知,在公平模式下,這兩個方法本質都是對TransferQueue中transfer方法的調用,下面來看看TransferQueue中的transfer方法是如何實現的?oop

E transfer(E e, boolean timed, long nanos) {

    QNode s = null; 

    //根據e是否爲null,標記本次入隊的是什麼操做
    //e爲null,isData爲false,不然isData爲true
    boolean isData = (e != null);


    for (;;) {    //自旋
        QNode t = tail;    //獲取隊尾
        QNode h = head;    //獲取隊首

        //判斷隊首或隊尾是否爲null,實際上h和t不會爲null(構造方法中已經初始化過)
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        //判斷是不是空隊列,或者入隊操做與隊尾相同(即與隊尾都是put或者take操做)
        //不管是空隊列仍是與隊尾相同操做,說明只能將操做入隊,而不能進行
        //匹配(操做相同,沒法配對,配對要操做不一樣才行)
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;    //獲取隊尾的後繼結點

            //判斷隊尾是否已經改變,便是否有其餘線程搶先進行入隊或者配對操做
            if (t != tail)                  // inconsistent read
                continue;    //從新進行入隊或者配對判斷

            //判斷是否是有新增的與隊尾相同的入隊操做,如果,更新成新的隊尾
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);    //嘗試更新隊尾結點,失敗也沒有關係
                continue;
            }

            //判斷是否超時(timed爲是否設置了超時等待操做,nanos爲剩餘的等待時間)
            if (timed && nanos <= 0)        // can't wait
                return null;    //超時後直接返回null

            //判斷s是否爲null,如果則以e爲item建立一個結點
            if (s == null)
                s = new QNode(e, isData);
            
            //嘗試更新隊尾的後繼結點爲s結點,失敗的話就循環繼續嘗試直到成功
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //在隊尾新增了一個後繼結點,那麼隊尾就應該是這個後繼結點了
            //所以須要將s更新爲新的隊尾結點
            advanceTail(t, s);              // swing tail and wait

            //空旋或者阻塞直到匹配的操做到來
            Object x = awaitFulfill(s, e, timed, nanos);

            //到這一步,說明阻塞的操做已經配對成功或者操做已經被取消了,線程被喚醒了
            //判斷是配對成功了,仍是操做被取消了
            //如果操做被取消,會設置s.item=s
            if (x == s) {                   // wait was cancelled
                clean(t, s);    //清除結點s
                return null;    //操做已經被取消,直接返回null
            }

            //判斷結點s是否還在隊列中
            //如果還在隊列中,且又配對操做成功,說明s結點應該是新的head
            //而且若s.item不爲null還須要將s.item設爲s自身,等待線程賦null
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                //判斷是否
                if (x != null)              
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        // 入隊的操做與以前的隊尾的操做不一樣,能夠進行配對(take配put,或
        // put配take)
        } else {    
            //獲取隊首的後繼結點
            QNode m = h.next;               
            
            //出現t與隊尾不一樣,m爲null,h與隊首不一樣,說明隊列發生了改變
            //即隊列出現了其餘線程搶先執行了入隊或者配對的操做
            if (t != tail || m == null || h != head)
                continue;    //循環從新來

            //獲取後繼結點的item
            Object x = m.item;

            //isData == (x != null)是判斷m結點對應的操做與當前操做是否相同
            //x == m 則是判斷m結點是否被取消
            //!m.casItem(x, e) 則是判斷嘗試更新m結點的item爲e是否成功
            //以上三個判斷有一個爲真,那就說明m結點已經不在隊列中或是被取消或是匹配過了
            if (isData == (x != null) || x == m || !m.casItem(x, e)) {         
                advanceHead(h, m);          //舊隊首已通過時,更新隊首
                continue;
            }

            //更新head,到此處說明配對操做已經成功,應該將m結點變爲head
            //而h結點則須要移除出列
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);    //喚醒m結點對應的線程

            //
            return (x != null) ? (E)x : e;
        }
    }
}

//嘗試更新隊尾結點
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

//等待結點被對應的操做匹配
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    //根據timed標識即超時時間計算截止時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();    //湖區當前線程的引用

    //計算出自旋時間
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);

    //死循環,確保操做能成功
    for (;;) {

        //判斷當前線程是否被中斷,如果被中斷那麼取消
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;    //獲取結點對應的item對象

        //判斷結點的item是否仍是對象e
        //生成s節點的時候,s.item是等於e的,當取消操做(item變爲s)或者
        //匹配了操做的時候會進行更改
        if (x != e)
            return x;

        //判斷是否設置了超時等待功能
        if (timed) {
            nanos = deadline - System.nanoTime();    //計算剩餘的等待時間
            if (nanos <= 0L) {    //判斷是否應超時,若超時直接嘗試取消操做
                s.tryCancel(e);
                continue;
            }
        }
        //自旋時間控制
        if (spins > 0)
            --spins;    //自旋時間減小

        //判斷是否須要設置等待線程
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)    //沒有設置超時等待功能,直接讓讓線程一直掛起
            LockSupport.park(this);

        //設置了超時等待功能,就判斷等待時間是否大於自旋的最大時間
        //若大於自旋最大時間那就讓線程阻塞一段時間
        //不然讓線程自旋一段時間
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

//嘗試取消當前結點對應的操做,即將結點中item值更新成結點自身(this)
void tryCancel(Object cmp) {
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}

//更新隊首head指針,並將原隊首結點的next指向其自身,方便GC
void advanceHead(QNode h, QNode nh) {
    if (h == head &&
        UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}


//如果某個結點對應的操做被取消,那麼這個操做對應的結點須要移除出隊
//也就是清理無效結點。
/**
* clean方法中若是刪除的節點不是尾節點,那麼能夠直接進行刪除,
* 若是刪除的節點是尾節點,那麼用cleanMe標記須要刪除的節點的前驅,
* 這樣在下一輪的clean的過程將會清除打了標記的節點。
*/
void clean(QNode pred, QNode s) {
    s.waiter = null;     //操做已經被取消,那麼就不會有等待線程了

    //判斷s是不是pred的後繼結點
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;    //隊首引用
        QNode hn = h.next;   //隊首的後繼結點

        //判斷hn是否被取消,若被取消,那麼更新head
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // 隊尾結點

        //判斷是不是空隊列
        if (t == h)
            return;
        QNode tn = t.next;    //隊尾的後繼

        //隊尾改變,說明隊列被其餘線程修改過了
        if (t != tail)
            continue;

        //t有後繼結點,說明隊尾該更新了
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }

        //判斷s結點是不是隊尾結點
        //若不是隊尾結點,則只需將s的前驅pred結點的next指向s結點的後繼結點sn
        //即成功將s結點從隊列中清除
        if (s != t) {        // If not tail, try to unsplice
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }

        //到此處,說明要清除的結點就是隊列的隊尾,而隊尾不能直接刪除
        QNode dp = cleanMe;    //獲取標識有要刪除的結點的前驅結點

        //判斷是否有須要刪除的結點,dp不爲null,即cleanMe存在
        //說明隊列以前有隊尾結點須要刪除
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;
            QNode dn;

            //當cleanMe處於如下四種情形時,cleanMe失效
            //(1)cleanMe的後繼而空(cleanMe 標記的是須要刪除節點的前驅)
            //(2)cleanMe的後繼等於自身,
            //(3)須要刪除節點的操做沒有被取消,
            //(4)被刪除的節點不是尾節點且其後繼節點有效
            if (d == null ||               // d is gone or
                d == dp ||                 // d is off list or
                !d.isCancelled() ||        // d not cancelled or
                (d != t &&                 // d not tail and
                 (dn = d.next) != null &&  //   has successor
                 dn != d &&                //   that is on list
                 dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);    //cleanMe失效,賦值爲null
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))    //將隊尾的前驅結點標記成cleanMe
            return;          // Postpone cleaning s
    }
}

//用於判斷結點是否還在隊列中,若後繼結點是自身,說明已經再也不隊列中
//不然還在隊列中
boolean isOffList() {
    return next == this;
}

由上面對公平的SynchronousQueue的分析可知,底層是使用隊列來實現公平模式的,而且線程安全是經過CAS方式實現的。公平的TransferQueue隊列中會將連續相同的操做入隊,而不一樣的操做則會進行配對,即TransferQueue隊列中要麼沒有存放操做,要麼存放都是相同的操做(要麼都是take,要麼都是put),當有一個與隊列中的操做不相同的操做時,隊列會自動將隊首操做與之進行匹配。大體流程(操做被取消未顯示,方便理解)以下圖所示:學習

4.SynchronousQueue的非公平模式this

    SynchronousQueue的非公平模式是基於棧來實現的,咱們知道棧是後進先出的(LIFO),也就是說這裏的非公平模式與ReentrantLock中的非公平模式區別巨大,後來先服務這太不公平了。spa

    先來看看非公平模式的具體實現TransferStack的底層數據結構鏈表中結點的定義:

static final class SNode {
    volatile SNode next;        // 鏈表的後繼結點
    volatile SNode match;       // 匹配的結點
    volatile Thread waiter;     // 等待的線程
    Object item;                // 數據
    int mode;        //結點的模式

    SNode(Object item) {
        this.item = item;
    }

    //CAS方式更新後繼結點
    boolean casNext(SNode cmp, SNode val) {
        return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //嘗試匹配結點,匹配成功就將等待匹配結點對應的線程喚醒繼續後續操做
    boolean tryMatch(SNode s) {
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w);
            }
            return true;
        }
        return match == s;
    }

    //嘗試取消結點,將match更新成結點自身即標誌着結點已處於取消狀態
    void tryCancel() {
        UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }

    //判斷是否被取消
    boolean isCancelled() {
        return match == this;
    }


    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;


    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

    知道告終點的基本結構,在來看看TransferStack中的重要屬性及構造方法:

//TransferStack中沒有構造方法,所以只有一個空構造
/**
* TransferStack中定義了三個標記:REQUEST表示消費者,DATA表示生產者,
* FULFILLING表示操做匹配狀態。任何線程對TransferStack的操做都屬於
* 上述3種狀態中的一種
*/
static final class TransferStack<E> extends Transferer<E> {
    //用於標記結點的類型,表明結點是消費者(對應take操做)
    static final int REQUEST    = 0;
    //用於標記結點的類型,表明結點是生產者(對應put操做)
    static final int DATA       = 1;
    //用於標記結點的狀態,表明結點正處於匹配狀態
    static final int FULFILLING = 2;

    //棧頂結點
    volatile SNode head;

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

5.非公平模式下的take和put操做

    有上文對公平模式的學習,咱們知道take和put操做最終調用的都是transfer方法,只不過公平模式調用的是TransferQueue中的轉移方法,非公平模式則是調用TransferStack中的轉移方法

/**
* transfer的大體過程:將一個操做與棧頂的操做進行配對
* 如果配對不成功(take對應put,或put對應take),那麼直接將該操做
* 入棧;如果配對成功,即此時應該將棧頂操做出棧,可是不能直接出棧(
* 若此時其餘線程進行入棧,那麼直接出棧會出問題),而是先將匹配的操做
* 標記成FULFILLING狀態(匹配狀態)而後入棧;當其餘線程檢查到這個匹配
* 的過程,就會先幫助配對,在去執行自身的操做
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

    SNode s = null; // constructed/reused as needed

    //判斷當前操做是何種模式,REQUEST對應take,DATA對應put
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {    //死循環
        SNode h = head;    //獲取棧頂

        //判斷棧是否爲空棧,若不爲空棧,那麼棧頂操做模式與當前操做模式是否相同
        //如果棧頂操做與當前操做模式相同,那麼就須要入棧該操做
        if (h == null || h.mode == mode) {  // empty or same-mode

            //判斷是否設置了超時等待,若設置了超時等待,那等待時間是否還有剩餘
            if (timed && nanos <= 0) {      

                //判斷棧頂是否爲null,且棧頂是否被取消
                //若不爲null,且棧頂操做被取消,那麼就嘗試更新棧頂操做爲其後繼
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else    //棧頂爲null或者沒被取消
                    return null;

            //沒有設置超時等待,或超時等待時間還未到,則嘗試將當前操做入棧
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //s入棧成功,那麼就自旋或掛起等待匹配的操做到來在被喚醒
                SNode m = awaitFulfill(s, timed, nanos);    //操做自旋或掛起

                //到這一步說明操做已經被取消,或者匹配到了對應的操做
                //匹配結點m爲結點本身自己,說明操做被取消
                if (m == s) {               // wait was cancelled
                    clean(s);    //清除被取消的結點
                    return null;
                }
                //如果棧不是空棧且棧頂的後繼是s,說明操做s和其匹配的操做m,都還在棧中
                //須要將其移除出棧,即更新棧頂爲s的後繼
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller

                //根據操做類型返回對應的數據,最後返回的其實都是put進去的數據
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }

        //當前的操做與棧頂操做相匹配,進行匹配,將操做的狀態更新成FULFILLING併入棧
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // 判斷棧頂是否被取消
                casHead(h, h.next);         // 棧頂被取消更新棧頂

            //入棧新結點s,s的後繼爲h,而且s處於FULFILLING狀態(匹配狀態)
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // s的後繼m,由於s處於匹配狀態,m多是其配對的結點

                    //配對結點爲null
                    //這裏有些疑問,沒搞懂什麼情形下會出現m爲null(按理不該該出現的)
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }

                    //m的後繼結點
                    SNode mn = m.next;
                    //判斷m和s配對是否成功,配對成功嘗試更新棧頂爲m的後繼
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        //沒有匹配成功,說明m結點不是s的配對結點,繼續向後尋找
                        s.casNext(m, mn);   // help unlink
                }
            }

        //到這說明棧頂處於匹配狀態,那麼幫助其匹配
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

//將結點對應的線程自旋或掛起以等待匹配的操做到來
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    //計算截止時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    //計算自旋時間
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();    //嘗試取消操做
        SNode m = s.match;    //獲取匹配的結點

        //匹配結點不爲null,說明要麼匹配到了,要麼被取消,均可以結束掛起了
        if (m != null)
            return m;

        //判斷是否設置了超時等待
        if (timed) {
            //計算剩餘時間
            nanos = deadline - System.nanoTime();
            //設置了超時等待,若截止時間已到,那麼操做要被取消
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        //自旋時間遞減
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;

        //設置等待線程
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter

        //沒有設置超時等待,線程一直掛起,知道被配對操做喚醒
        else if (!timed)
            LockSupport.park(this);    //線程掛起

        //判斷超時等待時間是否大於自旋最大時間
        //如果大於,就直接將線程掛起一段時間
        //不然只自旋不掛起
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

//須要自旋的情形
//當前結點爲棧頂
//棧頂爲null
//棧頂正在匹配中
boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}

//嘗試配對
boolean tryMatch(SNode s) {
    //嘗試將當前結點的配對結點更新爲s,更新成功就喚醒當前結對應的線程
    if (match == null &&
        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // waiters need at most one unpark
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;
}

    經過上面對非公平模式下TransferStack中transfer方法的分析,可知非公平模式實際上能夠說是很是的不公平,由於TransferStack是利用棧的後進先出性質來進行配對的,也就說基本上都是後來先服務。其大體過程能夠簡化成以下圖所示:

相關文章
相關標籤/搜索