1.相關知識的瞭解java
阻塞隊列:當隊列爲空時,去隊列中取數據會被阻塞。當隊列滿時,往隊列中放數據會被阻塞。數組
非阻塞隊列:當隊列爲空時,去隊列取數據會直接返回失敗,隊列滿時,往隊列中放數據會直接返回失敗。併發
2.經常使用的阻塞隊列ide
LinkedBlockingQueue:基於鏈表實現的FIFO的阻塞隊列,建立是能夠指定容量大小,不指定則是默認值Integer.MAX_VALUE。this
ArrayBlockingQueue:基於數組實現的FIFO的阻塞隊列,再建立是必須指定大小atom
3.LinkedBlockingQueue阻塞隊列模擬生產者-消費者模式spa
1 package com.test; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.LinkedBlockingQueue; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.atomic.AtomicInteger; 7 8 /** 9 * 10 * @Title: Test01.java 11 * @Package com.test 12 * @Description: 使用併發包下 LinkedBlockingQueue 阻塞隊列模擬生產者消費者問題 13 * @author Mr.Chen 14 * @date 2019年4月9日 15 * @version V1.0 16 * 版權聲明:本文爲博主原創文章,轉載請附上博文連接 17 */ 18 public class Test01 { 19 public static void main(String[] args) { 20 BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3); 21 ProducerThread producerThread = new ProducerThread(blockingQueue); 22 ConsumerThread consumerThread = new ConsumerThread(blockingQueue); 23 Thread t1 = new Thread(producerThread); 24 Thread t2 = new Thread(consumerThread); 25 t1.start(); 26 t2.start(); 27 //10秒後 中止線程.. 28 try { 29 Thread.sleep(10*1000); 30 producerThread.stop(); 31 } catch (Exception e) { 32 // TODO: handle exception 33 } 34 35 } 36 } 37 38 /** 39 * 40 *生存者類 41 */ 42 class ProducerThread implements Runnable { 43 //定義變量接收LinkedBlockingQueue 44 BlockingQueue<String> queue = null; 45 46 //定義一個自增的變量,用來做爲隊列裏面的消息 47 AtomicInteger data = new AtomicInteger(0); 48 49 //定義循環的結束條件 50 boolean flag = true; 51 52 public ProducerThread(BlockingQueue<String> queue) { 53 this.queue = queue; 54 } 55 56 @Override 57 public void run() { 58 try { 59 //循環往隊列裏面放值,若是放不進去,設置兩秒的等待時間。每一個循環設置1秒的等待時間,以便打印的時候方便查看 60 System.out.println(Thread.currentThread().getName() + " 生產者啓動-----"); 61 while (flag) { 62 //獲取data自增的值 63 String message = data.incrementAndGet() + ""; 64 boolean offer = queue.offer(message, 2, TimeUnit.SECONDS); 65 if (offer) { 66 System.out.println(Thread.currentThread().getName() + " " + message + " 放入隊列成功"); 67 } else { 68 System.out.println(Thread.currentThread().getName() + " " + message + " 放入隊列失敗"); 69 } 70 Thread.sleep(1000); 71 } 72 } catch (InterruptedException e) { 73 System.out.println(Thread.currentThread().getName() + " 生產者中止-----"); 74 } finally { 75 System.out.println(Thread.currentThread().getName() + " 生產者中止-----"); 76 } 77 } 78 79 80 public void stop() { 81 this.flag = false; 82 } 83 84 } 85 86 class ConsumerThread implements Runnable { 87 //定義變量接收LinkedBlockingQueue 88 BlockingQueue<String> queue = null; 89 90 //定義循環的結束條件 91 boolean flag = true; 92 93 public ConsumerThread(BlockingQueue<String> queue) { 94 this.queue = queue; 95 } 96 97 @Override 98 public void run() { 99 //使用queue 去取隊列中的消息 100 System.out.println(Thread.currentThread().getName() + " 消費者啓動-----"); 101 try { 102 while (flag) { 103 String poll = queue.poll(2, TimeUnit.SECONDS); 104 if (poll == null) { 105 flag = false; 106 System.out.println("消費者超過2秒時間未獲取到消息."); 107 return; 108 } 109 System.out.println(Thread.currentThread().getName() + " 消費者拿到 " + poll ); 110 Thread.sleep(2000); 111 } 112 } catch (InterruptedException e) { 113 // TODO Auto-generated catch block 114 e.printStackTrace(); 115 } 116 117 } 118 119 }