SynchronousQueue原理詳解-公平模式

1、介紹

SynchronousQueue是一個雙棧雙隊列算法,無空間的隊列或棧,任何一個對SynchronousQueue寫須要等到一個對SynchronousQueue的讀操做,反之亦然。一個讀操做須要等待一個寫操做,至關因而交換通道,提供者和消費者是須要組隊完成工做,缺乏一個將會阻塞線程,知道等到配對爲止。java

SynchronousQueue是一個隊列和棧算法實現,在SynchronousQueue中雙隊列FIFO提供公平模式,而雙棧LIFO提供的則是非公平模式。node

對於SynchronousQueue來講,他的put方法和take方法都被抽象成統一方法來進行操做,經過抽象出內部類Transferer,來實現不一樣的操做。算法

注意事項:本文分析主要是針對jdk1.8的版本進行分析,下面的代碼中的線程執行順序可能並不能徹底保證順序性,執行時間比較短,因此暫且認定有序執行。app

約定:圖片中以Reference-開頭的表明對象的引用地址,經過箭頭方式進行引用對象。函數

Transferer.transfer方法主要介紹以下所示:oop

abstract static class Transferer<E> {
    /** * 執行put和take方法. * * @param e 非空時,表示這個元素要傳遞給消費者(提供者-put); * 爲空時, 則表示當前操做要請求消費一個數據(消費者-take)。 * offered by producer. * @param timed 決定是否存在timeout時間。 * @param nanos 超時時長。 * @return 若是返回非空, 表明數據已經被消費或者正常提供; 若是爲空, * 則表示因爲超時或中斷致使失敗。可經過Thread.interrupted來檢查是那種。 */
    abstract E transfer(E e, boolean timed, long nanos);
}
複製代碼

接下來看一下SynchronousQueue的字段信息:源碼分析

/** CPU數量 */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

/** * 自旋次數,若是transfer指定了timeout時間,則使用maxTimeSpins,若是CPU數量小於2則自旋次數爲0,不然爲32 * 此值爲經驗值,不隨CPU數量增長而變化,這裏只是個常量。 */
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

/** * 自旋次數,若是沒有指定時間設置,則使用maxUntimedSpins。若是NCPUS數量大於等於2則設定爲爲32*16,不然爲0; */
static final int maxUntimedSpins = maxTimedSpins * 16;

/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */
static final long spinForTimeoutThreshold = 1000L;
複製代碼
  • NCPUS:表明CPU的數量
  • maxTimedSpins:自旋次數,若是transfer指定了timeout時間,則使用maxTimeSpins,若是CPU數量小於2則自旋次數爲0,不然爲32,此值爲經驗值,不隨CPU數量增長而變化,這裏只是個常量。
  • maxUntimedSpins:自旋次數,若是沒有指定時間設置,則使用maxUntimedSpins。若是NCPUS數量大於等於2則設定爲爲32*16,不然爲0;
  • spinForTimeoutThreshold:爲了防止自定義的時間限過長,而設置的,若是設置的時間限長於這個值則取這個spinForTimeoutThreshold 爲時間限。這是爲了優化而考慮的。這個的單位爲納秒。

公平模式-TransferQueue

TransferQueue內部是如何進行工做的,這裏先大體講解下,隊列採用了互補模式進行等待,QNode中有一個字段是isData,若是模式相同或空隊列時進行等待操做,互補的狀況下就進行消費操做。優化

入隊操做相同模式ui

不一樣模式時進行出隊列操做:

這時候來了一個isData=false的互補模式,隊列就會變成以下狀態: this

TransferQueue繼承自Transferer抽象類,而且實現了transfer方法,它主要包含如下內容:

QNode

表明隊列中的節點元素,它內部包含如下字段信息:

  1. 字段信息描述
字段 描述 類型
next 下一個節點 QNode
item 元素信息 Object
waiter 當前等待的線程 Thread
isData 是不是數據 boolean
  1. 方法信息描述
