迴歸Java基礎:LinkedBlockingQueue阻塞隊列解析

前言

整理了阻塞隊列LinkedBlockingQueue的學習筆記,但願對你們有幫助。有哪裏不正確,歡迎指出,感謝。html

LinkedBlockingQueue的概述

LinkedBlockingQueue的繼承體系圖

咱們先來看看LinkedBlockingQueue的繼承體系。使用IntelliJ IDEA查看類的繼承關係圖形 node

  • 藍色實線箭頭是指類繼承關係
  • 綠色箭頭實線箭頭是指接口繼承關係
  • 綠色虛線箭頭是指接口實現關係。

LinkedBlockingQueue實現了序列化接口 Serializable,所以它有序列化的特性。 LinkedBlockingQueue實現了BlockingQueue接口,BlockingQueue繼承了Queue接口,所以它擁有了隊列Queue相關方法的操做。編程

LinkedBlockingQueue的類圖

類圖來自Java併發編程之美 安全

LinkedBlockingQueue主要特性:bash

  1. LinkedBlockingQueue底層數據結構爲單向鏈表。
  2. LinkedBlockingQueue 有兩個Node節點,一個head節點,一個tail節點,只能從head取元素,從tail添加元素。
  3. LinkedBlockingQueue 容量是一個原子變量count,它的初始值爲0。
  4. LinkedBlockingQueue有兩把ReentrantLock的鎖,一把控制元素入隊,一把控制出隊,保證在併發狀況下的線程安全。
  5. LinkedBlockingQueue 有兩個條件變量,notEmpty 和 notFull。它們內部均有一個條件隊列,存放着出入隊列被阻塞的線程,這實際上是生產者-消費者模型。

LinkedBlockingQueue的重要成員變量

//容量範圍,默認值爲 Integer.MAX_VALUE
private final int capacity;

//當前隊列元素個數
private final AtomicInteger count = new AtomicInteger();

//頭結點
transient Node<E> head;

//尾節點
private transient Node<E> last;

//take, poll等方法的可重入鎖
private final ReentrantLock takeLock = new ReentrantLock();

//當隊列爲空時,執行出隊操做(好比take )的線程會被放入這個條件隊列進行等待
private final Condition notEmpty = takeLock.newCondition();

//put, offer等方法的可重入鎖
private final ReentrantLock putLock = new ReentrantLock();

//當隊列滿時, 執行進隊操做( 好比put)的線程會被放入這個條件隊列進行等待
private final Condition notFull = putLock.newCondition();
複製代碼

LinkedBlockingQueue的構造函數

LinkedBlockingQueue有三個構造函數:數據結構

  1. 無參構造函數,容量爲Integer.MAX
public LinkedBlockingQueue() {
   this(Integer.MAX_VALUE);
}
複製代碼
  1. 設置指定容量的構造器
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
   //設置隊列大小
   this.capacity = capacity;
   //new一個null節點,head、tail節點指向該節點
   last = head = new Node<E>(null);
}
複製代碼
  1. 傳入集合,若是調用該構造器,容量默認也是Integer.MAX_VALUE
public LinkedBlockingQueue(Collection<? extends E> c) {
        //調用指定容量的構造器
        this(Integer.MAX_VALUE);
        //獲取put, offer的可重入鎖
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); 
        try {
            int n = 0;
            //循環向隊列中添加集合中的元素
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                //將隊列的last節點指向該節點
                enqueue(new Node<E>(e));
                ++n;
            }
            //更新容量值
            count.set(n);
        } finally {
            //釋放鎖
            putLock.unlock();
        }
    }
複製代碼

LinkedBlockingQueue底層Node類

Node源碼

static class Node<E> {
    // 當前節點的元素值
    E item;
    // 下一個節點的索引
    Node<E> next;
    //節點構造器
    Node(E x) { 
     item = x;
   }
 }
複製代碼

LinkedBlockingQueue的節點符合單向鏈表的數據結構要求:併發

  • 一個成員變量爲當前節點的元素值
  • 一個成員變量是下一節點的索引
  • 構造方法的惟一參數節點元素值。

Node節點圖

item表示當前節點的元素值,next表示指向下一節點的指針函數

LinkedBlockingQueue經常使用操做

offer操做

入隊方法,其實就是向隊列的尾部插入一個元素。若是元素爲空,拋出空指針異常。若是隊列已滿,則丟棄當前元素,返回false,它是非阻塞的。若是隊列空閒則插入成功返回true。學習

offer源代碼

offer方法源碼以下:ui

public boolean offer(E e) {
        //爲空直接拋空指針
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //若是當前隊列滿了的話,直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        //構造新節點
        Node<E> node = new Node<E>(e);
        獲取put獨佔鎖
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //判斷隊列是否已滿
            if (count.get() < capacity) {
                //進隊列
                enqueue(node);
                //遞增元素計數
                c = count.getAndIncrement();
                //若是元素入隊,還有空閒,則喚醒notFull條件隊列裏被阻塞的線程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //釋放鎖
            putLock.unlock();
        }
        //若是容量爲0,則
        if (c == 0)
            //激活 notEmpty 的條件隊列,喚醒被阻塞的線程
            signalNotEmpty();
        return c >= 0;
    }
