ArrayBlockingQueue的內部實現

區別

  1. ArrayBlockingQueue 、ArrayList 的內部存儲結構爲數組;LinkedBlockingQueue 是單向鏈表, LinkedBlockingDeque 、 LinkedList 是雙向鏈表 (後三者都維護鏈表的head與tail引用node)
  2. ArrayBlockingQueue初始化時是必定要指定容量大小;ArrayList的默認大小爲0;而LinkedBlockingQueue、LinkedBlockingDeque提供默認容量Integer.MAX_VALUE。
  3. ArrayBlockingQueue & LinkedBlockingDeque 經過同一把鎖的不一樣Condition來控制;在操做時,生產與消費須要競爭同一把鎖。
    LinkedBlockingQueue經過鎖分離方式來控制(生產與消費競爭的資源是屬於獨立的鎖。隊列空時:有生產後須要喚醒等待的消費者;隊列滿時:有消費後須要喚醒等待的生產者;固然對於同屬生產(消費)者的不一樣線程相互間也須要競爭生產(消費)鎖)。

實現方案   

二者都是先加鎖ReentrantLock, 依賴於這個鎖的 [出隊]等待隊列  (Condition -> notEmpty)與 【入隊】等待隊列(Condition -> notFull) 來控制。java

鎖默認用的是非公平模式:NonfairSyncnode

能夠這麼理解:數組

  • notEmpty: 標識:當前隊列不是空的能夠消費; 因此出隊時,斷定隊列爲空,則await這個信號;隊列不爲空則進行出隊操做,成功後喚醒 notFull ;
  • notFull : 標識:當前隊列不是滿的能夠生產;  因此入隊時,斷定隊列已滿,則await這個信號。隊列沒滿則進行入隊操做,成功後喚醒notEmpty;

出隊

拿ArrayBlockingQueue 的出隊來分析:
this

  1. 先加鎖
  2. 斷定當前的數量是否爲空:
    1. 若是爲空,則notEmpty(「隊列不是空的」信號) 等待
    2. 若是不爲空,則出隊。 出隊成功後, 喚醒 notFull ( "隊列不是滿的" 信號)
  3. 最終釋放鎖

入隊

拿LinkedBlockingDeque 的入隊來分析
spa

  1. 先加鎖
  2. 嘗試出隊操做:
    1. 在unlinkFirst方法中, 從首節點進行出隊,出隊完畢後,設置好新的First-Node,  喚醒 notFull ( "隊列不是滿的" 信號)
    2. 若是出隊返回是null,則 notEmpty(「隊列不是空的」信號) 等待

模擬ArrayBlockingQueue的生產和消費方式

package com.noob.learn.netty;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {

    private int             capacity = 10;
    private Queue<Object>   queue    = new LinkedList<Object>();
    final ReentrantLock     lock     = new ReentrantLock();

    /** 當前隊列不是空的,能夠消費 */
    private final Condition notEmpty = lock.newCondition();

    /** 當前隊列不是滿的, 能夠生產 */
    private final Condition notFull  = lock.newCondition();

    public void produce() {
        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();

            while (queue.size() > capacity - 1) {
                System.out.println("庫存量:" + queue.size() + "已滿, 暫時不能執行生產任務!");
                notFull.await();
            }

            queue.add(new Object());
            System.out.println("生產了, 現倉儲量爲:" + queue.size());
            notEmpty.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();

        }
    }

    public void consume() {
        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
            while (queue.size() == 0) {
                System.out.println("庫存量" + queue.size() + "暫時不能執行消費任務!");
                notEmpty.wait();
            }
            queue.remove();
            System.out.println("消費了, 現倉儲量爲:" + queue.size());
            notFull.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();

        }
    }

    public static void main(String[] args) throws InterruptedException {
        Storage storage = new Storage();
        new Thread(() -> {
            while (true) {
                storage.produce();
                try {
                    Thread.sleep(1000); // 執行太快了降速
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }   ).start();

        new Thread(() -> {
            while (true) {
                storage.consume();
                try {
                    Thread.sleep(1000);// 執行太快了降速
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }   ).start();
    }
}

執行結果:
.net

相關文章
相關標籤/搜索