方法 描述
casNext 替換當前節點的next節點
casItem 替換當前節點的item數據
tryCancel 取消當前操做,將當前item賦值爲this(當前QNode節點)
isCancelled 若是item是this(當前QNode節點)的話就返回true,反之返回false
isOffList 若是已知此節點離隊列,判斷next節點是否是爲this,則返回true,由於因爲* advanceHead操做而忘記了其下一個指針。
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */

    QNode s = null; // constructed/reused as needed
  	// 分爲兩種狀態1.有數據=true 2.無數據=false
    boolean isData = (e != null); 
		// 循環內容
    for (;;) {
      	// 尾部節點。
        QNode t = tail;
      	// 頭部節點。
        QNode h = head;
      	// 判斷頭部和尾部若是有一個爲null則自旋轉。
        if (t == null || h == null)         // 還未進行初始化的值。
            continue;                       // 自旋
				// 頭結點和尾節點相同或者尾節點的模式和當前節點模式相同。
        if (h == t || t.isData == isData) { // 空或同模式。
          	// tn爲尾節點的下一個節點信息。
            QNode tn = t.next;
          	// 這裏我認爲是閱讀不一致,緣由是當前線程尚未阻塞的時候其餘線程已經修改了尾節點tail會致使當前線程的tail節點不一致。
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // 這裏若是指定timed判斷時間小於等於0直接返回。
                return null;
          	// 判斷新增節點是否爲null,爲null直接構建新節點。
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // 若是next節點不爲null說明已經有其餘線程進行tail操做
                continue;
						// 將t節點替換爲s節點
            advanceTail(t, s);             
          	// 等待有消費者消費線程。
            Object x = awaitFulfill(s, e, timed, nanos);
          	// 若是返回的x,指的是s.item,若是s.item指向本身的話清除操做。
            if (x == s) {
                clean(t, s);
                return null;
            }
						// 若是沒有取消聯繫
            if (!s.isOffList()) {          
              	// 將當前節點替換頭結點
                advanceHead(t, s);          // unlink if head
                if (x != null)              // 取消item值,這裏是take方法時會進行item賦值爲this
                    s.item = s;
              	// 將等待線程設置爲null
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
          	// 獲取頭結點下一個節點
            QNode m = h.next;               // node to fulfill
          	// 若是當前線程尾節點和全局尾節點不一致,從新開始
          	// 頭結點的next節點爲空,表明無下一個節點,則從新開始,
          	// 當前線程頭結點和全局頭結點不相等,則從新開始
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
    				if (isData == (x != null) ||    // 若是x=null說明已經被讀取了。
        				x == m ||                   // x節點和m節點相等說明被中斷操做,被取消操做了。
        				!m.casItem(x, e)) {         // 這裏是將item值設置爲null
        				advanceHead(h, m);          // 移動頭結點到頭結點的下一個節點
        				continue;
    				}
          
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}
複製代碼

咱們來看一下awaitFulfill方法內容:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
   	// 若是指定了timed則爲System.nanoTime() + nanos,反之爲0。
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
  	// 獲取當前線程。
    Thread w = Thread.currentThread();
  	// 若是頭節點下一個節點是當前s節點(以防止其餘線程已經修改了head節點)
  	// 則運算(timed ? maxTimedSpins : maxUntimedSpins),不然直接返回。
  	// 指定了timed則使用maxTimedSpins,反之使用maxUntimedSpins
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
  	// 自旋
    for (;;) {
      	// 判斷是否已經被中斷。
        if (w.isInterrupted())
          	//嘗試取消,將當前節點的item修改成當前節點(this)。
            s.tryCancel(e);
      	// 獲取當前節點內容。
        Object x = s.item;
      	// 判斷當前值和節點值不相同是返回,由於彈出時會將item值賦值爲null。
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}
複製代碼
  1. 首先先判斷有沒有被中斷,若是被中斷則取消本次操做,將當前節點的item內容賦值爲當前節點。
  2. 判斷當前節點和節點值不相同是返回
  3. 將當前線程賦值給當前節點
  4. 自旋,若是指定了timed則使用LockSupport.parkNanos(this, nanos);,若是沒有指定則使用LockSupport.park(this);
  5. 中斷相應是在下次才能被執行。

經過上面源碼分析咱們這裏作出簡單的示例代碼演示一下put操做和take操做是如何進行運做的,首先看一下示例代碼,以下所示:

/** * SynchronousQueue進行put和take操做。 * * @author battleheart */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
      
        thread1.start();
        Thread.sleep(2000);
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();
        Thread.sleep(10000);
      
        Thread thread3 = new Thread(() -> {
            try {
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread3.start();
    }
}
複製代碼

首先上來以後進行的是兩次put操做,而後再take操做,默認隊列上來會進行初始化,初始化的內容以下代碼所示:

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}
複製代碼

初始化後隊列的狀態以下圖所示:

當線程1執行put操做時,來分析下代碼:

QNode t = tail;
QNode h = head;
if (t == null || h == null)         // saw uninitialized value
    continue;      
複製代碼

首先執行局部變量t表明隊尾指針,h表明隊頭指針,判斷隊頭和隊尾不爲空則進行下面的操做,接下來是if…else語句這裏是分水嶺,當相同模式操做的時候執行if語句,當進行不一樣模式操做時執行的是else語句,程序是如何控制這樣的操做的呢?接下來咱們慢慢分析一下:

if (h == t || t.isData == isData) { // 隊列爲空或者模式相同時進行if語句
    QNode tn = t.next;
    if (t != tail)                  // 判斷t是不是隊尾,不是則從新循環。
        continue;
    if (tn != null) {               // tn是隊尾的下個節點,若是tn有內容則將隊尾更換爲tn,而且從新循環操做。
        advanceTail(t, tn);
        continue;
    }
    if (timed && nanos <= 0)        // 若是指定了timed而且延時時間用盡則直接返回空,這裏操做主要是offer操做時,由於隊列無存儲空間的當offer時不容許插入。
        return null;
    if (s == null)									// 這裏是新節點生成。
        s = new QNode(e, isData);
    if (!t.casNext(null, s))        // 將尾節點的next節點修改成當前節點。
        continue;

    advanceTail(t, s);              // 隊尾移動
    Object x = awaitFulfill(s, e, timed, nanos);	//自旋而且設置線程。
    if (x == s) {                   // wait was cancelled
        clean(t, s);
        return null;
    }

    if (!s.isOffList()) {           // not already unlinked
        advanceHead(t, s);          // unlink if head
        if (x != null)              // and forget fields
            s.item = s;
        s.waiter = null;
    }
    return (x != null) ? (E)x : e;

}
複製代碼

上面代碼是if語句中的內容,進入到if語句中的判斷是若是頭結點和尾節點相等表明隊列爲空,並無元素全部要進行插入隊列的操做,或者是隊尾的節點的isData標誌和當前操做的節點的類型同樣時,會進行入隊操做,isData標識當前元素是不是數據,若是爲true表明是數據,若是爲false則表明不是數據,換句話說只有模式相同的時候纔會往隊列中存放,若是不是模式相同的時候則表明互補模式,就不走if語句了,而是走了else語句,上面代碼中作有註釋講解,下面看一下這裏:

if (s == null)									// 這裏是新節點生成。
    s = new QNode(e, isData);
if (!t.casNext(null, s))        // 將尾節點的next節點修改成當前節點。
    continue;	
複製代碼

當執行上面代碼後,隊列的狀況以下圖所示:(這裏視爲插入第一個元素圖,方便下面的引用)

接下來執行這段代碼:

advanceTail(t, s);              // 隊尾移動
複製代碼

修改了tail節點後,這時候就須要進行自旋操做,而且設置QNode的waiter等待線程,而且將線程等待,等到喚醒線程進行喚醒操做

Object x = awaitFulfill(s, e, timed, nanos);	//自旋而且設置線程。
複製代碼

方法內部分析局部內容,上面已經所有內容的分析:

if (spins > 0)
    --spins;
else if (s.waiter == null)
    s.waiter = w;
else if (!timed)
    LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanos);
複製代碼

若是自旋時間spins還有則進行循環遞減操做,接下來判斷若是當前節點的waiter是空則價格當前線程賦值給waiter,上圖中顯然是爲空的因此會把當前線程進行賦值給我waiter,接下來就是等待操做了。

上面線程則處於等待狀態,接下來是線程二進行操做,這裏不進行重複進行,插入第二個元素隊列的情況,此時線程二也處於等待狀態。
上面的主要是put了兩次操做後隊列的狀況,接下來分析一下take操做時又是如何進行操做的,當take操做時,isData爲false,而隊尾的isData爲true兩個不相等,因此不會進入到if語句,而是進入到了else語句

} else {                            // 互補模式
    QNode m = h.next;               // 獲取頭結點的下一個節點,進行互補操做。
    if (t != tail || m == null || h != head)
        continue;                   // 這裏就是爲了防止閱讀不一致的問題

    Object x = m.item;
    if (isData == (x != null) ||    // 若是x=null說明已經被讀取了。
        x == m ||                   // x節點和m節點相等說明被中斷操做,被取消操做了。
        !m.casItem(x, e)) {         // 這裏是將item值設置爲null
        advanceHead(h, m);          // 移動頭結點到頭結點的下一個節點
        continue;
    }

    advanceHead(h, m);              // successfully fulfilled
    LockSupport.unpark(m.waiter);
    return (x != null) ? (E)x : e;
}
複製代碼

