併發容器學習—LinkedTransferQueue

1、LinkedTransferQueue併發容器java

1.LinkedTransferQueue的底層實現node

    LinkedTransferQueue是一個底層數據結構由鏈表實現的無界阻塞隊列,它與SynchronousQueue中公平模式的實現TransferQueue及其類似,LinkedTransferQueue中存儲的也是操做而不是數據元素。能夠對比着學習,更容易理解。先來看看LinkedTransferQueue結點的定義:數據結構

static final class Node {

    //用於標識結點的操做類型,true表示put操做,false表示take操做
    final boolean isData;  

    //結點的數據域,take類型的操做,該值爲null,配對後則爲put中的數據
    //put類型的操做,該值爲要轉移的數據
    volatile Object item;   

    //當前結點的後繼結點
    volatile Node next;

    //等待的線程
    volatile Thread waiter; // null until waiting


    //CAS方式更新後繼結點
    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //CAS方式更新數據域
    final boolean casItem(Object cmp, Object val) {
        // assert cmp == null || cmp.getClass() != Node.class;
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }


    //構造方法
    Node(Object item, boolean isData) {
        UNSAFE.putObject(this, itemOffset, item); // relaxed write
        this.isData = isData;
    }


    //移除出隊列,方便GC
    final void forgetNext() {
        UNSAFE.putObject(this, nextOffset, this);
    }


    //取消結點,就是本次取消操做
    final void forgetContents() {
        UNSAFE.putObject(this, itemOffset, this);
        UNSAFE.putObject(this, waiterOffset, null);
    }


    //判斷結點的操做是否已經被匹配了,結點的操做被取消也包含在匹配當中
    //也就是說這個操做如果被取消了,也認爲是匹配過的
    final boolean isMatched() {
        Object x = item;
        return (x == this) || ((x == null) == isData);
    }

    //判斷當前結點的操做是否是未匹配過的REQUEST類型(take)的結點,true表明是
    final boolean isUnmatchedRequest() {
        return !isData && item == null;
    }

    //判斷結點的操做類型與其數據(item的值)是否相符合,
    //例如take操做item值應該是null,put操做item則應該是數據
    final boolean cannotPrecede(boolean haveData) {
        boolean d = isData;
        Object x;
        return d != haveData && (x = item) != this && (x != null) == d;
    }

    //嘗試匹配一個數據結點,用在移除結點的方法中
    final boolean tryMatchData() {
        // assert isData;
        Object x = item;
        if (x != null && x != this && casItem(x, null)) {
            LockSupport.unpark(waiter);
            return true;
        }
        return false;
    }


    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;
    private static final long waiterOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
            waiterOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiter"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

2.LinkedTransferQueue的繼承關係併發

    LinkedTransferQueue的繼承關係以下圖所示,這麼多的父類及接口中,只有一個TransferQueue接口未接觸過,下面咱們先來看看這個接口是幹什麼的。app

public interface TransferQueue<E> extends BlockingQueue<E> {
    //嘗試轉移一個數據給一個正在等待消費者,若是沒有等待的消費者當即返回false
    //轉移失敗的話,這個操做是不會入隊等待被匹配的
    boolean tryTransfer(E e);

    //轉移一個數據給一個消費者,若沒有正在等待的消費者,那麼該轉移操做會阻塞
    //等待,或發生異常
    void transfer(E e) throws InterruptedException;

    //在必定時間內嘗試轉移數據給一個消費者,若是沒有正在等待的消費者,那就
    //一直嘗試到超時爲止
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //是否有消費者在等待
    boolean hasWaitingConsumer();

    //獲取等待的消費者數量
    int getWaitingConsumerCount();
}

3.LinkedTransferQueue中重要的屬性及構造方法less

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    //當前計算機CPU核心數是否大於1
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;

    //結點爲隊列中第一個waiter時的自旋次數
    private static final int FRONT_SPINS   = 1 << 7;

    //前驅結點正在處理,當前結點須要自旋的次數
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    //隊列進行清理的閾值
    static final int SWEEP_THRESHOLD = 32;

