聊聊Java中的生產者消費者模型——BlockingQueue

前言

生產者/消費者模型相信各位都不陌生,是一種很常見的分佈式資源調度模型。在這個模型中,至少有兩個對象:生產者和消費者。生產者只負責建立資源,消費者只負責使用資源。若是本身實現一個簡單的生產者/消費者模型也很容易,無非就是經過一個隊列來作,可是這種方式有不少隱藏的缺陷:java

  1. 須要保證資源的線程可見性,同時要手動實現線程同步
  2. 須要考慮各類臨界狀況和拒絕策略
  3. 須要在吞吐量與線程安全之間保持平衡

因此Java已經提早爲咱們封裝好了接口和實現,接下來咱們就要針對BlockingQueue接口和它的經常使用的實現類LinkedBlockingQueue進行簡要的分析node

阻塞隊列

概念

BlockingQueue,含義爲阻塞隊列,咱們能夠從類定義看出,其繼承了Queue接口,因此能夠看成隊列來使用:編程

圖1

既然叫作阻塞隊列,也就是說這個隊列的操做是以阻塞方式進行的,體如今以下兩個方面:安全

  • 插入元素的操做是阻塞的:當隊列滿時,執行插入操做的線程被阻塞
  • 移除元素的操做時阻塞的:當隊列空時,執行移除操做的線程被阻塞

經過這種方式,能夠方便地協調生產者和消費者之間的關係併發

接口方法

在BlockingQueue中,定義瞭如下6個接口:分佈式

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}
複製代碼

這些接口方法按功能能夠分爲三類:post

  • 添加元素:包括add、offer、put
  • 移除元素:包括remove、poll、take、drainTo
  • 獲取/檢查元素:包括contains、remainingCapacity

通常地,咱們也將添加元素叫作put操做(即便使用的是offer方法而不是put方法),移除元素的叫作take操做this

對於前兩類,能夠按照異常處理方式再次分爲如下幾類:spa

  • 拋出異常:add、remove
  • 返回特殊值:offer(e)、poll
  • 阻塞:put(e)、take
  • 超時退出:offer(e, time, unit)、poll(time, unit)

這幾種處理方式我就很少解釋了,字面意義已經很顯然了線程

阻塞隊列的實現

JDK8提供瞭如下BlockQueue的實現類:

圖2

咱們經常使用的基本有如下幾種:

  • ArrayBlockingQueue:基於ArrayList實現的阻塞隊列,有界
  • LinkedBlockingQueue:基於LinkedList實現的阻塞隊列,有界
  • PriorityBlockingQueue:優先隊列,無界
  • DelayQueue:支持延時獲取元素的優先隊列,無界

其他的實現感興趣的能夠自行了解,咱們這裏就以LinkedBlockingQueue爲例,介紹一下Java是如何實現阻塞隊列的

接口方法

除了BlockingQueue提供的接口方法以外,LinkedBlockingQueue還提供了一個方法peek,用於獲取隊首節點

至此,咱們經常使用的阻塞隊列方法都已說明完畢,這裏用一張表來總結一下[1]

方法/處理方式 拋出異常 返回特殊值 阻塞 超時退出
插入元素 add(e) offer(e) put(e) offer(e, timeout, unit)
移除元素 remove() poll() take() poll(timeout, unit)
獲取元素 element() peek() / /

其中element方法和peek方法功能是相同的

屬性

BlockingQueue僅僅定義了接口規範,真正的實現仍是由具體的實現類來完成,咱們暫且略過中間的AbstractQueue,直接來研究LinkedBlockingQueue,其中定義了幾個重要的域對象:

/** 元素個數 */
    private final AtomicInteger count = new AtomicInteger();
    
    /** 隊首節點 */
    transient Node<E> head;
    /** 隊尾節點 */
    private transient Node<E> last;

    /** take、poll等方法持有的鎖,這裏叫作take鎖或出鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** take方法的等待隊列 */
    private final Condition notEmpty = takeLock.newCondition();

    /** put、offer等方法持有的鎖,這裏叫作put鎖或入鎖 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** put方法的等待隊列 */
    private final Condition notFull = putLock.newCondition();
複製代碼

Node節點就是普通的隊列節點,和LinkedList同樣,咱們主要關注後面的4個域對象,能夠分爲兩類:用於插入元素的,和用於移除元素的。其中每類都有兩個屬性:ReentranLockCondition。其中ReentranLock是基於AQS[2]實現的一個可重入鎖(不理解可重入概念的能夠看成普通的鎖),Condition是一個等待/通知模式的具體實現(能夠理解爲一種提供了功能更強大的waitnotify的類)

count屬性天然不用說,headlast很顯然是用於維護存儲元素的隊列,相信也不用細說了。阻塞隊列和普通隊列的區分點是在於後面的ReentrantLockCondition類型的四個屬性,關於這四個屬性的意義,在接下來的幾個模塊會進行深刻的分析

不過咱們爲了接下來說解方便,先來簡單介紹一下Condition這個類。實際上,Condition是一個接口,具體的實現類是在AQS中。對於本篇文章來講,你只須要清楚3個方法:await()signal(),還有singalAll()。這三個方法徹底就能夠類比wait()notify()notifyAll(),它們之間的區別能夠模糊地理解爲,wait/notify這些方法管理的是對象鎖和類鎖,它們操控的是等待這些鎖的線程隊列;而await/signal這些方法管理的是基於AQS的鎖,操控的天然也是AQS中的線程等待隊列

因此這裏的notEmpty維護了等待take鎖的線程隊列,notFull維護了等待put鎖的線程隊列。從字面意義上也很好理解,notEmpty表示「隊列還沒空」,因此能夠取元素,同理,notFull就表示「隊列還沒滿」,能夠往裏插入元素

