隊列是一種數據結構.它有兩個基本操做:在隊列尾部加入一個元素,和從隊列頭部移除一個元素(注意不要弄混隊列的頭部和尾部)java
就是說,隊列以一種先進先出的方式管理數據,若是你試圖向一個 已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將致使線程阻塞.數組
在多線程進行合做時,阻塞隊列是頗有用的工具。工做者線程能夠按期地把中間結果存到阻塞隊列中而其餘工做者線程把中間結果取出並在未來修改它們。隊列會自動平衡負載。安全
若是第一個線程集運行得比第二個慢,則第二個 線程集在等待結果時就會阻塞。若是第一個線程集運行得快,那麼它將等待第二個線程集遇上來.數據結構
說白了,就是先進先出,線程安全!多線程
java中併發隊列都是在java.util.concurrent併發包下的,Queue接口與List、Set同一級別,都是繼承了Collection接口,最近學習了java中的併發Queue的全部子類應用場景,這裏記錄分享一下:併發
1.1 這裏能夠先用wait與notify(腦忒fai) 模擬一下隊列的增刪數據,簡單瞭解一下隊列:ide
import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 模擬隊列增刪數據 * @author houzheng */ public class MyQueue { //元素集合 private LinkedList<Object> list=new LinkedList<Object>(); //計數器(同步),判斷集合元素數量 private AtomicInteger count=new AtomicInteger(); //集合上限與下限,final必須指定初值 private final int minSize=0; private final int maxSize; //構造器指定最大值 public MyQueue(int maxSize) { this.maxSize = maxSize; } //初始化對象,用於加鎖,也可直接用this private Object lock=new Object(); //put方法:往集合中添加元素,若是集合元素已滿,則此線程阻塞,直到有空間再繼續 public void put(Object obj){ synchronized (lock) { while(count.get()==this.maxSize){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace();} } list.add(obj); //計數器加一 count.incrementAndGet(); System.out.println("放入元素:"+obj); //喚醒另外一個線程,(處理極端狀況:集合一開始就是空,此時take線程會一直等待) lock.notify(); } } //take方法:從元素中取數據,若是集合爲空,則線程阻塞,直到集合不爲空再繼續 public Object take(){ Object result=null; synchronized(lock){ while(count.get()==this.minSize){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace();} } //移除第一個 result=list.removeFirst(); //計數器減一 count.decrementAndGet(); System.out.println("拿走元素:"+result); //喚醒另外一個線程,(處理極端狀況:集合一開始就是滿的,此時put線程會一直等待) lock.notify(); } return result; } public int getSize(){ return this.count.get(); } public static void main(String[] args) { //建立集合容器 MyQueue queue=new MyQueue(5); queue.put("1"); queue.put("2"); queue.put("3"); queue.put("4"); queue.put("5"); System.out.println("當前容器長度爲:"+queue.getSize()); Thread t1=new Thread(()->{ queue.put("6"); queue.put("7"); },"t1"); Thread t2=new Thread(()->{ Object take1 = queue.take(); Object take2 = queue.take(); },"t2"); //測試極端狀況,兩秒鐘後再執行另外一個線程 t1.start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } t2.start(); } }
這裏用線程通訊的方式簡單模擬了隊列的進出,那麼接下來就正式進入java的併發隊列:高併發
JDK中併發隊列提供了兩種實現,一種是高性能隊列ConcurrentLinkedQueue,一種是阻塞隊列BlockingQueue,兩種都繼承自Queue:工具
這是一個使用於高併發場景的隊列(額,各位看這塊博客的小朋友,最好對線程基礎比較熟悉再來看,固然我也在拼命學習啦,哈哈哈),主要是無鎖的方式,他的想能要比BlockingQueue好性能
,是基於連接節點的無界線程安全隊列,先進先出,不容許有null元素,廢話很少說,上demo:
這種queue比較簡單,沒什麼好說的,和ArrayList同樣用就能夠,關鍵是BlockingQUeue
blockingQueue主要有5中實現,我感受都挺有意思的,其中幾種還比較經常使用就都學習了下,這裏都介紹下:
2.1 ArrayBlockingQueue
@Test public void test02() throws Exception{ //必須指定隊列長度 ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(2); abq.add("a"); //add :添加元素,若是BlockingQueue能夠容納,則返回true,不然拋異常,支持添加集合 System.out.println(abq.offer("b"));//容量若是不夠,返回false //offer: 若是可能的話,添加元素,即若是BlockingQueue能夠容納,則返回true,不然返回false,支持設置超時時間 //設置超時,若是超過期間就不添加,返回false, abq.offer("d", 2, TimeUnit.SECONDS);// 添加的元素,時長,單位 //put 添加元素,若是BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續. abq.put("d");//會一直等待 //poll 取走頭部元素,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null,支持設置超時時間 abq.poll(); abq.poll(2,TimeUnit.SECONDS);//兩秒取不到返回null //take() 取走頭部元素,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止 abq.take(); //取出頭部元素,但不刪除 abq.element(); //drainTo() //一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數),經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。 List list=new ArrayList(); abq.drainTo(list,2);//將隊列中兩個元素取到list中,取走後隊列中就沒有取走的元素 System.out.println(list); //[a,b] System.out.println(abq); //[] }
2.2 LinkedBlockingQueue
@Test public void test03(){ LinkedBlockingQueue lbq=new LinkedBlockingQueue();//可指定容量,也可不指定 lbq.add("a"); lbq.add("b"); lbq.add("c"); //API與ArrayBlockingQueue相同 //是否包含 System.out.println(lbq.contains("a")); //移除頭部元素或者指定元素 remove("a") System.out.println(lbq.remove()); //轉數組 Object[] array = lbq.toArray(); //element 取出頭部元素,但不刪除 System.out.println(lbq.element()); System.out.println(lbq.element()); System.out.println(lbq.element()); }
2.3 SynchronousQueue
public static void main(String[] args) { SynchronousQueue<String> sq=new SynchronousQueue<String>(); // iterator() 永遠返回空,由於裏面沒東西。 // peek() 永遠返回null /** * isEmpty()永遠是true。 * remainingCapacity() 永遠是0。 * remove()和removeAll() 永遠是false。 */ new Thread(()->{ try { //取出而且remove掉queue裏的element(認爲是在queue裏的。。。),取不到東西他會一直等。 System.out.println(sq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { //offer() 往queue裏放一個element後當即返回, //若是碰巧這個element被另外一個thread取走了,offer方法返回true,認爲offer成功;不然返回false //true ,上面take線程一直在等, ////下面剛offer進去就被拿走了,返回true,若是offer線程先執行,則返回false System.out.println(sq.offer("b")); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { //往queue放進去一個element之後就一直wait直到有其餘thread進來把這個element取走 sq.put("a"); } catch (Exception e) { e.printStackTrace(); } }).start(); }
2.4 PriorityBlockingQueue
@Test public void test04() throws Exception{ //隊列裏元素必須實現Comparable接口,用來決定優先級 PriorityBlockingQueue<String> pbq=new PriorityBlockingQueue<String>(); pbq.add("b"); pbq.add("g"); pbq.add("a"); pbq.add("c"); //獲取的時候會根據優先級取元素,插入的時候不會排序,節省性能 //System.out.println(pbq.take());//a,獲取時會排序,按優先級獲取 System.out.println(pbq.toString());//若是前面沒有取值,直接syso也不會排序 Iterator<String> iterator = pbq.iterator(); while(iterator.hasNext()){ System.out.println(iterator.next()); } } @Test public void test05(){ PriorityBlockingQueue<Person> pbq=new PriorityBlockingQueue<Person>(); Person p2=new Person("姚振",20); Person p1=new Person("侯徵",24); Person p3=new Person("何毅",18); Person p4=new Person("李世彪",22); pbq.add(p1); pbq.add(p2); pbq.add(p3); pbq.add(p4); System.out.println(pbq);//沒有按優先級排序 try { //只要take獲取元素就會按照優先級排序,獲取一次就所有排好序了,後面就會按優先級迭代 pbq.take(); } catch (InterruptedException e) { e.printStackTrace(); } //按年齡排好了序 for (Iterator iterator = pbq.iterator(); iterator.hasNext();) { Person person = (Person) iterator.next(); System.out.println(person); } }
2.5 最後說一下DelayQueue ,這裏用個網上很經典的例子,網吧上網計時
網民實體queue中元素
//網民 public class Netizen implements Delayed { //身份證 private String ID; //名字 private String name; //上網截止時間 private long playTime; //比較優先級,時間最短的優先 @Override public int compareTo(Delayed o) { Netizen netizen=(Netizen) o; return this.getDelay(TimeUnit.SECONDS)-o.getDelay(TimeUnit.SECONDS)>0?1:0; } public Netizen(String iD, String name, long playTime) { ID = iD; this.name = name; this.playTime = playTime; } //獲取上網時長,即延時時長 @Override public long getDelay(TimeUnit unit) { //上網截止時間減去如今當前時間=時長 return this.playTime-System.currentTimeMillis(); }
網吧類:
//網吧 public class InternetBar implements Runnable { //網民隊列,使用延時隊列 private DelayQueue<Netizen> dq=new DelayQueue<Netizen>(); //上網 public void startPlay(String id,String name,Integer money){ //截止時間= 錢數*時間+當前時間(1塊錢1秒) Netizen netizen=new Netizen(id,name,1000*money+System.currentTimeMillis()); System.out.println(name+"開始上網計費......"); dq.add(netizen); } //時間到下機 public void endTime(Netizen netizen){ System.out.println(netizen.getName()+"餘額用完,下機"); } @Override public void run() { //線程,監控每一個網民上網時長 while(true){ try { //除非時間到.不然會一直等待,直到取出這個元素爲止 Netizen netizen=dq.take(); endTime(netizen); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { //新建一個網吧 InternetBar internetBar=new InternetBar(); //來了三個網民上網 internetBar.startPlay("001","侯徵",3); internetBar.startPlay("002","姚振",7); internetBar.startPlay("003","何毅",5); Thread t1=new Thread(internetBar); t1.start(); } }
這樣就能夠完美實現業務需求了,
結果,
這塊東西比較深,還須要不斷增強學習實踐才行!!