JAVA concurrency -- 阻塞隊列ArrayBlockingQueue源碼詳解

概述

ArrayBlockingQueue顧名思義,使用數組實現的阻塞隊列。今天咱們就來詳細講述下他的代碼實現數組

阻塞隊列

什麼是阻塞隊列?緩存

阻塞隊列是一種特殊的隊列,使用場景爲併發環境下。在某種狀況下(當線程沒法獲取鎖的時候)線程會被掛起而且在隊列中等待,若是條件具有(鎖被釋放)那麼就會喚醒掛起的線程。併發

通俗點來說的話,阻塞隊列相似於理髮店的等待區,當沒有理髮師空閒的時候,客人會在等待區等待,一旦有了空閒,就會有人自動遞補。app

類的繼承關係

ArrayBlockingQueue繼承了抽象隊列,而且實現了阻塞隊列,所以它具有隊列的全部基本特性。ide

基本實現原理

ArrayBlockingQueue的實現是基於ReentrantLock以及AQS內部實現的鎖機制以及Condition機制。 ArrayBlockingQueue內部聲明瞭兩個Condition變量,一個叫notEmpty,一個叫notFull,當有數據加入隊列時嘗試喚醒notEmpty,當有數據移除隊列時則喚醒notFull,從而實現一個相似於生產者消費者模型的機制。函數

源碼分析

類成員變量

    // 隊列的存儲對象數組
    final Object[] items;    // 下一個取出的序號
    int takeIndex;    // 下一個放入隊列的序號
    int putIndex;    // 隊列中的元素數目
    int count;    // 鎖以及用來控制隊列的兩個條件變量
    final ReentrantLock lock;    private final Condition notEmpty;    private final Condition notFull;    transient Itrs itrs = null;

