阻塞隊列

概念

阻塞隊列(BlockingQueue):支持2個附加操做的隊列。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列中添加元素的線程,消費者是從隊列中獲取元素的線程。java

附加操做數組

1)隊列爲空時,獲取元素的線程會等待隊列變爲非空ide

2)隊列爲滿時,存儲元素的線程會等待隊列可用。spa

 

種類

ArrayBlockingQueue:由數組結構組成的 有界 阻塞隊列線程

LinkedBlockingQueue:由鏈表結構組成的 有界(但默認大小爲Integer.MAX_VALUE)阻塞隊列code

PriorityBlockingQueue:支持優先級排序的 無界 阻塞隊列blog

DelayQueue:使用優先級隊列實現的 延遲 無界 阻塞隊列排序

SynchronousQueue不存儲元素 的阻塞隊列,也即單個元素的隊列隊列

LinkedTransferQueue:由鏈表結構組成的 無界 阻塞隊列element

LinkedBlockingDeque:由鏈表結構組成的 雙向 阻塞隊列

 

方法

方法 拋出異常 返回特殊值 一直阻塞 超時退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用

插入

add

1)合法的add:add元素數量與規定元素數量一致

import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 構造方法傳參是規定其長度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         System.out.println(blockingQueue.add("1"));
         System.out.println(blockingQueue.add("2"));
         System.out.println(blockingQueue.add("3"));
         // true true true
     }
 ​
 }

 

2)異常( IllegalStateException )

當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常

 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 構造方法傳參是規定其長度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         System.out.println(blockingQueue.add("1"));
         System.out.println(blockingQueue.add("2"));
         System.out.println(blockingQueue.add("3"));
 ​
         System.out.println(blockingQueue.add("4"));
         // java.lang.IllegalStateException: Queue full
     }
 ​
 }

 

offer

1)若是隊列已滿,再插入元素,則返回false,而不會拋出異常

public static void main(String[] args) {
     // 構造方法傳參是規定其長度
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     System.out.println(blockingQueue.offer("1"));
     System.out.println(blockingQueue.offer("2"));
     System.out.println(blockingQueue.offer("3"));
     // true true true
 ​
     System.out.println(blockingQueue.offer("4"));
     // false
 }

 

2)設置超時時間

private static void offer() throws InterruptedException {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     // 設置超時時間爲2s,TimeUnit是時間顆粒度轉換,TimeUnit.SECOND表明秒
     System.out.println(blockingQueue.offer("1", 2L, TimeUnit.SECONDS));
     System.out.println(blockingQueue.offer("2", 2L, TimeUnit.SECONDS));
     System.out.println(blockingQueue.offer("3", 2L, TimeUnit.SECONDS));
     // true true true
     System.out.println(blockingQueue.offer("4", 2L, TimeUnit.SECONDS));
     // false
 }

 

put

若是隊列已滿,再插入元素,就會阻塞,直至能夠添加進去爲止

 public static void main(String[] args) {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     try {
         blockingQueue.put("1");
         blockingQueue.put("2");
         blockingQueue.put("3");
         blockingQueue.put("4");
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 }

 

移除

remove

合法的remove

 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 構造方法傳參是規定其長度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         blockingQueue.add("1");
         blockingQueue.add("2");
         blockingQueue.add("3");
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         // 1 2 3
         
         System.out.println(blockingQueue.remove());
     }
 ​
 }

 

異常(NoSuchElementException)

當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常

 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 構造方法傳參是規定其長度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         blockingQueue.add("1");
         blockingQueue.add("2");
         blockingQueue.add("3");
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         
         System.out.println(blockingQueue.remove());
         // java.util.NoSuchElementException
     }
 ​
 }

 

poll

當隊列爲空時,從隊列裏獲取元素時直接返回null,不拋出異常

 public static void main(String[] args) {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     blockingQueue.offer("1");
     blockingQueue.offer("2");
     blockingQueue.offer("3");
 ​
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     // 1 2 3
     
     System.out.println(blockingQueue.poll());
     // null
 }

 

take

當隊列爲空時,從隊列裏獲取元素時阻塞,直至其中存在元素

public static void main(String[] args) throws Exception {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     blockingQueue.put("1");
     blockingQueue.put("2");
     blockingQueue.put("3");
 ​
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
 }

 

檢查

element

若是隊列爲空,則拋異常NoSuchElementException

import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 構造方法傳參是規定其長度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         blockingQueue.add("1");
         blockingQueue.add("2");
         blockingQueue.add("3");
 ​
         System.out.println(blockingQueue.element());
         // 1
     }
 ​
 }

 

peek

若是隊列爲空,則返回null

 public static void main(String[] args) {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     blockingQueue.offer("1");
     blockingQueue.offer("2");
     blockingQueue.offer("3");
 ​
     System.out.println(blockingQueue.peek());
 }

 

SynchronousQueue

SynchronousQueue(同步隊列)模擬生產消費

import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 ​
 public class SynchronousQueueDemo {
 ​
     public static void main(String[] args) {
         // 同步隊列不存儲元素
         BlockingQueue queue = new SynchronousQueue();
 ​
         // 生產者線程
         new Thread(() -> {
             try {
                 System.out.println(Thread.currentThread().getName() + "\t put 1");
                 queue.put("1");
 ​
                 System.out.println(Thread.currentThread().getName() + "\t put 2");
                 queue.put("2");
 ​
                 System.out.println(Thread.currentThread().getName() + "\t put 3");
                 queue.put("3");
 ​
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }, "provider").start();
 ​
         // 消費者線程
         new Thread(() -> {
             try {
                 // 每次消費前先睡眠2秒鐘
                 TimeUnit.SECONDS.sleep(2);
                 System.out.println(Thread.currentThread().getName() + "\t" + queue.take());
 ​
                 TimeUnit.SECONDS.sleep(2);
                 System.out.println(Thread.currentThread().getName() + "\t" + queue.take());
 ​
                 TimeUnit.SECONDS.sleep(2);
                 System.out.println(Thread.currentThread().getName() + "\t" + queue.take());
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }, "consumer").start();
         
         // 程序執行結果:
         // provider     put 1
         // consumer    1
         // provider     put 2
         // consumer    2
         // provider     put 3
         // consumer    3
     }
 }
相關文章
相關標籤/搜索