ArrayBlockQueue源碼解析

清明節和朋友去被抖音帶火的一個餐廳,下午兩點鐘取晚上的號,前面已經有十幾桌了,四點半餐廳開始正式營業,等輪到咱們已經近八點了。餐廳分爲幾個區域,只有最火的區域(在小船上)須要排號,其餘區域基本上是隨到隨吃的,最冷清的區域幾乎都沒什麼人。菜的價格異常的貴,味道也並很差。最後送出兩張圖: 數組

image.png
image.png

好了,進入今天的正題,今天要講的是ArrayBlockQueue,ArrayBlockQueue是JUC提供的線程安全的有界的阻塞隊列,一看到Array,第一反應:這貨確定和數組有關,既然是數組,那天然是有界的了,咱們先來看看ArrayBlockQueue的基本使用方法,而後再看看ArrayBlockQueue的源碼。安全

ArrayBlockQueue基本使用

public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue(5);
        arrayBlockingQueue.offer(10);
        arrayBlockingQueue.offer(50);
        arrayBlockingQueue.add(20);
        arrayBlockingQueue.add(60);
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.peek());
        System.out.println(arrayBlockingQueue);
    }
複製代碼

運行結果: bash

image.png

  1. 建立了一個長度爲5的ArrayBlockQueue。
  2. 用offer方法,向ArrayBlockQueue添加了兩個元素,分別是10,50。
  3. 用put方法,向ArrayBlockQueue添加了兩個元素,分別是20,60。
  4. 打印出ArrayBlockQueue,結果是10,50,20,60。
  5. 用poll方法,彈出ArrayBlockQueue第一個元素,而且打印出來:10。
  6. 打印出ArrayBlockQueue,結果是50,20,60。
  7. 用take方法,彈出ArrayBlockQueue第一個元素,而且打印出來:50。
  8. 打印出ArrayBlockQueue,結果是20,60。
  9. 用peek方法,彈出ArrayBlockQueue第一個元素,而且打印出來:20。
  10. 打印出ArrayBlockQueue,結果是20,60。

代碼比較簡單,可是你確定會有疑問性能

  • offer/add(在上面的代碼中沒有演示)/put都是往隊列裏面添加元素,區別是什麼?
  • poll/take/peek都是彈出隊列的元素,區別是什麼?
  • 底層代碼是如何保證線程安全的?
  • 數據保存在哪裏?

要解決上面幾個疑問,最好的辦法固然是看下源碼,經過親自閱讀源碼所產生的印象遠遠要比看視頻,看博客,死記硬背最後的結論要深入的多。就算真的忘記了,只要再看看源碼,瞬間能夠回憶起來。ui

ArrayBlockQueue源碼解析

構造方法

ArrayBlockQueue提供了三個構造方法,以下圖所示: this

image.png

ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
複製代碼

這是最經常使用的構造方法,傳入capacity,capacity是容量的意思,也就是ArrayBlockingQueue的最大長度,方法內部直接調用了第二個構造方法,傳入的第二個參數爲false。spa

ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
複製代碼

這個構造方法接受兩個參數,分別是capacity和fair,fair是boolean類型的,表明是公平鎖,仍是非公平鎖,能夠看出若是咱們用第一個構造方法來建立ArrayBlockingQueue的話,採用的是非公平鎖,由於公平鎖會損失必定的性能,在沒有充足的理由的狀況下,是沒有必要採用公平鎖的。線程

方法內部作了幾件事情:3d

  1. 建立Object類型的數組,容量爲capacity,而且賦值給當前類對象的items。
  2. 建立排他鎖。
  3. 建立條件變量notEmpty 。
  4. 建立條件變量notFull。

至於排他鎖和兩個條件變量是作什麼用的,看到後面就明白了。code

ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        //調用第二個構造方法,方法內部就是初始化數組,排他鎖,兩個條件變量
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // 開啓排他鎖
        try {
            int i = 0;
            try {
                // 循環傳入的集合,把集合中的元素賦值給items數組,其中i會自增
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;//把i賦值給count 
            //若是i==capacity,也就是到了最大容量,把0賦值給putIndex,不然把i賦值給putIndex
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();//釋放排他鎖
        }
    }
複製代碼
  1. 調用第二個構造方法,方法內部就是初始化數組items,排他鎖lock,以及兩個條件變量。
  2. 開啓排他鎖。
  3. 循環傳入的集合,將集合中的元素賦值給items數組,其中i會自增。
  4. 把i賦值給count。
  5. 若是i==capacity,說明到了最大的容量,就把0賦值給putIndex,不然把i賦值給putIndex。
  6. 在finally中釋放排他鎖。

看到這裏,咱們應該明白這個構造方法的做用是什麼了,就是把傳入的集合做爲ArrayBlockingQueuede初始化數據,可是咱們又會有一個新的疑問:count,putIndex 是作什麼用的。

offer(E e)

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();//開啓排他鎖
        try {
            if (count == items.length)//若是count==items.length,返回false
                return false;
            else {
                enqueue(e);//入隊
                return true;//返回true
            }
        } finally {
            lock.unlock();//釋放鎖
        }
    }