首先獲取頭結點的下一個節點用於互補操做,也就是take操做,接下來進行閱讀不一致的判斷,防止其餘線程進行了閱讀操做,接下來獲取須要彈出內容x=1,首先進行判斷節點內容是否是已經被消費了,節點內容爲null時則表明被消費了,接下來判斷節點的item值是否是和自己相等若是相等話說明節點被取消了或者被中斷了,而後移動頭結點到下一個節點上,而後將refenrence-715的item值修改成null,至於爲何修改成null這裏留下一個懸念,這裏仍是比較重要的,你們看到這裏的時候須要注意下,顯然這些都不會成立,因此if語句中內容不會被執行,接下來的隊列的狀態是是這個樣子的:

OK,接下來就開始移動隊頭head了,將head移動到m節點上,執行代碼以下所示:

advanceHead(h, m);          
複製代碼

此時隊列的狀態是這個樣子的:

LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
複製代碼

接下來將執行喚醒被等待的線程,也就是thread-0,而後返回獲取item值1,take方法結束,可是這裏並無結束,由於喚醒了put的線程,此時會切換到put方法中,這時候線程喚醒後會執行awaitFulfill方法,此時循環時,有與item值修改成null則直接返回內容。

Object x = s.item;
if (x != e)
    return x;
複製代碼

