BlockingQueue是多線程裏面一個很是重要的數據結構。在面試的時候,也常會被問到怎麼實現BlockingQueue。本篇根據Java7裏ArrayBlockingQueue的源碼,簡單介紹一下如何實現一個BlockingQueue。html
要實現BlockingQueue,首先得了解最主要的方法:java
add()和remove()是最原始的方法,也是最不經常使用的。緣由是,當隊列滿了或者空了的時候,會拋出IllegalStateException("Queue full")/NoSuchElementException(),並不符合咱們對阻塞隊列的要求;所以,ArrayBlockingQueue裏,這兩個方法的實現,直接繼承自java.util.AbstractQueue:面試
1 public boolean add(E e) { 2 if (offer(e)) 3 return true; 4 else 5 throw new IllegalStateException("Queue full"); 6 } 7 8 public E remove() { 9 E x = poll(); 10 if (x != null) 11 return x; 12 else 13 throw new NoSuchElementException(); 14 }
有上述源碼可知,add()和remove()實現的關鍵,是來自java.util.Queue接口的offer()和poll()方法。api
offer():在隊列尾插入一個元素。若成功便返回true,若隊列已滿則返回false。(This method is generally preferable to method
, which can fail to insert an element only by throwing an exception.)數組add(java.lang.Object)
poll():同理,取出並刪除隊列頭的一個元素。若成功便返回true,若隊列爲空則返回false。數據結構
這裏使用的是ReentrantLock,在插入或者取出前,都必須得到隊列的鎖,以保證同步。多線程
1 public boolean offer(E e) { 2 checkNotNull(e); 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 if (count == items.length) 7 return false; 8 else { 9 insert(e); 10 return true; 11 } 12 } finally { 13 lock.unlock(); 14 } 15 } 16 17 public E poll() { 18 final ReentrantLock lock = this.lock; 19 lock.lock(); 20 try { 21 return (count == 0) ? null : extract(); 22 } finally { 23 lock.unlock(); 24 } 25 }
因爲offer()/poll()是非阻塞方法,一旦隊列已滿或者已空,均會立刻返回結果,也不能達到阻塞隊列的目的。所以有了put()/take()這兩個阻塞方法:oracle
1 public void put(E e) throws InterruptedException { 2 checkNotNull(e); 3 final ReentrantLock lock = this.lock; 4 lock.lockInterruptibly(); 5 try { 6 while (count == items.length) 7 notFull.await(); 8 insert(e); 9 } finally { 10 lock.unlock(); 11 } 12 } 13 14 public E take() throws InterruptedException { 15 final ReentrantLock lock = this.lock; 16 lock.lockInterruptibly(); 17 try { 18 while (count == 0) 19 notEmpty.await(); 20 return extract(); 21 } finally { 22 lock.unlock(); 23 } 24 }
put()/take()的實現,比起offer()/poll()複雜了一些,尤爲有兩個地方值得注意:this
1. 取得鎖之後,循環判斷隊列是否已滿或者已空,並加上Condition的await()方法將當前正在調用put()的線程掛起,直至notFull.signal()喚起。spa
2. 這裏使用的是lock.lockInterruptibly()而不是lock.lock()。緣由在這裏。lockInterruptibly()這個方法,優先考慮響應中斷,而不是響應普通得到鎖或重入得到鎖。簡單來講就是,因爲put()/take()是阻塞方法,一旦有interruption發生,必須立刻作出反應,不然可能會一直阻塞。
最後,不管是offer()/poll()仍是put()/take(),都要靠insert()/extract()這個私有方法去完成真正的工做:
1 private void insert(E x) { 2 items[putIndex] = x; 3 putIndex = inc(putIndex); 4 ++count; 5 notEmpty.signal(); 6 } 7 8 final int inc(int i) { 9 return (++i == items.length) ? 0 : i; 10 } 11 12 private E extract() { 13 final Object[] items = this.items; 14 E x = this.<E>cast(items[takeIndex]); 15 items[takeIndex] = null; 16 takeIndex = inc(takeIndex); 17 --count; 18 notFull.signal(); 19 return x; 20 } 21 22 final int dec(int i) { 23 return ((i == 0) ? items.length : i) - 1; 24 }
insert()/extract(),是真正將元素放進數組或者將元素從數組取出並刪除的方法。因爲ArrayBlockingQueue是有界限的隊列(Bounded Queue),所以inc()/dec()方法保證元素不超出隊列的界限。另外,每當insert()後,要使用notEmpty.signal()喚原由隊列空而等待取出的線程;每當extract()後,同理要使用notFull.signal()喚原由隊列滿而等待插入的線程。
到此,便將ArrayBlockingQueue的主要的方法粗略介紹了一遍。假設面試時,須要咱們本身實現BlockingQueue時,可參考以上的作法,重點放在put()/take()和insert()/extract()方法上,也可將其結合在一塊兒:
1 class BoundedBuffer { 2 final Lock lock = new ReentrantLock(); 3 final Condition notFull = lock.newCondition(); 4 final Condition notEmpty = lock.newCondition(); 5 6 final Object[] items = new Object[100]; 7 int putptr, takeptr, count; 8 9 public void put(Object x) throws InterruptedException { 10 lock.lock(); 11 try { 12 while (count == items.length) 13 notFull.await(); 14 items[putptr] = x; 15 if (++putptr == items.length) putptr = 0; 16 ++count; 17 notEmpty.signal(); 18 } finally { 19 lock.unlock(); 20 } 21 } 22 23 public Object take() throws InterruptedException { 24 lock.lock(); 25 try { 26 while (count == 0) 27 notEmpty.await(); 28 Object x = items[takeptr]; 29 if (++takeptr == items.length) takeptr = 0; 30 --count; 31 notFull.signal(); 32 return x; 33 } finally { 34 lock.unlock(); 35 } 36 } 37 }
最後,因爲此文的啓示,列舉一些使用隊列時的錯誤作法:
1. 忽略offer()的返回值。offer()做爲有返回值的方法,能夠在判斷的時候十分有做用(例如add()的實現)。所以,千萬不要忽略offer()方法的返回值。
2. 在循環裏使用isEmpty()和阻塞方法:
1 while(!queue.isEmpty()) 2 { 3 T element = queue.take(); 4 5 //Process element. 6 }
take()是阻塞方法,無需作isEmpty()的判斷,直接使用便可。而這種狀況頗有可能致使死鎖,由於因爲不斷循環,鎖會一直被isEmpty()取得(由於size()方法會取得鎖),而生產者沒法得到鎖。
3. 頻繁使用size()方法去記錄。size()方法是要取得鎖的,意味着這不是一個廉價的方法。能夠使用原子變量代替。
本文完