Java中的隊列

最近在看數據結構的時候,看到了隊列這裏,在實際的開發中咱們不多會手動的去實現一個隊列,甚至不多直接用到隊列,可是在Java的包中有一些具備特殊屬性的隊列應用的比較普遍,例如:阻塞隊列&併發隊列.java

阻塞隊列

阻塞隊列(BlockingQueue)是一個額外支持兩種操做的隊列。這兩個附加的操做是:
1、在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
2、當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。segmentfault


阻塞隊列提供了四種處理方法:數組

  • 拋出異常數據結構

    add(e):在添加元素的時候若是隊列已滿,那麼直接拋出異常。
    remove(e):移除元素,若是隊列爲空,那麼拋出異常。
    element():檢查方法。
    
    public static void test() {
       ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
       for (int i=0; i<blockingQueue.size() + 1; i++) {
           blockingQueue.add("string"+i);
       }
    }
    結果:Exception in thread "main" java.lang.IllegalStateException: Queue full
    源碼以下:
    public boolean add(E var1) {
       return super.add(var1);
    }
    public boolean add(E var1) {
       //調用offer方法
       if (this.offer(var1)) {
           return true;
       } else {
           throw new IllegalStateException("Queue full");
       }
    }
    
    /**offer(var1)在ArrayBlockingQueue中的實現源碼以下:*/
    public boolean offer(E var1) {
       checkNotNull(var1);
       ReentrantLock var2 = this.lock;
       var2.lock();
    
       boolean var3;
       try {
           //若是隊列已滿,返回false
           if (this.count == this.items.length) {
               var3 = false;
               return var3;
           }
           //進行入隊操做
           this.enqueue(var1);
           var3 = true;
       } finally {
           var2.unlock();
       }
       return var3;
    }
  • 返回特殊值併發

    一、offer(e) 入隊的時候返回特殊值,在不一樣的阻塞隊列中實現有必定的差異
    2、poll() 出隊的時候返回特殊的值
    3、peek() 測試出隊可否成功
  • 一直阻塞函數

    1、put(e) 若是隊列已滿,那麼會一直阻塞,直到成功
    2、take()  若是隊列爲空,那麼出隊會一直阻塞,直到成功
  • 阻塞,超時退出測試

    1、offer(e,time,unit)
    2、poll(time,unit)

Java中的阻塞隊列
JDK7提供了7個阻塞隊列。分別是ui

  1. ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  2. LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  3. PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  4. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  5. SynchronousQueue:一個不存儲元素的阻塞隊列。
  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  7. LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

下面具體看一下每一種阻塞隊列的實現方式以及使用場景:this

1. ArrayBlockingQueue