這裏的代碼咱們能夠對照插入第一個元素圖,s節點也就是當前m節點,獲取值得時候已經修改成null,可是當時插入的值時1,因此兩個不想等了,則直接返回null值。

Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {                   // wait was cancelled
    clean(t, s);
    return null;
}

if (!s.isOffList()) {           // not already unlinked
    advanceHead(t, s);          // unlink if head
    if (x != null)              // and forget fields
        s.item = s;
    s.waiter = null;
}
return (x != null) ? (E)x : e;
複製代碼

又返回到了transfer方法的if語句中,此時x和s並不相等因此不用進行clean操做,首先判斷s節點是否已經離隊了,顯然並無進行離隊操做,advanceHead(t, s);操做不會被執行由於上面已近將頭節點修改了,可是第一次插入的時候頭結點仍是reference-716,此時已是reference-715,而t節點的引用地址是reference-716,因此不會操做,接下來就是將waiter設置爲null,也就是忘記掉等待的線程。

分析了正常的take和put操做,接下來分析下中斷操做,因爲中斷相應後,會被執行 if(w.isInterrupted())這段代碼,它會執行 s.tryCancel(e)方法,這個方法的做用的是將QNode節點的item節點賦值爲當前QNode,這時候x和e值就不相等了( if (x != e)),x的值是s.item,則爲當前QNode,而e的值是用戶指定的值,這時候返回x(s.item)。返回到函數調用地方 transfer中,這時候要執行下面語句:

if (x == s) {
	clean(t, s);
	return null;
}
複製代碼

進入到clean方法執行清理當前節點,下面是方法clean代碼:

