聊聊 JDK 阻塞隊列源碼(ReentrantLock實現)

項目中用到了一個叫作 Disruptor 的隊列,今天樓主並非要介紹 Disruptor 而是想鞏固一下基礎扒一下 JDK 中的阻塞隊列,聽到隊列相信你們對其並不陌生,在咱們現實生活中隊列隨處可見,最經典的就是去銀行辦理業務等。

固然在計算機世界中,隊列是屬於一種數據結構,隊列採用的FIFO(first in firstout),新元素(等待進入隊列的元素)老是被插入到尾部,而讀取的時候老是從頭部開始讀取。在計算中隊列通常用來作排隊(如線程池的等待排隊,鎖的等待排隊),用來作解耦(生產者消費者模式),異步等等。java

JDK 中的隊列

JDK中的隊列都實現了 java.util.Queue 接口,在隊列中又分爲兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬於線程安全,而在咱們真實的環境中,咱們的機器都是屬於多線程,當多線程對同一個隊列進行操做的時,若是使用線程不安全會出現數據丟失等沒法預測的事情,因此咱們這個時候只能選擇線程安全的隊列。下面是咱們今天要探討的兩個隊列node

隊列名字 是否加鎖 數據結構
ArrayBlockingQueue 數組array
LinkedBlockingQueue 鏈表

ArrayBlockingQueue 源碼分析

ArrayBlockingQueue 的原理就是使用一個可重入鎖(ReentrantLock )和這個鎖生成的兩個條件對象進行併發控制,ArrayBlockingQueue是一個有界的阻塞隊列,初始化的時候必需要指定隊列長度,且指定長度以後不容許進行修改。
數組

成員變量屬性

/** The queued items item的集合 */
    final Object[] items;

    /** items index for next take, poll, peek or remove 取出數據的索引 */
    int takeIndex;

    /** items index for next put, offer, or add 添加數據的索引 */
    int putIndex;

    /** Number of elements in the queue 隊列元素的個數 */
    int count;

    /** Main lock guarding all access 可重入的鎖 */
    final ReentrantLock lock;

    /** Condition for waiting takes 隊列爲空條件等待對象 */
    private final Condition notEmpty;

    /** Condition for waiting puts 隊列滿條件等待對象 */
    private final Condition notFull;

主要方法源碼實現

  1. add:添加元素到隊列裏,添加成功返回true,因爲容量滿了添加失敗會拋出IllegalStateException異常;
  2. offer:添加元素到隊列裏,添加成功返回true,添加失敗返回false;
  3. put:添加元素到隊列裏,若是容量滿了會阻塞直到容量不滿;
  4. poll:刪除隊列頭部元素,若是隊列爲空,返回null。不然返回元素;
  5. remove:基於對象找到對應的元素,並刪除。刪除成功返回true,不然返回false;
  6. take:刪除隊列頭部元素,若是隊列爲空,一直阻塞到隊列有元素並刪除。

add方法:
安全

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

offer方法:
數據結構

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

put方法:
多線程

public void put(E e) {
        xfer(e, true, ASYNC, 0);
}

咱們能夠看到,若是隊列滿了則返回false,若是沒有滿調用insert。整個方法是經過可重入鎖來鎖住的,而且最終釋放。併發

接着看一下insert方法:
異步

private void insert(E x) {
    items[putIndex] = x; // 元素添加到數組裏
    putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0
    ++count; // 元素個數+1
    notEmpty.signal(); // 使用條件對象notEmpty通知
}

這裏insert被調用的時候就會喚醒notEmpty上等待的線程進行take操做。
源碼分析

再看一下put方法:
post

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 不容許元素爲空
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加鎖,保證調用put方法的時候只有1個線程
    try {
        while (count == items.length) // 若是隊列滿了,阻塞當前線程,while用來防止假喚醒
            notFull.await(); // 線程阻塞並被掛起,同時釋放鎖
        insert(e); // 調用insert方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其餘線程能夠調用put方法
    }
}

經過上面代碼咱們能夠知道,add方法和offer方法不會阻塞線程,put方法若是隊列滿了會阻塞線程,直到有線程消費了隊列裏的數據纔有可能被喚醒。

緊接着咱們看一下poll方法:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程
    try {
        return (count == 0) ? null : extract(); // 若是隊列裏沒元素了,返回null,不然調用extract方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其餘線程能夠調用poll方法
    }
}

