閱讀ArrayBlockingQueue源碼瞭解如何利用鎖實現BlockingQueue

BlockingQueue是多線程裏面一個很是重要的數據結構。在面試的時候,也常會被問到怎麼實現BlockingQueue。本篇根據Java7裏ArrayBlockingQueue的源碼,簡單介紹一下如何實現一個BlockingQueue。html

 

要實現BlockingQueue,首先得了解最主要的方法java

 

add()和remove()是最原始的方法,也是最不經常使用的。緣由是,當隊列滿了或者空了的時候,會拋出IllegalStateException("Queue full")/NoSuchElementException(),並不符合咱們對阻塞隊列的要求;所以,ArrayBlockingQueue裏,這兩個方法的實現,直接繼承自java.util.AbstractQueue:面試

 1    public boolean add(E e) {
 2         if (offer(e))
 3             return true;
 4         else
 5             throw new IllegalStateException("Queue full");
 6     }
 7 
 8     public E remove() {
 9         E x = poll();
10         if (x != null)
11             return x;
12         else
13             throw new NoSuchElementException();
14     }

 

有上述源碼可知,add()和remove()實現的關鍵,是來自java.util.Queue接口的offer()和poll()方法。api

offer():在隊列尾插入一個元素。若成功便返回true,若隊列已滿則返回false。(This method is generally preferable to method add(java.lang.Object), which can fail to insert an element only by throwing an exception.數組

poll():同理,取出並刪除隊列頭的一個元素。若成功便返回true,若隊列爲空則返回false。數據結構

這裏使用的是ReentrantLock,在插入或者取出前,都必須得到隊列的鎖,以保證同步。多線程

 1     public boolean offer(E e) {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lock();
 5         try {
 6             if (count == items.length)
 7                 return false;
 8             else {
 9                 insert(e);
10                 return true;
11             }
12         } finally {
13             lock.unlock();
14         }
15     }
16 
17     public E poll() {
18         final ReentrantLock lock = this.lock;
19         lock.lock();
20         try {
21             return (count == 0) ? null : extract();
22         } finally {
23             lock.unlock();
24         }
25     }

 

 

因爲offer()/poll()是非阻塞方法,一旦隊列已滿或者已空,均會立刻返回結果,也不能達到阻塞隊列的目的。所以有了put()/take()這兩個阻塞方法:oracle

 1     public void put(E e) throws InterruptedException {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             while (count == items.length)
 7                 notFull.await();
 8             insert(e);
 9         } finally {
10             lock.unlock();
11         }
12     }
13 
14     public E take() throws InterruptedException {
15         final ReentrantLock lock = this.lock;
16         lock.lockInterruptibly();
17         try {
18             while (count == 0)
19                 notEmpty.await();
20             return extract();
21         } finally {
22             lock.unlock();
23         }
24     }

 

 

put()/take()的實現,比起offer()/poll()複雜了一些,尤爲有兩個地方值得注意:this

1. 取得鎖之後,循環判斷隊列是否已滿或者已空,並加上Condition的await()方法將當前正在調用put()的線程掛起,直至notFull.signal()喚起。spa

2. 這裏使用的是lock.lockInterruptibly()而不是lock.lock()。緣由在這裏。lockInterruptibly()這個方法,優先考慮響應中斷,而不是響應普通得到鎖或重入得到鎖。簡單來講就是,因爲put()/take()是阻塞方法,一旦有interruption發生,必須立刻作出反應,不然可能會一直阻塞。

 

 

最後,不管是offer()/poll()仍是put()/take(),都要靠insert()/extract()這個私有方法去完成真正的工做:

 1     private void insert(E x) {
 2         items[putIndex] = x;
 3         putIndex = inc(putIndex);
 4         ++count;
 5         notEmpty.signal();
 6     }
 7 
 8     final int inc(int i) {
 9         return (++i == items.length) ? 0 : i;
10     }
11 
12     private E extract() {
13         final Object[] items = this.items;
14         E x = this.<E>cast(items[takeIndex]);
15         items[takeIndex] = null;
16         takeIndex = inc(takeIndex);
17         --count;
18         notFull.signal();
19         return x;
20     }
21 
22     final int dec(int i) {
23         return ((i == 0) ? items.length : i) - 1;
24     }

 

insert()/extract(),是真正將元素放進數組或者將元素從數組取出並刪除的方法。因爲ArrayBlockingQueue是有界限的隊列(Bounded Queue),所以inc()/dec()方法保證元素不超出隊列的界限。另外,每當insert()後,要使用notEmpty.signal()喚原由隊列空而等待取出的線程;每當extract()後,同理要使用notFull.signal()喚原由隊列滿而等待插入的線程。

 

 

到此,便將ArrayBlockingQueue的主要的方法粗略介紹了一遍。假設面試時,須要咱們本身實現BlockingQueue時,可參考以上的作法,重點放在put()/take()和insert()/extract()方法上,也可將其結合在一塊兒:

 1 class BoundedBuffer {  
 2   final Lock lock = new ReentrantLock();  
 3   final Condition notFull  = lock.newCondition();   
 4   final Condition notEmpty = lock.newCondition();   
 5   
 6   final Object[] items = new Object[100];  
 7   int putptr, takeptr, count;  
 8   
 9   public void put(Object x) throws InterruptedException {  
10     lock.lock();  
11     try {  
12       while (count == items.length)   
13         notFull.await();  
14       items[putptr] = x;   
15       if (++putptr == items.length) putptr = 0;  
16       ++count;  
17       notEmpty.signal();  
18     } finally {  
19       lock.unlock();  
20     }  
21   }  
22   
23   public Object take() throws InterruptedException {  
24     lock.lock();  
25     try {  
26       while (count == 0)   
27         notEmpty.await();  
28       Object x = items[takeptr];   
29       if (++takeptr == items.length) takeptr = 0;  
30       --count;  
31       notFull.signal();  
32       return x;  
33     } finally {  
34       lock.unlock();  
35     }  
36   }   
37 }

 

 

最後,因爲此文的啓示,列舉一些使用隊列時的錯誤作法:

1. 忽略offer()的返回值。offer()做爲有返回值的方法,能夠在判斷的時候十分有做用(例如add()的實現)。所以,千萬不要忽略offer()方法的返回值。

 

2. 在循環裏使用isEmpty()和阻塞方法:

1 while(!queue.isEmpty())
2 {
3    T element = queue.take();
4    
5    //Process element.
6 }

take()是阻塞方法,無需作isEmpty()的判斷,直接使用便可。而這種狀況頗有可能致使死鎖,由於因爲不斷循環,鎖會一直被isEmpty()取得(由於size()方法會取得鎖),而生產者沒法得到鎖。

 

3. 頻繁使用size()方法去記錄。size()方法是要取得鎖的,意味着這不是一個廉價的方法。能夠使用原子變量代替。

 

 

本文完

相關文章
相關標籤/搜索