深度分析:面試阿里,字節跳動,美團幾乎都會被問到的阻塞隊列

基本概念

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。java

2)支持阻塞的移除方法:意思是在隊列爲空時,獲取元素的線程會等待隊列變爲非空node

阻塞隊列一共有7種,咱們着重講一下
ArrayBlockingQueue ,
LinkedBlockingQueue ,
DelayQueue,
SynchronousQueue
這四種阻塞隊列數組

ArrayBlockingQueue

基於數組實現有界的阻塞隊列(循環數組)緩存

類的繼承

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

主要成員變量

private static final long serialVersionUID = -817911632652898426L;

   final Object[] items; //底層存儲元素的數組

   int takeIndex; //進行取操做時的下標

   int putIndex;//進行放操做時的下標
   
   int count;//隊列中元素的數量
   
   final ReentrantLock lock;//阻塞時用的鎖

   private final Condition notEmpty;//滿時的condition隊列

   private final Condition notFull;//空時的condition隊列

構造器

參數有容量和全局鎖是不是公平鎖併發

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

不用肯定是不是公平鎖,默認是非公平鎖less

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

在第一個構造器的前提下,將整個集合移入阻塞隊列dom

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

主要方法

put()ide

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

1.首先判斷添加的是否非空,是空的會拋出異常。
2.給put方法上鎖
3.當集合元素數量和集合的長度相等時,調用put方法的線程將會被放入notFull隊列上等待。
4.若是不相等,則enqueue(),也就是讓e進入隊列。
咱們再看看enqueue()方法(入隊方法)性能

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

其實就是讓該元素入隊,而且喚醒由於集合空而等待的線程。this

take()方法同理。

LinkedBlockingQueue

LinkedBlockingQueue底層是基於鏈表實現的,因此其基本成員變量和LinkedList差很少。

類的繼承關係

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

構造器

無參構造器,默認容量爲最大容量

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

手動設定容量

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

將整個集合挪入隊列中,默認容量一樣是最大容量。

public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

主要成員變量

鏈表就必定會有節點
內部節點類
和ArrayBlockingQueue不一樣的是,它有兩個全局鎖,一個負責放元素,一個負責取元素。

static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

除了節點以外。

private transient Node<E> last;//尾節點

transient Node<E> head;//頭節點

private final AtomicInteger count = new AtomicInteger();//計算當前阻塞隊列中的元素個數 

private final int capacity;//容量
 
 //獲取並移除元素時使用的鎖,如take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();

//notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程
private final Condition notEmpty = takeLock.newCondition();

//添加元素時使用的鎖如 put, offer, etc
private final ReentrantLock putLock = new ReentrantLock();

//notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程
private final Condition notFull = putLock.newCondition();

主要方法

put()方法

public void put(E e) throws InterruptedException {
       if (e == null) throw new NullPointerException();
       // Note: convention in all put/take/etc is to preset local var
       // holding count negative to indicate failure unless set.
       int c = -1;
       Node<E> node = new Node<E>(e);
       final ReentrantLock putLock = this.putLock;
       final AtomicInteger count = this.count;
       putLock.lockInterruptibly();
       try {
  
           while (count.get() == capacity) {
               notFull.await();
           }
           enqueue(node);
           c = count.getAndIncrement();
           if (c + 1 < capacity)
               notFull.signal();
       } finally {
           putLock.unlock();
       }
       if (c == 0)
           signalNotEmpty();
   }

基本和ArrayBlockingQueue同樣,只是鎖的數量不一樣,致使有一些細微的區別。

代碼示例

