Java併發包源碼學習系列:阻塞隊列實現之LinkedTransferQueue源碼解析

系列傳送門:html

LinkedTransferQueue概述

LinkedTransferQueue在JDK1.7版本誕生,是由鏈表組成的無界TransferQueue,相對於其餘阻塞隊列,多了tryTransfer和transfer方法。java

TransferQueue:生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事)。新添加的transfer方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程transfer到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建Java內存模型中的happens-before關係的方式)。node

http://cs.oswego.edu/pipermail/concurrency-interest/2009-February/005888.html算法

Doug Lea評價TransferQueue是ConcurrentLinkedQueue、SynchronousQueue(在公平模式下)、無界的LinkedBlockingQueue等的超集,功能十分強大,最重要的是,它的實現也更加的高效。編程

總結:基於無鎖CAS方式實現的無界FIFO隊列安全

TransferQueue

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    //...
}

LinkedTransferQueue不一樣於其餘的阻塞隊列,它實現了TransferQueue接口,這必定是核心所在,咱們直接來看看接口定義的方法規範:併發

// 繼承了BlockingQueue接口,並增長若干新方法
public interface TransferQueue<E> extends BlockingQueue<E> {
    /**
     * 將元素 傳給等待的消費者【若是有的話】, 返回true, 若是不存在,返回false,不入隊。
     */
    boolean tryTransfer(E e);

    /**
     * 將元素傳遞給等待的消費者【若是有的話】, 若是沒有,則將e插入隊列尾部,
     * 會一直等待,直到它被消費者接收
     */
    void transfer(E e) throws InterruptedException;

    /**
     * 在transfer的基礎上,增長了超時操做,時間到了尚未被消費的話,返回false,並移除元素
     */
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 若是存在消費者線程,返回true
     */
    boolean hasWaitingConsumer();

    /**
     * 獲得等待獲取元素的消費者線程的數量
     */
    int getWaitingConsumerCount();
}

類圖結構及重要字段

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -3223113410248163686L;

    /** 是否爲多核處理器 */
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;

    /**
     * 當一個節點目前是隊列的第一個waiter時,阻塞前的自旋次數
     */
    private static final int FRONT_SPINS   = 1 << 7;

    /**
     * 前驅節點正在處理,當前節點須要自旋的次數
     */
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    /**
     * 
     */
    static final int SWEEP_THRESHOLD = 32;
    
    // 隊列中的節點
    static final class Node {...}
    
    // 頭節點
    transient volatile Node head;

    /** 尾指針,注意可能不是最後一個節點,初始化爲null */
    private transient volatile Node tail;

    /** 刪除節點失敗的次數 */
    private transient volatile int sweepVotes;
    
    /*
     * xfer方法中使用,定義how,解釋很清楚了,每一個變量對應不一樣的方法
     */
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

有耐心的同窗其實能夠看一下javadoc的介紹,LinkedTransferQueue使用的隊列結構實際上是這樣的:是slack dual queue,他和普通的M&S dual queue的區別在於,它不會每次操做的時候都更新head或tail,而是保持有針對性的slack懈怠,因此它的結構多是下面這樣,tail指針指向的節點未必就是最後一個節點。app

head           tail
         |              |
         v              v
         M -> M -> U -> U -> U -> U

Node節點

Node節點的結構其實和SynchronousQueue公平模式差不太多,這一次看起來就比較清晰了,這邊再總結一下,主要包含幾個部分:幾個重要字段,以及一些CAS方法。less

static final class Node {
        final boolean isData;   // isData == true表示存數據,不然爲獲取數據
        volatile Object item;   // 存數據,item非null, 獲取數據,匹配後,item爲null
        volatile Node next; // next域
        volatile Thread waiter; // 等待線程

