併發容器學習—DelayQueue與PriorityBlockingQueue

1、DelayQueue併發容器
1.Delay Queue的底層實現
    Delay Queue是一個線程安全且無界的阻塞隊列,只有在延遲時間知足後才能獲取隊列中的元素,所以隊列中的元素必須實現Delay接口,在建立元素時指定多久時間後才能從隊列中獲取該元素。Delay Queue的底層實現是使用了PriorityQueue+ReentrantLock來實現延遲獲取功能。
 
2.PriorityQueue分析
    其中PriorityQueue是種優先級隊列,線程不安全,隊列中的元素會按照優先級來排序。該隊列底層實現是使用二叉堆,而且元素按照其天然順序進行排序,或者根據構造隊列時提供的Comparator進行排序。由於PriorityQueue中的元素都要進行比較,因此優先級隊列中不能擁有null元素,也不能有不能比較的元素。
    PriorityQueue的繼承關係以下圖:
    PriorityQueue中的屬性及構造方法:
public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {
    
    //隊列的默認容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    //底層用於存放數據的數組
    transient Object[] queue; // non-private to simplify nested class access

    //隊列中的元素數量計數
    private int size = 0;

    //比較器
    private final Comparator<? super E> comparator;

    //快速失敗機制使用的變量
    transient int modCount = 0; 

    //建立一個默認容量的隊列
    public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    //建立一個指定容量的隊列
    public PriorityQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    //建立一個指定比較器的默認容量隊列
    public PriorityQueue(Comparator<? super E> comparator) {
        this(DEFAULT_INITIAL_CAPACITY, comparator);
    }

    //建立一個指定比較器且指定容量隊列
    public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        //判斷指定的容量值是否合法
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];    //初始化底層數組
        this.comparator = comparator;    //比較器初始化
    }

    //建立一個帶有指定集合中的元素的隊列
    @SuppressWarnings("unchecked")
    public PriorityQueue(Collection<? extends E> c) {

        //判斷c是不是有序集合
        //如果有序集合,那麼就以其比較器做爲隊列的比較器
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            initElementsFromCollection(ss);
        }

        //判斷集合是不是優先級隊列
        //如果的話,直接使用該隊列的比較器,
        else if (c instanceof PriorityQueue<?>) {
            PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            initFromPriorityQueue(pq);
        }
        else {
            this.comparator = null;
            initFromCollection(c);
        }
    }

    //將容器c中的元素添加到優先級隊列中
    private void initElementsFromCollection(Collection<? extends E> c) {
        Object[] a = c.toArray();
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, a.length, Object[].class);
        int len = a.length;
        if (len == 1 || this.comparator != null)
            for (int i = 0; i < len; i++)
                if (a[i] == null)
                    throw new NullPointerException();
        this.queue = a;
        this.size = a.length;
    }

    //將優先級隊列c中的元素添加到當前優先級隊列中
    private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
        if (c.getClass() == PriorityQueue.class) {
            this.queue = c.toArray();
            this.size = c.size();
        } else {
            initFromCollection(c);
        }
    }

    //將容器c中的元素添加到優先級隊列中
    private void initFromCollection(Collection<? extends E> c) {
        initElementsFromCollection(c);
        heapify();
    }

    //建立包含優先級隊列c中元素的隊列,且使用同一個比較器
    @SuppressWarnings("unchecked")
    public PriorityQueue(PriorityQueue<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();
        initFromPriorityQueue(c);
    }

    //建立包含排序集合c中元素的優先級隊列,且使用同一個比較器
    @SuppressWarnings("unchecked")
    public PriorityQueue(SortedSet<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();
        initElementsFromCollection(c);
    }
}

    PriorityQueue中的入隊方法分析:java

//add與offer沒有區別
public boolean add(E e) {
    return offer(e);
}


public boolean offer(E e) {
    //隊列中不容許有null元素
    if (e == null)
        throw new NullPointerException();
    modCount++;    //快速失敗機制
    int i = size;    //獲取當前隊列中元素個數
    //判斷數組是否須要擴容
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;    //元素計數+1
    
    //新增元素的插入位置
    //若隊列本來爲空,則直接放到0位置
    //若隊列本來不爲空
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);    //插入數組
    return true;
}

//擴容
private void grow(int minCapacity) {
    int oldCapacity = queue.length;    //隊列舊容量
    
    //擴容機制,隊列原容量小於64時,擴容爲原來的2倍再加2
    //大於64,則擴大1.5倍
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);
}