public class TestDemo16 {
    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    public static void main(String[] args) {
        new Thread("put"){
            @Override
            public void run() {
                //添加元素
                for(int i=0; i<10; i++){
                    System.out.println("put: "+i);
                    try {
                        queue.put(i);
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("take"){
            @Override
            public void run() {
                //獲取元素
                for(int i=0; i<10; i++){
                    try {
                        System.out.println("take: "+queue.take());
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

DelayQueue

基於PriorityQueue 延時阻塞隊列,DelayQueue中的元素只有當其延時時間到達,纔可以去當前隊列中獲取到該元素,DelayQueue是一個無界隊列。主要用於緩存系統的設計、定時任務系統的設計。
實現DelayQueue的三個步驟

第一步:繼承Delayed接口

第二步:實現getDelay(TimeUnit unit),該方法返回當前元素還須要延時多長時間,單位是納秒

第三步:實現compareTo方法來指定元素的順序
例如;

class Test implements Delayed {
    private long time; //Test實例延時時間

    public Test(long time, TimeUnit unit){
        this.time = System.currentTimeMillis() + unit.toMillis(time);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return this.time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        long diff = this.time - ((Test)o).time;
        if(diff <= 0){
            return -1;
        }else{
            return 1;
        }
    }
}
DelayQueue<Test> queue = new DelayQueue<>();
        queue.put(new Test(5, TimeUnit.SECONDS));
        queue.put(new Test(10, TimeUnit.SECONDS));
        queue.put(new Test(15, TimeUnit.SECONDS));

        System.out.println("begin time: "+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        for(int i=0; i<3; i++){
            try {
                Test test = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("time: "+LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        }

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。它支持公平訪問隊列。默認狀況下線程採用非公平性策略訪問隊列。使用如下構造方法能夠建立公平性訪問的SynchronousQueue,若是設置爲true,則等待的線程會採用先進先出的順序訪問隊列

SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合傳遞性場景。SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue.

public static void main(String[] args) throws InterruptedException {
        SynchronousQueue queue=new SynchronousQueue();
        LinkedBlockingQueue
        new Thread("put"){
            @Override
            public void run() {
                System.out.println("put has started");
                for(int i=0;i<5;i++){
                    System.out.println("put after takeThread");
                    try {
                        queue.put((int)((Math.random() * 100) + 1));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("put has ended");
            }
        }.start();
        new Thread("take"){
            @Override
            public void run() {
                System.out.println("take has started");
                for(int i=0;i<5;i++){
                    try {
                        System.out.println("take from putThread"+queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("put has ended");
            }
        }.start();
        }

總結

1:ArrayBlockingQueue和LinkedBlockingQueue的區別和聯繫?1)數據存儲容器不同,ArrayBlockingQueue採用數組去存儲數據、LinkedBlockingQueue採用鏈表去存儲數據。2)ArrayBlockingQueue(循環數組)採用數組去存儲數據,不會產生額外的對象實例; LinkedBlockingQueue採用鏈表去存儲數據,在插入和刪除元素只與一個節點有關,須要去生成一個額外的Node對象,這可能長時間內須要併發處理大批量的數據,對於性能和後期GC會產生影響。3)ArrayBlockingQueue是有界的,初始化時必需要指定容量;LinkedBlockingQueue默認是無界的,Integer.MAX_VALUE, 當添加速度大於刪除速度、有可能形成內存溢出。4) ArrayBlockingQueue在讀和寫使用的鎖是同樣的,即添加操做和刪除操做使用的是同一個ReentrantLock,沒有實現鎖分離;LinkedBlockingQueue實現了鎖分離,添加的時候採用putLock、刪除的時候採用takeLock,這樣能提升隊列的吞吐量。2:ArrayBlockingQueue可使用兩把鎖提升效率嗎?不能,主要緣由是ArrayBlockingQueue底層循環數組來存儲數據,LinkedBlockingQueue底層 鏈表來存儲數據,鏈表隊列的添加和刪除,只是和某一個節點有關,爲了防止head和last相互影響,就須要有一個原子性的計數器,每一個添加操做先加入隊列,計數器+1,這樣是爲了保證隊列在移除的時候, 長度是大於等於計數器的,經過原子性的計數器,使得當前添加和移除互不干擾。對於循環數據來講,當咱們走到最後一個位置須要返回到第一個位置,這樣的操做是沒法原子化,只能使用同一把鎖來解決。

相關文章
相關標籤/搜索