構造函數

    public ArrayBlockingQueue(int capacity) {        this(capacity, false);
    }    
    // 通用的構造函數,以容量和是否公平鎖爲參數,餘下兩個構造函數均調用此函數
    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {        // 調用構造函數
        this(capacity, fair);        // 爲阻塞隊列初始化數據(此操做須要上鎖)
        final ReentrantLock lock = this.lock;
        lock.lock();        try {            int i = 0;            try {                // 將集合中的數據存放到數組中而且進行判空操做
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {                throw new IllegalArgumentException();
            }            // 修改count和putIndex的值
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

這裏有一點疑問,這裏明明是構造函數,是類初始化的地方,照理來講不會產生競爭,爲何要進行加鎖操做呢?此處本來有一句原版的註釋 Lock only for visibility, not mutual exclusion 鎖是爲了可見性而不是互斥。這句話怎麼理解呢?咱們仔細觀察代碼,發現當咱們把集合中的數據所有插入隊列中以後,咱們會修改相應的count以及putIndex的數值,可是若是咱們沒有加鎖,那麼在集合插入完成前count以及putIndex沒有完成初始化操做的時候若是有其餘線程進行了插入等操做的話,會形成數據同步問題從而使得數據不許確,所以這裏的鎖是必要的。源碼分析

隊列操做

基礎隊列操做enqueue和dequeue

    // 隊列的插入操做
    private void enqueue(E x) {        // 本地聲明一個item數組的引用
        final Object[] items = this.items;        // 將元素放入數組中
        items[putIndex] = x;        // 若是此時已經到了數組的末尾了,將putIndex重置爲0
        if (++putIndex == items.length)
            putIndex = 0;        // 元素數目加1
        count++;        // 發出通知告訴全部取數據的線程能夠取數據
        notEmpty.signal();
    }    // 隊列的移除操做
    private E dequeue() {        final Object[] items = this.items;        @SuppressWarnings("unchecked")        // 找到要移除的數據置空
        E x = (E) items[takeIndex];
        items[takeIndex] = null;        // 若是此時已經到了數組的末尾了,將takeIndex重置爲0
        if (++takeIndex == items.length)
            takeIndex = 0;        // 元素數目減1
        count--;        // 迭代器操做,這個以後再說
        if (itrs != null)
            itrs.elementDequeued();        // 發出通知告知插入線程能夠工做
        notFull.signal();        return x;
    }

這兩個方法是隊列操做的基本方法,基本上就是常規的數組數據插入移除,只是有一點很讓人困惑 final Object[] items = this.items; 這段代碼實現將類成員對象在本地建立了一個引用,而後在本地使用引用進行操做,爲何要畫蛇添足呢?除此以外,代碼中大量用到了這種手法,例如: final ReentrantLock lock = this.lock; 這又是爲了什麼呢?對此筆者猜想多是和優化相關,由於jdk7中的實現與之不一樣,是使用的類變量直接操做。在進行了資料查閱後,筆者找到了一個相對靠譜的解釋:鄭州市不孕不育醫院:http://jbk.39.net/yiyuanfengcai/tsyl_zztjyy/3033/優化

這是ArrayBlockingQueue的做者Doug Lea的習慣,他認爲這種書寫習慣是對機器更加友好的書寫this

固然也有一些大神有一些其餘的解釋:spa

final自己是不可變的,可是因爲反射以及序列化操做的存在,final的不可變性就變得捉摸不定,除此以外一些編譯器層面上在final上優化的不夠好,致使會在使用到數據的時候反覆重載致使緩存失效

但願你們能夠本身認真思考下,而後嘗試下,獲得本身的結論。

阻塞隊列的插入操做

    public boolean offer(E e) {
        checkNotNull(e);        final ReentrantLock lock = this.lock;
        lock.lock();        try {            // 若是阻塞隊列已滿,那麼插入失敗
            if (count == items.length)                return false;            else {                // 不然插入成功
                enqueue(e);                return true;
            }
        } finally {
            lock.unlock();
        }
    }    public void put(E e) throws InterruptedException {
        checkNotNull(e);        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();        try {            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }    public boolean add(E e) {        if (offer(e))            return true;        else
            throw new IllegalStateException("Queue full");
    }

阻塞隊列插入操做大體就以上幾種,這幾種的區別在代碼中也體現得比較清楚了:

  1. offer返回的是布爾值,插入成功返回true不然(隊列已滿)返回false

  2. put沒有返回值,假如隊列是滿的,他會一直阻塞直到隊列爲空的時候執行插入操做

  3. add實際上調用的就是offer,只是他在加入失敗後會拋出異常

阻塞隊列的移除操做

    public E poll() {        final ReentrantLock lock = this.lock;
        lock.lock();        try {            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();        try {            while (count == 0)
                notEmpty.await();            return dequeue();
        } finally {
            lock.unlock();
        }
    }    public E peek() {        final ReentrantLock lock = this.lock;
        lock.lock();        try {            return itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }
  1. poll執行成功會返回隊列元素,若是隊列爲空則直接返回null

  2. take執行成功會返回隊列元素,可是若是隊列爲空他不會返回而是等待有數據插入,而後取出

  3. peek則是直接獲取隊列元素,而且執行後不會將元素從隊列中刪除

迭代器實現

因爲迭代器和內部隊列共享數據,再加上阻塞隊列的特性,致使爲了實現迭代器功能,須要新增一些很複雜的代碼實現。

內部聲明瞭兩個類來實現迭代器,一個是Itr繼承Iterator<E>,一個則是Itrs

Itrs

Itrs是用來管理迭代器的。因爲阻塞隊列內部可能會有多個迭代器在同時工做,在迭代器內部發生刪除或者是一些不常見的操做時可能會產生一些問題,好比他們會丟失本身的數據之類的。因此Itrs內部會維護一個變量用於記錄循環的圈數,而且在刪除操做removeAt的時候會通知全部的迭代器。

    class Itrs {        // 建立一個Node類做爲單向鏈表(節點是弱引用)來管理迭代器
        private class Node extends WeakReference<Itr> {
            Node next;

            Node(Itr iterator, Node next) {                super(iterator);                this.next = next;
            }
        }        // 循環圈數
        int cycles = 0;        // 鏈表頭
        private Node head;        // 清理相關的變量
        private Node sweeper = null;        private static final int SHORT_SWEEP_PROBES = 4;        private static final int LONG_SWEEP_PROBES = 16;

        Itrs(Itr initial) {
            register(initial);
        }        // 清理無效的迭代器(若是sweeper爲空,則從頭開始,不然從sweeper記錄的節點開始)
        void doSomeSweeping(boolean tryHarder) {
            
        }        // 新增長一個迭代器
        void register(Itr itr) {
            head = new Node(itr, head);
        }        // 當takeIndex爲0時調用此方法
        void takeIndexWrapped() {            // cycle數+1,內部實現通知全部迭代器並進行清理(鏈表遍歷)
        }        // 有移除操做的時候調用此方法,並通知全部迭代器進行清理
        void removedAt(int removedIndex) {            // 簡單的鏈表遍歷,內部調用Itr的removedAt方法
        }        // 當發現隊列爲空的時候調用此方法,清理迭代器內的弱引用
        void queueIsEmpty() {
            
        }        // 有元素被取時是調用
        void elementDequeued() {            // 若是數組爲空調用queueIsEmpty進行清理
            if (count == 0)
                queueIsEmpty();            // 若是takeIndex爲0,調用takeIndexWrapped,來進行循環+1操做
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }

Itr

Itrs是管理迭代器的,Itr則是迭代器的具體實現

    private class Itr implements Iterator<E> {        // 遊標,用於尋找下一個元素
        private int cursor;        // 下一個元素
        private E nextItem;        // 下一個元素的下標
        private int nextIndex;        // 上一個元素
        private E lastItem;        // 上一個元素的下標
        private int lastRet;        // 上一個take的下標
        private int prevTakeIndex;        // 上一個循環
        private int prevCycles;        // 標記爲空
        private static final int NONE = -1;        // 刪除標記
        private static final int REMOVED = -2;        // DETACH標記專用於prevTakeIndex
        private static final int DETACHED = -3;

        Itr() {            // 這是構造函數,內部實現主要是初始化爲主,
            // 而且在Itrs不爲空的時候進行一波清理操做
        }        boolean isDetached() {            return prevTakeIndex < 0;
        }        private int incCursor(int index) {            // 遊標+1,並從新計算值(判斷是否走完一個循環,是否等於putIndex)
            if (++index == items.length)
                index = 0;            if (index == putIndex)
                index = NONE;            return index;
        }        // 判斷給的刪除數是不是有效值
        private boolean invalidated(int index, int prevTakeIndex,                                    long dequeues, int length) {
            
        }        // 計算在迭代器的上一次操做後全部的刪除(出隊)操做
        private void incorporateDequeues() {            // 主要方法爲經過當前圈數和以前的圈數以及偏移量計算
            // 真實的刪除數,而且和prevTakeIndex以及index的偏移量進行比較
        }        // 進行detach操做並進行清理
        private void detach() {
            
        }        // 判斷是否有下一個節點
        public boolean hasNext() {
            
        }        // 沒有下一個節點(沒有detach的節點將會被執行detach操做)
        private void noNext() {
            
        }        // 找到下個節點
        public E next() {            // 實現不復雜,主要是須要判斷節點是不是detach模式
        }        // 刪除節點
        public void remove() {
            
        }        // 當隊列爲空或者後續很難找到下個節點的時候通知迭代器
        void shutdown() {
            
        }        // 輔助計算遊標和prevTakeIndex之間的距離
        private int distance(int index, int prevTakeIndex, int length) {
            
        }        // 刪除節點
        boolean removedAt(int removedIndex) {
            
        }        // 當takeIndex歸0時調用
        boolean takeIndexWrapped() {
            
        }
    }

總結

ArrayBlockingQueue的實現能夠說是比較的簡單清晰,主要是利用了ReentrantLock內部的Condition,經過設置兩個條件來巧妙地完成阻塞隊列的實現,只要可以理解這兩個條件的工做原理,源碼的理解就沒有太大的難度。ArrayBlockingQueue較難理解的反而是它內部的迭代器,因爲阻塞隊列的特性,他的迭代器可能會有丟失當前數據的風險,所以,做者創做的時候加入了許多複雜的方法來保證可靠性,可是在這裏因爲篇幅限制,以及迭代器在阻塞隊列中的地位和重要性並不高,因此簡單講述,若是有興趣能夠本身找一份源碼閱讀。鄭州不孕不育醫院:http://mobile.03913882333.com/

相關文章
相關標籤/搜索