    //隊列頭結點
    transient volatile Node head;

    //隊列尾結點
    private transient volatile Node tail;

    //移除結點連接失敗(修改結點的next失敗)的次數,當該值大於SWEEP_THRESHOLD
    //時,會對隊裏進行一次清理,清理掉哪些無效的結點
    private transient volatile int sweepVotes;

    //下面四個值,用於標識xfer方法的類型
    /**
    * NOW:表明不等待消費者,直接返回結果的類型。poll方法和tryTransfer方法中使用
    * ASYNC:表示異步操做,直接添加數據元素到隊尾,不等待匹配,用於offer,add,put方法中
    * SYNC:同步操做,等待數據元素被消費者接受,用於take,transfer方法中
    * TIMED:延時操做,等待必定時間後在返回匹配的結果,用於待超時時間的poll和tryTransfer方法中
    */
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

    public LinkedTransferQueue() {
    }


    public LinkedTransferQueue(Collection<? extends E> c) {
        this();
        addAll(c);
    }
}

4.LinkedTransferQueue入隊操做dom

    LinkedTransferQueue中的入隊方法包含有offer,put及add三種,這三個方法本質是都是同樣的,都是調用的同一個方法且參數也都同樣,而且由於LinkedTransferQueue是個無界阻塞隊列,容量沒有限制,所以不會出現入隊等待的現象。異步