        // CAS操做next域 若是next爲cmp,則變爲val
        final boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
		// CAS操做item域,若是item爲cmp,變爲val
        final boolean casItem(Object cmp, Object val) {
            // assert cmp == null || cmp.getClass() != Node.class;
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        // 構造器
        Node(Object item, boolean isData) {
            UNSAFE.putObject(this, itemOffset, item); // relaxed write
            this.isData = isData;
        }

        // 將next指向自身this
        final void forgetNext() {
            UNSAFE.putObject(this, nextOffset, this);
        }
		// 匹配或取消節點調用
        final void forgetContents() {
            UNSAFE.putObject(this, itemOffset, this);
            UNSAFE.putObject(this, waiterOffset, null);
        }

        /**
         * 判斷節點是否已經匹配,匹配取消也爲true
         */ 
        final boolean isMatched() {
            Object x = item;
            return (x == this) || ((x == null) == isData);
        }

        /**
         * 是否爲一個未匹配的請求 item爲null表示未匹配
         */
        final boolean isUnmatchedRequest() {
            return !isData && item == null;
        }

        /**
         * 若是給定的節點不能掛到當前節點後面,則返回true
         */
        final boolean cannotPrecede(boolean haveData) {
            boolean d = isData;
            Object x;
            return d != haveData && (x = item) != this && (x != null) == d;
        }

        /**
         * 嘗試去匹配一個數據節點
         */
        final boolean tryMatchData() {
            // assert isData;
            Object x = item;
            if (x != null && x != this && casItem(x, null)) {
                LockSupport.unpark(waiter);
                return true;
            }
            return false;
        }

        private static final long serialVersionUID = -3375979862319811754L;

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
        private static final long waiterOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
                waiterOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiter"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

前置:xfer方法的定義

咱們接下來將會介紹LinkedTransferQueue提供的各類操做,他們都會調用一個方法:xfer。dom

這裏咱們暫且不談具體的實現,咱們只須要知道一下這個方法的四個入參分別是什麼意思。

/**
     * xfer方法實現了全部的隊列方法
     *
     * @param e take操做傳入null, 不然傳入具體元素
     * @param haveData put操做爲true, take操做爲false
     * @param how NOW, ASYNC, SYNC, or TIMED  不一樣字段,先從名稱上猜想一下他們的大意
     * @param nanos 若是是TIMED模式,也就是具備超時機制的方法啦,具體超時的時間
     * @return an item if matched, else e  返回匹配的元素,不然返回e
     * @throws NullPointerException 插入null值拋出空指針異常: haveData==true && e == null
     */
    private E xfer(E e, boolean haveData, int how, long nanos) {
    	//     
    }

接下來咱們將分幾類來分別看一下各類操做的定義。

隊列操做三大類

插入元素put、add、offer

LinkedTransferQueue是無界的,下面三個插入方法不會阻塞,他們都調用了xfer方法,傳入元素e,havaData爲true,how字段類型都爲SYNC

public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }

    public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

獲取元素take、poll

// take
	public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

	// timed poll
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }

	// untimed poll
    public E poll() {
        return xfer(null, false, NOW, 0);
    }

一樣的,獲取元素的方法也都調用了xfer方法,他們都傳入null,havaData都爲false,可是傳入的how字段類型不一樣:

  • take方法傳入SYNC。
  • 超時機制的poll傳入TIMED,所以須要設置nanos。
  • 普通的poll傳入NOW。

transfer、tryTransfer

public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }

    public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }

    public boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

xfer三大流程

xfer方法的實現,做者已經在註釋中說的十分清楚啦,這邊簡單看下三個核心步驟,細節部分下面會學習。

一、Try to match an existing node 嘗試去匹配一個節點

二、Try to append a new node (method tryAppend) 嘗試將一個節點入隊,對應tryAppend方法

三、Await match or cancellation (method awaitMatch) 阻塞等待一個節點被匹配或取消,對應awaitMatch方法

xfer

這個方法必然是核心方法了,畢竟它能夠實現隊列中提供的全部操做。