/** * Gets rid of cancelled node s with original predecessor pred. */
void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be deleted, so this always terminates. */
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)
            return;
        QNode tn = t.next;
      	// 判斷如今的t是否是末尾節點,可能其餘線程插入了內容致使不是最後的節點。
        if (t != tail)
            continue;
      	// 若是不是最後節點的話將其如今t.next節點做爲tail尾節點。
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
      	// 若是當前節點不是尾節點進入到這裏面。
        if (s != t) {        // If not tail, try to unsplice
          	// 獲取當前節點(被取消的節點)的下一個節點。
            QNode sn = s.next;
          	// 修改上一個節點的next(下一個)元素爲下下個節點。
            if (sn == s || pred.casNext(s, sn))
              	//返回。
                return;
        }
        QNode dp = cleanMe;
        if (dp != null) {    // 嘗試清除上一個標記爲清除的節點。
            QNode d = dp.next;	//1.獲取要被清除的節點
            QNode dn;
            if (d == null ||               // 被清除節點不爲空
                d == dp ||                 // 被清除節點已經離隊
                !d.isCancelled() ||        // 被清除節點是標記爲Cancel狀態的。
                (d != t &&                 // 被清除節點不是尾節點
                 (dn = d.next) != null &&  // 被清除節點下一個節點不爲null
                 dn != d &&                // that is on list
                 dp.casNext(d, dn)))       // 將被清除的節點的前一個節點的下一個節點修改成被清除節點的下一個節點。
                casCleanMe(dp, null);			 // 清空cleanMe節點。
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred)) // 這裏將上一個節點標記爲被清除操做,可是其實要操做的是下一個節點。
            return;          // Postpone cleaning s
    }
}
複製代碼
  1. 若是節點中取消的頭結點的下一個節點,只須要移動當前head節點到下一個節點便可。
  2. 若是取消的是中間的節點,則將當前節點next節點修改成下下個節點。
  3. 若是修改成末尾的節點,則將當前節點放入到QNode的clearMe中,等待有內容進來以後下一次進行清除操做。

實例一:清除頭結點下一個節點,下面是實例代碼進行講解:

/** * 清除頭結點的下一個節點實例代碼。 * * @author battleheart */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();
				Thread.sleep(200);
      	
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();
      	Thread.sleep(2000);
        thread1.interrupt();

    }
}
複製代碼

上面例子說明咱們啓動了兩個線程,分別向SynchronousQueue隊列中添加了元素1和元素2,添加成功以後的,讓主線程休眠一會,而後將第一個線程進行中斷操做,添加兩個元素後節點所處在的狀態爲下圖所示:

當咱們調用 thread1.interrupt時,此時線程1等待的消費操做將被終止,會相應上面 awaitFulfill方法,該方法會運行下面代碼:

if (w.isInterrupted())
	//嘗試取消,將當前節點的item修改成當前節點(this)。
	s.tryCancel(e);
// 獲取當前節點內容。
Object x = s.item;
// 判斷當前值和節點值不相同是返回,由於彈出時會將item值賦值爲null。
if (x != e)
	return x;
複製代碼

首先上來現將s節點(上圖中的Reference-715引用對象)的item節點設置爲當前節點引用(Reference-715引用對象),因此s節點和e=1不相等則直接返回,此時節點的狀態變化以下所示:

退出 awaitFulfill而且返回的是s節點內容(實際上返回的就是s節點),接下來返回到調用 awaitFulfill的方法 transfer方法中

Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {                   // 是不是被取消了
    clean(t, s);
    return null;
}
複製代碼

首先判斷的事x節點和s節點是否相等,上面咱們也說了明顯是相等的因此這裏會進入到clean方法中,clean(QNode pred, QNode s)clean方法一個是前節點,一個是當前被取消的節點,也就是當前s節點的前節點是head節點,接下來咱們一步一步的分析代碼:

s.waiter = null; // 刪除等待的線程。
複製代碼

進入到方法體以後首先先進行的是將當前節點的等待線程刪除,以下圖所示:

接下來進入while循環,循環內容時 pred.next == s若是不是則表示已經移除了節點,反之還在隊列中,則進行下面的操做:

QNode h = head;
QNode hn = h.next;   // 若是取消的是第一個節點則進入下面語句
if (hn != null && hn.isCancelled()) {
    advanceHead(h, hn);
    continue;
}
複製代碼