複製代碼

enqueue方法源碼以下:

private void enqueue(Node<E> node) {
 //從尾節點加進去
 last = last.next = node;
 }
複製代碼

爲了形象生動,咱們用一張圖來看看往隊列裏依次放入元素A和元素B。圖片參考來源【細談Java併發】談談LinkedBlockingQueue

signalNotEmpty方法源碼以下

private void signalNotEmpty() {
    //獲取take獨佔鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
       //喚醒notEmpty條件隊列裏被阻塞的線程
       notEmpty.signal();
     } finally {
       //釋放鎖
       takeLock.unlock();
        }
    }
複製代碼

offer執行流程圖

基本流程:

  • 判斷元素是否爲空,若是是,就拋出空指針異常。
  • 判讀隊列是否已滿,若是是,添加失敗,返回false。
  • 若是隊列沒滿,構造Node節點,上鎖。
  • 判斷隊列是否已滿,若是隊列沒滿,Node節點在隊尾加入隊列待。
  • 加入隊列後,判斷隊列是否還有空閒,若是是,喚醒notFull的阻塞線程。
  • 釋放完鎖後,判斷容量是否爲空,若是是,喚醒notEmpty的阻塞線程。

put操做

put方法也是向隊列尾部插入一個元素。若是元素爲null,拋出空指針異常。若是隊列己滿則阻塞當前線程,直到隊列有空閒插入成功爲止。若是隊列空閒則插入成功,直接返回。若是在阻塞時被其餘線程設置了中斷標誌, 則被阻塞線程會拋出 InterruptedException 異常而返回。

put源代碼

public void put(E e) throws InterruptedException {
        ////爲空直接拋空指針異常
        if (e == null) throw new NullPointerException();
        int c = -1;
        // 構造新節點
        Node<E> node = new Node<E>(e);
        //獲取putLock獨佔鎖
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //獲取獨佔鎖,它跟lock的區別,是能夠被中斷
        putLock.lockInterruptibly();
        try {
            //隊列已滿線程掛起等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //進隊列
            enqueue(node);
            //遞增元素計數
            c = count.getAndIncrement();
            //若是元素入隊,還有空閒,則喚醒notFull條件隊列裏被阻塞的線程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //釋放鎖
            putLock.unlock();
        }
        //若是容量爲0,則
        if (c == 0)
            //激活 notEmpty 的條件隊列,喚醒被阻塞的線程
            signalNotEmpty();
    }
複製代碼

put流程圖

基本流程:

  • 判斷元素是否爲空,若是是就拋出空指針異常。
  • 構造Node節點,上鎖(可中斷鎖)
  • 判斷隊列是否已滿,若是是,阻塞當前線程,一直等待。
  • 若是隊列沒滿,Node節點在隊尾加入隊列。
  • 加入隊列後,判斷隊列是否還有空閒,若是是,喚醒notFull的阻塞線程。
  • 釋放完鎖後,判斷容量是否爲空,若是是,喚醒notEmpty的阻塞線程。

poll操做

從隊列頭部獲取並移除一個元素, 若是隊列爲空則返回 null, 該方法是不阻塞的。

poll源代碼

poll方法源代碼

public E poll() {
        final AtomicInteger count = this.count;
        //若是隊列爲空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        //獲取takeLock獨佔鎖
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //若是隊列不爲空,則出隊,並遞減計數
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                ////容量大於1,則激活 notEmpty 的條件隊列,喚醒被阻塞的線程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
           //釋放鎖
            takeLock.unlock();
        }
        if (c == capacity)
            //喚醒notFull條件隊列裏被阻塞的線程
            signalNotFull();
        return x;
    }
複製代碼

dequeue方法源代碼

//出隊列
  private E dequeue() {
        //獲取head節點
        Node<E> h = head;
        //獲取到head節點指向的下一個節點
        Node<E> first = h.next;
        //head節點原來指向的節點的next指向本身,等待下次gc回收
        h.next = h; // help GC
        // head節點指向新的節點
        head = first;
        // 獲取到新的head節點的item值
        E x = first.item;
        // 新head節點的item值設置爲null
        first.item = null;
        return x;
    }
複製代碼

爲了形象生動,咱們用一張圖來描述出隊過程。圖片參考來源【細談Java併發】談談LinkedBlockingQueue

signalNotFull方法源碼

private void signalNotFull() {
        //獲取put獨佔鎖
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            ////喚醒notFull條件隊列裏被阻塞的線程
            notFull.signal();
        } finally {
            //釋放鎖
            putLock.unlock();
        }
    }
複製代碼

poll流程圖

