面試【JAVA基礎】阻塞隊列

一、五種阻塞隊列介紹

  • ArrayBlockingQueue 有界隊列,底層使用數組實現,併發控制使用ReentrantLock控制,不論是插入操做仍是讀取操做,都須要獲取鎖以後才能執行。
  • LinkedBlockingQueue 底層基於單向鏈表實現,既能夠當作有界隊列,也能夠當作無界隊列使用。使用兩個ReentrantLock實現併發控制:takelock和putlock。
  • SynchronousQueue 底層使用單向鏈表實現,只有一個元素,同步的意思是一個寫操做必須等到一個讀操做以後才返回,指的是讀寫線程的同步。
  • PriorityBlockingQueue 帶排序的阻塞隊列的實現,使用數組進行實現。併發控制使用ReentrantLock,隊列爲無界隊列。 有初始化參數指定隊列大小,可是會自動擴容。使用最小堆來實現排序。
  • DelayedQueue DelayedQueue是使用PriorityBlockingQueue和Delayed實現的,內部定義了一個優先級隊列,當調用offer的時候,把Delayed對象加入隊列中,使用take先把first對象拿出來(peek),若是沒有到達閾值,進行await處理。

二、poll和peek的區別

都用於取隊列的頭結點,poll會刪除頭結點,peek不會刪除頭結點。java

三、LinkedBlockingQueue

  • 是先進先出隊列FIFO。
  • 採用ReentrantLock保證線程安全

3.一、功能

3.1.一、增長

增長有三種方式,前提:隊列滿 | 方式 | put | add | offer | |--------|--------|--------|--------| |特色|一直阻塞|拋異常|返回false|node

3.1.二、刪除

刪除有三種方式,前提:隊列爲空 | 方式 | remove | poll | take | |--------|--------|--------|--------| |特色|NoSuchElementException|返回false|阻塞|數組

3.二、簡單分析

  • LinkedBlockingQueue是一個阻塞隊列,內部由兩個ReentrantLock來實現出入隊列的線程安全,由各自的Condition對象的await和signal來實現等待和喚醒功能。
  • 基於單向鏈表的、範圍任意的(實際上是有界的)、FIFO 阻塞隊列。
  • 頭結點和尾結點一開始老是指向一個哨兵的結點,它不持有實際數據,當隊列中有數據時,頭結點仍然指向這個哨兵,尾結點指向有效數據的最後一個結點。這樣作的好處在於,與計數器 count 結合後,對隊頭、隊尾的訪問能夠獨立進行,而不須要判斷頭結點與尾結點的關係。

LinkedBlockingQueue繼承關係圖.png

3.2.一、節點與屬性
//鏈表節點內部類
static class Node<E> {
    //節點元素
    E item;
    Node<E> next;
    Node(E x) {
         item = x;
    }
}
//容量界限,若是未設定,則爲Integer最大值
 private final int capacity;
//當前元素個數
private final AtomicInteger count = new AtomicInteger();
//鏈表的頭:head.item == null
transient Node<E> head;
//鏈表的尾:last.next == null
private transient Node<E> last;
//take,poll等獲取鎖
private final ReentrantLock takeLock = new ReentrantLock();
//等待任務的等待隊列
private final Condition notEmpty = takeLock.newCondition();
//put,offer等插入鎖
private final ReentrantLock putLock = new ReentrantLock();
//等待插入的等待隊列
private final Condition notFull = putLock.newCondition();
3.2.二、插入線程與獲取線程的相互通知

signalNotEmpty()方法,在插入線程發現隊列爲空時調用,告知獲取線程須要等待。 signalNotFull()方法,在獲取線程發現隊列已滿時調用,告知插入線程須要等待。安全