看看這個extract方法,extract的翻譯過來就是提取的意思:

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]); // 獲得取索引位置上的元素
    items[takeIndex] = null; // 對應取索引上的數據清空
    takeIndex = inc(takeIndex); // 取數據索引+1,當索引滿了變成0
    --count; // 元素個數-1
    notFull.signal(); // 使用條件對象notFull通知,原理同上面的insert中
    return x; // 返回元素
}

看一下take方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程
    try {
        while (count == 0) // 若是隊列空,阻塞當前線程,並加入到條件對象notEmpty的等待隊列裏
            notEmpty.await(); // 線程阻塞並被掛起,同時釋放鎖
        return extract(); // 調用extract方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其餘線程能夠調用take方法
    }
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素
            if (o.equals(items[i])) { // 兩個對象相等的話
                removeAt(i); // 調用removeAt方法
                return true; // 刪除成功,返回true
            }
        }
        return false; // 刪除成功,返回false
    } finally {
        lock.unlock(); // 釋放鎖,讓其餘線程能夠調用remove方法
    }
}

再看一下removeAt方法:

private void removeAt(int i) {
    final Object[] items = this.items;
    if (i == takeIndex) { 
        // 若是要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,而後取索引+1便可
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else { 
        // 若是要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
    --count; // 元素個數-1
    notFull.signal(); // 使用條件對象notFull通知
}

LinkedBlockingQueue 源碼分析

LinkedBlockingQueue是一個使用鏈表完成隊列操做的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表

成員變量屬性

/** The capacity bound, or Integer.MAX_VALUE if none 容量大小 */
    private final int capacity;

    /** Current number of elements 元素個數,由於有2個鎖,存在競態條件,使用AtomicInteger */
    private final AtomicInteger count = new AtomicInteger(0);

    /**
     * Head of linked list.
     * Invariant: head.item == null
     * 頭結點
     */
    private transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     * 尾節點
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc 獲取元素的鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes 取元素的條件對象 */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc 放元素的鎖 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts 添加元素的條件對象 */
    private final Condition notFull = putLock.newCondition();

主要方法源碼實現

因爲文章篇幅問題對於LinkedBlockingQueue咱們主要分析如下幾個方法:

  1. offer:添加元素到隊列裏,添加成功返回true,添加失敗返回false;
  2. put:添加元素到隊列裏,若是容量滿了會阻塞直到容量不滿;
  3. poll:刪除隊列頭部元素,若是隊列爲空,返回null。不然返回元素;
  4. remove:基於對象找到對應的元素,並刪除。刪除成功返回true,不然返回false;
  5. take:刪除隊列頭部元素,若是隊列爲空,一直阻塞到隊列有元素並刪除。

offer方法:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); // 不容許空元素
    final AtomicInteger count = this.count;
    if (count.get() == capacity) // 若是容量滿了,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e); // 容量沒滿,以新元素構造節點
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 放鎖加鎖,保證調用offer方法的時候只有1個線程
    try {
        // 再次判斷容量是否已滿,由於可能取元素鎖在進行消費數據,沒滿的話繼續執行
        if (count.get() < capacity) { 
            enqueue(node); // 節點添加到鏈表尾部
            c = count.getAndIncrement(); // 元素個數+1
            if (c + 1 < capacity) // 若是容量還沒滿
                notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示能夠再次往隊列裏面加數據
        }
    } finally {
        putLock.unlock(); // 釋放放鎖,讓其餘線程能夠調用offer方法
    }
    // 因爲存在放元素鎖和取元素鎖,這裏可能取元素鎖一直在消費數據,count會變化。這裏的if條件表示若是隊列中還有1條數據
    if (c == 0) 
        // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,能夠進行消費
        signalNotEmpty(); 
    return c >= 0; // 添加成功返回true,不然返回false
}

put方法:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); // 不容許空元素
    int c = -1;
    Node<E> node = new Node(e); // 以新元素構造節點
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 放鎖加鎖,保證調用put方法的時候只有1個線程
    try {
        while (count.get() == capacity) { // 若是容量滿了
            notFull.await(); // 阻塞並掛起當前線程
        }
        enqueue(node); // 節點添加到鏈表尾部
        c = count.getAndIncrement(); // 元素個數+1
        if (c + 1 < capacity) // 若是容量還沒滿
            // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示能夠再次往隊列裏面加數據了,隊列還沒滿
            notFull.signal();
    } finally {
        putLock.unlock(); // 釋放放鎖,讓其餘線程能夠調用put方法
    }
    // 因爲存在放鎖和拿鎖,這裏可能拿鎖一直在消費數據,count會變化。這裏的if條件表示若是隊列中還有1條數據
    if (c == 0)
        // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,能夠進行消費
        signalNotEmpty();
}

