參考:http://www.jianshu.com/p/cc2281b1a6bcnode
本文須要關注的地方設計模式
阻塞隊列主要用於「生產者-消費者」 設計模式。
BlockingQueue 方法以四種形式出現,對於不能當即知足但可能在未來某一時刻能夠知足的操做,這四種形式的處理方式不一樣:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操做),第三種是在操做能夠成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:
數組
LinkedBlockingQueue在BlockingQueue的實現類中使用最多(若是知道隊列的大小,能夠考慮使用ArrayBlockIngQueue,它使用循環數組實現。可是若是不知道隊列將來的大小,那麼使用ArrayBlockingQueue就必然會致使數組的來回複製,下降效率)。咱們主要關心可阻塞的put和take方法,以及支持定時的offer和poll方法。若是隊列已經滿了,那麼put方法將阻塞直到有空間可用;若是隊列爲空,那麼take方法將會阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠都不會充滿,所以無界隊列上 的put方法也永遠不會阻塞(若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限)。安全
LinkedBlockingQueue採用的是單鏈表結構,包含了頭結點和尾節點,last入隊,head出隊。
入隊:last.next=node;last = node;
,即last中保存了最後一個有效元素;
出隊:Node<E> h = head; Node<E> first = h.next; head = first; E x = first.item; first.item = null; return x;
,即head並無存放有效的值(爲null),將head指向的下一個節點的值返回,並將下一個節點的設爲新的head。併發
// 全部的元素都經過Node這個靜態內部類來進行存儲,這與LinkedList的處理方式徹底同樣 static class Node<E> { //使用item來保存元素自己 E item; //保存當前節點的後繼節點 Node<E> next; Node(E x) { item = x; } } /** 阻塞隊列所能存儲的最大容量 用戶能夠在建立時手動指定最大容量,若是用戶沒有指定最大容量 那麼最默認的最大容量爲Integer.MAX_VALUE. */ private final int capacity; /** 當前阻塞隊列中的元素數量,因爲它的入隊列和出隊列使用的是兩個 不一樣的lock對象,所以不管是在入隊列仍是出隊列,都會涉及對元素數 量的併發修改,所以這裏使用了一個原子操做類來解決對同一個變量進行併發修改的線程安全問題。 */ private final AtomicInteger count = new AtomicInteger(0); /** * 鏈表的頭部 * LinkedBlockingQueue的頭部具備一個不變性: * 頭部的元素老是爲null,head.item==null */ private transient Node<E> head; /** * 鏈表的尾部 * LinkedBlockingQueue的尾部也具備一個不變性: * 即last.next==null */ private transient Node<E> last; /** 元素出隊列時線程所獲取的鎖 當執行take、poll等操做時線程須要獲取的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 當隊列爲空時,經過該Condition讓從隊列中獲取元素的線程處於等待狀態 */ private final Condition notEmpty = takeLock.newCondition(); /** 元素入隊列時線程所獲取的鎖 當執行add、put、offer等操做時線程須要獲取鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 當隊列的元素已經達到capactiy,經過該Condition讓元素入隊列的線程處於等待狀態 */ private final Condition notFull = putLock.newCondition();
經過上面的分析,咱們能夠發現LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味着它們之間的操做不會存在互斥操做。在多個CPU的狀況下,它們能夠作到真正的在同一時刻既消費、又生產,可以作到並行處理。函數
/** * 其實下面的代碼等價於以下內容: * last.next=node; * last = node; */ private void enqueue(Node<E> node) { last = last.next = node; }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node(e); /* 在這裏首先獲取到putLock,以及當前隊列的元素數量 */ final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; /* 執行可中斷的鎖獲取操做 */ putLock.lockInterruptibly(); try { /* 當隊列的容量到達最大容量時,此時線程將處於等待狀態,直到隊列有空閒的位置才繼續執行。使用while判 斷依舊是爲了防止線程被"僞喚醒」而出現的狀況,即當線程被喚醒時而隊列的大小依舊等於capacity時,線程應該繼續等待。 */ while (count.get() == capacity) { notFull.await(); } //讓元素進行隊列的末尾 enqueue(node); c = count.getAndIncrement(); /*注:c+1獲得的結果是新元素入隊列以後隊列元素的總和。 當前隊列中的總元素個數小於最大容量時,此時喚醒其餘執行入隊列的線程 讓它們能夠放入元素,若是新加入元素以後,隊列的大小等於capacity, 那麼就意味着此時隊列已經滿了,也就沒有必需要喚醒其餘正在等待入隊列的線程,由於喚醒它們以後,它們也仍是繼續等待。 */ if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } /*當c=0時,即意味着以前的隊列是空隊列,出隊列的線程都處於等待狀態, 如今新添加了一個新的元素,即隊列再也不爲空,所以它會喚醒正在等待獲取元素的線程。 */ if (c == 0) signalNotEmpty(); } /* 喚醒正在等待獲取元素的線程,告訴它們如今隊列中有元素了 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //經過notEmpty喚醒獲取元素的線程 notEmpty.signal(); } finally { takeLock.unlock(); } }
/** 該方法會返回一個boolean值,當入隊列成功返回true,入隊列失敗返回false */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; /* 當隊列已經滿了,它不會繼續等待,而是直接返回。所以該方法是非阻塞的。 */ if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { /* 當獲取到鎖時,須要進行二次的檢查,由於獲取鎖和判斷不是原子操做 */ if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //經過takeLock獲取鎖,而且支持線程中斷 takeLock.lockInterruptibly(); try { //當隊列爲空時,則讓當前線程處於等待 while (count.get() == 0) { notEmpty.await(); } //完成元素的出隊列 x = dequeue(); /* 隊列元素個數完成原子化操做-1,能夠看到count元素會在插入元素的線程和獲取元素的線程進行併發修改操做。 */ c = count.getAndDecrement(); /* 當一個元素出隊列以後,隊列的大小依舊大於1時當前線程會喚醒其餘執行元素出隊列的線程,讓它們也能夠執行元素的獲取 */ if (c > 1) notEmpty.signal(); } finally { //完成鎖的釋放 takeLock.unlock(); } /* 當c==capaitcy時,即在獲取當前元素以前,隊列已經滿了,而此時獲取元素以後,隊列就會空出一個位置,故當前線程會喚醒執行插入操做的線程通知其餘中的一個能夠進行插入操做。 */ if (c == capacity) signalNotFull(); return x; } /** * 讓頭部元素出隊列的過程 * 其最終的目的是讓原來的head被GC回收,讓其的next成爲head * 而且新的head的item爲null. * 由於LinkedBlockingQueue的頭部具備一致性:即元素爲null。 */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
peek類操做其實比較簡單。由於有一個head節點去維護當前的隊首元素。只有判斷先first(head的後繼)是否爲空就好。高併發
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
public boolean remove(Object o) { if (o == null) return false; fullyLock();//獲取存元素鎖和取元素鎖(不容許存或取元素),由於有可能同時涉及到頭尾結點的訪問問題 try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {// 遍歷整個鏈表 if (o.equals(p.item)) {// 結點的值與指定值相等 unlink(p, trail); // 斷開結點 return true; } } return false; } finally { fullyUnlock(); } } void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next;// 斷開p結點 if (last == p) // 尾節點爲p結點 last = trail; // 從新賦值尾節點 if (count.getAndDecrement() == capacity) notFull.signal(); } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }