來源:html
https://blog.biezhi.me/2019/01/simple-blocking-queue.htmljava
Java 併發經常使用的組件中有一種隊列叫阻塞隊列(BlockingQueue),當隊列爲空時,獲取元素的線程會阻塞等待直到隊列有數據;當隊列滿時,想要存儲元素的線程會阻塞等待直到隊列有空間。咱們常常會用這種數據結構能夠實現生產者、消費者模型。面試
本文會經過兩種方式來實現簡單的有界阻塞隊列,在最後分別測試不一樣實現的性能差別。數組
看過 Java 併發相關書籍的同窗應該都見過 Monitor 這個詞,有人稱爲監視器也有人叫它管程,不過都是一個意思:一個同步工具,至關於操做系統中的互斥量(mutex),即值爲 1 的信號量。安全
synchronized
關鍵詞的背後就是靠 monitor 實現的,monitor 的重要特色是,同一個時刻,只有一個線程能進入 monitor 中定義的臨界區,這使得 monitor 可以達到互斥的效果。但僅僅有互斥的做用是不夠的,沒法進入 monitor 臨界區的線程應該被阻塞,在必要的時候能夠被喚醒,因此 Java 提供了 wait
和 notify
、notifyAll
的 API 給咱們使用。數據結構
wait()
: 讓當前線程進入等待隊列,同時會釋放鎖,直到被喚醒。notify()
: 從條件隊列中隨機喚醒一個線程,讓它去參與鎖競爭notifyAll()
: 喚醒條件隊列中的全部線程,讓它們去參與鎖競爭實現同步的一種方式是使用 synchronized
關鍵字,還可使用 Lock
接口下的實現來完成,好比 ReentrantLock
,它是一把重入鎖(synchronized 也是),基於 AQS 併發框架實現。咱們可使用它來進行加鎖和釋放鎖,若是遇到有條件須要阻塞可使用 Condition
API。併發
Lock#newCondition()
: 建立一個新的條件Condition#await()
: 讓當前線程等待Condition#signal()
: 喚醒一個等待線程條件和鎖老是息息相關,在沒有 Lock 接口的時候你會發現 monitor 機制有一個嚴重的問題:一把鎖只能對應一個條件(也就是隻能夠作一次 wait),那麼在喚醒的時候就可能出現喚醒丟失。舉個例子,在兩個方法上有不一樣的條件會致使阻塞,它們持有一把鎖,喚醒時候若是用 notify
只會從條件隊列選擇一個,使用 notifyAll
會帶來大量的 CPU 上下文切換和鎖競爭,僞代碼以下:框架
1synchronized void foo() { 2 while(CONDITION1){ 3 wait(); 4 } 5 notifyAll(); 6} 7synchronized void bar() { 8 while(CONDITION2){ 9 wait(); 10 } 11 notifyAll(); 12}
咱們經過定義一個 Queue
接口來實現兩種隊列,該隊列是有界隊列,使用數組的方式實現,若是你有興趣也可使用鏈表或棧來實現這個隊列。提供 put
方法添加元素(滿了則阻塞),take
方法彈出元素(沒有元素則阻塞)。ide
1public interface Queue<E> { 2 3 // 添加新元素,當隊列滿則阻塞 4 void put(E e) throws InterruptedException; 5 6 // 彈出隊頭元素,當隊列空則阻塞 7 E take() throws InterruptedException; 8 9 // 隊列元素個數 10 int size(); 11 12 // 隊列是否爲空 13 boolean isEmpty(); 14 15}
核心思路:工具
1public class BlockingQueueWithSync<E> implements Queue<E> { 2 3 private E[] array; 4 private int head; // 隊頭指針 5 private int tail; // 隊尾指針 6 7 private volatile int size; // 隊列元素個數 8 9 public BlockingQueueWithSync(int capacity) { 10 array = (E[]) new Object[capacity]; 11 } 12 13 @Override 14 public synchronized void put(E e) throws InterruptedException { 15 // 當隊列滿的時候阻塞 16 while (size == array.length) { 17 this.wait(); 18 } 19 20 array[tail] = e; 21 // 隊列裝滿後索引歸零 22 if (++tail == array.length) { 23 tail = 0; 24 } 25 ++size; 26 // 通知其餘消費端有數據了 27 this.notifyAll(); 28 } 29 30 @Override 31 public synchronized E take() throws InterruptedException { 32 // 當隊列空的時候阻塞 33 while (isEmpty()) { 34 this.wait(); 35 } 36 37 E element = array[head]; 38 // 消費完後從0開始 39 if (++head == array.length) { 40 head = 0; 41 } 42 --size; 43 // 通知其餘生產者能夠生產了 44 this.notifyAll(); 45 return element; 46 } 47 48 @Override 49 public synchronized boolean isEmpty() { 50 return size == 0; 51 } 52 53 @Override 54 public synchronized int size() { 55 return size; 56 } 57 58}
1public class BlockingQueueWithLock<E> implements Queue<E> { 2 3 private E[] array; 4 private int head; 5 private int tail; 6 7 private volatile int size; 8 9 private Lock lock = new ReentrantLock(); 10 private Condition notFull = lock.newCondition(); 11 private Condition notEmpty = lock.newCondition(); 12 13 public BlockingQueueWithLock(int capacity) { 14 array = (E[]) new Object[capacity]; 15 } 16 17 @Override 18 public void put(E e) throws InterruptedException { 19 lock.lockInterruptibly(); 20 try { 21 // 隊列滿,阻塞 22 while (size == array.length) { 23 notFull.await(); 24 } 25 array[tail] = e; 26 if (++tail == array.length) { 27 tail = 0; 28 } 29 ++size; 30 notEmpty.signal(); 31 } finally { 32 lock.unlock(); 33 } 34 } 35 36 @Override 37 public E take() throws InterruptedException { 38 lock.lockInterruptibly(); 39 try { 40 // 隊列空,阻塞 41 while (isEmpty()) { 42 notEmpty.await(); 43 } 44 E element = array[head]; 45 if (++head == array.length) { 46 head = 0; 47 } 48 --size; 49 // 通知isFull條件隊列有元素出去 50 notFull.signal(); 51 return element; 52 } finally { 53 lock.unlock(); 54 } 55 } 56 57 @Override 58 public boolean isEmpty() { 59 lock.lock(); 60 try { 61 return size == 0; 62 } finally { 63 lock.unlock(); 64 } 65 } 66 67 @Override 68 public int size() { 69 lock.lock(); 70 try { 71 return size; 72 } finally { 73 lock.unlock(); 74 } 75 } 76 77}
1public class Benchmark { 2 3 @Test 4 public void testWithMonitor() { 5 Queue<Integer> queue = new BlockingQueueWithSync<>(5); 6 execute(queue); 7 } 8 9 @Test 10 public void testWithCondition() { 11 Queue<Integer> queue = new BlockingQueueWithLock<>(5); 12 execute(queue); 13 } 14 15 private void execute(Queue<Integer> queue) { 16 ExecutorService executorService = Executors.newCachedThreadPool(); 17 for (int i = 1; i <= 1000; i++) { 18 final int finalNum = i; 19 executorService.execute(() -> { 20 try { 21 queue.put(finalNum); 22 Integer take = queue.take(); 23 System.out.println("item: " + take); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 }); 28 } 29 executorService.shutdown(); 30 } 31 32}
這個測試程序讓 2 個隊列的可存儲的元素數都爲 5,開啓 1000 個線程進行 put
和 take
操做,運行後查看總耗時。
能夠看出,使用 synchronized
的方式性能較差。
● 深刻淺出 CAS
● Spring Boot Admin 2.2.0發佈,支持最新Spring Boot/Cloud以外,新增中文展現!
● 你應該知道的 @ConfigurationProperties 註解的使用姿式,這一篇就夠了
若有收穫,請幫忙轉發,您的鼓勵是做者最大的動力,謝謝!
本文由博客一文多發平臺 OpenWrite 發佈!