都用於取隊列的頭結點,poll會刪除頭結點,peek不會刪除頭結點。java
增長有三種方式,前提:隊列滿 | 方式 | put | add | offer | |--------|--------|--------|--------| |特色|一直阻塞|拋異常|返回false|node
刪除有三種方式,前提:隊列爲空 | 方式 | remove | poll | take | |--------|--------|--------|--------| |特色|NoSuchElementException|返回false|阻塞|數組
//鏈表節點內部類 static class Node<E> { //節點元素 E item; Node<E> next; Node(E x) { item = x; } } //容量界限,若是未設定,則爲Integer最大值 private final int capacity; //當前元素個數 private final AtomicInteger count = new AtomicInteger(); //鏈表的頭:head.item == null transient Node<E> head; //鏈表的尾:last.next == null private transient Node<E> last; //take,poll等獲取鎖 private final ReentrantLock takeLock = new ReentrantLock(); //等待任務的等待隊列 private final Condition notEmpty = takeLock.newCondition(); //put,offer等插入鎖 private final ReentrantLock putLock = new ReentrantLock(); //等待插入的等待隊列 private final Condition notFull = putLock.newCondition();
signalNotEmpty()方法,在插入線程發現隊列爲空時調用,告知獲取線程須要等待。 signalNotFull()方法,在獲取線程發現隊列已滿時調用,告知插入線程須要等待。安全
//表示等待take。put/offer調用,不然一般不會鎖定takeLock。 private void signalNotEmpty() { //獲取takeLock final ReentrantLock takeLock = this.takeLock; //鎖定takeLock takeLock.lock(); try { //喚醒take線程等待隊列 notEmpty.signal(); } finally { //釋放鎖 takeLock.unlock(); } } //表示等待put,take/poll 調用 private void signalNotFull() { //獲取putLock final ReentrantLock putLock = this.putLock; //鎖定putLock putLock.lock(); try { //喚醒插入線程等待隊列 notFull.signal(); } finally { //釋放鎖 putLock.unlock(); } }
enqueue()方法只能在持有 putLock 鎖下執行,dequeue()在持有 takeLock 鎖下執行。併發
//在隊列尾部插入 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; //last.next指向當前node //尾指針後移 last = last.next = node; } //移除隊列頭 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //保存頭指針 Node<E> h = head; //獲取當前鏈表第一個元素 Node<E> first = h.next; //頭指針的next指向本身 h.next = h; // help GC //頭指針指向第一個元素 head = first; //獲取第一個元素的值 E x = first.item; //將第一個元素的值置空 first.item = null; //返回第一個元素的值 return x; }
在須要對兩把鎖同時加鎖時,把加鎖的順序與釋放的順序封裝成方法,確保全部地方都是一致的。並且獲取鎖時都是不響應中斷的,一直獲取直到加鎖成功,這就避免了第一把鎖加鎖成功,而第二把鎖加鎖失敗致使鎖不釋放的風險。ide
//鎖定putLock和takeLock void fullyLock() { putLock.lock(); takeLock.lock(); } //與fullyLock的加鎖順序相反,先解鎖takeLock,再解鎖putLock void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
簡單介紹一下LinkedBlockingQueue中API的源碼,如構造方法,新增,獲取,刪除,drainTo。函數
LinkedBlockingQueue有三個構造方法,其中無參構造儘可能少用,由於容量爲Integer的最大值,操做不當會出現內存溢出。this
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { //參數校驗 if (capacity <= 0) throw new IllegalArgumentException(); //設置容量 this.capacity = capacity; //首尾節點指向一個空節點 last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); //獲取putLock final ReentrantLock putLock = this.putLock; //鎖定 putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
將給定的元素設置到隊列中,若是設置成功返回true, 不然返回false。 e的值不能爲空,不然拋出空指針異常。線程
//若是能夠在不超過隊列容量的狀況下當即插入指定的元素到隊列的尾部,成功後返回true,若是隊列已滿,返回false。當使用容量受限的隊列時,此方法一般比方法BlockingQueue#add更可取,後者只能經過拋出異常才能插入元素。 public boolean offer(E e) { //非空判斷 if (e == null) throw new NullPointerException(); //計數器 final AtomicInteger count = this.count; //若是隊列已滿,直接返回插入失敗 if (count.get() == capacity) return false; int c = -1; //新建節點 Node<E> node = new Node<E>(e); //獲取插入鎖 final ReentrantLock putLock = this.putLock; //鎖定 putLock.lock(); try { //若是隊列未滿 if (count.get() < capacity) { //插入隊列 enqueue(node); //計數 c = count.getAndIncrement(); //還有空餘空間 if (c + 1 < capacity) //喚醒插入線程 notFull.signal(); } } finally { //解鎖 putLock.unlock(); } //若是隊列爲空 if (c == 0) //通知獲取線程阻塞 signalNotEmpty(); //返回成功或者插入失敗 return c >= 0; }
將元素設置到隊列中,若是隊列中沒有多餘的空間,該方法會一直阻塞,直到隊列中有多餘的空間。指針
public void put(E e) throws InterruptedException { //不能夠插入空元素 if (e == null) throw new NullPointerException(); //全部put/take/etc中的約定都是預先設置本地var //除非設置,不然保持計數爲負數表示失敗。 int c = -1; //新建節點 Node<E> node = new Node<E>(e); //獲取putLock final ReentrantLock putLock = this.putLock; //獲取計數器 final AtomicInteger count = this.count; //可中斷加鎖,即在鎖獲取過程當中不處理中斷狀態,而是直接拋出中斷異常,由上層調用者處理中斷。 putLock.lockInterruptibly(); try { /* * 注意count在wait守衛線程中使用,即便它沒有被鎖保護。 * 這是由於count只能在此時減小(全部其餘put都被鎖定關閉), * 若是它從容量更改,咱們(或其餘一些等待put)將收到信號。 * 相似地,count在其餘等待守衛線程中的全部其餘用途也是如此。 */ //只要當前隊列已滿 while (count.get() == capacity) { //通知插入線程等待 notFull.await(); } //插入隊列 enqueue(node); //數量加1 c = count.getAndIncrement(); //若是隊列增長1個元素還未滿 if (c + 1 < capacity) //喚醒插入進程 notFull.signal(); } finally { //解鎖 putLock.unlock(); } //若是隊列中沒有元素了 if (c == 0) //通知獲取線程等待 signalNotEmpty(); }
非阻塞的獲取隊列中的第一個元素,不出隊列。
public E peek() { //隊列爲空,直接返回 if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //獲取第一個元素,非哨兵 Node<E> first = head.next; //元素爲空,返回null if (first == null) return null; else //返回第一個元素值 return first.item; } finally { takeLock.unlock(); } }
非阻塞的獲取隊列中的值,未獲取到返回null。
public E poll() { final AtomicInteger count = this.count; //隊列爲空,直接返回 if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //隊列非空,獲取隊列中元素 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
從隊列中移除指定的值。將兩把鎖都鎖定。
public boolean remove(Object o) { //不支持null if (o == null) return false; //鎖定兩個鎖 fullyLock(); try { //迭代隊列 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //經過equals方法匹配待刪除元素 if (o.equals(p.item)) { //移除p節點 unlink(p, trail); //成功 return true; } } //失敗 return false; } finally { //解鎖 fullyUnlock(); } } // 將內部節點p與前一個跟蹤斷開鏈接 void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. //p節點內容置空 p.item = null; //trail節點的next指向p的next trail.next = p.next; //若是p是隊尾 if (last == p) //trail變爲隊尾 last = trail; //若是隊列已滿 if (count.getAndDecrement() == capacity) //通知插入線程阻塞 notFull.signal(); }
清空隊列。
//原子性地從隊列中刪除全部元素。此調用返回後,隊列將爲空。 public void clear() { //鎖定 fullyLock(); try { //清空數據,幫助垃圾回收 for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; //若是容量爲0 if (count.getAndSet(0) == capacity) //喚醒插入線程 notFull.signal(); } finally { //解鎖 fullyUnlock(); } }
將隊列中值,所有移除,併發設置到給定的集合中。
public int drainTo(Collection<? super E> c, int maxElements) { //各類判斷 if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false; //鎖 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //獲取要轉移的數量 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { //組裝集合 while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }