聊聊併發(七)——Java中的阻塞隊列

1. 什麼是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。
java

阻塞隊列提供了四種處理方法:node

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException(「Queue full」)異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。

2. Java裏的阻塞隊列

JDK7提供了7個阻塞隊列。分別是linux

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。咱們可使用如下代碼建立一個公平的阻塞隊列:windows

1 ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,代碼以下:數組

1 public ArrayBlockingQueue(int capacity, boolean fair) {
2         if (capacity <= 0)
3             throw new IllegalArgumentException();
4         this.items = new Object[capacity];
5         lock = new ReentrantLock(fair);
6         notEmpty = lock.newCondition();
7         notFull =  lock.newCondition();
8 }

LinkedBlockingQueue

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。緩存

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界隊列。默認狀況下元素採起天然順序排列,也能夠經過比較器comparator來指定元素的排序規則。元素按照升序排列。多線程

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。咱們能夠將DelayQueue運用在如下應用場景:函數

  • 緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。

隊列中的Delayed必須實現compareTo來指定元素的順序。好比讓延時時間最長的放在隊列的末尾。實現代碼以下:ui

01 public int compareTo(Delayed other) {
02            if (other == this// compare zero ONLY if same object
03                 return 0;
04             if (other instanceof ScheduledFutureTask) {
05                 ScheduledFutureTask x = (ScheduledFutureTask)other;
06                 long diff = time - x.time;
07                 if (diff < 0)
08                     return -1;
09                 else if (diff > 0)
10                     return 1;
11        else if (sequenceNumber < x.sequenceNumber)
12                     return -1;
13                 else
14                     return 1;
15             }
16             long d = (getDelay(TimeUnit.NANOSECONDS) -
17                       other.getDelay(TimeUnit.NANOSECONDS));
18             return (d == 0) ? 0 : ((d < 0) ? -1 1);
19         }

如何實現Delayed接口this

咱們能夠參考ScheduledThreadPoolExecutor裏ScheduledFutureTask類。這個類實現了Delayed接口。首先:在對象建立的時候,使用time記錄前對象何時可使用,代碼以下:

1 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
2             super(r, result);
3             this.time = ns;
4             this.period = period;
5             this.sequenceNumber = sequencer.getAndIncrement();
6 }

而後使用getDelay能夠查詢當前元素還須要延時多久,代碼以下:

public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

經過構造函數能夠看出延遲時間參數ns的單位是納秒,本身設計的時候最好使用納秒,由於getDelay時能夠指定任意單位,一旦以納秒做爲單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當time小於當前時間時,getDelay會返回負數。

如何實現延時隊列

延時隊列的實現很簡單,當消費者從隊列裏獲取元素時,若是元素沒有達到延時時間,就阻塞當前線程。

1 long delay = first.getDelay(TimeUnit.NANOSECONDS);
2                     if (delay <= 0)
3                         return q.poll();
4                     else if (leader != null)
5                         available.await();

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合於傳遞性場景,好比在一個線程中使用的數據,傳遞給另一個線程使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法能夠把生產者傳入的元素馬上transfer(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵代碼以下:

1 Node pred = tryAppend(s, haveData);
2 return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代碼是試圖把存放當前元素的s節點做爲tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。由於自旋會消耗CPU,因此自旋必定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其餘線程。

tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法不管消費者是否接收,方法當即返回。而transfer方法是必須等到消費者消費了才返回。

對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。

LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你能夠從隊列的兩端插入和移出元素。雙端隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素。另外插入方法add等同於addLast,移除方法remove等效於removeFirst。可是take方法卻等同於takeFirst,不知道是否是Jdk的bug,使用時仍是用帶有First和Last後綴的方法更清楚。在初始化LinkedBlockingDeque時能夠初始化隊列的容量,用來防止其再擴容時過渡膨脹。另外雙向阻塞隊列能夠運用在「工做竊取」模式中。

3. 阻塞隊列的實現原理

若是隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?若是讓你來設計阻塞隊列你會如何設計,讓生產者和消費者可以高效率的進行通信呢?讓咱們先來看看JDK是如何實現的。

使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼以下:

01 private final Condition notFull;
02 private final Condition notEmpty;
03  
04 public ArrayBlockingQueue(int capacity, boolean fair) {
05         //省略其餘代碼
06         notEmpty = lock.newCondition();
07         notFull =  lock.newCondition();
08     }
09  
10 public void put(E e) throws InterruptedException {
11         checkNotNull(e);
12         final ReentrantLock lock = this.lock;
13         lock.lockInterruptibly();
14         try {
15             while (count == items.length)
16                 notFull.await();
17             insert(e);
18         finally {
19             lock.unlock();
20         }
21 }
22  
23 public E take() throws InterruptedException {
24         final ReentrantLock lock = this.lock;
25         lock.lockInterruptibly();
26         try {
27             while (count == 0)
28                 notEmpty.await();
29             return extract();
30   finally {
31             lock.unlock();
32         }
33 }
34  
35 private void insert(E x) {
36         items[putIndex] = x;
37         putIndex = inc(putIndex);
38         ++count;
39         notEmpty.signal();
40     }

當咱們往隊列裏插入一個元素時,若是隊列不可用,阻塞生產者主要經過LockSupport.park(this);來實現

01 public final void await() throws InterruptedException {
02             if (Thread.interrupted())
03                 throw new InterruptedException();
04             Node node = addConditionWaiter();
05             int savedState = fullyRelease(node);
06             int interruptMode = 0;
07             while (!isOnSyncQueue(node)) {
08                 LockSupport.park(this);
09                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
10                     break;
11             }
12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
13                 interruptMode = REINTERRUPT;
14             if (node.nextWaiter != null// clean up if cancelled
15                 unlinkCancelledWaiters();
16             if (interruptMode != 0)
17  
18 reportInterruptAfterWait(interruptMode);
19         }

繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,而後調用unsafe.park阻塞當前線程。

1 public static void park(Object blocker) {
2         Thread t = Thread.currentThread();
3         setBlocker(t, blocker);
4         unsafe.park(false, 0L);
5         setBlocker(t, null);
6     }

unsafe.park是個native方法,代碼以下:

1 public native void park(boolean isAbsolute, long time);

park這個方法會阻塞當前線程,只有如下四種狀況中的一種發生時,該方法纔會返回。

  • 與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,而後再執行的park。
  • 線程被中斷時。
  • 若是參數中的time不是零,等待了指定的毫秒數時。
  • 發生異常現象時。這些異常事先沒法肯定。

咱們繼續看一下JVM是如何實現park方法的,park在不一樣的操做系統使用不一樣的方式實現,在linux下是使用的是系統方法pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp裏的 os::PlatformEvent::park方法,代碼以下:

01 void os::PlatformEvent::park() {
02              int v ;
03          for (;;) {
04         v = _Event ;
05          if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
06          }
07          guarantee (v >= 0, "invariant") ;
08          if (v == 0) {
09          // Do this the hard way by blocking ...
10          int status = pthread_mutex_lock(_mutex);
11          assert_status(status == 0, status, "mutex_lock");
12          guarantee (_nParked == 0, "invariant") ;
13          ++ _nParked ;
14          while (_Event < 0) {
15          status = pthread_cond_wait(_cond, _mutex);
16          // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
17          // Treat this the same as if the wait was interrupted
18          if (status == ETIME) { status = EINTR; }
19          assert_status(status == 0 || status == EINTR, status, "cond_wait");
20          }
21          -- _nParked ;
22  
23          // In theory we could move the ST of 0 into _Event past the unlock(),
24          // but then we'd need a MEMBAR after the ST.
25          _Event = 0 ;
26          status = pthread_mutex_unlock(_mutex);
27          assert_status(status == 0, status, "mutex_unlock");
28          }
29          guarantee (_Event >= 0, "invariant") ;
30          }
31  
32      }

pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思能夠理解爲線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。

當隊列滿時,生產者往阻塞隊列裏插入一個元素,生產者線程會進入WAITING (parking)狀態。咱們可使用jstack dump阻塞的生產者線程看到這點:

1 "main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
2    java.lang.Thread.State: WAITING (parking)
3         at sun.misc.Unsafe.park(Native Method)
4         - parking to wait for  <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
5         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
6         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
7         at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
8         at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
相關文章
相關標籤/搜索