JUC阻塞隊列之DelayQueue源碼分析

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。而且隊列中的元素必須實現Delayed接口。在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中獲取到元素。DelayQueue的應用範圍很是廣闊,如能夠用它來保存緩存中元素的有效期,也可用它來實現定時任務。緩存

Delayed接口

在分析DelayQueue源碼以前,咱們先來看看Delayd接口,其源碼定義以下:bash

public interface Delayed extends Comparable < Delayed > {

    /**
     * 指定返回對象的延時時間
     * @param  unit [時間單位]
     * @return      [延時的剩餘,0或者-1表示延時已通過期]
     */
    long getDelay(TimeUnit unit);
}
複製代碼

咱們看到,Delayed接口繼承了Comparable接口,即實現Delayed接口的對象必須實現**getDelay(TimeUnit unit)方法和compareTo(T o)方法。這裏compareTo(T o)**方法能夠用來實現元素的排序,能夠將延時時間長的放到隊列的末尾。併發

DelayQueue構造函數

上面分析了Delayed接口,接下來咱們分析DelayQueue的構造函數。DelayQueue提供了2種構造函數,一個是無參構造函數,一個是給定集合爲參數的構造函數。其源碼以下:函數

/**
 * 構建一個空的DelayQueue
 */
public DelayQueue() {}

/**
 * 給定集合c爲參數的構造函數
 * 將集合c中的元素所有放入到DelayQueue中
 */
public DelayQueue(Collection < ? extends E > c) {
    this.addAll(c);
}
複製代碼

addAll方法是AbstractQueue抽象類中的方法,其源碼以下:ui

public boolean addAll(Collection < ? extends E > c) {
    // 參數檢測
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    //遍歷集合c中的元素
    for (E e: c)
        // 調用DelayQueue中的add方法
        if (add(e))
            modified = true;
    return modified;
}
複製代碼

從上面的源碼中,咱們能夠看到,AbstractQueue抽象類中addAll方法實際是調用DelayQueue類中的add方法來實現的。this

DelayQueue 入列操做

DelayQueue提供了4中入列操做,分別是:spa

  • **add(E e):**阻塞的將制定元素添加到延時隊列中去,由於隊列是無界的所以此方法永不阻塞。
  • **offer(E e):**阻塞的將制定元素添加到延時隊列中去,由於隊列是無界的所以此方法永不阻塞。
  • **put(E e):**阻塞的將制定元素添加到延時隊列中去,由於隊列是無界的所以此方法永不阻塞。
  • **offer(E e, long timeout, TimeUnit unit):**阻塞的將制定元素添加到延時隊列中去,由於隊列是無界的所以此方法永不阻塞。

這裏你們可能會奇怪,爲何這些入列方法的解釋都是同樣的?這個問題先等下回答,咱們先來看看這幾個入列方法的源碼定義:線程

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    //獲取可重入鎖
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lock();
    try {
        //調用PriorityQueue中的offer方法
        q.offer(e);
        //調用PriorityQueue中的peek方法
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //釋放鎖
        lock.unlock();
    }
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}
複製代碼

這裏咱們從源碼中能夠看到,**add(E e)**方法、**put(E e)方法和offer(E e,long timeout,TimeUnit unit)方法都是調用offer(E e)方法來實現的,這也是爲何這幾個方法的解釋都是同樣的緣由。其中offer(E e)方法的核心又是調用了PriorityQueue中的offer(E e)**方法,PriorityQueue和PriorityBlockingQueue都是以二叉堆的無界隊列,只不過PriorityQueue不是阻塞的而PriorityBlockingQueue是阻塞的。code

DelayQueue出列操做

DelayQueue提供了3中出列操做方法,它們分別是:對象

  • **poll():**檢索並刪除此隊列的開頭,若是此隊列沒有延遲延遲的元素,則返回null
  • **take():**檢索併除去此隊列的頭,若有必要,請等待直到該隊列上具備過時延遲的元素可用。
  • **poll(long timeout, TimeUnit unit):**檢索並刪除此隊列的頭,若有必要,請等待直到該隊列上具備過時延遲的元素可用,或者或指定的等待時間到期。

下面咱們來一個一個分析出列操做的原來。

poll():

poll操做的源碼定義以下:

public E poll() {
   //獲取可重入鎖
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lock();
    try {
       //獲取隊列中的第一個元素
        E first = q.peek();
        //若果元素爲null,或者頭元素還未過時,則返回false
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
           //調用PriorityQueue中的出列方法
            return q.poll();
    } finally {
        lock.unlock();
    }
}
複製代碼

該方法與PriorityQueue的poll方法惟一的區別就是多了**if (first == null || first.getDelay(NANOSECONDS) > 0)**這個條件判斷,該條件是表示若是隊列中沒有元素或者隊列中的元素未過時,則返回null。

take

take操做源碼定義以下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lockInterruptibly();
    try {
    	//西循環
        for (;;) {
        	//查看隊列頭元素
            E first = q.peek();
            //若是隊列頭元素爲null,則表示隊列中沒有數據,線程進入等待隊列
            if (first == null)
                available.await();
            else {
            	// 獲取first元素剩餘的延時時間
                long delay = first.getDelay(NANOSECONDS);
                //若果剩餘延時時間<=0 表示元素已通過期,能夠從隊列中獲取元素
                if (delay <= 0)
                	//直接返回頭部元素
                    return q.poll();
                //若果剩餘延時時間>0,表示元素還未過時,則將first置爲null,防止內存溢出
                first = null; // don't retain ref while waiting //若是leader不爲null,則直接進入等待隊列中等待 if (leader != null) available.await(); else { //若果leader爲null,則把當前線程賦值給leader,並超時等待delay納秒 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) //喚醒線程 available.signal(); lock.unlock(); } } 複製代碼

take操做比poll操做稍微要複雜些,可是邏輯仍是相對比較簡單。只是在獲取元素的時候先檢查元素的剩餘延時時間,若是剩餘延時時間<=0,則直接返回隊列頭元素。若是剩餘延時時間>0,則判斷leader是否爲null,若果leader不爲null,則表示已經有線程在等待獲取隊列的頭部元素,所以直接進入等待隊列中等待。若果leader爲null,則表示這是第一個獲取頭部元素的線程,把當前線程賦值給leader,而後超時等待剩餘延時時間。在take操做中須要注意的一點是fist=null,由於若是first不置爲null的話會引發內存溢出的異常,這是由於在併發的時候,每一個線程都會持有一份first,所以first不會被釋放,若果線程數過多,就會致使內存溢出的異常。

poll(long timeout, TimeUnit unit)

超時等待獲取隊列元素的源碼以下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } 複製代碼

這個出列操做的邏輯和take出列操做的邏輯幾乎同樣,惟一不一樣的在於take是無時間限制等待,而改操做是超時等待。

總結

DelayQueue的入列和出列操做邏輯相對比較簡單,就是在獲取元素的時候,判斷元素是否已通過期,若果過時就能夠直接獲取,沒有過時的話poll操做是直接返回null,take操做是進入等待隊列中等待。

相關文章
相關標籤/搜索