Queue和BlockingQueue的使用以及使用BlockingQueue實現生產者-消費者

  Java提供了兩種新的容器類型:Queue和BlockingQueue。前端

  Queue用於保存一組等待處理的元素。它提供了幾種實現,包括:ConcurrentLinkedQueue,這是一個先進先出的併發對列,以及PriorityQueue,這是一個非併發的優先隊列。Queue上的操做不會阻塞,若是隊列爲空,獲取元素的操做將返回空值。雖然能夠用List來模擬一個Queue的行爲----事實上正是經過LinkedList來實現Queue的行爲的,但還須要一個Queue的類,由於它能去掉List的隨機訪問需求,從而實現更高效的併發。java

  BlockingQueue擴展了Queue,增長了可阻塞的插入和獲取操做。若是隊列爲空那麼獲取元素的操做將會一直阻塞,直到隊列出現一個能夠可用的元素。若是隊列已滿(對於有界隊列來講),那麼插入元素的操做將一直阻塞,直到隊列中出現可用的元素。在"生產者-消費者"設計模式中,阻塞隊仍是很是有用的。node

1.Queue的使用

  Queue接口與List、Set同一級別,都是繼承了Collection接口。
  Queue使用時要儘可能避免Collection的add()和remove()方法,而是要使用offer()來加入元素,使用poll()來獲取並移出元素。它們的優點是經過返回值能夠判斷成功與否,add()和remove()方法在失敗的時候會拋出異常。 若是要使用前端而不移出該元素,使用element()或者peek()方法。
  值得注意的是LinkedList類實現了Queue接口,所以咱們能夠把LinkedList當成Queue來用。設計模式

  Queue的方法也很是簡單,就是三組(一個會拋出異常,一個返回特殊值):api

方法 拋出異常 不會拋出異常
插入 boolean add(E e); boolean offer(E e);
移除(返回且移除頭元素)  E remove(); E poll();
檢查(返回頭元素但不刪除) E element(); E peek();

例如:(poll()返回了null,remove()拋出異常了)數組

package cn.qlq.thread.thirteen;

import java.util.LinkedList;
import java.util.Queue;

public class Demo1 {
    public static void main(String[] args) {
        Queue<String> queue = new LinkedList<String>();
        String poll = queue.poll();
        System.out.println(poll);
        String remove = queue.remove();
        System.out.println(remove);
    }
}

結果:安全

null
Exception in thread "main" java.util.NoSuchElementException
  at java.util.LinkedList.removeFirst(LinkedList.java:268)
  at java.util.LinkedList.remove(LinkedList.java:683)
  at cn.qlq.thread.thirteen.Demo1.main(Demo1.java:11)併發

2.BlockingQueue的使用 

  BlockingQueue繼承Queue接口,位於併發包下,對Queue接口進行了擴展。框架

package java.util.concurrent;

import java.util.Collection;
import java.util.Queue;

public interface BlockingQueue<E> extends Queue<E> {
  
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

 

  阻塞隊列提供了可阻塞的put和take方法,以及支持定時的offer和poll方法。若是隊列已經滿了,那麼put方法將阻塞到有空間可用;若是隊列爲空,那麼take方法將會阻塞到有元素可用。隊列能夠是有界的,也能夠是無界的,無界隊列永遠不會充滿,所以無界隊列的put方法永遠也不會阻塞。(offer方法若是數據項不能添加到隊列中,就會返回一個失敗狀態。這樣就可以建立更多靈活的策略來處理負荷過載的狀況,例如減輕負載,將多餘的工做項序列化並寫入磁盤,減小生產者線程的數量,或者經過某種方式來抑制生產者線程)ide

  BlockingQueue簡化了生產者-消費者模式的設計過程,消費者不須要知道生產者是誰,生產者也不須要知道生產者是誰;並且支持任意數量的生產者與消費者。一種最多見的生產者-消費者設計模式就是線程池與工做隊列的組合,在Executor任務執行框架中就體現了這種模式。

  

  一個經典的例子:以洗盤子爲例子,一我的洗完盤子把盤子放在盤架上,另外一我的負責從盤架上取出盤子並把他們烘乾。在這個例子中,盤架就至關於一個阻塞隊列。若是盤架上沒有盤子,消費者會一直等待,若是盤架滿了,生產者會一直等待。咱們能夠將這種類比擴展爲多個生產者與多個消費者,每一個工人只須要與盤架打交道。人們不須要知道誰是生產者誰是消費者。

  生產者和消費者的角色是相對的,某種環境下的生產者在另外一種不一樣的環境中可能會變爲消費者。好比烘乾盤子的人將"消費"洗乾淨的溼盤子,而產生烘乾的盤子。第三我的把洗乾淨的盤子整理好,在這種狀況下,烘乾盤子的人是生產者也是消費者,從而就有了兩個共享的隊列(每一個對壘對列可能阻塞烘乾工做的運行)。

  

  JDK中有多個BlockingQueue的實現,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO隊列,兩者分別於LinkedList和ArrayList相似,但比同步List擁有更好的同步性能。PriorityBlockingQueue隊列是一個按優先級排列的隊列,這個隊列能夠根據元素的天然順序來比較元素(若是他們實現了Comparable方法),也可使用Comparator來比較。

  還有一個是SynchronousQueue,實際上它不是一個真正的隊列,由於它不會維護隊列中元素的存儲空間,與其餘隊列不一樣的是,它維護一組線程,這些線程在等待把元素加入或移除隊列。若是以洗盤子爲例,那麼久至關於沒有盤架而是直接將洗好的盤子放入下一個空閒的烘乾機中。這種方式看似很奇怪,因爲能夠直接交付工做下降了將數據從生產者移到消費者的延遲。由於SynchronousQueue沒有存儲功能,所以put和take會一直阻塞,直到有另外一個線程準備好參與到交付過程,僅當有足夠多的消費者,而且老是有一個消費者準備獲取交付工做時,才適合使用同步隊列。

 

BlockingQueue中的方法:

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已經列了。看一下BlockingQueue中特有的方法:

(1)void put(E e) throws InterruptedException

  把e添加進BlockingQueue中,若是BlockingQueue中沒有空間,則調用線程被阻塞,進入等待狀態,直到BlockingQueue中有空間再繼續

(2)void take() throws InterruptedException

  取走BlockingQueue裏面排在首位的對象,若是BlockingQueue爲空,則調用線程被阻塞,進入等待狀態,直到BlockingQueue有新的數據被加入

(3)int drainTo(Collection<? super E> c, int maxElements)

  一次性取走BlockingQueue中的數據到c中,能夠指定取的個數。經過該方法能夠提高獲取數據效率,不須要屢次分批加鎖或釋放鎖

2.1   ArrayBlockingQueue的簡單使用

  基於數組的阻塞隊列,必須指定隊列大小。比較簡單。ArrayBlockingQueue中只有一個ReentrantLock對象,這意味着生產者和消費者沒法並行運行。建立ArrayBlockingQueue能夠指定鎖的公平性,默認是非公平鎖,以下源碼:

    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

 

例如:基於ArrayBlockingQueue的單生產單消費模式:

package cn.qlq.thread.thirteen;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Demo2 {
    private static int num ;
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);
    
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<String> strings = new ArrayBlockingQueue<>(1);//必須指定容量(指定容器最多爲1)
        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for  (int i=0;i<5;i++) {
                        String ele = "ele"+(++num);
                        strings.put(ele);
                        LOGGER.info("ThreadName ->{} put ele->{}",Thread.currentThread().getName(),ele);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"producer");
        producer.start();
        
        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i=0;i<5;i++) {
                        Thread.sleep(1*1000);
                        String take = strings.take();
                        LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"consumer");
        consumer.start();
    }
}

結果:(能夠看到生產者放進元素以後會等元素被拿走以後纔會繼續生成元素)

11:00:04 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele1
11:00:05 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele1
11:00:05 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele2
11:00:06 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele2
11:00:06 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele3
11:00:07 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele4
11:00:07 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele3
11:00:08 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele5
11:00:08 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele4
11:00:09 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele5

2.2  LinkedBlockingQueue 簡單使用

  相似於LinkedList,基於鏈表的阻塞隊列。此隊列若是不指定容量大小,默認採用Integer.MAX_VALUE(能夠理解爲無限隊列)。此外LinkedBlockingList有兩個鎖,意味着生產者和消費者都有本身的鎖。以下源碼:

    private transient Node<E> head;

    private transient Node<E> last;

    private final ReentrantLock takeLock = new ReentrantLock();

    private final Condition notEmpty = takeLock.newCondition();

    private final ReentrantLock putLock = new ReentrantLock();

    private final Condition notFull = putLock.newCondition();

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

 

例如:基於LinkedBlockingQueue的多生產多消費模式:

package cn.qlq.thread.thirteen;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Demo3 {
    private static int num ;
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);
    
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<String> strings = new LinkedBlockingQueue<>(3);
        Runnable producerRun = new Runnable() {
            @Override
            public synchronized void  run() {//加同步避免出現線程非安全
                try {
                    for  (int i=0;i<5;i++) {
                        Thread.sleep(1000);
                        String ele = "ele"+(++num);
                        strings.put(ele);
                        LOGGER.info("ThreadName ->{} put ele->{}",Thread.currentThread().getName(),ele);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        
        Thread producer = new Thread(producerRun,"producer");
        producer.start();
        Thread producer2 = new Thread(producerRun,"producer2");
        producer2.start();
        
        Runnable consumerRun = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i=0;i<5;i++) {
                        Thread.sleep(3000);
                        String take = strings.take();
                        LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread consumer = new Thread(consumerRun,"consumer");
        Thread consumer1 = new Thread(consumerRun,"consumer1");
        consumer.start();
        consumer1.start();
    }
}

結果:

11:46:47 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele1
11:46:48 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele2
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele2
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele1
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele3
11:46:50 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele4
11:46:51 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele5
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele3
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele4
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele6
11:46:53 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele7
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele8
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele6
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele5
11:46:56 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele9
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele7
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele8
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele10
11:47:01 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele10
11:47:01 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele9

 

2.3  PriorityBlockingQueue簡單使用

  PriorityBlockingQueue 是一個按優先級排列的阻塞隊列,相似於TreeSet,看到tree,能夠按順序進行排列,就要想到兩個接口。Comparable(集合中元素實現這個接口,元素自身具有可比性),Comparator(比較器,傳入容器構造方法中,容器具有可比性)。

  其內部只有一個Lock,因此生產消費者不能同時做業,並且默認的容量是11,其構造方法也能夠傳入一個比較器,以下源碼:

   /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }

 

測試按年齡逆序排列:

package cn.qlq.thread.thirteen;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Demo4 {
    
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Person> persons = new PriorityBlockingQueue<Person>(3);
        persons.put(new Person(20,"張三"));
        persons.put(new Person(22,"李四"));
        persons.put(new Person(21,"王五"));
        persons.put(new Person(18,"八卦"));
        System.out.println(persons.take());
        System.out.println(persons.take());
        System.out.println(persons.take());
        System.out.println(persons.take());
    }
}


class Person implements Comparable<Person>{
    private int age;
    private String name;

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Person [age=" + age + ", name=" + name + "]";
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Person(int age, String name) {
        super();
        this.age = age;
        this.name = name;
    }

    @Override
    public int compareTo(Person o) {//返回-1表示排在他前面,返回1表示排在他後面
        if(o.getAge() > this.getAge()  ){
            return 1;
        }else if(o.getAge() < this.getAge()){
            return -1;
        }
        return 0;
    }
}

結果:

Person [age=22, name=李四]
Person [age=21, name=王五]
Person [age=20, name=張三]
Person [age=18, name=八卦]

 

2.4 SynchronousQueue簡單使用

   前面已經介紹了,SynchronousQueue實際上它不是一個真正的隊列,由於它不會維護隊列中元素的存儲空間,與其餘隊列不一樣的是,它維護一組線程,這些線程在等待把元素加入或移除隊列。適用於生產者少消費者多的狀況。

例如:

ArrayBlockingQueue有一個數組存儲隊列元素:

    /** The queued items */
    final Object[] items;

 

LinedBlockingQueue有一個內部Node類存儲元素:

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;
Node<E> next; Node(E x) { item = x; } }

 

PriorityBlockingQueue有一個數組用於存儲元素

private transient Object[] queue;

 

  能夠這麼理解,SynchronousQueue是生產者直接把數據給消費者(消費者直接從生產者這裏拿數據)。換句話說,每個插入操做必須等待一個線程對應的移除操做。SynchronousQueue又有兩種模式:

一、公平模式

  採用公平鎖,並配合一個FIFO隊列(Queue)來管理多餘的生產者和消費者

二、非公平模式

  採用非公平鎖,並配合一個LIFO棧(Stack)來管理多餘的生產者和消費者,這也是SynchronousQueue默認的模式

以下源碼:

    public SynchronousQueue() {
        this(false);
    }
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

 

transferer 是一個內部類用於在生產者和消費者之間傳遞數據
    abstract static class Transferer {
        /**
         * Performs a put or take.
         **/
        abstract Object transfer(Object e, boolean timed, long nanos);
    }

 

例如:直接put元素會阻塞

package cn.qlq.thread.thirteen;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Demo5 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> persons = new SynchronousQueue<String>();
        persons.put("1");
        LOGGER.info("放入元素 1");
        LOGGER.info("獲取元素 "+persons.take());
    }
}

結果:(線程會一直處於阻塞狀態,因爲沒有消費者線程消費元素因此一直處於阻塞,因此不會執行LOGGER.info()的代碼)

 

解決辦法:生產元素以前,先開啓消費者線程:(也就是必須確保生產的元素有消費者在take(),不然會阻塞)

package cn.qlq.thread.thirteen;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Demo5 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<String> strings = new SynchronousQueue<String>();
        
        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                        String take = strings.take();
                        LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"consumer");
        consumer.start();
        
        strings.put("1");
        LOGGER.info("放入元素 1");
    }
}

結果:(正常打印信息,而且進程也結束)

 

2.5  還有一個延遲隊列DelayQueue---此隊列能夠實現有序與延遲的效果

  DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。(獲取元素的時候獲取的是頭部元素,並且頭部元素只有在延遲期小於0才能夠取出來)

  爲了具備調用行爲,存放到DelayDeque的元素必須繼承Delayed接口。Delayed接口使對象成爲延遲對象,它使存放在DelayQueue類中的對象具備了激活日期。該接口繼承Comparable接口,以下:

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

 

  CompareTo(Delayed o):Delayed接口繼承了Comparable接口,所以有了這個方法。
  getDelay(TimeUnit unit):這個方法返回到激活日期的剩餘時間,時間單位由單位參數指定。(返回值爲負數的時候才能夠take()出來)

  此類也是隻有一把鎖,並且內部維護一個PriorityQueue用於存放有序隊列(實現有序),查看源碼:

    private transient final ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    private Thread leader = null;
    public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

 

例如:測試隊列中放入5s之後的元素才能夠取出來:

package cn.qlq.thread.thirteen;

import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Demo6 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo6.class);

    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<DelayObj> delayObjs = new DelayQueue<DelayObj>();
        DelayObj delayObj = new DelayObj("1");
        delayObjs.put(delayObj);
        LOGGER.info("放入元素->{}", delayObj);
        Thread.sleep(1 * 1000);

        DelayObj delayObj2 = new DelayObj("3");
        delayObjs.put(delayObj2);
        LOGGER.info("放入元素->{}", delayObj2);

        LOGGER.info("{}", delayObjs.take());
        LOGGER.info("{}", delayObjs.take());

    }
}

class DelayObj implements Delayed {
    private Date createTime;
    private String name;

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public DelayObj(String name) {
        this.createTime = new Date();
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public int compareTo(Delayed o) { // 返回負數表示在前面,返回正數表示在後面
        if (this.getDelay(TimeUnit.NANOSECONDS) > o.getDelay(TimeUnit.NANOSECONDS)) {// NANOSECONDS是十億分之秒
            return -1;
        } else if (this.getDelay(TimeUnit.NANOSECONDS) < o.getDelay(TimeUnit.NANOSECONDS)) {
            return 1;
        }
        return 0;
    }

    @Override
    public String toString() {
        return "DelayObj [createTime=" + createTime + ", name=" + name + "]";
    }

    @Override
    public long getDelay(TimeUnit unit) {
        Date now = new Date();
        long diff = createTime.getTime() + 5 * 1000 - now.getTime();
        System.out.println(diff);
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
}

結果: (能夠看到先獲取的是最後建立的元素,並且只有在延遲期爲0才能夠獲取到---實現了有序加延遲)

23:10:44 [cn.qlq.thread.thirteen.Demo6]-[INFO] 放入元素->DelayObj [createTime=Wed Dec 26 23:10:44 CST 2018, name=1]4997397623:10:45 [cn.qlq.thread.thirteen.Demo6]-[INFO] 放入元素->DelayObj [createTime=Wed Dec 26 23:10:45 CST 2018, name=3]4991-323:10:50 [cn.qlq.thread.thirteen.Demo6]-[INFO] DelayObj [createTime=Wed Dec 26 23:10:45 CST 2018, name=3]-102323:10:50 [cn.qlq.thread.thirteen.Demo6]-[INFO] DelayObj [createTime=Wed Dec 26 23:10:44 CST 2018, name=1]

相關文章
相關標籤/搜索