//表示等待take。put/offer調用,不然一般不會鎖定takeLock。
private void signalNotEmpty() {
    //獲取takeLock
    final ReentrantLock takeLock = this.takeLock;
    //鎖定takeLock
    takeLock.lock();
    try {
        //喚醒take線程等待隊列
        notEmpty.signal();
    } finally {
       //釋放鎖
       takeLock.unlock();
    }
}
//表示等待put,take/poll 調用
private void signalNotFull() {
    //獲取putLock
    final ReentrantLock putLock = this.putLock;
    //鎖定putLock
    putLock.lock();
    try {
        //喚醒插入線程等待隊列
        notFull.signal();
    } finally {
        //釋放鎖
        putLock.unlock();
    }
}
3.2.三、入隊與出隊操做

enqueue()方法只能在持有 putLock 鎖下執行,dequeue()在持有 takeLock 鎖下執行。併發

//在隊列尾部插入
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    //last.next指向當前node
    //尾指針後移
    last = last.next = node;
}
//移除隊列頭
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    //保存頭指針
    Node<E> h = head;
   //獲取當前鏈表第一個元素
   Node<E> first = h.next;
   //頭指針的next指向本身
   h.next = h; // help GC
   //頭指針指向第一個元素
   head = first;
   //獲取第一個元素的值
   E x = first.item;
   //將第一個元素的值置空
   first.item = null;
   //返回第一個元素的值
   return x;
}
3.2.四、對兩把鎖的加鎖與釋放

在須要對兩把鎖同時加鎖時,把加鎖的順序與釋放的順序封裝成方法,確保全部地方都是一致的。並且獲取鎖時都是不響應中斷的,一直獲取直到加鎖成功,這就避免了第一把鎖加鎖成功,而第二把鎖加鎖失敗致使鎖不釋放的風險。ide

//鎖定putLock和takeLock
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
//與fullyLock的加鎖順序相反,先解鎖takeLock,再解鎖putLock
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

3.三、源碼解讀

簡單介紹一下LinkedBlockingQueue中API的源碼,如構造方法,新增,獲取,刪除,drainTo。函數

3.3.一、構造函數

LinkedBlockingQueue有三個構造方法,其中無參構造儘可能少用,由於容量爲Integer的最大值,操做不當會出現內存溢出。this

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
    //參數校驗
    if (capacity <= 0) throw new IllegalArgumentException();
    //設置容量
    this.capacity = capacity;
    //首尾節點指向一個空節點
    last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //獲取putLock
    final ReentrantLock putLock = this.putLock;
    //鎖定
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
3.3.二、offer(E e)

將給定的元素設置到隊列中,若是設置成功返回true, 不然返回false。 e的值不能爲空,不然拋出空指針異常。線程

//若是能夠在不超過隊列容量的狀況下當即插入指定的元素到隊列的尾部,成功後返回true,若是隊列已滿,返回false。當使用容量受限的隊列時,此方法一般比方法BlockingQueue#add更可取,後者只能經過拋出異常才能插入元素。
public boolean offer(E e) {
    //非空判斷
    if (e == null) throw new NullPointerException();
    //計數器
    final AtomicInteger count = this.count;
    //若是隊列已滿,直接返回插入失敗
    if (count.get() == capacity)
        return false;
    int c = -1;
    //新建節點
    Node<E> node = new Node<E>(e);
    //獲取插入鎖
    final ReentrantLock putLock = this.putLock;
    //鎖定
    putLock.lock();
    try {
        //若是隊列未滿
        if (count.get() < capacity) {
        //插入隊列
        enqueue(node);
        //計數
        c = count.getAndIncrement();
        //還有空餘空間
        if (c + 1 < capacity)
             //喚醒插入線程
             notFull.signal();
        }
    } finally {
        //解鎖
        putLock.unlock();
    }
    //若是隊列爲空
    if (c == 0)
        //通知獲取線程阻塞
        signalNotEmpty();
    //返回成功或者插入失敗
    return c >= 0;
}
3.3.三、put(E e)

將元素設置到隊列中,若是隊列中沒有多餘的空間,該方法會一直阻塞,直到隊列中有多餘的空間。指針

