LinkedBlockingQueue是基於鏈表的阻塞FIFO隊列,能夠指定一個最大的長度限制以防止過分擴展,未指定狀況下其大小爲Integer.MAX_VALUE。java
putlock只負責添加,takelock只負責刪除。這樣添加和刪除操做就能夠分開作,這樣也是其吞吐量高於ArrayBlockingQueue的緣由,而對於count則使用一個AtomicInteger來進行同步。可是對於迭代器操做和remove(Object)操做則是不一樣的,須要同時加上兩種鎖才行。對於notFull和notEmpty的條件變量這裏實際也是分開的。數組
讀者和寫者之間的可見性問題,當元素是隊尾元素時,獲取put鎖,而且count更新,隨後的讀者經過fullLock得到put鎖或得到take鎖來保證其對於隊尾元素的可見性。而後讀取n=count.get(),這個保證得到對第n個元素的可見性。併發
LinkedBlockingQueue 類中定義的變量有:高併發
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0); /** Head of linked list */ private transient Node<E> head; /** Tail of linked list */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
該類中定義了兩個ReentrantLock鎖:putLock和takeLock,分別用於put端和take端。也就是說,生成端和消費端各自獨立擁有一把鎖,避免了讀(take)寫(put)時互相競爭鎖的狀況。性能
public boolean offer(E e)this
原理:在隊尾插入一個元素, 若是隊列沒滿,當即返回true; 若是隊列滿了,當即返回false。spa
/** * 在隊尾插入一個元素, 容量沒滿,能夠當即插入,返回true; 隊列滿了,直接返回false * 注:若是使用了限制了容量的隊列,這個方法比add()好,由於add()插入失敗就會拋出異常 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count;// 獲取隊列中的元素個數 if (count.get() == capacity)// 隊列滿了 return false; int c = -1; final ReentrantLock putLock = this.putLock; putLock.lock();// 獲取入隊鎖 try { if (count.get() < capacity) {// 容量沒滿 enqueue(e);// 入隊 c = count.getAndIncrement();// 容量+1,返回舊值(注意) if (c + 1 < capacity)// 若是添加元素後的容量,還小於指定容量(說明在插入當前元素後,至少還能夠再插一個元素) notFull.signal();// 喚醒等待notFull條件的其中一個線程 } } finally { putLock.unlock();// 釋放入隊鎖 } if (c == 0)// 若是c==0,這是什麼狀況?一開始若是是個空隊列,就會是這樣的值,要注意的是,上邊的c返回的是舊值 signalNotEmpty(); return c >= 0; } /** * 建立一個節點,並加入鏈表尾部 * @param x */ private void enqueue(E x) { /* * 封裝新節點,並賦給當前的最後一個節點的下一個節點,而後在將這個節點設爲最後一個節點 */ last = last.next = new Node<E>(x); } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();//獲取出隊鎖 try { notEmpty.signal();//喚醒等待notEmpty條件的線程中的一個 } finally { takeLock.unlock();//釋放出隊鎖 } }
public void put(E e)線程
原理:在隊尾插入一個元素,若是隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷指針
/** * 在隊尾插一個元素 * 若是隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock;//入隊鎖 final AtomicInteger count = this.count;//當前隊列中的元素個數 putLock.lockInterruptibly();//加鎖 try { while (count.get() == capacity) {//若是隊列滿了 /* * 加入notFull等待隊列,直到隊列元素不滿了, * 被其餘線程使用notFull.signal()喚醒 */ notFull.await(); } enqueue(e);//入隊 c = count.getAndIncrement();//入隊數量+1 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
public E take() code
原理:將隊頭元素出隊,若是隊列空了,一直阻塞,直到隊列不爲空或者線程被中斷
/** * 出隊: * 若是隊列空了,一直阻塞,直到隊列不爲空或者線程被中斷 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count;//獲取隊列中的元素總量 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//獲取出隊鎖 try { while (count.get() == 0) {//若是沒有元素,一直阻塞 /* * 加入等待隊列, 一直等待條件notEmpty(即被其餘線程喚醒) * (喚醒其實就是,有線程將一個元素入隊了,而後調用notEmpty.signal()喚醒其餘等待這個條件的線程,同時隊列也不空了) */ notEmpty.await(); } x = dequeue();//出隊 c = count.getAndDecrement();//元素數量-1 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } /** * 從隊列頭部移除一個節點 */ private E dequeue() { Node<E> h = head;//獲取頭節點:x==null Node<E> first = h.next;//將頭節點的下一個節點賦值給first h.next = h; // 將當前將要出隊的節點置null(爲了使其作head節點作準備) head = first;//將當前將要出隊的節點做爲了頭節點 E x = first.item;//獲取出隊節點的值 first.item = null;//將出隊節點的值置空 return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
public E poll()
原理:若是沒有元素,直接返回null;若是有元素,出隊
/** * 出隊: * 一、若是沒有元素,直接返回null * 二、若是有元素,出隊 */ public E poll() { final AtomicInteger count = this.count;// 獲取元素數量 if (count.get() == 0)// 沒有元素 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock();// 獲取出隊鎖 try { if (count.get() > 0) {// 有元素 x = dequeue();// 出隊 // 元素個數-1(注意:該方法是一個無限循環,直到減1成功爲止,且返回舊值) c = count.getAndDecrement(); if (c > 1)// 還有元素(若是舊值c==1的話,那麼經過上邊的操做以後,隊列就空了) notEmpty.signal();// 喚醒等待在notEmpty隊列中的其中一條線程 } } finally { takeLock.unlock();// 釋放出隊鎖 } if (c == capacity)// c == capacity是怎麼發生的?若是隊列是一個滿隊列,注意:上邊的c返回的是舊值 signalNotFull(); return x; }
一、具體入隊與出隊的原理圖:
圖中每個節點前半部分表示封裝的數據x,後邊的表示指向的下一個引用。
1.一、初始化
初始化以後,初始化一個數據爲null,且head和last節點都是這個節點。
1.二、入隊兩個元素事後
1.三、出隊一個元素後
表面上看,只是將頭節點的next指針指向了要刪除的x1.next,事實上這樣我覺的就徹底能夠,可是jdk其實是將原來的head節點刪除了,而上邊看到的這個head節點,正是剛剛出隊的x1節點,只是其值被置空了。
二、三種入隊對比:
三、三種出隊對比:
四、ArrayBlockingQueue與LinkedBlockingQueue對比