private E xfer(E e, boolean haveData, int how, long nanos) {
        // 若是 是插入的數據爲null, 則NPE
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry:
        for (;;) {                            // restart on append race
			// 第一次插入數據的時候,不會進入這個循環,由於p == null
            // 不然進入這個循環,從head首節點開始
            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                // 找到還未匹配的節點:  isData的item應該是爲非null, 若是是null代表用過了
                if (item != p && (item != null) == isData) { // unmatched
                    // 節點類型和當前類型一致,沒法匹配
                    if (isData == haveData)   // can't match
                        break;
                    // 將參數加入到item域,
                    if (p.casItem(item, e)) { // match
                        // 下面這個for循環,是匹配item以後進行的額外操做,
                        // 好比將head更新爲當前這個點
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        // 阻塞線程
                        LockSupport.unpark(p.waiter);
                        // 返回item值
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                // 若是節點已經匹配過了,向後
                Node n = p.next;
                // p != n的狀況很簡單,將p移到n的位置, p==n表示什麼呢?
                // 其實若是p.next == p 說明p節點已經被其餘線程處理,那麼p就從head開始
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }
			// 尚未找到能夠匹配的點的話,會走到這
            // 這裏 若是 how 字段傳入爲 NOW ,便不會走裏面的邏輯,
            // 也就是說untimed poll、 tryTransfer 不須要將元素入隊
            if (how != NOW) {                 // No matches available
                // 這裏構造一個節點
                if (s == null)
                    s = new Node(e, haveData);
                // 初始化以後,調用tryAppend入隊, 返回前驅節點
                Node pred = tryAppend(s, haveData);
                // pred == null表示競爭失敗,返回到retry的地方
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                // 若是是ASYNC會跳過這裏,馬上返回e,不須要阻塞
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }

核心流程:

  1. 從頭開始日後找,跳過已經匹配過的節點,直到找到mode相反的節點,進行匹配並返回。若是須要的話,能夠額外改變head的指向。
  2. 若是沒有找到能夠匹配的點呢? 那就判斷是否是NOW,若是是NOW的話,直接返回【untimed poll, tryTransfer】。
  3. 若是不是NOW,那就構建一個節點,入隊,若是是ASYNC就直接返回【offer, put, add】,其餘狀況須要阻塞等待匹配。

直接上圖吧:

tryAppend

tryAppend包含入隊的邏輯,返回前驅節點。代碼充分考慮到併發狀況,仍是比較難懂的,若是要看明白,能夠在圖上畫一畫節點的變化。

private Node tryAppend(Node s, boolean haveData) {
        for (Node t = tail, p = t;;) {        // move p to last node and append
            Node n, u;                        // temps for reads of next & tail
            // p == null && head == null  表示此時隊頭還未初始化
            if (p == null && (p = head) == null) {
                // cas設置s爲隊頭
                if (casHead(null, s))
                    return s;                 // initialize
            }
            // 這裏檢測到異常狀況,返回null,以後會continue retry;
            else if (p.cannotPrecede(haveData))
                return null;                  // lost race vs opposite mode
            // 這裏就是p一直找到tail的位置,
            else if ((n = p.next) != null)    // not last; keep traversing
                // 這段... 吐槽一下
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
            // 嘗試將s插到隊尾,若是失敗,說明其餘線程先插了,那麼p向後移,重新開始
            else if (!p.casNext(null, s))
                p = p.next;                   // re-read on CAS failure
            else {
                if (p != t) {                 // update if slack now >= 2
                    // 這裏會設置s爲tail,若是成功的話,就退出循環了,
                    // 若是失敗的話,會進行後面的判斷,一開始tail其實都是null的
                    // 
                    while ((tail != t || !casTail(t, s)) &&
                           (t = tail)   != null &&
                           (s = t.next) != null && // advance and retry
                           (s = s.next) != null && s != t);
                }
                // 返回加入節點的前驅節點
                return p;
            }
        }
    }

該方法從當前的tail開始,找到實際的最後一個節點【前面說了,tail可能不是最後一個節點】,並嘗試追加一個新的節點【若是head爲null,則創建第一個節點】。

成功追加後,若是how爲ASYNC,則返回。

注意:僅當它的前面節點都已經匹配或mode相同時,才能夠追加節點。若是檢測到其餘的狀況,咱們須要直接返回null,從新啓動retry。

awaitMatch

awaitMatch方法其實和SynchronousQueue的awaitFulfill邏輯差很少,線程會有三種狀況:spins/yield/blocks,直到node s被匹配或取消。

On multiprocessors, we use front-of-queue spinning: If a node appears to be the first unmatched node in the queue, it spins a bit before blocking.

若是一個節點可能會優先被匹配呢,它會優先選擇自旋而不是阻塞,自旋次數到了才阻塞,主要是考慮到阻塞、喚醒須要消耗更多的資源。這邊簡單過一下:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        // 自旋次數
        int spins = -1; // initialized after first item and cancel checks
        // 這裏是線程安全的Random
        ThreadLocalRandom randomYields = null; // bound if needed

        for (;;) {
            Object item = s.item;
            // 
            if (item != e) {                  // matched
                // assert item != s;
                s.forgetContents();           // avoid garbage
                return LinkedTransferQueue.<E>cast(item);
            }
            // 若是中斷或超時 ,就cas設置s的item爲e
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                    s.casItem(e, s)) {        // cancel
                // 斷開
                unsplice(pred, s);
                return e;
            }
			// 計算自旋次數
            if (spins < 0) {                  // establish spins at/near front
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            }
            else if (spins > 0) {             // spin
                --spins;
                // 這裏做者提示:雖然偶爾執行yield的收益不是很明顯
                // 但仍限制了 自旋對busy system 的影響
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            }
            // 設置一下waiter線程,標記一下誰在等
            else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            }
            // 超時阻塞
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos > 0L)
                    LockSupport.parkNanos(this, nanos);
            }
            // 自旋完仍是沒有匹配,就park住
            else {
                LockSupport.park(this);
            }
        }
    }

LinkedTransferQueue使用案例

最後,來看個簡單的案例吧。

/**
 * @author Summerday
 */
public class TestTransferQueue {

    // 無鎖算法 無界隊列
    static TransferQueue<Integer> queue = new LinkedTransferQueue<>();

    public static void main (String[] args) {

        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "消費 id - " + queue.take());
                    System.out.println("---------------------------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "consumer" + i).start();
        }

        Thread producer = new Thread(() -> {
            while (true) {
                System.out.println("當前隊列中等待的線程" + queue.getWaitingConsumerCount());
                // 若是隊列中有等待的消費者
                if (queue.hasWaitingConsumer()) {
                    int product = new Random().nextInt(500);
                    try {
                        System.out.println(Thread.currentThread().getName() + "生產 id - " + product);
                        queue.tryTransfer(product);
                        TimeUnit.MILLISECONDS.sleep(100); // 等待消費
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "producer");
        producer.setDaemon(true);
        producer.start();
    }
}
// 打印結果:
當前隊列中等待的線程10
producer生產 id - 266
consumer1消費 id - 266
---------------------------------------------
當前隊列中等待的線程9
producer生產 id - 189
consumer2消費 id - 189
---------------------------------------------
//... 省略

總結

LinkedTransferQueue在JDK1.7版本誕生,是由鏈表組成的無界TransferQueue,相對於其餘阻塞隊列,不只多了tryTransfer和transfer方法,並且性能方面也有巨大的提高。

LinkedTransferQueue使用的隊列結構是slack dual queue,不會每次操做的時候都更新head或tail,而是保持有針對性的slack懈怠。

LinkedTransferQueue的全部隊列操做都基於xfer方法,具體狀況根據傳入的how字段決定:NOW節點不入隊,ASYNC節點入隊但會當即返回,SYNC節點入隊且阻塞,TIMED對應超時機制。

xfer的實現分爲三個流程:

  1. Try to match an existing node 嘗試去匹配一個未匹配過的節點。
  2. Try to append a new node (method tryAppend) 嘗試將一個節點入隊,對應tryAppend方法。
  3. Await match or cancellation (method awaitMatch) 阻塞等待一個節點被匹配或取消,對應awaitMatch方法。

最後:具體步驟能夠查看上面的解析,若有不足,望評論區指教。

參考閱讀

相關文章
相關標籤/搜索