什麼是阻塞隊列 BlockingQueuejava
隊列是一種數據結構,它的特色是先進先出(First In First Out),它有兩個基本操做:在隊列尾部加入一個元素,從隊列頭部移除一個元素。隊列在多線程應用中,經常使用於生產-消費場景。編程
BlockingQueue 是 Java util.concurrent 包下重要的數據結構,BlockingQueue 提供了線程安全的隊列訪問方式:當阻塞隊列進行插入數據時,若是隊列已滿,線程將會阻塞等待直到隊列非滿;從阻塞隊列取數據時,若是隊列已空,線程將會阻塞等待直到隊列非空。併發包下不少高級同步類的實現都是基於 BlockingQueue 實現的。數組
BlockingQueue 具備 4 組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下: 安全
BlockingQueue 是個接口,你須要使用它的實現之一來使用 BlockingQueue,Java.util.concurrent 包下具備如下 BlockingQueue 接口的實現類:數據結構
ArrayBlockingQueue:ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了。多線程
DelayQueue:DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口。併發
LinkedBlockingQueue:LinkedBlockingQueue 內部以一個鏈式結構對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上限。若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限。this
PriorityBlockingQueue:PriorityBlockingQueue 是一個無界的併發隊列。它使用了和類 java.util.PriorityQueue 同樣的排序規則。你沒法向這個隊列中插入 null 值。全部插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。所以該隊列中元素的排序就取決於你本身的 Comparable 實現。線程
SynchronousQueue:SynchronousQueue 是一個特殊的隊列,它的內部同時只可以容納單個元素。若是該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。據此,把這個類稱做一個隊列顯然是誇大其詞了。它更多像是一個匯合點。3d
下面用 BlockQueue 技術來實現一下:點擊連接加入羣【Java併發編程交流組】:https://jq.qq.com/?_wv=1027&k=5mOvK7L
/** 定義一個盤子類,能夠放雞蛋和取雞蛋 */ public class BigPlate { /** 裝雞蛋的盤子,大小爲5 */ private BlockingQueue<Object> eggs = new ArrayBlockingQueue<Object>(5); /** 放雞蛋 */ public void putEgg(Object egg) { try { eggs.put(egg);// 向盤子末尾放一個雞蛋,若是盤子滿了,當前線程阻塞 } catch (InterruptedException e) { e.printStackTrace(); } // 下面輸出有時不許確,由於與put操做不是一個原子操做 System.out.println("放入雞蛋"); } /** 取雞蛋 */ public Object getEgg() { Object egg = null; try { egg = eggs.take();// 從盤子開始取一個雞蛋,若是盤子空了,當前線程阻塞 } catch (InterruptedException e) { e.printStackTrace(); } // 下面輸出有時不許確,由於與take操做不是一個原子操做 System.out.println("拿到雞蛋"); return egg; } /** 放雞蛋線程 */ static class AddThread extends Thread { private BigPlate plate; private Object egg = new Object(); public AddThread(BigPlate plate) { this.plate = plate; } public void run() { plate.putEgg(egg); } } /** 取雞蛋線程 */ static class GetThread extends Thread { private BigPlate plate; public GetThread(BigPlate plate) { this.plate = plate; } public void run() { plate.getEgg(); } } public static void main(String[] args) { BigPlate plate = new BigPlate(); // 先啓動10個放雞蛋線程 for(int i = 0; i < 10; i++) { new Thread(new AddThread(plate)).start(); } // 再啓動10個取雞蛋線程 for(int i = 0; i < 10; i++) { new Thread(new GetThread(plate)).start(); } } }
利用 Condition 來實現阻塞隊列
Java 1.5 以後新增了顯式鎖的接口 java.util.concurrent.locks.Lock 接口,一樣提供了顯式的條件接口 Condition,並對條件隊列進行了加強。
Condition 對象能夠提供和 Object 的 wait 和 notify 同樣的行爲,可是後者必須使用 synchronized 這個內置的monitor鎖,而 Condition 使用的是 RenentranceLock 。這兩種方式在阻塞等待時都會將相應的鎖釋放掉,可是 Condition 的等待能夠中斷,這是兩者惟一的區別。
下面就用 Condition 技術來實現一下:點擊連接加入羣【Java併發編程交流組】:https://jq.qq.com/?_wv=1027&k=5mOvK7L
class Buffer { final Lock lock = new ReentrantLock(); //定義一個鎖 final Condition notFull = lock.newCondition(); //定義阻塞隊列滿了的Condition final Condition notEmpty = lock.newCondition();//定義阻塞隊列空了的Condition final Object[] items = new Object[10]; //爲了下面模擬,設置阻塞隊列的大小爲10,不要設太大 int putptr, takeptr, count; //數組下標,用來標定位置的 //往隊列中存數據 public void put(Object x) throws InterruptedException { lock.lock(); //上鎖 try { while (count == items.length) { System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時沒法存數據!"); notFull.await(); //若是隊列滿了,那麼阻塞存數據這個線程,等待被喚醒 } //若是沒滿,按順序往數組中存 items[putptr] = x; if (++putptr == items.length) //這是到達數組末端的判斷,若是到了,再回到始端 putptr = 0; ++count; //消息數量 System.out.println(Thread.currentThread().getName() + " 存好了值: " + x); notEmpty.signal(); //好了,如今隊列中有數據了,喚醒隊列空的那個線程,能夠取數據啦 } finally { lock.unlock(); //放鎖 } } //從隊列中取數據 public Object take() throws InterruptedException { lock.lock(); //上鎖 try { while (count == 0) { System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時沒法取數據!"); notEmpty.await(); //若是隊列是空,那麼阻塞取數據這個線程,等待被喚醒 } //若是沒空,按順序從數組中取 Object x = items[takeptr]; if (++takeptr == items.length) //判斷是否到達末端,若是到了,再回到始端 takeptr = 0; --count; //消息數量 System.out.println(Thread.currentThread().getName() + " 取出了值: " + x); notFull.signal(); //好了,如今隊列中有位置了,喚醒隊列滿的那個線程,能夠存數據啦 return x; } finally { lock.unlock(); //放鎖 } } }