特性:用數組實現的實現的有界阻塞隊列,默認狀況下不保證線程公平的訪問隊列(按照阻塞的前後順序訪問隊列),隊列可用的時候,阻塞的線程均可以爭奪隊列的訪問資格,固然也可使用如下的構造方法建立一個公平的阻塞隊列。
ArrayBlockingQueue<String> blockingQueue2 = new ArrayBlockingQueue<>(10, true);
下面經過源碼探究如下,這個阻塞隊列是如何實現的?若是實現公平與非公平的控制。
  • 構造過程
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    //基於數組實現
    this.items = new Object[capacity];
    /**公平與非公平是經過可重入鎖來實現的*/
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
/**阻塞隊列的公平與非公平是經過可重入鎖來實現的,關於爲何可重入鎖能夠實現線程訪問的公平非公平特性,咱們晚一點分析一下ReentrantLock的實現原理。

【關於ReentrantLock的實現原理】https://segmentfault.com/a/11...線程

  • add(E) 操做,若是add失敗,那麼拋出異常
public boolean add(E e) {
    return super.add(e);
}
/**AbstractQueue 父類的add方法*/
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
/**經過多態調用本身的offer(e)實現*/
public boolean offer(E e) {
    checkNotNull(e);
    //加鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //若是隊列滿了,那麼返回false
        if (count == items.length)
            return false;
        else {
            //入隊
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E var1) {
    Object[] var2 = this.items;
    //putIndex能夠認爲是隊列的隊尾後的一個位置,數據入隊對應的位置,若是隊列滿了,那麼putIndex設置爲0
    var2[this.putIndex] = var1;
    if (++this.putIndex == var2.length) {
        this.putIndex = 0;
    }

    ++this.count;
    //喚醒一個等待在condition上的線程
    this.notEmpty.signal();
}
  • put(E e) put操做,若是隊列已滿,那麼會一直阻塞,直到put成功
public void put(E var1) throws InterruptedException {
    checkNotNull(var1);
    ReentrantLock var2 = this.lock;
    //加鎖,可被線程中斷返回
    var2.lockInterruptibly();
    try {
        //若是隊列已經滿了,那麼阻塞
        while(this.count == this.items.length) {
            this.notFull.await();
        }
        //進行入隊操做
        this.enqueue(var1);
    } finally {
        var2.unlock();
    }
}
/**
 * 在隊列滿的狀況put操做被阻塞,那麼何時該操做能夠被喚醒呢?很顯然是隊列中出現空地的狀況下,纔會被喚醒在notFull這種條件下 
 * 阻塞的操做:
 * 因此在發生如下操做的時候,會被喚醒進行入隊的操做
 * 1、dequeue()操做 2、removeAt(int var1)操做 3、clear() 4、drainTo
 */
  • take() 出隊操做,若是隊列爲空,那麼阻塞,直到隊列中包含對應元素喚醒
/**實現比較容易,和上面的操做殊途同歸*/
public E take() throws InterruptedException {
    ReentrantLock var1 = this.lock;
    var1.lockInterruptibly();

    Object var2;
    try {
        while(this.count == 0) {
            this.notEmpty.await();
        }

        var2 = this.dequeue();
    } finally {
        var1.unlock();
    }

    return var2;
}
我的總結:實現阻塞操做和核心在於線程掛起以及線程的喚醒,在Java中提供了兩種線程等待以及線程喚醒的方式。一是基於對象監視器的wait(),notify()方法。 二是經過Condition.await()和signal()方法。

2. LinkedBlockingQueue

基於鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。這個隊列的實現原理和ArrayBlockingQueue實現基本相同。能夠看一下隊列的定義:
  • 隊列的定義
/**默認的構造函數*/
public LinkedBlockingQueue() {
    this(2147483647);
}

public LinkedBlockingQueue(int var1) {
    this.count = new AtomicInteger();
    this.takeLock = new ReentrantLock();
    this.notEmpty = this.takeLock.newCondition();
    this.putLock = new ReentrantLock();
    this.notFull = this.putLock.newCondition();
    if (var1 <= 0) {
        throw new IllegalArgumentException();
    } else {
        this.capacity = var1;
        //鏈表的頭結點和尾節點,默認是空
        this.last = this.head = new LinkedBlockingQueue.Node((Object)null);
    }
}

三、PriorityBlockingQueue

一個支持優先級的無界隊列。默認狀況下元素採起天然順序排列,也能夠經過比較器comparator來指定元素的排序規則。元素按照升序排列。具體是如何實現的?
  • 隊列的定義以及構造方法
/**定義和一般的阻塞隊列相同,AbstractQueue中定義了隊列的基本操做,BlockingQueue中定義可阻塞隊列的相關操做定義*/
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
/**構造方法,默認的無參構造方法,調用的是另外一個構造方法,默認定義了一個隊列的容量,那爲何說他是無界隊列呢?接着向下*/
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**全部的構造方法最後調用的構造方法, comparator是一個比較器,經過比較器能夠肯定隊列中元素的排列順序*/
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    /**隊列是基於數組實現的*/
    this.queue = new Object[initialCapacity];
}
  • add 操做以及offer操做
public boolean add(E e) {
    return offer(e);
}
/**
 * offer操做
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //若是隊列已滿,那麼嘗試進行擴容(我的感受這裏使用 >= 並非很合理)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            //使用默認的比較方法將e放到隊列中
            siftUpComparable(n, e, array);
        else
            //使用指定的比較順序將數據插入到隊列中
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        //激活一個在notEmpty這個condition上等待的線程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
/**tryGrow()實現*/
private void tryGrow(Object[] array, int oldCap) {
    //這裏先釋放了鎖,最後須要從新獲取鎖,那麼這個時候全部的add操做都會執行下面的代碼段
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

併發隊列

相關文章
相關標籤/搜索