JDK容器學習之Queue:LinkedBlockingQueue

基於鏈表阻塞隊列LinkedBlockingQueue

基於鏈表的無邊界阻塞隊列,經常使用與線程池建立中做爲任務緩衝隊列使用java

I. 底層數據結構

先看一下內部定義,與 ArrayBlockingQueue作一下對比,順帶看下這二者的區別及不一樣的應用場景node

/** 隊列的容量, or Integer.MAX_VALUE if none */
private final int capacity;

/** 隊列中實際的個數 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 隊列頭,但其中沒有有效數據,它的下一個才保存實際的數據
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * 隊列尾,其內包含有效的數據
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** 出隊的鎖, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** 進隊的鎖, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


static class Node<E> {
  // 存放在隊列中的數據; 隊列頭的item爲null
  E item;
  
  // 隊列中,該節點的下一個節點,隊列尾的next爲null
  Node<E> next;

  Node(E x) { item = x; }
}

說明數組

  1. 底層結構爲單向鏈表,其中隊列頭不包含有效數據;
  2. 隊列長度有界,爲初始化時指定的容量大小;沒指定時,默認爲int最大值
  3. count實時表示隊列中元素的個數,採用原子進行+/-1
  4. 進隊和出隊是兩個鎖,也就是說出隊和進隊能夠併發進行

對比下ArrayBlockingQueue,主要區別爲兩個地方數據結構

  1. LinkedBlockingQueue底層爲鏈表;ArrayBlockingQueue底層爲數組(貌似有點多餘,命名上就能夠看出)
  2. LinkedBlockingQueue出隊和入隊是兩個鎖,而ArrayBlockingQueue是一個鎖進行控制;即前者出隊和入隊能夠併發執行;然後者會出現鎖的競爭

II. 阻塞實現原理

0. Prefer

分析阻塞原理以前,先經過註釋解釋下LinkedBlockingQueue的使用場景併發

  • 先進先出隊列(隊列頭的是最早進隊的元素;隊列尾的是最後進隊的元素)
  • 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現擴容,容量滿,則阻塞進隊操做;容量空,則阻塞出隊操做)
  • 隊列不支持空元素

1. 進隊

public void put(E e) throws InterruptedException {
    // 隊列中不能存在null
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 若隊列已滿,則等待`notFull(出隊後,隊列未滿時).signal()`喚醒
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node); // 進隊
        c = count.getAndIncrement(); // 計數+1,並獲取隊列的實際元素個數
        if (c + 1 < capacity) // 若進隊後,隊列依然沒有滿,則釋放一個信號 (why?)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0) // 表示隊列從空到有一個數據,喚醒由於隊列爲空被阻塞的線程
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
// 喚醒阻塞的出隊線程,注意使用姿式,Condition的使用必須放在對應的鎖中間,不然會報錯
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

進隊邏輯:this

  1. null不容許入隊
  2. 加入隊鎖
  3. 判斷隊列是否已滿,如果,則阻塞線程
  4. 待其餘線程出隊時被喚醒,將元素掛在隊列尾
  5. 若是隊列以前爲空,此時入隊成功以後,須要執行 notEmpty.singal(),喚醒由於隊列空被阻塞的出隊線程

2. 出隊

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 若是隊列爲空,則阻塞,等待入隊以後,被喚醒
        while (count.get() == 0) {     notEmpty.await(); }
        x = dequeue(); // 進隊
        c = count.getAndDecrement();
        if (c > 1) // 若是隊列依然非空,則喚醒其餘由於隊列爲空被阻塞的線程
            notEmpty.signal();
    } finally { takeLock.unlock();}
    if (c == capacity) 
    // 原來隊列爲滿的,此時出隊一個後,正好非滿,喚醒由於隊列滿被阻塞的線程
        signalNotFull();
    return x;
}
// 出隊邏輯,實現邏輯是把出隊Node節點設置爲新的head,釋放老的head節點 
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

// 喚醒阻塞的出隊線程,注意使用姿式,Condition的使用必須放在對應的鎖中間,不然會報錯
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

出隊邏輯.net

  1. 出隊鎖
  2. 判斷隊列是否非空,爲空時阻塞出隊線程
  3. 其餘線程入隊成功,喚醒因隊列爲空被阻塞的線程
  4. 若出隊以前,隊列爲滿的,則喚醒由於隊列滿沒法入隊而阻塞的線程

查看上面的源碼時,還發現一個很是有意思的地方,出隊成功以後,會判斷若是以前的隊列中元素的個數大於1(即出隊以後,還有元素),會執行notEmpty.signal();,喚醒被阻塞的出隊線程,爲何要這麼幹?線程

假設一種場景,一個空隊列,兩個線程(A,B)都執行出隊,被阻塞;code

此時線程C執行入隊,入隊完成,由於隊列由空到非空,會喚醒一個被阻塞的出隊線程(假設爲A);

由於出隊和入隊是能夠併發的,如今在線程A執行`c = count.getAndDecrement();`以前,若線程D又入隊成功一個,由於此時隊列非空,因此不會調用`signalNotEmpty`

如今若是線程A執行出隊以後,獲取到的c應該爲2,若是不執行`notEmpty.signal();`,就會致使線程B一直被阻塞,顯然不符合咱們的預期

3. 其餘方法

除了出隊和入隊的方法以外,還有幾個有意思的方法,如隊列中元素以數組形式輸出,判斷隊列是否有元素,這兩個操做,都會競爭出隊和入隊鎖,確保在執行這個方法時,隊列不會被其餘線程修改對象

public boolean contains(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> p = head.next; p != null; p = p.next)
            if (o.equals(p.item))
                return true;
        return false;
    } finally {
        fullyUnlock();
    }
}

public Object[] toArray() {
    fullyLock();
    try {
        int size = count.get();
        Object[] a = new Object[size];
        int k = 0;
        for (Node<E> p = head.next; p != null; p = p.next)
            a[k++] = p.item;
        return a;
    } finally {
        fullyUnlock();
    }
}

III. 經典case

鏈表阻塞隊列的經典使用case,基本上用過線程池就會用到這個了,如jdk中自帶的

// java.util.concurrent.Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

IV. 對比小結

1. 底層結構

ArrayBlockingQueue : 底層存儲結構爲數組,直接將數據存入數組中

LinkedBlockingQueue : 底層存儲結構爲單向鏈表,會將數據封裝到Node對象做爲鏈表的節點,且鏈表頭中不包含實際的元素信息

2. 鎖的分離

ArrayBlockingQueue : 出隊入隊公用一把鎖,即二者沒法併發

LinkedBlockingQueue: 出隊和入隊各一把鎖,所以出隊和入隊可併發執行

所以在線程池的建立中,通常是使用LinkedBlockingQueue,至少線程在進入等待隊列中時,出隊和進隊不會相互阻塞,可是二者之間有關聯

  • 出隊時,若隊列以前爲滿隊列時,會喚醒由於隊列滿被阻塞的入隊線程
  • 進隊時,若隊列以前爲空隊列時,會喚醒由於隊列空被阻塞的出隊線程

3. 出隊和進隊的操做不一樣

ArrayBlockingQueue : 是直接將對象插入或移除

LinkedBlockingQueue: 須要把枚舉對象轉換爲Node<E>進行插入或移除,其中會將出隊的Node節點做爲新的隊列頭,返回並置空Node的item元素

4. 隊列大小初始化

ArrayBlockingQueue : 必須指定隊列的容量

LinkedBlockingQueue: 能夠指定隊列的容量,不指定時,容量爲 Integer.MAX_VALUE

掃描關注,java分享

https://static.oschina.net/uploads/img/201710/13203703_6IVg.jpg

相關文章
相關標籤/搜索