複製代碼
  1. 開啓排他鎖。
  2. 若是count==items.length,也就是到了最大的容量,返回false。
  3. 若是count<items.length,執行入隊方法,而且返回true。
  4. 釋放排他鎖。

看到這裏,咱們應該能夠明白了,ArrayBlockQueue是如何保證線程安全的,仍是利用了ReentrantLock排他鎖,count就是用來保存數組的當前大小的。咱們再來看看enqueue方法。

private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
複製代碼

這方法比較簡單,在代碼裏面就不寫註釋了,作了以下的操做:

  1. 把x賦值給items[putIndex] 。
  2. 將putIndex進行自增,若是自增後的值 == items.length,把0賦值給putIndex 。
  3. 執行count++操做。
  4. 調用條件變量notEmpty的signal方法,說明在某個地方,一定調用了notEmpty的await方法,這裏就是喚醒由於調用notEmpty的await方法而被阻塞的線程。

這裏就解答了一個疑問:putIndex是作什麼的,就是入隊元素的下標。

add(E e)

public boolean add(E e) {
        return super.add(e);
    }
複製代碼
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
複製代碼

這個方法內部最終仍是調用的offer方法。

put(E e)

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//開啓響應中斷的排他鎖
        try {
            while (count == items.length)//若是隊列滿了,調用notFull的await
                notFull.await();
            enqueue(e);//入隊
        } finally {
            lock.unlock();//釋放排他鎖
        }
    }
複製代碼
  1. 開啓響應中斷的排他鎖,若是在獲取鎖的過程當中,當前的線程被中斷,會拋出異常。
  2. 若是隊列滿了,調用notFull的await方法,說明在某個地方,一定調用了notFull的signal方法來喚醒當前線程,這裏用while循環是爲了防止虛假喚醒。
  3. 執行入隊操做。
  4. 釋放排他鎖。

能夠看到put方法和 offer/add方法的區別了:

  • offer/add:若是隊列滿了,直接返回false。
  • put:若是隊列滿了,當前線程被阻塞,等待喚醒。

poll()

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
複製代碼
  1. 開啓排他鎖。
  2. 若是count==0,直接返回null,不然執行dequeue出隊操做。
  3. 釋放排他鎖。

咱們來看dequeue方法:

private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];//得到元素的值
        items[takeIndex] = null;//把null賦值給items[takeIndex] 
        if (++takeIndex == items.length)//若是takeIndex自增後的值== items.length,就把0賦值給takeIndex
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//喚醒由於調用notFull的await方法而被阻塞的線程
        return x;
    }
複製代碼
  1. 獲取元素的值,takeIndex保存的是出隊的下標。
  2. 把null賦值給items[takeIndex],也就是清空被彈出的元素。
  3. 若是takeIndex自增後的值== items.length,就把0賦值給takeIndex。
  4. count--。
  5. 喚醒由於調用notFull的await方法而被阻塞的線程。

這裏調用了notFull的signal方法來喚醒由於調用notFull的await方法而被阻塞的線程,那到底在哪裏調用了notFull的await方法呢,還記不記得在put方法中調用了notFull的await方法,咱們再看看:

while (count == items.length)
                notFull.await();
複製代碼

當隊列滿了,就調用 notFull.await()來等待,在出隊操做中,又調用了notFull.signal()來喚醒。

take()

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
複製代碼
  1. 開啓排他鎖。
  2. 若是count==0,表明隊列是空的,則調用notEmpty的await方法,用while循環是爲了防止虛假喚醒。
  3. 執行出隊操做。
  4. 釋放排他鎖。

這裏調用了notEmpty的await方法,那麼哪裏調用了notEmpty的signal方法呢?在enqueue入隊方法裏。

咱們能夠看到take和poll的區別:

  • take:若是隊列爲空,會阻塞,直到被喚醒了。
  • poll: 若是隊列爲空,直接返回null。

peek()

public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); 
        } finally {
            lock.unlock();
        }
    }
複製代碼
final E itemAt(int i) {
        return (E) items[i];
    }
複製代碼
  1. 開啓排他鎖。
  2. 得到元素。
  3. 釋放排他鎖。

咱們能夠看到peek和poll/take的區別:

  • peek,只是獲取元素,不會清空元素。
  • poll/take,獲取並清空元素。

size()

public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
複製代碼
  1. 開啓排他鎖。
  2. 返回count。
  3. 釋放排他鎖。

總結

至此,ArrayBlockQueue的核心源碼就分析完畢了,咱們來作一個總結:

  • ArrayBlockQueue有幾個比較重要的字段,分別是items,保存的是隊列的數據,putIndex保存的是入隊的下標,takeIndex保存的是出隊的下標,count用來統計隊列元素的個數,lock用來保證線程的安全性,notEmpty和notFull兩個條件變量實現喚醒和阻塞。
  • offer和add是同樣的,其中add方法內部調用的就是offer方法,若是隊列滿了,直接返回false。
  • put,若是隊列滿了,會被阻塞。
  • peek,只是彈出元素,不會清空元素。
  • poll,彈出並清空元素,若是隊列爲空,直接返回null。
  • take,彈出並清空元素,若是隊列爲空,會被阻塞。
相關文章
相關標籤/搜索