阻塞隊列是一個隊列,它最大的特色就是阻塞的線程知足條件就會被自動喚醒,不須要咱們人爲的判斷。java
以前總結的線程間通訊,須要判斷對應的值,一個生產者與一個消費者,在判斷狀態的時候須要加一個標誌類,還須要控制線程。而阻塞隊列在某些狀況會掛起<暫停>線程(阻塞),知足條件,就會被自動的喚起數組
java中阻塞隊列的方法以下:安全
BlockQueue的源碼:ide
public interface BlockingQueue<E> extends Queue<E> { //增長一個元索 若是隊列已滿,則拋出一個IIIegaISlabEepeplian異常 boolean add(E e); //添加一個元素並返回true 若是隊列已滿,則返回false boolean offer(E e); //添加一個元素 若是隊列滿,則阻塞 void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //移除並返回隊列頭部的元素 若是隊列爲空,則阻塞 E take() throws InterruptedException; //移除並返問隊列頭部的元素 若是隊列爲空,則返回null E poll(long timeout, TimeUnit unit) throws InterruptedException; //剩餘容量 int remainingCapacity(); //移除並返回隊列頭部的元素 若是隊列爲空,則拋出一個NoSuchElementException異常 boolean remove(Object o); public boolean contains(Object o); //一次性從BlockingQueue獲取全部可用的數據對象並轉移到參數集合中 int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }
能夠看到,BlockQueue提供了不少不一樣於其餘集合的方法。下面是它的子類:this
咱們隨便選一個ArrayBlockQueue來探索一下它是怎麼作到阻塞的。先看看它的三個構造方法:spa
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) throw new IllegalArgumentException();
//初始化一個數組 this.items = new Object[capacity];
//重入鎖 lock = new ReentrantLock(fair);
//下面初始化的是兩個隊列 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
咱們關注的重點固然是第三個構造方法,此處用到了lock鎖來把一個普通的集合轉移到ArrayBlockQueue中。ArrayBlockQueue的初始化是在第二個構造方法中完成的。須要注意的是,ArrayBlockQueue內部存儲對象的方式是經過Object數組實現的。線程
不難想象,構造方法就已經用lock鎖來達到安全的目的了,那麼,其餘的阻塞相關方法也確定離不開lock鎖的影子了。咱們帶着這個flag繼續往下走。先來看看offer()方法和put()方法,發現和咱們猜測的同樣:對象
該方法在ArrayBlockQueue中有兩個重載方法offer(E e, long timeout, TimeUnit unit)和offer(E e)。
將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量),在成功時返回 true,若是此隊列已滿,則返回 false。前者與後者的主要區別在於,若是隊列中沒有可用空間,能夠設置必定的等待時間,等待可用空間。blog
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try {
//若是長度等於數組長度表示已經滿了 if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
將指定的元素插入到隊列的尾部,若是有可用空間直接插入,若是沒有可用空間,調用condition.await()方法等待,直到被喚醒,而後插入元素。 隊列
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock;
//這種鎖能夠中斷 lock.lockInterruptibly(); try { while (count == items.length) notFull.await();
//能夠跟進 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items;
//此處putIndex能夠當成遊標 items[putIndex] = x;
//當數據滿了,遊標會恢復爲0 if (++putIndex == items.length) putIndex = 0;
//隊列中元素個數 count++;
//喚醒 notEmpty.signal(); }
若是插入元素成功,返回true,若是插入失敗拋出異常IllegalStateException(「Queue full」)。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
出隊列方法:
該方法也有兩個重載方法poll(long timeout, TimeUnit unit)和poll(),從隊列頭部移除一個元素,前者與後者的區別在於,若是隊列中沒有能夠移除的元素,前者會等待必定時間,而後執行移除方法。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue();//若是沒有能夠移出元素,返回null,不然執行dequeue()方法 } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos);//若是沒有能夠移出元素,調用condition的線程等待的方法,等待必定時間 } return dequeue(); } finally { lock.unlock();//最後釋放鎖lock } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();//最後喚醒其餘等待的線程 return x; }
獲取並移除此隊列的頭部。take()和poll()的區別在於,若是隊列中沒有可移除元素,take()會一直等待,而poll()可設置直接返回null或者等待必定時間。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//若是隊列中沒有元素,該線程一直處於阻塞狀態 return dequeue(); } finally { lock.unlock(); } }
分析完了上面的源碼,咱們以一個小Demo來結束上面的話題,咱們以積分分發和消費爲例來隨便搞個例子
public class User { private String name; public User(String name) { this.name = name; } @Override public String toString() { return "User{" + "name='" + name + '\'' + '}'; } }
public class UserService { private final ExecutorService executorService= Executors.newSingleThreadExecutor(); ArrayBlockingQueue<User> arrayBlockingQueue=new ArrayBlockingQueue(10); { init(); } public void init(){ //不斷消費隊列的線程 executorService.execute(()->{ while(true){ try { User user=arrayBlockingQueue.take(); //阻塞式 System.out.println("發送優惠券給:"+user); } catch (InterruptedException e) { e.printStackTrace(); } } }); } public boolean register(){ User user=new User("用戶A"); addUser(user); //發送積分. try { arrayBlockingQueue.put(user); } catch (InterruptedException e) { e.printStackTrace(); } return true; } private void addUser(User user){ System.out.println("添加用戶:"+user); } public static void main(String[] args) { new UserService().register(); } }