插入元素

offer(e)

先來看offer(e)方法,源碼以下:

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 若是容量達到上限會返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // 獲取put鎖
        putLock.lock();
        try {
            if (count.get() < capacity) {
                // 入隊並自增元素個數
                enqueue(node);
                // 注意,這裏c返回的是增長前的值
                c = count.getAndIncrement();
                // 若是容量沒到上限,就喚醒一個put操做
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            // 解鎖
            putLock.unlock();
        }
        if (c == 0)
            // 若是隊列以前爲空,會喚醒一個take操做
            signalNotEmpty();
        return c >= 0;
    }
複製代碼

這個方法大部分操做都很好理解,當添加元素的操做不容許時,offer方法會給用戶返回false,相似於非阻塞的通訊方式。offer方法的線程安全性是經過put鎖來保證的

這裏有一個頗有意思的地方,咱們看最後判斷若是c == 0,那麼就會喚醒一個take操做。可能不少人疑惑這裏爲何要加一條判斷,是這樣的,整個方法中,c的初值是-1,修改其值的惟一地方就是c = count.getAndIncrement()這條語句。也就是說,若是斷定c == 0,那麼這條語句的返回值就是0,即在插入元素以前,隊列是空的。因此,若是一開始隊列爲空,當插入第一個元素以後,會馬上喚醒一個take操做[3]

至此,整個方法流程能夠概括爲:

  1. 獲取put鎖
  2. 元素入隊,並自增count
  3. 若是容量未到上限,則喚醒一個put操做
  4. 若是在插入元素以前隊列爲空,則在最後喚醒一個take操做

offer(e, timeout, unit)

趁熱打鐵,咱們接着來看帶有超時機制的offer方法:

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 可被中斷地獲取put鎖
        putLock.lockInterruptibly();
        try {
            // 重複執行while循環體,直到隊列不滿,或到了超時時間
            while (count.get() == capacity) {
                // 到了超時時間後就返回false
                if (nanos <= 0)
                    return false;
                // 會將當前線程添加到notFull等待隊列中,
                // 返回的是剩餘可用的等待時間
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
複製代碼

整個方法大致上和offer(e)方法相同,不一樣的點有兩處:

  1. 獲取鎖採用的是可中斷的形式,即putLock.lockInterruptibly()
  2. 若是隊列一直是滿的,則會循環執行notFull.awaitNanos(nanos)操做來將當前線程添加到notFull等待隊列中(等待put操做的執行)

其他部分和offer(e)徹底一致,在這裏就不贅述了

add(e)

add方法與offer方法相比,當操做不容許時,會拋出異常而不是返回一個特殊值,以下:

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
複製代碼

單純地就是對offer(e)作了二次封裝,沒什麼好說的,須要提一點的就是這個方法的實現是在AbstractQueue

put(e)

put(e)方法當操做不容許時會阻塞線程,咱們來看其是如何實現的:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        
        // 以可中斷的形式獲取put鎖
        putLock.lockInterruptibly();
        try {
            // 與offer(e, timeout, unit)相比,採用了無限等待的方式
            while (count.get() == capacity) {
                // 當執行了移除元素操做後,會經過signal操做來喚醒notFull隊列中的一個線程
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
複製代碼

果真方法之間都是大同小異的,put(e)操做能夠類比咱們以前講的offer(e, timeout, unit),只有一個不一樣的地方,就是當隊列滿時,await操做再也不有超時時間,也就是說,只能等待take操做[4]來調用signal方法喚醒該線程

移除元素

poll()

poll()方法用於移除並返回隊首節點,下面是方法的具體實現:

public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        // 獲取take鎖
        takeLock.lock();
        try {
            if (count.get() > 0) {
                // 出隊,並自減
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    // 只要隊列還有元素,就喚醒一個take操做
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 若是在隊列滿的狀況下移除一個元素,會喚醒一個put操做
        if (c == capacity)
            signalNotFull();
        return x;
    }
複製代碼

若是你認真看了offer(e)方法以後,poll()方法就沒什麼好講的了,徹底就是offer(e)的翻版(我也想講點東西,可是poll()方法徹底和offer(e)流程如出一轍...)

其餘

poll(timeout, unit)/take()/remove()方法分別是offer(e, timeout, unit)/put()/add()方法的翻版,沒有什麼特殊的地方,這裏就一筆略過了

獲取元素

peek()

peek()方法是用於獲取隊首元素,其實現以下:

public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // 獲取take鎖
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
複製代碼

流程沒什麼好說的,須要注意的是該方法須要獲取take鎖,也就是說在peek()方法執行時,是不能執行移除元素的操做的

element()

element()方法的實現是在AbstractQueue中:

public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
複製代碼

仍是一樣的二次封裝操做

總結

原本說的是BlockingQueue,結果說了半天LinkedBlockingQueue。不過做爲阻塞隊列的一種經典實現,LinkedBlockingQueue中的方法實現思路也是對於理解阻塞隊列來講也是很重要的。想要理解阻塞隊列的理念,最重要的就是理解鎖的概念,好比LinkedBlockingQueue經過生產者鎖/put鎖消費者鎖/take鎖,以及鎖對應的Condition對象來實現線程安全。理解了這一點,才能理解整個生產者/消費者模型


  1. 這裏參考了《Java併發編程的藝術》 ↩︎

  2. 參見淺談AQS(抽象隊列同步器)一文 ↩︎

  3. 這裏描述爲「喚醒一個take操做」有些不許確,實際應描述爲「喚醒一個等待take鎖的線程」,不過我認爲前者更有助於讀者理解,因此沿用前者的描述方式 ↩︎

  4. 指的是和take功能相似的一組方法,包含有take/poll/removeput操做同理 ↩︎

相關文章
相關標籤/搜索