//上浮
/**
* 上浮過程
* 假設已有一個有序堆(升序)以下所示:
*          10
*      /       \
*    20         40
*   /  \      /
*  60   70   90
* 如今要將元素30插入堆中,則有
* 1.將要插入的30先放在二叉堆的末尾
* 2.再將其與父結點進行比較,判斷是否要上浮(小於父結點就上浮)
* 3.若小於父結點則交換位置,再重複第2步驟繼續上浮
* 4.若大於則直接結束上浮
*          10                         10
*      /       \                  /       \
*    20         40      ——>     20         30
*   /  \      /   \            /  \      /   \
*  60   70   90    30        60   70   90    40
*/
private void siftUp(int k, E x) {
    //判斷隊列是天然排序仍是比較器排序
    if (comparator != null)
        siftUpUsingComparator(k, x);    //比較器排序
    else
        siftUpComparable(k, x);    //天然排序
}

//入隊操做本質是一個堆排序中的一個上浮的過程
private void siftUpUsingComparator(int k, E x) {
    //判斷索引位置是否大於0,便是否到達堆頂
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (comparator.compare(x, (E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = x;
}

//另外一個上浮方法,使用的天然排序
private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (key.compareTo((E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = key;
}

    PriorityQueue中的出隊方法分析:算法

public E poll() {
    if (size == 0)    //判斷隊列是不是空隊列
        return null;
    int s = --size;
    modCount++;
    E result = (E) queue[0];    //取出隊首元素
    E x = (E) queue[s];    //獲取隊尾元素
    queue[s] = null;    //隊尾賦null

    //將本來的隊尾元素放到堆頂,再對整個堆進行排序整理
    //即下沉
    if (s != 0)
        siftDown(0, x);    //下沉方法
    return result;
}

//下沉
/**
* 下沉過程
* 假設已有一個有序堆(升序)以下所示:
*          10
*      /       \
*    20         30
*   /  \      /    \
*  60   70   90     40
* 如今要將元素10出隊,則有
* 1.將要出隊的10移除出二叉堆,並將隊尾40放到堆頂
* 2.將堆頂元素與兩個子結點中較小的元素相比較,選擇小的元素做爲新的堆頂元素
* 3.重複對堆中前一半結點進行將第2步的比較交換
*          40                         20
*      /       \                  /       \
*    20         30      ——>     40         30
*   /  \      /               /  \       /   
*  60   70   90             60   70    90 
*/
private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);    //比較器下沉
    else
        siftDownComparable(k, x);    //天然排序下沉
}

//使用天然排序下沉
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;       //下沉要對堆中前一半的結點都進行
    while (k < half) {
        int child = (k << 1) + 1;     
        Object c = queue[child];    //獲取當前結點的左孩子
        int right = child + 1;    //右孩子索引
    
        //若存在右孩子,那麼左右孩子先比較大小,取小再與父結點比較
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];

        //父結點與子結點比較
        //若父結點小於子結點,則直接結束下沉的過程
        //不然,交互位置後繼續下沉操做
        if (key.compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = key;
}

//使用比較器下沉
@SuppressWarnings("unchecked")
private void siftDownUsingComparator(int k, E x) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = queue[child];
        int right = child + 1;
        if (right < size && comparator.compare((E) c, (E) queue[right]) > 0)
            c = queue[child = right];
        if (comparator.compare(x, (E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = x;
}

3.DelayQueue的繼續體系api

    瞭解了DelayQueue的底層實際是經過PriorityQueue實現,再來看看DelayQueue的繼承關係,以下圖所示,父類及接口以前的學習中都已分析過,不在贅言。數組

 

4.Delay接口安全

    DelayQueue隊列與其餘隊列最明顯的不一樣之處,就是它的延時功能,也正由於這個延時特色,DelayQueue中的對象都必需要實現Delay接口,接下來就看看這個Delay接口是幹什麼的。數據結構

//用來標記那些應該在給定延遲時間以後執行的對象
public interface Delayed extends Comparable<Delayed> {
    //檢查延遲是否結束,該方法返回一個延遲時間,時間到後在檢查還有沒有
    //延遲,若沒有延遲執行下一步,若還有延遲,繼續等待
    long getDelay(TimeUnit unit);
}

    DelayQueue的使用示例:多線程

/**
* 延遲隊列的使用示例
* 主線程建立三個延遲任務放到queue中,其餘三個線程
* 在任務可用時取出
* Created by bzhang on 2019/4/1.
*/
public class TestDelayed implements Delayed {
      private String name;
      private Date takeTime;  //延遲時間

      public TestDelayed(String name, Date takeTime) {
            this.name = name;
            this.takeTime = takeTime;
      }

      public String getName() {
            return name;
      }

      public void setName(String name) {
            this.name = name;
      }

      public Date getTakeTime() {
            return takeTime;
      }

      public void setTakeTime(Date takeTime) {
            this.takeTime = takeTime;
      }

      @Override
      public long getDelay(TimeUnit unit) {
            long convert = unit.convert(takeTime.getTime()-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            return convert;
      }

      @Override
      public int compareTo(Delayed o) {
            TestDelayed t = (TestDelayed)o;
            long l = this.takeTime.getTime() - t.getTakeTime().getTime();
            if (l==0){
                  return 0;
            }
            return l > 0 ? 1 : -1;
      }

      @Override
      public String toString() {
            return "TestDelayed{" +
                    "name='" + name + '\'' +
                    ", takeTime=" + takeTime +
                    '}';
      }

      public static void main(String[] args) {
            DelayQueue queue = new DelayQueue();
            long l = System.currentTimeMillis();
            queue.put(new TestDelayed("A",new Date(l+5000)));
            queue.put(new TestDelayed("B",new Date(l+2000)));
            queue.put(new TestDelayed("C",new Date(l+7000)));

            System.out.println(new Date());
            int t = 0;
            for (int i = 0;i < 3;i++){
                  new Thread(new Runnable() {
                        @Override
                        public void run() {
                              try {
                                    System.out.println(Thread.currentThread().getName()+queue.take());
                              } catch (InterruptedException e) {
                                    e.printStackTrace();
                              }
                        }
                  }).start();
            }


      }
}

//結果
Tue Apr 02 11:03:33 CST 2019
Thread-1TestDelayed{name='B', takeTime=Tue Apr 02 11:03:35 CST 2019}
Thread-0TestDelayed{name='A', takeTime=Tue Apr 02 11:03:38 CST 2019}
Thread-2TestDelayed{name='C', takeTime=Tue Apr 02 11:03:40 CST 2019}

5.DelayQueue中的重要屬性及構造方法併發

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //重入鎖,用於保證併發安全
    private final transient ReentrantLock lock = new ReentrantLock();

    //底層優先級隊列,實際元素都存儲與該隊列中,底層是數組構成的二叉堆
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    //下一個等待獲取元素的線程,可減小沒必要要的等待
    private Thread leader = null;

    //條件控制,表示是否能夠從隊列中取數據
    private final Condition available = lock.newCondition();


    public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

}

6.DelayQueue的入隊方法ide

//add方法本質就是調用offer方法,將元素新增到隊列
public boolean add(E e) {
    return offer(e);
}

//同上
public void put(E e) {
    offer(e);
}

//延遲隊列是無界隊列,指定超時時間放入元素沒有意義,與直接入隊是同樣的
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

//向隊列中新增元素,元素位置以比較結果(compareTo方法)來肯定
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);    //調用底層優先級隊列的offer方法來存儲元素

        //判斷底層優先級隊列的隊首是不是新增元素
        if (q.peek() == e) {
            leader = null;

            //喚醒條件等待隊列的某一個線程,即說明隊列中有元素了,
            //能夠從隊列中獲取到元素了
            available.signal();    
        }
        return true;
    } finally {
        lock.unlock();
    }
}

7.DelayQueue的出隊方法oop

//返回延遲時間已到的第一個元素,或返回null(沒有元素或元素延遲時間都未到)
public E poll() {
    final ReentrantLock lock = this.lock;    //重入鎖
    lock.lock();    //加鎖同步
    try {
        E first = q.peek();    //獲取優先級隊列中的隊首元素

        //判斷隊列是否爲空,若不爲空那麼隊首延遲時間是否到達,若都不知足
        //說明隊首元素可用,返回隊首
        //不然返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

//如有延遲時間已到的元素就當即返回,若無則一直等待
//隊列中無元素那麼也一直等待
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可被中斷鎖
    try {
        for (;;) {    //自旋
            E first = q.peek();    //獲取隊首元素

            //若隊列爲空,直接進入條件隊列等待喚醒
            //隊列不爲空,則判斷隊首的延時是否到達
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);    //獲取剩餘延遲時間(單位是ns)
                if (delay <= 0)    //沒有剩餘延遲時間,則將隊首元素返回
                    return q.poll();
                first = null; 
    
                //判斷是否已經有其餘線程在等待取元素
                //如有,那麼就讓當前線程直接等待
                //若沒有,那就說明當前只有本線程在等待獲取隊首元素
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();    //獲取當前線程
                    leader = thisThread;    //將單籤線程設爲等待獲取隊首的線程
                    try {
                        //等待隊首元素的延遲時間後,在嘗試獲取隊首元素
                        available.awaitNanos(delay);    
                    } finally {

                        //將等待獲取的線程設爲null,由於當前線程正在獲取,所以不該該有leader
                        //即leader爲null,說明要麼有線程正在執行獲取操做,要麼沒有出隊操做在進行
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //當前線程已經取完元素了,能夠喚醒其餘線程獲取隊首元素了
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

//指定時間內獲取延遲的隊首元素,若在指定等待時間內隊首延遲時間未到達或隊列爲空
//就返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();

            //隊列是否爲空,若爲空隊列,那麼在指定等待是否到達,若等待時間也已到達
            //那就返回null,若未到達等待時間,就繼續等待
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);    //當前線程進入等待時間nanos納秒
            } else {
                long delay = first.getDelay(NANOSECONDS);    //獲取隊首元素的延遲時間

                //判斷延遲時間是否到達,到達就直接將隊首元素返回
                if (delay <= 0)
                    return q.poll();

                //延遲時間未到,但等待時間已經達到,那麼就返回null
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting

                //延遲時間小於等待時間,說明能夠在等待時間內獲取到隊首元素
                //那麼就在等待延遲時間到達的時間內,能夠再次嘗試將隊首元素獲取返回
                //這裏僅是再次嘗試,由於可能在等待期間內有新的元素入隊,且延遲時間最小成爲新隊首
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);

                else {
                    //等待時間 > 延遲時間 而且沒有其它線程在等待,
                    //那麼當前元素成爲leader,表示當前線程最先正在等待獲取元素
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //讓等待時間到達
                        long timeLeft = available.awaitNanos(delay);
                        //繼續等待的時間
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

8.peek方法

//peek方法僅僅就是爲底層的優先級隊列的peek方法加上鎖
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}

2、PriorityBlockingQueue併發容器

1.PriorityBlockingQueue的底層實現

    PriorityBlockingQueue是一個線程安全的無界阻塞隊列,能夠看對是PriorityQueue的多線程版本,其底層數據結構與PriorityQueue相同,都是數組實現的利用二叉堆結構。前文已經分析過,這裏再也不多說

 

2.PriorityBlockingQueue的繼承體系

    PriorityBlockingQueue的繼承關係以下圖所示,均是以前學習過的父類或接口。這裏再也不展開。

3.PriorityBlockingQueue中的重要屬性及構造方法

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    //未指定隊列初始容量時使用的默認容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    //隊列雖說是無界的,但實際隊列是不能超過Integer.MAX_VALUE - 8這個值的
    //如果超過報OOM異常
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    //底層存放數據的數組
    private transient Object[] queue;

    //隊列中元素的個數,計數器
    private transient int size;

    //用於判斷優先級的比較器,若爲null則使用天然排序
    private transient Comparator<? super E> comparator;

    //重入鎖,保證併發安全
    private final ReentrantLock lock;

    //隊列非空條件,用於出隊操做
    private final Condition notEmpty;

    //用於隊列顯示是否處於擴容狀態,0表示沒有在擴容
    //而1表示處於擴容狀態,將該值更新成1的線程會進行數組擴容
    //其餘要進行擴容的線程檢查該值發現爲1,則直接暫停線程讓出CPU
    private transient volatile int allocationSpinLock;

    //將隊列轉換成線程不安全的優先級隊列,用於序列化
    private PriorityQueue<E> q;

    //建立一個默認初始容量的隊列
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    //建立一個指定初始容量的隊列
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    //建立一個指定容量和比較器的隊列
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

    //以集合c爲底,建立一個隊列
    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls

        //根據集合c是哪種容器來決定建立怎樣的初始隊列
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }
}

4.入隊方法

//PriorityBlockingQueue全部的入隊方法,都同樣,由於隊列是無界隊列
//不存在加入隊列失敗的可能,所以最終都是調用offer方法
public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

public boolean offer(E e) {

    //優先級隊列中不容許存在null元素,所以null元素沒法肯定優先級
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;    //n爲當前隊列中的元素個數,cap爲當前隊列的容量
    Object[] array;

    //判斷底層數組是否須要擴容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;

        //判斷是使用比較器進行上浮操做,仍是使用天然排序進行上浮操做
        if (cmp == null)
            siftUpComparable(n, e, array);    //天然排序上浮
        else
            siftUpUsingComparator(n, e, array, cmp);    //比較器上浮
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

//天然上浮,與PriorityQueue中同樣
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;    //獲取父結點索引
        Object e = array[parent];    //父結點

        //比較插入的值與父結點的大小,若比父結點小,那麼交換位置後,在繼續比較
        //若比父結點大,說明排序正確,無需在繼續比較
        if (key.compareTo((T) e) >= 0)    
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

//比較器比較上浮
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

//數組擴容
private void tryGrow(Object[] array, int oldCap) {
    // 擴容時不須要加鎖,由於擴容是經過CAS方式來實現的,
    //這樣不只能夠提高效率,而且不影響出隊操做
    lock.unlock();     
    Object[] newArray = null;

    //將allocationSpinLock更新成1的線程進行數組擴容操做,其他要擴容的線程暫停
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //擴容規則,容量小於64,擴大2倍+2,容量不小於64,則擴大1.5倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            //判斷擴大後的容量是否越界
            //如果會越界,則擴容規則改成舊容量+1,若仍越界,報OOM異常
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];    //建立新數組
        } finally {
            allocationSpinLock = 0;    //恢復爲0,表示沒有在擴容狀態
        }
    }
    if (newArray == null)     //未競爭到擴容操做的線程暫停
        Thread.yield();
    lock.lock();    /從新上鎖
    if (newArray != null && queue == array) {
        queue = newArray;
        //將舊數組中的數據轉移到新數組中
        System.arraycopy(array, 0, newArray, 0, oldCap);    
    }
}

