LinkedBlockingQueue底層是由節點鏈表實現的定長阻塞隊列(阻塞表示若是沒有原始那麼獲取元素會阻塞當前線程)html
LinkedBlockingQueue通常用於生產者消費者模型業務(排隊機制,先進先出)java
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /** * Linked list node class */ static class Node<E> {//存儲數據的節點 E item; Node<E> next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity;//鏈表的最大長度,若是不設置值默認爲Integer.MAX_VALUE /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0);//統計數量線程安全 /** * Head of linked list. * Invariant: head.item == null */ private transient Node<E> head;//頭節點 /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last;//尾節點 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();//tackLock /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();//tackLock條件不爲空 /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();//putLock /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();//putLock條件沒滿 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//默認last=head=空節點 }
addnode
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//不能存儲空元素 int c = -1; Node<E> node = new Node(e);//建立節點 final ReentrantLock putLock = this.putLock;//得到putLock final AtomicInteger count = this.count;//獲取當前數量 putLock.lockInterruptibly();//獲取鎖,若是有調用Thread.Interrupted()直接拋出異常 try { while (count.get() == capacity) {//若是當前隊列以滿,進入等待狀態 notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public boolean offer(E e, long timeout, TimeUnit unit) offer(e)相似 throws InterruptedException { if (e == null) throw new NullPointerException();//不能存儲空元素 long nanos = unit.toNanos(timeout);//裝換爲納秒 int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos);//等待一段時間 } enqueue(new Node<E>(e)); c = count.getAndIncrement();//遞增 if (c + 1 < capacity)//若是未滿喚醒notFull.awit notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();//喚醒notEmpty.await() return true; } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; //拆分爲兩步 last.next = node,last = node //每次head.next=當前的last而後last.next指向node last = last.next = node; }
remove安全
public boolean remove(Object o) { if (o == null) return false; fullyLock();//刪除數據時所有lock try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next;//先後元素執行,大年元素設置爲空 if (last == p) last = trail; if (count.getAndDecrement() == capacity)//count獲取數量同時遞減(獲取數量爲遞減錢數量) notFull.signal();//喚醒 notFull.await() }
getoracle
//獲取元素,消費,可能被中斷 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//若是有調用Thread.Interrupted()拋出異常 try { while (count.get() == 0) { notEmpty.await();//元素爲空進入等待狀態 } x = dequeue();// c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //獲取元素,消費 public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //查看元素 public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } [null,aaa,bbb] queue [null,bbb] delete after queue 去掉頭部null元素獲取aaa元素修改aaa元素的item=null private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next;//first第一個有值的節點 h.next = h; // help GC head = first; E x = first.item;//獲取元素 first.item = null;//設置爲空 return x; }
定長鏈表不支持擴容this
線程安全線程