在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的全部成員,包括他們各自的功能以及常見使用場景。java
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * @author jackyuj */ public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 聲明一個容量爲10的緩存隊列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 藉助Executors ExecutorService service = Executors.newCachedThreadPool(); // 啓動線程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 執行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); } }
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 消費者線程 * * @author jackyuj */ public class Consumer implements Runnable { public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { System.out.println("啓動消費者線程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正從隊列獲取數據..."); String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println("拿到數據:" + data); System.out.println("正在消費數據:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超過2s還沒數據,認爲全部生產線程都已經退出,自動退出消費線程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消費者線程!"); } } private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; } import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 生產者線程 * * @author jackyuj */ public class Producer implements Runnable { public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("啓動生產者線程!"); try { while (isRunning) { System.out.println("正在生產數據..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println("將數據:" + data + "放入隊列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("放入數據失敗:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生產者線程!"); } } public void stop() { isRunning = false; } private volatile boolean isRunning = true; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; }