public void put(E e) throws InterruptedException {
    //不能夠插入空元素
    if (e == null) throw new NullPointerException();

    //全部put/take/etc中的約定都是預先設置本地var
    //除非設置,不然保持計數爲負數表示失敗。
    int c = -1;
    //新建節點
    Node<E> node = new Node<E>(e);
    //獲取putLock
    final ReentrantLock putLock = this.putLock;
    //獲取計數器
    final AtomicInteger count = this.count;
    //可中斷加鎖,即在鎖獲取過程當中不處理中斷狀態,而是直接拋出中斷異常,由上層調用者處理中斷。
    putLock.lockInterruptibly();
    try {
        /*
        * 注意count在wait守衛線程中使用,即便它沒有被鎖保護。
        * 這是由於count只能在此時減小(全部其餘put都被鎖定關閉),
        * 若是它從容量更改,咱們(或其餘一些等待put)將收到信號。
        * 相似地,count在其餘等待守衛線程中的全部其餘用途也是如此。
        */
        //只要當前隊列已滿
        while (count.get() == capacity) {
            //通知插入線程等待
            notFull.await();
        }
        //插入隊列
        enqueue(node);
        //數量加1
        c = count.getAndIncrement();
        //若是隊列增長1個元素還未滿
        if (c + 1 < capacity)
            //喚醒插入進程
            notFull.signal();
    } finally {
        //解鎖
        putLock.unlock();
    }
    //若是隊列中沒有元素了
    if (c == 0)
        //通知獲取線程等待
        signalNotEmpty();
}
3.3.四、peek()

非阻塞的獲取隊列中的第一個元素,不出隊列。

public E peek() {
    //隊列爲空,直接返回
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //獲取第一個元素,非哨兵
        Node<E> first = head.next;
        //元素爲空,返回null
        if (first == null)
            return null;
        else
            //返回第一個元素值
            return first.item;
    } finally {
        takeLock.unlock();
    }
}
3.3.五、poll()

非阻塞的獲取隊列中的值,未獲取到返回null。

public E poll() {
    final AtomicInteger count = this.count;
    //隊列爲空,直接返回
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //隊列非空,獲取隊列中元素
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
3.3.六、remove(Object o)

從隊列中移除指定的值。將兩把鎖都鎖定。

public boolean remove(Object o) {
    //不支持null
    if (o == null) return false;
    //鎖定兩個鎖
    fullyLock();
    try {
        //迭代隊列
        for (Node<E> trail = head, p = trail.next;
            p != null;
            trail = p, p = p.next) {
            //經過equals方法匹配待刪除元素
            if (o.equals(p.item)) {
                //移除p節點
                unlink(p, trail);
                //成功
                return true;
            }
        }
        //失敗
        return false;
    } finally {
        //解鎖
        fullyUnlock();
    }
}
// 將內部節點p與前一個跟蹤斷開鏈接
void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
    //p節點內容置空
    p.item = null;
    //trail節點的next指向p的next
    trail.next = p.next;
    //若是p是隊尾
    if (last == p)
        //trail變爲隊尾
        last = trail;
    //若是隊列已滿
    if (count.getAndDecrement() == capacity)
        //通知插入線程阻塞
        notFull.signal();
}
3.3.七、clear()

清空隊列。

//原子性地從隊列中刪除全部元素。此調用返回後,隊列將爲空。
public void clear() {
    //鎖定
    fullyLock();
    try {
        //清空數據,幫助垃圾回收
        for (Node<E> p, h = head; (p = h.next) != null; h = p) {
             h.next = h;
             p.item = null;
        }
        head = last;
        // assert head.item == null && head.next == null;
        //若是容量爲0
        if (count.getAndSet(0) == capacity)
            //喚醒插入線程
            notFull.signal();
    } finally {
        //解鎖
        fullyUnlock();
    }
}
3.3.八、drainTo(Collection c)

將隊列中值,所有移除,併發設置到給定的集合中。

public int drainTo(Collection<? super E> c, int maxElements) {
    //各類判斷
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    //鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //獲取要轉移的數量
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
         Node<E> h = head;
        int i = 0;
        try {
            //組裝集合
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                // assert h.item == null;
                head = h;
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}

tencent.jpg

相關文章
相關標籤/搜索