能夠看到首先h節點爲head節點,hn爲頭結點的下一個節點,在進行判斷頭結點的下一個節點不爲空而且頭結點下一個節點是被中斷的節點(取消的節點),則進入到if語句中,if語句其實也很簡單就是將頭結點修改成頭結點的下一個節點(s節點,別取消節點,而且將前節點的next節點修改成本身,也就是移除了以前的節點,咱們看下advanceHead方法:

void advanceHead(QNode h, QNode nh) {
    if (h == head &&
        UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}
複製代碼

首先上來先進行CAS移動頭結點,再講原來頭結點h的next節點修改成本身(h),爲何這樣作呢?由於上面進行advanceHead以後並無退出循環,是進行continue操做,也就是它並無跳出while循環,他還會循環一次prev.next此時已經不能等於s因此退出循環,以下圖所示:

實例二:清除中間的節點

/** * SynchronousQueue實例二,清除中間的節點。 * * @author battleheart */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();
      	//休眠一會。
				Thread.sleep(200);
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();
      	//休眠一會。
				Thread.sleep(200);
        Thread thread3 = new Thread(() -> {
            try {
                queue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread3.start();
        //休眠一會。
        Thread.sleep(10000);
        thread2.interrupt();


    }
}
複製代碼

看上面例子,首先先進行put操做三次,也就是入隊3條數據,分別是整型值1,整型值2,整型值3,而後將當前線程休眠一下,對中間線程進行中斷操做,經過讓主線程休眠一會保證線程執行順序性(固然上面線程不必定能保證執行順序,由於put操做一會兒就執行完了因此這點時間是能夠的),此時隊列所處的狀態來看一下下圖:

當休眠一會以後,進入到threa2進行中斷操做,目前上圖中表示 Reference-723被中斷操做,此時也會進入到 awaitFulfill方法中,將 Reference-723的item節點修改成當前節點,以下圖所示:

進入到clear方法中此時的prev節點爲Reference-715,s節點是被清除節點,仍是首先進入clear方法中先將waiter設置爲null,取消當前線程內容,以下圖所示:

接下來進入到循環中,進行下面處理

QNode h = head;
QNode hn = h.next;   // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
    advanceHead(h, hn);
    continue;
}
QNode t = tail;      // Ensure consistent read for tail
if (t == h)
    return;
QNode tn = t.next;
if (t != tail)
    continue;
if (tn != null) {
    advanceTail(t, tn);
    continue;
}
if (s != t) {        // If not tail, try to unsplice
    QNode sn = s.next;
    if (sn == s || pred.casNext(s, sn))
        return;
}
複製代碼

第一個if語句已經分析過了因此說這裏不會進入到裏面去,接下來是進行尾節點t是不是等於head節點若是相等則表明沒有元素,在判斷當前方法的t尾節點是否是真正的尾節點tail若是不是則進行修改尾節點,先來看一下如今的狀態:

tn != null判斷若是tn不是尾節點,則將tn做爲尾節點處理,若是處理以後還不是尾節點還會進行處理直到tail是尾節點未知,咱們如今這個是尾節點因此跳過這段代碼。s != t經過上圖能夠看到s節點是被清除節點,並非尾節點因此進入到循環中:

if (s != t) {        // If not tail, try to unsplice
    QNode sn = s.next;
    if (sn == s || pred.casNext(s, sn))
        return;
}
複製代碼

首先獲取的s節點的下一個節點,上圖中表示Reference-725節點,判斷sn是都等於當前節點顯然這一條不成立,pred節點爲Reference-715節點,將715節點的next節點變成Reference-725節點,這裏就將原來的節點清理出去了,如今的狀態以下所示:

實例三:刪除的節點是尾節點

/** * SynchronousQueue實例三,刪除的節點爲尾節點 * * @author battleheart */
public class SynchronousQueueDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();

        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();

        Thread.sleep(10000);
        thread2.interrupt();

        Thread.sleep(10000);

        Thread thread3 = new Thread(() -> {
            try {
                queue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread3.start();

        Thread.sleep(10000);
        thread3.interrupt();
    }
}
複製代碼

該例子主要說明一個問題就是刪除的節點若是是末尾節點的話,clear方法又是如何處理的,首先啓動了三個線程其中主線程休眠了一會,爲了能讓插入的順序保持線程1,線程2,線程3這樣子,啓動第二個線程後,又將第二個線程中斷,這是第二個線程插入的節點爲尾節點,而後再啓動第三個節點插入值,再中斷了第三個節點末尾節點,說一下爲啥這樣操做,由於當清除尾節點時,並非直接移除當前節點,而是將被清除的節點的前節點設置到QNode的CleanMe中,等待下次clear方法時進行清除上次保存在CleanMe的節點,而後再處理當前被中斷節點,將新的被清理的節點prev設置爲cleanMe當中,等待下次進行處理,接下來一步一步分析,首先咱們先來看一下第二個線程啓動後節點的狀態。

此時運行 thread2.interrupt();將第二個線程中斷,這時候會進入到clear方法中,前面的代碼都不會被返回,會執行下面的語句:

QNode dp = cleanMe;
if (dp != null) {    // Try unlinking previous cancelled node
    QNode d = dp.next;
    QNode dn;
    if (d == null ||               // d is gone or
        d == dp ||                 // d is off list or
        !d.isCancelled() ||        // d not cancelled or
        (d != t &&                 // d not tail and
         (dn = d.next) != null &&  // has successor
         dn != d &&                // that is on list
         dp.casNext(d, dn)))       // d unspliced
        casCleanMe(dp, null);
    if (dp == pred)
        return;      // s is already saved node
} else if (casCleanMe(null, pred))
    return;   
複製代碼

首先得到TransferQueue當中cleanMe節點,此時獲取的爲null,當判斷dp!=null時就會被跳過,直接執行

casCleanMe(null, pred)此時pred傳入的值時t節點指向的內容,也就是當前節點的上一個節點,它會被標記爲清除操做節點(其實並不清楚它而是清除它下一個節點,也就是說item=this的節點),此時看一下節點狀態爲下圖所示:

接下來第三個線程啓動了這時候又往隊列中添加了元素3,此時隊列的情況以下圖所示:

此時thread3也被中斷操做了,這時候仍是運行上面的代碼,可是此次不一樣的點在於cleanMe已經不是空值,是有內容的,首先獲取的是cleanMe的下一個節點(d),然我來把變量標記在圖上而後看起來好分析一些,以下圖所示:

dp表示d節點的前一個pred節點,dn表示d節點的next節點,主要邏輯在這裏:

if (d == null ||               // d is gone or
    d == dp ||                 // d is off list or
    !d.isCancelled() ||        // d not cancelled or
    (d != t &&                 // d not tail and
     (dn = d.next) != null &&  // has successor
     dn != d &&                // that is on list
     dp.casNext(d, dn)))       // d unspliced
    casCleanMe(dp, null);
if (dp == pred)
    return;      // s
複製代碼

首先判斷d節點是否是爲null,若是d節點爲null表明已經清除掉了,若是cleanMe節點的下一個節點和本身相等,說明須要清除的節點已經離隊了,判斷下個節點是否是須要被清除的節點,目前看d節點是被清除的節點,而後就將被清除的節點的下一個節點賦值給dn而且判斷d節點是否是末尾節點,若是不是末尾節點則進行dp.casNext方法,這個地方是關鍵點,它將被清除節點d的前節點的next節點修改成被清除節點d的後面節點dn,而後調用caseCleanMe將TransferQueue中的cleanMe節點清空,此時節點的內容以下所示:

能夠看出將上一次標記爲清除的節點清除了隊列中,清除完了就完事兒?那此次的怎麼弄呢?由於如今運行的是thread3的中斷程序,因此上面並無退出,而是再次進入循環,循環以後發現dp爲null則會運行 casCleanMe(null, pred),此時當前節點s的前一個節點已經被清除隊列,可是並不影響後續的清除操做,由於前節點的next節點還在維護中,也是前節點的next指向仍是 reference-725,以下圖所示:
就此分析完畢若是有不正確的地方請指正。
相關文章
相關標籤/搜索