public void put(E e) {
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

5.xfer方法分析學習

    xfer方法是LinkedTransferQueue種最核心的一個方法,將其理解清楚,那麼LinkedTransferQueue隊列也就明白了。LinkedTransferQueue與SynchronousQueue中公平模式的實現TransferQueue是同樣的,隊列中存放的不是數據,而是操做(取出數據的操做take和放入數據的操做put)。隊列中既能夠存放take操做也能夠存放put操做,可是隊列中不能同時存在兩種不一樣的操做,由於不一樣的操做會觸發隊列進行配對(操做出隊)。this

    知道了這些咱們再來看xfer方法的大體流程(超時等待部分和操做取消部分暫不分析,等分析源碼時在說):當隊列爲空時,若是有一個線程執行take操做,此時對列中是沒有對應的put操做與之匹配的,那麼這個take操做就會入隊,同時阻塞(也多是自旋)執行這個操做的線程以等待匹配操做的到來;同理,空隊列時來的是一個put操做,那麼這個put操做也要入隊阻塞等待匹配的take操做到來。而當隊列不爲空時(假設隊列中都是take操做),某一線程執行put操做,此時隊列檢測到來了一個與隊列中存放的操做相匹配的操做,那麼就會將隊首操做與到來的操做進行匹配,匹配成功,就會喚醒隊首操做所在的線程,同時將已經匹配度額操做移除出隊;而如果某一線程執行的是與隊裏中相同的操做,那麼就將該操做直接添加到隊尾。

//1.當e!=null 且haveData爲true,how爲ASYNC,nanos==0,表示沒有超時設置的當即返回的放入數據的操做(put,add,offer)
//2.當e==null 且haveData爲false,how爲SYNC,nanos==0,表示沒有超時設置的等待匹配到放入數據的操做(take)
//3.當e!=null 且haveData爲true,how爲SYNC,nanos==0,表示沒有超時設置的等待匹配到取出數據的操做(transfer)
//4.當e!=null 且haveData爲true,how爲TIMED,nanos>0,表示設置在超時等待時間內匹配取出數據的
//操做(tryTransfer(E e, long timeout, TimeUnit unit))
//5.當e==null 且haveData爲false,how爲TIMED,nanos>0,表示設置在超時等待時間內匹配放入數據的
//操做(poll(long timeout, TimeUnit unit))
//6.當e!=null 且haveData爲true,how爲NOW,nanos==0,表示當即匹配取出數據的操做(tryTransfer)
//7.當e==null 且haveData爲false,how爲NOW,nanos==0,表示當即匹配放入數據的操做(poll)
private E xfer(E e, boolean haveData, int how, long nanos) {

    //判斷本次操做是否是放入數據的操做類型,如果則e不能爲null
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race

        //從head開始匹配
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean isData = p.isData;    //獲取隊首結點的操做類型
            Object item = p.item;    //獲取隊首的數據

            //判斷隊首結點是否已經被取消或匹配過了且隊首結點的操做類型與其數據內容是否一致
            //isData爲true對應put操做,則item不能爲null,反之item必須爲null。
            //所以,如果不一致說明p已經不是隊首了,須要從新查找隊首
            if (item != p && (item != null) == isData) { // unmatched

                //此處判斷本次操做應該是入隊操做仍是匹配操做,即判斷與隊首的操做類型是否一致
                //若本次操做與隊列中的操做類型(都是put或都是take)相同,那麼須要將本次操做入隊
                //如果不一樣,那麼須要將隊首結點的操做與本次操做匹配
                if (isData == haveData)   // can't match
                    break;    //操做類型相同,退出當前循環,去執行入隊步驟

                //到此,說明隊首操做與本次操做時相互匹配的,那麼接下來須要作配對以後的工做
                //嘗試修改p中數據item爲e,若修改爲功,說明操做匹配成功
                //如果修改失敗,說明別其餘線程搶先匹配了,那麼就往隊列後繼續查找匹配
                if (p.casItem(item, e)) { // match

                    //本次操做已經與p匹配成功,那麼p以前的結點要麼是被匹配過,要麼已經被取消
                    //都不能再作爲head了,所以,這裏須要將head更新
                    for (Node q = p; q != h;) {
                        //獲取後繼結點
                        Node n = q.next;  // update by 2 unless singleton

                        //判斷head是否仍是h,如果,說明head還沒被其餘線程更新過,那當前線程能夠嘗試更新
                        //如果更新成功,說明h結點已經被移除出隊了,那麼就須要將其後繼指針指向自身表明
                        //這個結點已被移除出隊,方便GC回收
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry

                        //到這說明head已經被更新過,或是當前線程要更新head失敗,那麼就從新獲取head並判斷
                        //head是否爲null(是不是空隊列),如果則直接結束循環;若不是,再繼續判斷其後繼結點
                        //是否爲null(head是不是隊列中最後一個結點),如果也直接結束循環(不須要再繼續嘗試
                        //更新head);若不是,在判斷這個後繼結點是否已經匹配過,若未匹配,那麼也放棄更新head
                        //這裏能夠總結出,head更新的要求:head不會隨着隊首被匹配就當即更新,head的更新會滯後
                        //只有當head及其後繼都被匹配後,纔會對head進行匹配;也就是說隊列中要有至少兩個結點匹配過
                        //會觸發head的更新(即鬆弛度>2才更新head)
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);    //p結點匹配成功,喚醒等待該結點的線程
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            //往p的後繼繼續查找未匹配的結點
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        //到這說明隊列中的操做與本次操做相同,只能將操做入隊
        //判斷本次操做的模式,NOw爲不等待,當即返回的模式
        if (how != NOW) {                 // No matches available

            //s未初始化的話,進行初始化
            if (s == null)
                s = new Node(e, haveData);
            Node pred = tryAppend(s, haveData);    //嘗試添加到隊尾,並返回其前驅
            if (pred == null)    //返回前驅爲null,說明添加失敗,從新開始
                continue retry;           // lost race vs opposite mode

            //添加的結點不爲異步模式,說明是同步或超時模式,那麼要等待匹配
            //若爲異步模式,則不須要等待匹配,由於異步模式必然是add,offer,put
            //三個方法,不須要等待
            if (how != ASYNC)    
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

//結點自環
final void forgetNext() {
    UNSAFE.putObject(this, nextOffset, this);
}

//嘗試添加結點到隊尾
private Node tryAppend(Node s, boolean haveData) {

    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail

        //判斷隊列是否爲空,爲空的話,直接更新head結點爲s後結束,即空隊列直接入隊
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;                 // initialize
        }

        //判斷結點是否符合入隊要求,即驗證s結點的操做類型與p是否相同
        else if (p.cannotPrecede(haveData))
            return null;                  //不一樣,直接返回null

        //判斷p是否有後繼,新增結點是要添加到隊尾的,而tail是可能滯後於隊尾的,
        //且其餘線程也可能搶先更新隊尾,所以若p的後繼不爲null,說明當前p不是真正的
        //隊尾,須要推動查找隊尾。
        else if ((n = p.next) != null)
            p = p != t && t != (u = tail) ? (t = u) : (p != n) ? n : null;  

        //n爲null,說明找到隊尾了,此時須要將p的後繼更新成s,如果更新失敗說明有其
        //它線程搶先了,那麼就從新獲取隊尾,再嘗試    
        else if (!p.casNext(null, s))
            p = p.next;                   // 繼續查找隊尾

        //成功將s入隊
        else {

            //此時,若p不等t,說明t不是隊尾,能夠看看tail需不須要更新
            if (p != t) {                 // update if slack now >= 2

                /**
                * 判斷是否須要更新tail,如果當前的tail離
                * 真正的隊尾不超過2個結點,那就暫時不更新tail
                * 如果超過的話,就更新tail
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

//阻塞或自旋等待匹配
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {

    //計算截止時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();    //獲取當前線程
    int spins = -1; // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    //死循環
    for (;;) {
        Object item = s.item;    //s結點的數據

        //判斷s是否被匹配過,未匹配時item==e,匹配過或取消後item就會改變
        if (item != e) {                  // matched
            // assert item != s;

            //s被匹配過,那麼須要將item設爲s自己,且waiter要恢復成null
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);    //返回數據
        }

        //判斷s所在的線程是否被中斷過或者超時時間是否到了,那麼就須要取消本次s
        //結點對應的操做了(將s.item設爲s)
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            unsplice(pred, s);    //將s移除出隊列
            return e;
        }

        //下面都是進行的超時阻塞或者自旋操做
        //判斷自旋次數是否初始化過    
        if (spins < 0) {                  // establish spins at/near front

            //初始化自旋次數,即計算自旋次數
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            --spins;    //自旋次數遞減
            // 生成隨機數來讓出CPU時間
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            s.waiter = w;                 //設置等待線程
        }
        else if (timed) {
            //計算超時等待時間
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);    //超時阻塞
        }
        else {
            LockSupport.park(this);    //阻塞
        }
    }
}

//將s結點移除出隊列,即解除和前驅結點的連接
final void unsplice(Node pred, Node s) {

    //將s.item設爲s自己,且waiter要恢復成null
    s.forgetContents(); // forget unneeded fields

    //當s的前驅不爲null,且前驅與s不相同的條件下才能進行解除連接
    if (pred != null && pred != s && pred.next == s) {
        Node n = s.next;    //獲取s結點的後繼

        //判斷s是否有後繼(s是否爲隊尾),若s有後繼那麼後繼是否是s自己(s是否已匹配
        //或取消了),若後繼不是s自身,那麼就嘗試將pred的後繼結點更新成s的後繼n,若
        //是更新成功,再判斷pred是否已經被匹配過或取消了
        if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) {

            //更新head
            for (;;) {               // check if at, or could be, head
                Node h = head;    //獲取head

                //h爲pred或s或隊列已空,那就不須要更新了,直接返回
                if (h == pred || h == s || h == null)
                    return;          // at head or list empty

                //如果h未被匹配過,說明不須要更新,退出當前循環
                if (!h.isMatched())
                    break;
                //獲取h的後繼
                Node hn = h.next;
                if (hn == null)
                    return;          // now empty
                //只要h未被匹配或取消,就嘗試更新head
                if (hn != h && casHead(h, hn))
                    //將h結點移除出隊,h.next==h
                    h.forgetNext();  // advance head
            }

            //s節點被移除後,須要記錄刪除的操做次數,若是超過閥值,則須要清理隊列
            if (pred.next != pred && s.next != s) {     // 從新檢查移除是否成功
                for (;;) {           // sweep now if enough votes
                    int v = sweepVotes;    //返回當前刪除次數的記錄
                
                    //判斷是否超過閾值,沒超過就更新記錄,超過就將記錄恢復爲0
                    //而且清理隊列
                    if (v < SWEEP_THRESHOLD) {
                        if (casSweepVotes(v, v + 1))
                            break;
                    }
                    else if (casSweepVotes(v, 0)) {
                        sweep();    //清理隊列
                        break;
                    }
                }
            }
        }
    }
}

//清理隊列
private void sweep() {

    //遍歷隊列
    for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
        //判斷s是否被匹配過,未被匹配過就繼續向後遍歷
        if (!s.isMatched())
            // Unmatched nodes are never self-linked
            p = s;

        //s節點被匹配,可是是尾節點,則退出循環,隊尾就算被匹配了也不能直接
        //移除
        else if ((n = s.next) == null) // trailing node is pinned
            break;

        //判斷s是否已經被移除了,如果,則從新從head開始清理
        else if (s == n)    // stale
            // No need to also check for p == s, since that implies s == n
            p = head;
        else
            p.casNext(s, n);    //移除s出隊列
    }
}

    以上就是xfer的所有過程了,一個xfer方法直接包含了LinkedTransferQueue的全部功能,不只add,put,offer方法是由其實現的,其餘的如poll,take,transfer,tryTransfer方法也均是由其實現的,只不過參數不一樣:

public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}


public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}


public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}


public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}


public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}


public E poll() {
    return xfer(null, false, NOW, 0);
}

6.LinkedTransferQueue中主要方法流程

    1.offer,add,put三個異步放入數據的操做的大體過程以下:

 

    2.take同步取出數據的大體流程以下:

 

 

    3.transfer同步放入數據的流程大體以下:

 

    4.tryTransfer帶超時設置的放入數據的流程大體以下:

 

    5.poll帶超時設置的取出數據的流程大體以下:

 

    6.tryTransfer不作任何等待放入數據(只作一次放入數據的嘗試,失敗直接結束)的流程大體以下:

 

    7.poll只進行一次取出數據的操做,失敗直接返回null,大體過程以下:

 

7.其餘方法

//查找隊列中第一個item不爲null或結點自己的結點,將其item返回,不移除出隊列
public E peek() {
    return firstDataItem();
}

//遍歷隊列查找item不爲null也不指向結點自身的結點,返回其item
private E firstDataItem() {

    //遍歷隊列
    for (Node p = head; p != null; p = succ(p)) {
        Object item = p.item;
        
        if (p.isData) {
            if (item != null && item != p)
                return LinkedTransferQueue.<E>cast(item);
        }
        else if (item == null)
            return null;
    }
    return null;
}

//獲取結點的後繼,若後繼爲自身(即已被移除出隊),那麼返回head
final Node succ(Node p) {
    Node next = p.next;
    return (p == next) ? head : next;
}

//返回隊列中的結點數
public int size() {
    return countOfMode(true);
}

//計算結點數
private int countOfMode(boolean data) {
    int count = 0;

    //遍歷隊列計算有效的結點數
    for (Node p = head; p != null; ) {
        if (!p.isMatched()) {
            if (p.isData != data)
                return 0;
            if (++count == Integer.MAX_VALUE) // saturated
                break;
        }
        Node n = p.next;
        if (n != p)
            p = n;
        else {
            count = 0;
            p = head;
        }
    }
    return count;
}

//刪除隊列查詢到的第一個item爲o的結點
public boolean remove(Object o) {
    return findAndRemove(o);
}

private boolean findAndRemove(Object e) {
    if (e != null) {

        //遍歷隊列查找刪除
        for (Node pred = null, p = head; p != null; ) {
            Object item = p.item;
            if (p.isData) {

                //判斷結點是不是須要刪除的數據
                if (item != null && item != p && e.equals(item) &&
                    p.tryMatchData()) {
                    unsplice(pred, p);    //將結點移除出隊列
                    return true;
                }
            }
            else if (item == null)    //item爲null,說明隊列中都是取出數據的操做,不可能有e了
                break;
            pred = p;
            if ((p = p.next) == pred) { // stale
                pred = null;
                p = head;
            }
        }
    }
    return false;
}
相關文章
相關標籤/搜索