poll方法:

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0) // 若是元素個數爲0
        return null; // 返回null
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock(); // 拿鎖加鎖,保證調用poll方法的時候只有1個線程
    try {
        if (count.get() > 0) { // 判斷隊列裏是否還有數據
            x = dequeue(); // 刪除頭結點
            c = count.getAndDecrement(); // 元素個數-1
            if (c > 1) // 若是隊列裏還有元素
                // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列裏還有數據,能夠再次消費
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock(); // 釋放拿鎖,讓其餘線程能夠調用poll方法
    }
    // 因爲存在放鎖和拿鎖,這裏可能放鎖一直在添加數據,count會變化。這裏的if條件表示若是隊列中還能夠再插入數據
    if (c == capacity)
        // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列裏還能再次添加數據
        signalNotFull(); 
    return x;
}

take方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調用take方法的時候只有1個線程
    try {
        while (count.get() == 0) { // 若是隊列裏已經沒有元素了
            notEmpty.await(); // 阻塞並掛起當前線程
        }
        x = dequeue(); // 刪除頭結點
        c = count.getAndDecrement(); // 元素個數-1
        if (c > 1) // 若是隊列裏還有元素
            // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列裏還有數據,能夠再次消費
            notEmpty.signal(); 
    } finally {
        takeLock.unlock(); // 釋放拿鎖,讓其餘線程能夠調用take方法
    }
    // 因爲存在放鎖和拿鎖,這裏可能放鎖一直在添加數據,count會變化。這裏的if條件表示若是隊列中還能夠再插入數據
    if (c == capacity) 
        // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列裏還能再次添加數據
        signalNotFull(); 
    return x;
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock(); // remove操做要移動的位置不固定,對讀鎖寫鎖都進行加鎖
    try {
        for (Node<E> trail = head, p = trail.next; // 從鏈表頭結點開始遍歷
            p != null;
            trail = p, p = p.next) {
            if (o.equals(p.item)) { // 判斷是否找到對象
                unlink(p, trail); // 修改節點的連接信息,同時調用notFull的signal方法
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock(); // 2個鎖解鎖
    }
}

緊接着來看一下 fullyLockfullyUnlock方法:

/**
  * Locks to prevent both puts and takes.
  */
  void fullyLock() {
      putLock.lock();
      takeLock.lock();
  }

  /**
   * Unlocks to allow both puts and takes.
   */
  void fullyUnlock() {
      takeLock.unlock();
      putLock.unlock();
  }

LinkedBlockingQueue的take方法對於沒數據的狀況下會阻塞,poll方法刪除鏈表頭結點,remove方法刪除指定的對象。

須要注意的是remove方法因爲要刪除的數據的位置不肯定,須要2個鎖同時加鎖。

小結

文章有點長,JDK中的阻塞隊列線程安全的主要有ArrayBlockingQueueLinkedBlockingQueueLinkedTransferQueueDelayQueue四種,今天樓主把ArrayBlockingQueueLinkedBlockingQueue放在一塊兒介紹主要緣由是這二者都是使用可重入鎖 ReentrantLock實現的線程安全。
固然兩者也有很大的不一樣,主要是:

1,ArrayBlockingQueue只有1個鎖,添加數據和刪除數據的時候只能有1個被執行,不容許並行執行。
LinkedBlockingQueue有2個鎖,放元素鎖和取元素鎖,添加數據和刪除數據是能夠並行進行的,固然添加數據和刪除數據的時候只能有1個線程各自執行。

2,ArrayBlockingQueue中放入數據阻塞的時候,須要消費數據才能喚醒。
LinkedBlockingQueue中放入數據阻塞的時候,由於它內部有2個鎖,能夠並行執行放入數據和消費數據,不只在消費數據的時候進行喚醒插入阻塞的線程,同時在插入的時候若是容量還沒滿,也會喚醒插入阻塞的線程。

參考連接

相關文章
相關標籤/搜索