public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable { //隊列的默認容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //底層用於存放數據的數組 transient Object[] queue; // non-private to simplify nested class access //隊列中的元素數量計數 private int size = 0; //比較器 private final Comparator<? super E> comparator; //快速失敗機制使用的變量 transient int modCount = 0; //建立一個默認容量的隊列 public PriorityQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //建立一個指定容量的隊列 public PriorityQueue(int initialCapacity) { this(initialCapacity, null); } //建立一個指定比較器的默認容量隊列 public PriorityQueue(Comparator<? super E> comparator) { this(DEFAULT_INITIAL_CAPACITY, comparator); } //建立一個指定比較器且指定容量隊列 public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) { //判斷指定的容量值是否合法 if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; //初始化底層數組 this.comparator = comparator; //比較器初始化 } //建立一個帶有指定集合中的元素的隊列 @SuppressWarnings("unchecked") public PriorityQueue(Collection<? extends E> c) { //判斷c是不是有序集合 //如果有序集合,那麼就以其比較器做爲隊列的比較器 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); initElementsFromCollection(ss); } //判斷集合是不是優先級隊列 //如果的話,直接使用該隊列的比較器, else if (c instanceof PriorityQueue<?>) { PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); initFromPriorityQueue(pq); } else { this.comparator = null; initFromCollection(c); } } //將容器c中的元素添加到優先級隊列中 private void initElementsFromCollection(Collection<? extends E> c) { Object[] a = c.toArray(); // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, a.length, Object[].class); int len = a.length; if (len == 1 || this.comparator != null) for (int i = 0; i < len; i++) if (a[i] == null) throw new NullPointerException(); this.queue = a; this.size = a.length; } //將優先級隊列c中的元素添加到當前優先級隊列中 private void initFromPriorityQueue(PriorityQueue<? extends E> c) { if (c.getClass() == PriorityQueue.class) { this.queue = c.toArray(); this.size = c.size(); } else { initFromCollection(c); } } //將容器c中的元素添加到優先級隊列中 private void initFromCollection(Collection<? extends E> c) { initElementsFromCollection(c); heapify(); } //建立包含優先級隊列c中元素的隊列,且使用同一個比較器 @SuppressWarnings("unchecked") public PriorityQueue(PriorityQueue<? extends E> c) { this.comparator = (Comparator<? super E>) c.comparator(); initFromPriorityQueue(c); } //建立包含排序集合c中元素的優先級隊列,且使用同一個比較器 @SuppressWarnings("unchecked") public PriorityQueue(SortedSet<? extends E> c) { this.comparator = (Comparator<? super E>) c.comparator(); initElementsFromCollection(c); } }
PriorityQueue中的入隊方法分析:java
//add與offer沒有區別 public boolean add(E e) { return offer(e); } public boolean offer(E e) { //隊列中不容許有null元素 if (e == null) throw new NullPointerException(); modCount++; //快速失敗機制 int i = size; //獲取當前隊列中元素個數 //判斷數組是否須要擴容 if (i >= queue.length) grow(i + 1); size = i + 1; //元素計數+1 //新增元素的插入位置 //若隊列本來爲空,則直接放到0位置 //若隊列本來不爲空 if (i == 0) queue[0] = e; else siftUp(i, e); //插入數組 return true; } //擴容 private void grow(int minCapacity) { int oldCapacity = queue.length; //隊列舊容量 //擴容機制,隊列原容量小於64時,擴容爲原來的2倍再加2 //大於64,則擴大1.5倍 int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1)); if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); queue = Arrays.copyOf(queue, newCapacity); } //上浮 /** * 上浮過程 * 假設已有一個有序堆(升序)以下所示: * 10 * / \ * 20 40 * / \ / * 60 70 90 * 如今要將元素30插入堆中,則有 * 1.將要插入的30先放在二叉堆的末尾 * 2.再將其與父結點進行比較,判斷是否要上浮(小於父結點就上浮) * 3.若小於父結點則交換位置,再重複第2步驟繼續上浮 * 4.若大於則直接結束上浮 * 10 10 * / \ / \ * 20 40 ——> 20 30 * / \ / \ / \ / \ * 60 70 90 30 60 70 90 40 */ private void siftUp(int k, E x) { //判斷隊列是天然排序仍是比較器排序 if (comparator != null) siftUpUsingComparator(k, x); //比較器排序 else siftUpComparable(k, x); //天然排序 } //入隊操做本質是一個堆排序中的一個上浮的過程 private void siftUpUsingComparator(int k, E x) { //判斷索引位置是否大於0,便是否到達堆頂 while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; } //另外一個上浮方法,使用的天然排序 private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; }
PriorityQueue中的出隊方法分析:算法
public E poll() { if (size == 0) //判斷隊列是不是空隊列 return null; int s = --size; modCount++; E result = (E) queue[0]; //取出隊首元素 E x = (E) queue[s]; //獲取隊尾元素 queue[s] = null; //隊尾賦null //將本來的隊尾元素放到堆頂,再對整個堆進行排序整理 //即下沉 if (s != 0) siftDown(0, x); //下沉方法 return result; } //下沉 /** * 下沉過程 * 假設已有一個有序堆(升序)以下所示: * 10 * / \ * 20 30 * / \ / \ * 60 70 90 40 * 如今要將元素10出隊,則有 * 1.將要出隊的10移除出二叉堆,並將隊尾40放到堆頂 * 2.將堆頂元素與兩個子結點中較小的元素相比較,選擇小的元素做爲新的堆頂元素 * 3.重複對堆中前一半結點進行將第2步的比較交換 * 40 20 * / \ / \ * 20 30 ——> 40 30 * / \ / / \ / * 60 70 90 60 70 90 */ private void siftDown(int k, E x) { if (comparator != null) siftDownUsingComparator(k, x); //比較器下沉 else siftDownComparable(k, x); //天然排序下沉 } //使用天然排序下沉 private void siftDownComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>)x; int half = size >>> 1; //下沉要對堆中前一半的結點都進行 while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; //獲取當前結點的左孩子 int right = child + 1; //右孩子索引 //若存在右孩子,那麼左右孩子先比較大小,取小再與父結點比較 if (right < size && ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) c = queue[child = right]; //父結點與子結點比較 //若父結點小於子結點,則直接結束下沉的過程 //不然,交互位置後繼續下沉操做 if (key.compareTo((E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = key; } //使用比較器下沉 @SuppressWarnings("unchecked") private void siftDownUsingComparator(int k, E x) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; int right = child + 1; if (right < size && comparator.compare((E) c, (E) queue[right]) > 0) c = queue[child = right]; if (comparator.compare(x, (E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = x; }
3.DelayQueue的繼續體系api
瞭解了DelayQueue的底層實際是經過PriorityQueue實現,再來看看DelayQueue的繼承關係,以下圖所示,父類及接口以前的學習中都已分析過,不在贅言。數組
4.Delay接口安全
DelayQueue隊列與其餘隊列最明顯的不一樣之處,就是它的延時功能,也正由於這個延時特色,DelayQueue中的對象都必需要實現Delay接口,接下來就看看這個Delay接口是幹什麼的。數據結構
//用來標記那些應該在給定延遲時間以後執行的對象 public interface Delayed extends Comparable<Delayed> { //檢查延遲是否結束,該方法返回一個延遲時間,時間到後在檢查還有沒有 //延遲,若沒有延遲執行下一步,若還有延遲,繼續等待 long getDelay(TimeUnit unit); }
DelayQueue的使用示例:多線程
/** * 延遲隊列的使用示例 * 主線程建立三個延遲任務放到queue中,其餘三個線程 * 在任務可用時取出 * Created by bzhang on 2019/4/1. */ public class TestDelayed implements Delayed { private String name; private Date takeTime; //延遲時間 public TestDelayed(String name, Date takeTime) { this.name = name; this.takeTime = takeTime; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getTakeTime() { return takeTime; } public void setTakeTime(Date takeTime) { this.takeTime = takeTime; } @Override public long getDelay(TimeUnit unit) { long convert = unit.convert(takeTime.getTime()-System.currentTimeMillis(), TimeUnit.MILLISECONDS); return convert; } @Override public int compareTo(Delayed o) { TestDelayed t = (TestDelayed)o; long l = this.takeTime.getTime() - t.getTakeTime().getTime(); if (l==0){ return 0; } return l > 0 ? 1 : -1; } @Override public String toString() { return "TestDelayed{" + "name='" + name + '\'' + ", takeTime=" + takeTime + '}'; } public static void main(String[] args) { DelayQueue queue = new DelayQueue(); long l = System.currentTimeMillis(); queue.put(new TestDelayed("A",new Date(l+5000))); queue.put(new TestDelayed("B",new Date(l+2000))); queue.put(new TestDelayed("C",new Date(l+7000))); System.out.println(new Date()); int t = 0; for (int i = 0;i < 3;i++){ new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } } //結果 Tue Apr 02 11:03:33 CST 2019 Thread-1TestDelayed{name='B', takeTime=Tue Apr 02 11:03:35 CST 2019} Thread-0TestDelayed{name='A', takeTime=Tue Apr 02 11:03:38 CST 2019} Thread-2TestDelayed{name='C', takeTime=Tue Apr 02 11:03:40 CST 2019}
5.DelayQueue中的重要屬性及構造方法併發
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { //重入鎖,用於保證併發安全 private final transient ReentrantLock lock = new ReentrantLock(); //底層優先級隊列,實際元素都存儲與該隊列中,底層是數組構成的二叉堆 private final PriorityQueue<E> q = new PriorityQueue<E>(); //下一個等待獲取元素的線程,可減小沒必要要的等待 private Thread leader = null; //條件控制,表示是否能夠從隊列中取數據 private final Condition available = lock.newCondition(); public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } }
6.DelayQueue的入隊方法ide
//add方法本質就是調用offer方法,將元素新增到隊列 public boolean add(E e) { return offer(e); } //同上 public void put(E e) { offer(e); } //延遲隊列是無界隊列,指定超時時間放入元素沒有意義,與直接入隊是同樣的 public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } //向隊列中新增元素,元素位置以比較結果(compareTo方法)來肯定 public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //調用底層優先級隊列的offer方法來存儲元素 //判斷底層優先級隊列的隊首是不是新增元素 if (q.peek() == e) { leader = null; //喚醒條件等待隊列的某一個線程,即說明隊列中有元素了, //能夠從隊列中獲取到元素了 available.signal(); } return true; } finally { lock.unlock(); } }
7.DelayQueue的出隊方法oop
//返回延遲時間已到的第一個元素,或返回null(沒有元素或元素延遲時間都未到) public E poll() { final ReentrantLock lock = this.lock; //重入鎖 lock.lock(); //加鎖同步 try { E first = q.peek(); //獲取優先級隊列中的隊首元素 //判斷隊列是否爲空,若不爲空那麼隊首延遲時間是否到達,若都不知足 //說明隊首元素可用,返回隊首 //不然返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } //如有延遲時間已到的元素就當即返回,若無則一直等待 //隊列中無元素那麼也一直等待 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被中斷鎖 try { for (;;) { //自旋 E first = q.peek(); //獲取隊首元素 //若隊列爲空,直接進入條件隊列等待喚醒 //隊列不爲空,則判斷隊首的延時是否到達 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); //獲取剩餘延遲時間(單位是ns) if (delay <= 0) //沒有剩餘延遲時間,則將隊首元素返回 return q.poll(); first = null; //判斷是否已經有其餘線程在等待取元素 //如有,那麼就讓當前線程直接等待 //若沒有,那就說明當前只有本線程在等待獲取隊首元素 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); //獲取當前線程 leader = thisThread; //將單籤線程設爲等待獲取隊首的線程 try { //等待隊首元素的延遲時間後,在嘗試獲取隊首元素 available.awaitNanos(delay); } finally { //將等待獲取的線程設爲null,由於當前線程正在獲取,所以不該該有leader //即leader爲null,說明要麼有線程正在執行獲取操做,要麼沒有出隊操做在進行 if (leader == thisThread) leader = null; } } } } } finally { //當前線程已經取完元素了,能夠喚醒其餘線程獲取隊首元素了 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } //指定時間內獲取延遲的隊首元素,若在指定等待時間內隊首延遲時間未到達或隊列爲空 //就返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //隊列是否爲空,若爲空隊列,那麼在指定等待是否到達,若等待時間也已到達 //那就返回null,若未到達等待時間,就繼續等待 if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); //當前線程進入等待時間nanos納秒 } else { long delay = first.getDelay(NANOSECONDS); //獲取隊首元素的延遲時間 //判斷延遲時間是否到達,到達就直接將隊首元素返回 if (delay <= 0) return q.poll(); //延遲時間未到,但等待時間已經達到,那麼就返回null if (nanos <= 0) return null; first = null; // don't retain ref while waiting //延遲時間小於等待時間,說明能夠在等待時間內獲取到隊首元素 //那麼就在等待延遲時間到達的時間內,能夠再次嘗試將隊首元素獲取返回 //這裏僅是再次嘗試,由於可能在等待期間內有新的元素入隊,且延遲時間最小成爲新隊首 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { //等待時間 > 延遲時間 而且沒有其它線程在等待, //那麼當前元素成爲leader,表示當前線程最先正在等待獲取元素 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //讓等待時間到達 long timeLeft = available.awaitNanos(delay); //繼續等待的時間 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
8.peek方法
//peek方法僅僅就是爲底層的優先級隊列的peek方法加上鎖 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } }
1.PriorityBlockingQueue的底層實現
PriorityBlockingQueue是一個線程安全的無界阻塞隊列,能夠看對是PriorityQueue的多線程版本,其底層數據結構與PriorityQueue相同,都是數組實現的利用二叉堆結構。前文已經分析過,這裏再也不多說
2.PriorityBlockingQueue的繼承體系
PriorityBlockingQueue的繼承關係以下圖所示,均是以前學習過的父類或接口。這裏再也不展開。
3.PriorityBlockingQueue中的重要屬性及構造方法
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //未指定隊列初始容量時使用的默認容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //隊列雖說是無界的,但實際隊列是不能超過Integer.MAX_VALUE - 8這個值的 //如果超過報OOM異常 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //底層存放數據的數組 private transient Object[] queue; //隊列中元素的個數,計數器 private transient int size; //用於判斷優先級的比較器,若爲null則使用天然排序 private transient Comparator<? super E> comparator; //重入鎖,保證併發安全 private final ReentrantLock lock; //隊列非空條件,用於出隊操做 private final Condition notEmpty; //用於隊列顯示是否處於擴容狀態,0表示沒有在擴容 //而1表示處於擴容狀態,將該值更新成1的線程會進行數組擴容 //其餘要進行擴容的線程檢查該值發現爲1,則直接暫停線程讓出CPU private transient volatile int allocationSpinLock; //將隊列轉換成線程不安全的優先級隊列,用於序列化 private PriorityQueue<E> q; //建立一個默認初始容量的隊列 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //建立一個指定初始容量的隊列 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //建立一個指定容量和比較器的隊列 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } //以集合c爲底,建立一個隊列 public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls //根據集合c是哪種容器來決定建立怎樣的初始隊列 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); } }
4.入隊方法
//PriorityBlockingQueue全部的入隊方法,都同樣,由於隊列是無界隊列 //不存在加入隊列失敗的可能,所以最終都是調用offer方法 public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); // never need to block } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block } public boolean offer(E e) { //優先級隊列中不容許存在null元素,所以null元素沒法肯定優先級 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; //n爲當前隊列中的元素個數,cap爲當前隊列的容量 Object[] array; //判斷底層數組是否須要擴容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //判斷是使用比較器進行上浮操做,仍是使用天然排序進行上浮操做 if (cmp == null) siftUpComparable(n, e, array); //天然排序上浮 else siftUpUsingComparator(n, e, array, cmp); //比較器上浮 size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } //天然上浮,與PriorityQueue中同樣 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; //獲取父結點索引 Object e = array[parent]; //父結點 //比較插入的值與父結點的大小,若比父結點小,那麼交換位置後,在繼續比較 //若比父結點大,說明排序正確,無需在繼續比較 if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } //比較器比較上浮 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } //數組擴容 private void tryGrow(Object[] array, int oldCap) { // 擴容時不須要加鎖,由於擴容是經過CAS方式來實現的, //這樣不只能夠提高效率,而且不影響出隊操做 lock.unlock(); Object[] newArray = null; //將allocationSpinLock更新成1的線程進行數組擴容操做,其他要擴容的線程暫停 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //擴容規則,容量小於64,擴大2倍+2,容量不小於64,則擴大1.5倍 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); //判斷擴大後的容量是否越界 //如果會越界,則擴容規則改成舊容量+1,若仍越界,報OOM異常 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; //建立新數組 } finally { allocationSpinLock = 0; //恢復爲0,表示沒有在擴容狀態 } } if (newArray == null) //未競爭到擴容操做的線程暫停 Thread.yield(); lock.lock(); /從新上鎖 if (newArray != null && queue == array) { queue = newArray; //將舊數組中的數據轉移到新數組中 System.arraycopy(array, 0, newArray, 0, oldCap); } }
5.出隊方法
//獲取並移除隊首元素,若隊列爲空,返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); //真正出隊的方法 } finally { lock.unlock(); } } //真正執行獲取並移除隊首元素的方法 private E dequeue() { int n = size - 1; //移除隊首後隊列中的元素個數 ,同時也是隊尾元素的索引 //判斷隊列是否爲空隊列,空隊列直接返回null if (n < 0) return null; else { Object[] array = queue; //獲取底層數組引用 E result = (E) array[0]; //獲取隊首元素 E x = (E) array[n]; //獲取隊尾元素 array[n] = null; //隊尾置爲null Comparator<? super E> cmp = comparator; //將原來隊列的隊尾放到隊首位置,而後進行下沉操做(即二叉堆從新排序的操做) if (cmp == null) siftDownComparable(0, x, array, n); //使用天然排序下沉 else siftDownUsingComparator(0, x, array, n, cmp); //使用比較器下沉 size = n; return result; } } //下沉操做與PriorityQueue中相同,這裏不在多作分析 //天然排序下沉 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } //比較器下沉 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } } //獲取並移除隊首元素,若隊列已空,則等待 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //若返回的元素爲null,說明隊列中沒有元素 //那麼讓當前線程進入條件隊列中等待,當前隊列有元素時,則 //會喚醒線程,在嘗試獲取並移除隊首 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } //在必定時間內嘗試獲取並移除隊首元素,若在指定時間內未成功, //返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //嘗試獲取並移除隊首,若失敗但超時時間未到,則進入條件等待 //一段時間後在進行嘗試,若超時時間已過仍爲成功獲取並移除隊首 //則返回null while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; } //獲取但不移除隊首元素 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (size == 0) ? null : (E) queue[0]; } finally { lock.unlock(); } }