java多線程系列之synchronousQueue

synchronousQueue詳解

在使用cachedThreadPool的時候,沒有對原理去很好的理解,因此致使使用起來有些不放心,主要是對synchronousQueue的原理不太瞭解,因此有此文的分析java

本文從兩個方面分析synchronousQueue:node

  1. synchronousQueue的使用,主要是經過Executors框架提供的線程池cachedThreadPool來說,由於synchronousQueue是它的workQueue
  2. synchronousQueue的原理,主要是從實現角度,分析一下數據結構

synchronousQueue基本使用

首先說說api吧,關於隊列有幾套api,核心是下面的兩套:算法

take() & put() //這是阻塞的,會阻塞操做線程
poll() & offer() //這是非阻塞的(在不設置超時時間的前提下),當操做不能達成的時候會立馬返回boolean複製代碼

synchronousQueue是一個沒有數據緩衝的阻塞隊列,生產者線程對其的插入操做put()必須等待消費者的移除操做take(),反過來也同樣。api

可是poll()和offer()就不會阻塞,舉例來講就是offer的時候若是有消費者在等待那麼就會立馬知足返回true,若是沒有就會返回false,不會等待消費者到來。數據結構

下面咱們分析一下cachedThreadPool的使用流程,經過這個過程咱們來了解synchronousQueue的使用方式:先看代碼併發

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//1
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }複製代碼
  1. 對於使用synchronousQueue的線程池,在第一次execute任務的時候會在1處返回false,由於線程池中尚未線程,因此沒有消費者在等待,因此就會直接建立線程進行執行任務
  2. 在上篇線程池的分析中咱們提到:建立的線程在執行完畢任務後會去循環的getTask,在getTask的過程當中會調用take去獲取任務。因此當咱們再次調用execute提交任務的時候1就會返回成功(前提是先前建立的線程已經執行完畢,正在執行gettask方法進行等待),由於這個時候已經有一個線程在等待task了,因此offer直接返回成功!
  3. 這就達到了cachedThreadPool線程複用的目的,也就是說:在提交任務的時候,若是全部工做線程都處於忙碌的狀態就會新建線程來執行,若是有工做線程處於空閒狀態則把任務交給空閒線程來執行!而這其中的黑科技就是經過synchronousQueue來進行的。

synchronousQueue內部數據結構

根據上面咱們介紹的synchronousQueue的隊列語義,咱們其實能夠很容易的經過鎖或者信號量等一系列的同步機制來實現一個synchronousQueue的結構,可是咱們知道有鎖的通常效率都不會過高,因此java爲咱們提供了下面一種無鎖的算法。框架

無鎖在java裏面通常就是cas和spin來實現的!具體會在以後介紹java併發包的時候來分析TODO。下面的分析的核心在於:cas和spin,通常把cas和spin能夠組合起來使用,spin就是不斷循環重試cas操做,確保操做可以成功。這些就不詳細介紹,網上有不少相關文章!這也是java併發的基礎!ide

稍微跟蹤一下代碼,就會發現synchronousQueue內部是經過Transferer來實現的,具體分爲兩個Transferer,分別是TransferStack和TransferQueue,二者差異在因而否公平:下面咱們只分析TransferQueue的實現。this

