項目中用到了一個叫作 Disruptor 的隊列,今天樓主並非要介紹 Disruptor 而是想鞏固一下基礎扒一下 JDK 中的阻塞隊列,聽到隊列相信你們對其並不陌生,在咱們現實生活中隊列隨處可見,最經典的就是去銀行辦理業務等。
固然在計算機世界中,隊列是屬於一種數據結構,隊列採用的FIFO(first in firstout),新元素(等待進入隊列的元素)老是被插入到尾部,而讀取的時候老是從頭部開始讀取。在計算中隊列通常用來作排隊(如線程池的等待排隊,鎖的等待排隊),用來作解耦(生產者消費者模式),異步等等。java
在JDK中的隊列都實現了 java.util.Queue 接口,在隊列中又分爲兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬於線程安全,而在咱們真實的環境中,咱們的機器都是屬於多線程,當多線程對同一個隊列進行操做的時,若是使用線程不安全會出現數據丟失等沒法預測的事情,因此咱們這個時候只能選擇線程安全的隊列。下面是咱們今天要探討的兩個隊列node
隊列名字 | 是否加鎖 | 數據結構 |
---|---|---|
ArrayBlockingQueue | 是 | 數組array |
LinkedBlockingQueue | 是 | 鏈表 |
ArrayBlockingQueue 的原理就是使用一個可重入鎖(ReentrantLock )和這個鎖生成的兩個條件對象進行併發控制,ArrayBlockingQueue是一個有界的阻塞隊列,初始化的時候必需要指定隊列長度,且指定長度以後不容許進行修改。
數組
/** The queued items item的集合 */ final Object[] items; /** items index for next take, poll, peek or remove 取出數據的索引 */ int takeIndex; /** items index for next put, offer, or add 添加數據的索引 */ int putIndex; /** Number of elements in the queue 隊列元素的個數 */ int count; /** Main lock guarding all access 可重入的鎖 */ final ReentrantLock lock; /** Condition for waiting takes 隊列爲空條件等待對象 */ private final Condition notEmpty; /** Condition for waiting puts 隊列滿條件等待對象 */ private final Condition notFull;
IllegalStateException
異常;add
方法:
安全
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
offer
方法:
數據結構
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
put
方法:
多線程
public void put(E e) { xfer(e, true, ASYNC, 0); }
咱們能夠看到,若是隊列滿了則返回false,若是沒有滿調用insert。整個方法是經過可重入鎖來鎖住的,而且最終釋放。併發
接着看一下insert
方法:
異步
private void insert(E x) { items[putIndex] = x; // 元素添加到數組裏 putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0 ++count; // 元素個數+1 notEmpty.signal(); // 使用條件對象notEmpty通知 }
這裏insert
被調用的時候就會喚醒notEmpty
上等待的線程進行take
操做。
源碼分析
再看一下put
方法:
post
public void put(E e) throws InterruptedException { checkNotNull(e); // 不容許元素爲空 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖,保證調用put方法的時候只有1個線程 try { while (count == items.length) // 若是隊列滿了,阻塞當前線程,while用來防止假喚醒 notFull.await(); // 線程阻塞並被掛起,同時釋放鎖 insert(e); // 調用insert方法 } finally { lock.unlock(); // 釋放鎖,讓其餘線程能夠調用put方法 } }
經過上面代碼咱們能夠知道,add
方法和offer
方法不會阻塞線程,put
方法若是隊列滿了會阻塞線程,直到有線程消費了隊列裏的數據纔有可能被喚醒。
緊接着咱們看一下poll
方法:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程 try { return (count == 0) ? null : extract(); // 若是隊列裏沒元素了,返回null,不然調用extract方法 } finally { lock.unlock(); // 釋放鎖,讓其餘線程能夠調用poll方法 } }
看看這個extract
方法,extract的翻譯過來就是提取的意思:
private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); // 獲得取索引位置上的元素 items[takeIndex] = null; // 對應取索引上的數據清空 takeIndex = inc(takeIndex); // 取數據索引+1,當索引滿了變成0 --count; // 元素個數-1 notFull.signal(); // 使用條件對象notFull通知,原理同上面的insert中 return x; // 返回元素 }
看一下take
方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程 try { while (count == 0) // 若是隊列空,阻塞當前線程,並加入到條件對象notEmpty的等待隊列裏 notEmpty.await(); // 線程阻塞並被掛起,同時釋放鎖 return extract(); // 調用extract方法 } finally { lock.unlock(); // 釋放鎖,讓其餘線程能夠調用take方法 } }
remove
方法:
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程 try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素 if (o.equals(items[i])) { // 兩個對象相等的話 removeAt(i); // 調用removeAt方法 return true; // 刪除成功,返回true } } return false; // 刪除成功,返回false } finally { lock.unlock(); // 釋放鎖,讓其餘線程能夠調用remove方法 } }
再看一下removeAt
方法:
private void removeAt(int i) { final Object[] items = this.items; if (i == takeIndex) { // 若是要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,而後取索引+1便可 items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // 若是要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值 for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; // 元素個數-1 notFull.signal(); // 使用條件對象notFull通知 }
LinkedBlockingQueue
是一個使用鏈表完成隊列操做的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。
/** The capacity bound, or Integer.MAX_VALUE if none 容量大小 */ private final int capacity; /** Current number of elements 元素個數,由於有2個鎖,存在競態條件,使用AtomicInteger */ private final AtomicInteger count = new AtomicInteger(0); /** * Head of linked list. * Invariant: head.item == null * 頭結點 */ private transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null * 尾節點 */ private transient Node<E> last; /** Lock held by take, poll, etc 獲取元素的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes 取元素的條件對象 */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc 放元素的鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts 添加元素的條件對象 */ private final Condition notFull = putLock.newCondition();
因爲文章篇幅問題對於LinkedBlockingQueue
咱們主要分析如下幾個方法:
offer
方法:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 不容許空元素 final AtomicInteger count = this.count; if (count.get() == capacity) // 若是容量滿了,返回false return false; int c = -1; Node<E> node = new Node(e); // 容量沒滿,以新元素構造節點 final ReentrantLock putLock = this.putLock; putLock.lock(); // 放鎖加鎖,保證調用offer方法的時候只有1個線程 try { // 再次判斷容量是否已滿,由於可能取元素鎖在進行消費數據,沒滿的話繼續執行 if (count.get() < capacity) { enqueue(node); // 節點添加到鏈表尾部 c = count.getAndIncrement(); // 元素個數+1 if (c + 1 < capacity) // 若是容量還沒滿 notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示能夠再次往隊列裏面加數據 } } finally { putLock.unlock(); // 釋放放鎖,讓其餘線程能夠調用offer方法 } // 因爲存在放元素鎖和取元素鎖,這裏可能取元素鎖一直在消費數據,count會變化。這裏的if條件表示若是隊列中還有1條數據 if (c == 0) // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,能夠進行消費 signalNotEmpty(); return c >= 0; // 添加成功返回true,不然返回false }
put
方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 不容許空元素 int c = -1; Node<E> node = new Node(e); // 以新元素構造節點 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 放鎖加鎖,保證調用put方法的時候只有1個線程 try { while (count.get() == capacity) { // 若是容量滿了 notFull.await(); // 阻塞並掛起當前線程 } enqueue(node); // 節點添加到鏈表尾部 c = count.getAndIncrement(); // 元素個數+1 if (c + 1 < capacity) // 若是容量還沒滿 // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示能夠再次往隊列裏面加數據了,隊列還沒滿 notFull.signal(); } finally { putLock.unlock(); // 釋放放鎖,讓其餘線程能夠調用put方法 } // 因爲存在放鎖和拿鎖,這裏可能拿鎖一直在消費數據,count會變化。這裏的if條件表示若是隊列中還有1條數據 if (c == 0) // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,能夠進行消費 signalNotEmpty(); }
poll
方法:
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) // 若是元素個數爲0 return null; // 返回null E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); // 拿鎖加鎖,保證調用poll方法的時候只有1個線程 try { if (count.get() > 0) { // 判斷隊列裏是否還有數據 x = dequeue(); // 刪除頭結點 c = count.getAndDecrement(); // 元素個數-1 if (c > 1) // 若是隊列裏還有元素 // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列裏還有數據,能夠再次消費 notEmpty.signal(); } } finally { takeLock.unlock(); // 釋放拿鎖,讓其餘線程能夠調用poll方法 } // 因爲存在放鎖和拿鎖,這裏可能放鎖一直在添加數據,count會變化。這裏的if條件表示若是隊列中還能夠再插入數據 if (c == capacity) // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列裏還能再次添加數據 signalNotFull(); return x; }
take
方法:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調用take方法的時候只有1個線程 try { while (count.get() == 0) { // 若是隊列裏已經沒有元素了 notEmpty.await(); // 阻塞並掛起當前線程 } x = dequeue(); // 刪除頭結點 c = count.getAndDecrement(); // 元素個數-1 if (c > 1) // 若是隊列裏還有元素 // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列裏還有數據,能夠再次消費 notEmpty.signal(); } finally { takeLock.unlock(); // 釋放拿鎖,讓其餘線程能夠調用take方法 } // 因爲存在放鎖和拿鎖,這裏可能放鎖一直在添加數據,count會變化。這裏的if條件表示若是隊列中還能夠再插入數據 if (c == capacity) // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列裏還能再次添加數據 signalNotFull(); return x; }
remove
方法:
public boolean remove(Object o) { if (o == null) return false; fullyLock(); // remove操做要移動的位置不固定,對讀鎖寫鎖都進行加鎖 try { for (Node<E> trail = head, p = trail.next; // 從鏈表頭結點開始遍歷 p != null; trail = p, p = p.next) { if (o.equals(p.item)) { // 判斷是否找到對象 unlink(p, trail); // 修改節點的連接信息,同時調用notFull的signal方法 return true; } } return false; } finally { fullyUnlock(); // 2個鎖解鎖 } }
緊接着來看一下 fullyLock
與fullyUnlock
方法:
/** * Locks to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlocks to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
LinkedBlockingQueue
的take方法對於沒數據的狀況下會阻塞,poll
方法刪除鏈表頭結點,remove方法刪除指定的對象。
須要注意的是remove
方法因爲要刪除的數據的位置不肯定,須要2個鎖同時加鎖。
文章有點長,JDK中的阻塞隊列線程安全的主要有ArrayBlockingQueue
,LinkedBlockingQueue
,LinkedTransferQueue
,DelayQueue
四種,今天樓主把ArrayBlockingQueue
,LinkedBlockingQueue
放在一塊兒介紹主要緣由是這二者都是使用可重入鎖 ReentrantLock
實現的線程安全。
固然兩者也有很大的不一樣,主要是:
1,ArrayBlockingQueue
只有1個鎖,添加數據和刪除數據的時候只能有1個被執行,不容許並行執行。
而LinkedBlockingQueue
有2個鎖,放元素鎖和取元素鎖,添加數據和刪除數據是能夠並行進行的,固然添加數據和刪除數據的時候只能有1個線程各自執行。
2,ArrayBlockingQueue
中放入數據阻塞的時候,須要消費數據才能喚醒。
而LinkedBlockingQueue
中放入數據阻塞的時候,由於它內部有2個鎖,能夠並行執行放入數據和消費數據,不只在消費數據的時候進行喚醒插入阻塞的線程,同時在插入的時候若是容量還沒滿,也會喚醒插入阻塞的線程。