5.出隊方法

//獲取並移除隊首元素,若隊列爲空,返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();    //真正出隊的方法
    } finally {
        lock.unlock();
    }
}

//真正執行獲取並移除隊首元素的方法
private E dequeue() {
    int n = size - 1;      //移除隊首後隊列中的元素個數 ,同時也是隊尾元素的索引 

    //判斷隊列是否爲空隊列,空隊列直接返回null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;    //獲取底層數組引用
        E result = (E) array[0];    //獲取隊首元素
        E x = (E) array[n];    //獲取隊尾元素
        array[n] = null;    //隊尾置爲null
        Comparator<? super E> cmp = comparator;

        //將原來隊列的隊尾放到隊首位置,而後進行下沉操做(即二叉堆從新排序的操做)
        if (cmp == null)
            siftDownComparable(0, x, array, n);    //使用天然排序下沉
        else
            siftDownUsingComparator(0, x, array, n, cmp);    //使用比較器下沉
        size = n;
        return result;
    }
}

//下沉操做與PriorityQueue中相同,這裏不在多作分析
//天然排序下沉
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

//比較器下沉
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}


//獲取並移除隊首元素,若隊列已空,則等待
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        //若返回的元素爲null,說明隊列中沒有元素
        //那麼讓當前線程進入條件隊列中等待,當前隊列有元素時,則
        //會喚醒線程,在嘗試獲取並移除隊首
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

//在必定時間內嘗試獲取並移除隊首元素,若在指定時間內未成功,
//返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //嘗試獲取並移除隊首,若失敗但超時時間未到,則進入條件等待
        //一段時間後在進行嘗試,若超時時間已過仍爲成功獲取並移除隊首
        //則返回null
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

//獲取但不移除隊首元素
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (size == 0) ? null : (E) queue[0];
    } finally {
        lock.unlock();
    }
}
6.總結
    1.PriorityBlocking Queue是基於數組實現的二叉堆結構。
    2.PriorityBlocking Queue中涉及到元素之間的比較,所以不能存在null元素。
    3.PriorityBlocking Queue的入隊出隊操做線程安全是經過重入鎖ReentrantLock實現的,但在擴容時是基於CAS算法實現的。
    4.PriorityBlocking Queue是無界隊列,其入隊出隊規則是基於優先級的,雖說是無界隊列,但並非無限大的,容量不能超過 Integer.MAX_VALUE - 8。
相關文章
相關標籤/搜索