/** Node class for TransferQueue. */
        static final class QNode { //1
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

            boolean casNext(QNode cmp, QNode val) {//2
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            boolean casItem(Object cmp, Object val) {//2
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
         }複製代碼
  1. 既然是隊列,確定有個node是表明隊列的節點的。
  2. 2處表明了典型的兩個cas賦值操做,表明瞭如何設置next和item的值,用於進行併發更新

以後是transfer操做idea

E transfer(E e, boolean timed, long nanos) {//1
            QNode s = null; 
            boolean isData = (e != null);
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)       
                    continue;                       

                if (h == t || t.isData == isData) { //2
                    QNode tn = t.next;
                    if (t != tail)                 
                        continue;
                    if (tn != null) {               
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);//3
                    if (!t.casNext(null, s))       //4
                        continue;

                    advanceTail(t, s);              // 5
                    Object x = awaitFulfill(s, e, timed, nanos);//6
                    if (x == s) {                  
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {          
                        advanceHead(t, s);        
                        if (x != null)            
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                           //7
                    QNode m = h.next;            //8
                    if (t != tail || m == null || h != head)
                        continue;                   

                    Object x = m.item;
                    if (isData == (x != null) ||   
                        x == m ||                  
                        !m.casItem(x, e)) {         // 9
                        advanceHead(h, m);          
                        continue;
                    }

                    advanceHead(h, m);              // 10
                    LockSupport.unpark(m.waiter);//11
                    return (x != null) ? (E)x : e;
                }
            }
        }複製代碼
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {//6.1
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)//6.2
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);//6.3
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }複製代碼

在上面的代碼中,我把重要的地方分了11步,分別進行解釋:

首先說一下大體的操做。在transfer中,把操做分爲兩種,一種就是入隊put,一種是出隊take,入隊的時候會建立data節點,值爲data。出隊的時候會建立一個request節點,值爲null。

  1. put和take操做都會調用該方法,區別在於,put操做的時候e值爲數據data,take操做的時候e值爲null

  2. 若是h==t也就是隊列爲空,或者當前隊列尾部的數據類型和調用該方法的數據類型一致:好比當前隊列爲空,第一次來了一個入隊請求,這時候隊列就會建立出一個data節點,若是第二次又來了一個入隊請求(和第一次也就是隊列尾部的數據類型一致,都是入隊請求),這時候隊列會建立出第二個data節點,並造成一個鏈表。同理,若是剛開始來了request請求,也會入隊,以後若是繼續來了一個reqeust請求,也會繼續入隊!

  3. 知足2的條件,就會進入3,中間會有一些一致性檢查這也是必須的,避免產生併發衝突。3會建立出一個節點,根據e值的不一樣,多是data節點或者request節點。

  4. 把3中建立的節點經過cas方式設置到隊列尾部去。

  5. 把tail經過cas方式修改爲3中新創建的s節點

  6. 調用方法awaitFulfill進行等待,若是3中建立的是data節點,那麼就會等待來一個reqeust節點,反之亦然!

    1. 放入隊列以後就開始進行循環判斷
    2. 終止條件是節點的值被修改,具體若是是data節點,那麼會被修改爲null,若是是request節點,那麼會被修改爲data值。這個修改是在第9步中由相對的請求(若是建立的是data節點,那麼就由reqeust請求來進行修改,反之亦然)來作的。若是一直沒有相對的請求過來,那麼節點的值就一直不會被修改,這樣就跳不出循環體!
    3. 若是沒有被修改,那麼就須要進入park休眠,等待第9步進行修改後再經過unpark進行喚醒,喚醒以後就會判斷節點值被修改從而返回。
  7. 若是在插入一個節點的時候,不知足2的條件,也就是隊列不爲空而且尾部節點和當前要插入節點的類型不同(這就表明來了一個相對請求),好比上圖中的尾部是data節點,若是來了一個插入reqeust節點的請求,那麼就會走到7這裏
  8. 因爲是隊列,先進先出,因此會取隊列裏面的第一個節點,也就是h.nex
  9. 把8中取出的節點的值經過cas的方式設置成新來節點的e值,這樣就成功的知足了6-2的終止條件
  10. 將head節點日後移動,這樣就把第一個節點成功的出隊。
  11. 每一個節點都保存了對應的操做線程,將8中節點對應的線程進行喚醒,這樣6-3處於休眠的線程就醒來了,而後繼續進行for循環,進而判斷6-2終止條件知足,因而返回

整個過程就是這樣,上文的分析只是分析了正常的工做流程,沒有具體的分析操做中的競態條件,好比兩個線程同時進行入隊的時候如何正確設置鏈表的狀態,都講的話篇幅過大。

相關文章
相關標籤/搜索