基本流程:

  • 判斷元素是否爲空,若是是,就返回null。
  • 加鎖
  • 判斷隊列是否有元素,若是沒有,釋放鎖
  • 若是隊列有元素,則出隊列,獲取數據,容量計數器減一。
  • 判斷此時容量是否大於1,若是是,喚醒notEmpty的阻塞線程。
  • 釋放完鎖後,判斷容量是否滿,若是是,喚醒notFull的阻塞線程。

peek操做

獲取隊列頭部元素可是不從隊列裏面移除它,若是隊列爲空則返回 null。 該方法是不 阻塞的。

peek源代碼

public E peek() {
        //隊列容量爲0,返回null
        if (count.get() == 0)
            return null;
        //獲取takeLock獨佔鎖
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            //判斷first是否爲null,若是是直接返回
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            //釋放鎖
            takeLock.unlock();
        }
    }
複製代碼

peek流程圖

基本流程:

  • 判斷隊列容量大小是否爲0,若是是,就返回null。
  • 加鎖
  • 獲取隊列頭部節點first
  • 判斷節點first是否爲null,是的話,返回null。
  • 若是fist不爲null,返回節點first的元素。
  • 釋放鎖。

take操做

獲取當前隊列頭部元素並從隊列裏面移除它。 若是隊列爲空則阻塞當前線程直到隊列 不爲空而後返回元素,若是在阻塞時被其餘線程設置了中斷標誌, 則被阻塞線程會拋出 InterruptedException 異常而返回。

take源代碼

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //獲取takeLock獨佔鎖
        final ReentrantLock takeLock = this.takeLock;
        //獲取獨佔鎖,它跟lock的區別,是能夠被中斷
        takeLock.lockInterruptibly();
        try {
            //當前隊列爲空,則阻塞掛起
            while (count.get() == 0) {
                notEmpty.await();
            }
            //)出隊並遞減計數
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
               //激活 notEmpty 的條件隊列,喚醒被阻塞的線程
                notEmpty.signal();
        } finally {
            //釋放鎖
            takeLock.unlock();
        }
        if (c == capacity)
            //激活 notFull 的條件隊列,喚醒被阻塞的線程
            signalNotFull();
        return x;
    }
複製代碼

take流程圖

基本流程:

  • 加鎖
  • 判斷隊列容量大小是否爲0,若是是,阻塞當前線程,直到隊列不爲空。
  • 若是隊列容量大小大於0,節點出隊列,獲取元素x,計數器減一。
  • 判斷隊列容量大小是否大於1,若是是,喚醒notEmpty的阻塞線程。
  • 釋放鎖。
  • 判斷隊列容量是否已滿,若是是,喚醒notFull的阻塞線程。
  • 返回出隊元素x

remove操做

刪除隊列裏面指定的元素,有則刪除並返回 true,沒有則返回 false。

remove方法源代碼

public boolean remove(Object o) {
         //爲空直接返回false
        if (o == null) return false;
        //雙重加鎖
        fullyLock();
        try {
            //邊歷隊列,找到元素則刪除並返回true
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    //執行unlink操做
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //解鎖
            fullyUnlock();
        }
    }
複製代碼

雙重加鎖,fullyLock方法源代碼

void fullyLock() {
        //putLock獨佔鎖加鎖
        putLock.lock();
        //takeLock獨佔鎖加鎖
        takeLock.lock();
    }
複製代碼

unlink方法源代碼

void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        //若是當前隊列滿 ,則刪除後,也不忘記喚醒等待的線程 
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
複製代碼

fullyUnlock方法源代碼

void fullyUnlock() {
        //與雙重加鎖順序相反,先解takeLock獨佔鎖
        takeLock.unlock();
        putLock.unlock();
    }
複製代碼

remove流程圖

基本流程

  • 判斷要刪除的元素是否爲空,是就返回false。
  • 若是要刪除的元素不爲空,加雙重鎖
  • 遍歷隊列,找到要刪除的元素,若是找不到,返回false。
  • 若是找到,刪除該節點,返回true。
  • 釋放鎖

size操做

獲取當前隊列元素個數。

public int size() {
        return count.get();
    }
複製代碼

因爲進行出隊、入隊操做時的 count是加了鎖的,因此結果相比ConcurrentLinkedQueue 的 size 方法比較準確。

總結

  • LinkedBlockingQueue底層經過單向鏈表實現。
  • 它有頭尾兩個節點,入隊操做是從尾節點添加元素,出隊操做是對頭節點進行操做。
  • 它的容量是原子變量count,保證szie獲取的準確性。
  • 它有兩把獨佔鎖,保證了隊列操做原子性。
  • 它的兩把鎖都配備了一個條件隊列,用來存放阻塞線程,結合入隊、出隊操做實現了一個生產消費模型。

Java併發編程之美中,有一張圖唯妙唯肖描述了它,以下圖:

參看與感謝

我的公衆號

  • 若是你是個愛學習的好孩子,能夠關注我公衆號,一塊兒學習討論。
  • 若是你以爲本文有哪些不正確的地方,能夠評論,也能夠關注我公衆號,私聊我,你們一塊兒學習進步哈。
相關文章
相關標籤/搜索