Java阻塞隊列的簡單實現

來源:html

https://blog.biezhi.me/2019/01/simple-blocking-queue.htmljava

Java 併發經常使用的組件中有一種隊列叫阻塞隊列(BlockingQueue),當隊列爲空時,獲取元素的線程會阻塞等待直到隊列有數據;當隊列滿時,想要存儲元素的線程會阻塞等待直到隊列有空間。咱們常常會用這種數據結構能夠實現生產者、消費者模型。面試

本文會經過兩種方式來實現簡單的有界阻塞隊列,在最後分別測試不一樣實現的性能差別。數組

Monitor 和 Condition

看過 Java 併發相關書籍的同窗應該都見過 Monitor 這個詞,有人稱爲監視器也有人叫它管程,不過都是一個意思:一個同步工具,至關於操做系統中的互斥量(mutex),即值爲 1 的信號量。安全

synchronized 關鍵詞的背後就是靠 monitor 實現的,monitor 的重要特色是,同一個時刻,只有一個線程能進入 monitor 中定義的臨界區,這使得 monitor 可以達到互斥的效果。但僅僅有互斥的做用是不夠的,沒法進入 monitor 臨界區的線程應該被阻塞,在必要的時候能夠被喚醒,因此 Java 提供了 waitnotifynotifyAll 的 API 給咱們使用。數據結構

  • wait(): 讓當前線程進入等待隊列,同時會釋放鎖,直到被喚醒。
  • notify(): 從條件隊列中隨機喚醒一個線程,讓它去參與鎖競爭
  • notifyAll(): 喚醒條件隊列中的全部線程,讓它們去參與鎖競爭

實現同步的一種方式是使用 synchronized 關鍵字,還可使用 Lock 接口下的實現來完成,好比 ReentrantLock,它是一把重入鎖(synchronized 也是),基於 AQS 併發框架實現。咱們可使用它來進行加鎖和釋放鎖,若是遇到有條件須要阻塞可使用 Condition API。併發

  • Lock#newCondition(): 建立一個新的條件
  • Condition#await(): 讓當前線程等待
  • Condition#signal(): 喚醒一個等待線程

條件和鎖老是息息相關,在沒有 Lock 接口的時候你會發現 monitor 機制有一個嚴重的問題:一把鎖只能對應一個條件(也就是隻能夠作一次 wait),那麼在喚醒的時候就可能出現喚醒丟失。舉個例子,在兩個方法上有不一樣的條件會致使阻塞,它們持有一把鎖,喚醒時候若是用 notify 只會從條件隊列選擇一個,使用 notifyAll 會帶來大量的 CPU 上下文切換和鎖競爭,僞代碼以下:框架

1synchronized void foo() {
 2    while(CONDITION1){
 3        wait();
 4    }
 5    notifyAll();
 6}
 7synchronized void bar() {
 8    while(CONDITION2){
 9        wait();
10    }
11    notifyAll();
12}

具體實現

實現思路

咱們經過定義一個 Queue 接口來實現兩種隊列,該隊列是有界隊列,使用數組的方式實現,若是你有興趣也可使用鏈表或棧來實現這個隊列。提供 put 方法添加元素(滿了則阻塞),take 方法彈出元素(沒有元素則阻塞)。ide

定義接口

1public interface Queue<E> {
 2
 3    // 添加新元素,當隊列滿則阻塞
 4    void put(E e) throws InterruptedException;
 5
 6    // 彈出隊頭元素,當隊列空則阻塞
 7    E take() throws InterruptedException;
 8
 9    // 隊列元素個數
10    int size();
11
12    // 隊列是否爲空
13    boolean isEmpty();
14
15}

基於 synchronized 的實現

核心思路:工具

  • 添加元素時隊列滿則阻塞
  • 彈出元素時隊列空則阻塞
  • 添加元素後喚醒消費者
  • 彈出元素後喚醒生產者
1public class BlockingQueueWithSync<E> implements Queue<E> {
 2
 3    private E[] array;
 4    private int head;  // 隊頭指針
 5    private int tail;  // 隊尾指針
 6
 7    private volatile int size; // 隊列元素個數
 8
 9    public BlockingQueueWithSync(int capacity) {
10        array = (E[]) new Object[capacity];
11    }
12
13    @Override
14    public synchronized void put(E e) throws InterruptedException {
15        // 當隊列滿的時候阻塞
16        while (size == array.length) {
17            this.wait();
18        }
19
20        array[tail] = e;
21        // 隊列裝滿後索引歸零
22        if (++tail == array.length) {
23            tail = 0;
24        }
25        ++size;
26        // 通知其餘消費端有數據了
27        this.notifyAll();
28    }
29
30    @Override
31    public synchronized E take() throws InterruptedException {
32        // 當隊列空的時候阻塞
33        while (isEmpty()) {
34            this.wait();
35        }
36
37        E element = array[head];
38        // 消費完後從0開始
39        if (++head == array.length) {
40            head = 0;
41        }
42        --size;
43        // 通知其餘生產者能夠生產了
44        this.notifyAll();
45        return element;
46    }
47
48    @Override
49    public synchronized boolean isEmpty() {
50        return size == 0;
51    }
52
53    @Override
54    public synchronized int size() {
55        return size;
56    }
57
58}

基於 ReentrantLock 的實現

1public class BlockingQueueWithLock<E> implements Queue<E> {
 2
 3    private E[] array;
 4    private int head;
 5    private int tail;
 6
 7    private volatile int size;
 8
 9    private Lock      lock     = new ReentrantLock();
10    private Condition notFull  = lock.newCondition();
11    private Condition notEmpty = lock.newCondition();
12
13    public BlockingQueueWithLock(int capacity) {
14        array = (E[]) new Object[capacity];
15    }
16
17    @Override
18    public void put(E e) throws InterruptedException {
19        lock.lockInterruptibly();
20        try {
21            // 隊列滿,阻塞
22            while (size == array.length) {
23                notFull.await();
24            }
25            array[tail] = e;
26            if (++tail == array.length) {
27                tail = 0;
28            }
29            ++size;
30            notEmpty.signal();
31        } finally {
32            lock.unlock();
33        }
34    }
35
36    @Override
37    public E take() throws InterruptedException {
38        lock.lockInterruptibly();
39        try {
40            // 隊列空,阻塞
41            while (isEmpty()) {
42                notEmpty.await();
43            }
44            E element = array[head];
45            if (++head == array.length) {
46                head = 0;
47            }
48            --size;
49            // 通知isFull條件隊列有元素出去
50            notFull.signal();
51            return element;
52        } finally {
53            lock.unlock();
54        }
55    }
56
57    @Override
58    public boolean isEmpty() {
59        lock.lock();
60        try {
61            return size == 0;
62        } finally {
63            lock.unlock();
64        }
65    }
66
67    @Override
68    public int size() {
69        lock.lock();
70        try {
71            return size;
72        } finally {
73            lock.unlock();
74        }
75    }
76
77}

對比性能

1public class Benchmark {
 2
 3    @Test
 4    public void testWithMonitor() {
 5        Queue<Integer> queue = new BlockingQueueWithSync<>(5);
 6        execute(queue);
 7    }
 8
 9    @Test
10    public void testWithCondition() {
11        Queue<Integer> queue = new BlockingQueueWithLock<>(5);
12        execute(queue);
13    }
14
15    private void execute(Queue<Integer> queue) {
16        ExecutorService executorService = Executors.newCachedThreadPool();
17        for (int i = 1; i <= 1000; i++) {
18            final int finalNum = i;
19            executorService.execute(() -> {
20                try {
21                    queue.put(finalNum);
22                    Integer take = queue.take();
23                    System.out.println("item: " + take);
24                } catch (InterruptedException e) {
25                    e.printStackTrace();
26                }
27            });
28        }
29        executorService.shutdown();
30    }
31
32}

這個測試程序讓 2 個隊列的可存儲的元素數都爲 5,開啓 1000 個線程進行 puttake 操做,運行後查看總耗時。

能夠看出,使用 synchronized 的方式性能較差。

img

java匠人手法-優雅的處理空值

REST API 的安全基礎

深刻淺出 CAS

Spring Boot Devtools熱部署

Spring Boot AOP記錄用戶操做日誌

Spring Boot整合Mongo DB

【圖文講解】你必定能看懂的HTTPS原理剖析!

基礎面試,爲何面試官總喜歡問String?

Spring Boot Admin 2.2.0發佈,支持最新Spring Boot/Cloud以外,新增中文展現!

你應該知道的 @ConfigurationProperties 註解的使用姿式,這一篇就夠了

img

若有收穫,請幫忙轉發,您的鼓勵是做者最大的動力,謝謝!

本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索