java併發之DelayQueue實際運用示例

在學習Java 多線程併發開發過程當中,瞭解到DelayQueue類的主要做用:是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中。html

Delayed,一種混合風格的接口,用來標記那些應該在給定延遲時間以後執行的對象。此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。java

簡單的延時隊列要有三部分:第一實現了Delayed接口的消息體第二消費消息的消費者第三存放消息的延時隊列,那下面就來看看延時隊列demo。緩存

1、消息體

package com.delqueue;  
  
import java.util.concurrent.Delayed;  
import java.util.concurrent.TimeUnit;  
  
/** 
 * 消息體定義 實現Delayed接口就是實現兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期…… 
 *  
 * @author whd 
 * @date 2017年9月24日 下午8:57:14 
 */  
public class Message implements Delayed {  
    private int id;  
    private String body; // 消息內容  
    private long excuteTime;// 延遲時長,這個是必須的屬性由於要按照這個判斷延時時長。  
  
    public int getId() {  
        return id;  
    }  
  
    public String getBody() {  
        return body;  
    }  
  
    public long getExcuteTime() {  
        return excuteTime;  
    }  
  
    public Message(int id, String body, long delayTime) {  
        this.id = id;  
        this.body = body;  
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();  
    }  
  
    // 自定義實現比較方法返回 1 0 -1三個參數  
    @Override  
    public int compareTo(Delayed delayed) {  
        Message msg = (Message) delayed;  
        return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1  
                : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);  
    }  
  
    // 延遲任務是否到時就是按照這個方法判斷若是返回的是負數則說明到期不然還沒到期  
    @Override  
    public long getDelay(TimeUnit unit) {  
        return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);  
    }  
}  

2、消息消費者

package com.delqueue;  
  
import java.util.concurrent.DelayQueue;  
  
public class Consumer implements Runnable {  
    // 延時隊列 ,消費者從其中獲取消息進行消費  
    private DelayQueue<Message> queue;  
  
    public Consumer(DelayQueue<Message> queue) {  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        while (true) {  
            try {  
                Message take = queue.take();  
                System.out.println("消費消息id:" + take.getId() + " 消息體:" + take.getBody());  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  

3、延時隊列

package com.delqueue;  
  
import java.util.concurrent.DelayQueue;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
public class DelayQueueTest {  
     public static void main(String[] args) {    
            // 建立延時隊列    
            DelayQueue<Message> queue = new DelayQueue<Message>();    
            // 添加延時消息,m1 延時3s    
            Message m1 = new Message(1, "world", 3000);    
            // 添加延時消息,m2 延時10s    
            Message m2 = new Message(2, "hello", 10000);    
            //將延時消息放到延時隊列中  
            queue.offer(m2);    
            queue.offer(m1);    
            // 啓動消費線程 消費添加到延時隊列中的消息,前提是任務到了延期時間   
            ExecutorService exec = Executors.newFixedThreadPool(1);  
            exec.execute(new Consumer(queue));  
            exec.shutdown();  
        }    
}  

將消息體放入延遲隊列中,在啓動消費者線程去消費延遲隊列中的消息,若是延遲隊列中的消息到了延遲時間則能夠從中取出消息不然沒法取出消息也就沒法消費。mybatis

這就是延遲隊列demo,下面咱們來講說在真實環境下的使用。多線程

在網上也看到兩個示例,但這兩個示例我的在實際運行時均沒有達到知足業務場景的效果,於是對其進行了修改,供你們參考討論。併發

業務場景一:多考生考試

該場景來自於http://ideasforjava.iteye.com/blog/657384,模擬一個考試的日子,考試時間爲120分鐘,30分鐘後纔可交卷,當時間到了,或學生都交完捲了考試結束。dom

這個場景中幾個點須要注意:ide

  1. 考試時間爲120分鐘,30分鐘後纔可交卷,初始化考生完成試卷時間最小應爲30分鐘
  2. 對於可以在120分鐘內交卷的考生,如何實現這些考生交卷
  3. 對於120分鐘內沒有完成考試的考生,在120分鐘考試時間到後須要讓他們強制交卷
  4. 在全部的考生都交完卷後,須要將控制線程關閉

實現思想:用DelayQueue存儲考生(Student類),每個考生都有本身的名字和完成試卷的時間,Teacher線程對DelayQueue進行監控,收取完成試卷小於120分鐘的學生的試卷。當考試時間120分鐘到時,先關閉Teacher線程,而後強制DelayQueue中還存在的考生交卷。每個考生交卷都會進行一次countDownLatch.countDown(),當countDownLatch.await()再也不阻塞說明全部考生都交完捲了,然後結束考試。函數

複製代碼
package com.my.base.concurrent.delayQueue; import java.util.Iterator; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** *this project is created for my partactice. *In the project I will write the mybatis by myself * *2014-1-10 下午9:43:48 *@author 孫振超 mychaoyue2011@163.com */ public class Exam { /** * *2014-1-10 下午9:43:48 by 孫振超 * *@param args *void * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub int studentNumber = 20; CountDownLatch countDownLatch = new CountDownLatch(studentNumber+1); DelayQueue< Student> students = new DelayQueue<Student>(); Random random = new Random(); for (int i = 0; i < studentNumber; i++) { students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch)); } Thread teacherThread =new Thread(new Teacher(students)); students.put(new EndExam(students, 120,countDownLatch,teacherThread)); teacherThread.start(); countDownLatch.await(); System.out.println(" 考試時間到,所有交卷!"); } } class Student implements Runnable,Delayed{ private String name; private long workTime; private long submitTime; private boolean isForce = false; private CountDownLatch countDownLatch; public Student(){} public Student(String name,long workTime,CountDownLatch countDownLatch){ this.name = name; this.workTime = workTime; this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime(); this.countDownLatch = countDownLatch; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub if(o == null || ! (o instanceof Student)) return 1; if(o == this) return 0; Student s = (Student)o; if (this.workTime > s.workTime) { return 1; }else if (this.workTime == s.workTime) { return 0; }else { return -1; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public void run() { // TODO Auto-generated method stub if (isForce) { System.out.println(name + " 交卷, 但願用時" + workTime + "分鐘"+" ,實際用時 120分鐘" ); }else { System.out.println(name + " 交卷, 但願用時" + workTime + "分鐘"+" ,實際用時 "+workTime +" 分鐘"); } countDownLatch.countDown(); } public boolean isForce() { return isForce; } public void setForce(boolean isForce) { this.isForce = isForce; } } class EndExam extends Student{ private DelayQueue<Student> students; private CountDownLatch countDownLatch; private Thread teacherThread; public EndExam(DelayQueue<Student> students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) { super("強制收卷", workTime,countDownLatch); this.students = students; this.countDownLatch = countDownLatch; this.teacherThread = teacherThread; } @Override public void run() { // TODO Auto-generated method stub  teacherThread.interrupt(); Student tmpStudent; for (Iterator<Student> iterator2 = students.iterator(); iterator2.hasNext();) { tmpStudent = iterator2.next(); tmpStudent.setForce(true); tmpStudent.run(); } countDownLatch.countDown(); } } class Teacher implements Runnable{ private DelayQueue<Student> students; public Teacher(DelayQueue<Student> students){ this.students = students; } @Override public void run() { // TODO Auto-generated method stub try { System.out.println(" test start"); while(!Thread.interrupted()){ students.take().run(); } } catch (Exception e) { // TODO: handle exception  e.printStackTrace(); } } }
複製代碼

 

業務場景二:具備過時時間的緩存

該場景來自於http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html,向緩存添加內容時,給每個key設定過時時間,系統自動將超過過時時間的key清除。學習

這個場景中幾個點須要注意:

  1. 當向緩存中添加key-value對時,若是這個key在緩存中存在而且尚未過時,須要用這個key對應的新過時時間
  2. 爲了可以讓DelayQueue將其已保存的key刪除,須要重寫實現Delayed接口添加到DelayQueue的DelayedItem的hashCode函數和equals函數
  3. 當緩存關閉,監控程序也應關閉,於是監控線程應當用守護線程

具體實現以下:

複製代碼
package com.my.base.concurrent.delayQueue; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** *Cache.java * * Created on 2014-1-11 上午11:30:36 by sunzhenchao mychaoyue2011@163.com */ public class Cache<K, V> { public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<K, V>(); public DelayQueue<DelayedItem<K>> queue = new DelayQueue<DelayedItem<K>>(); public void put(K k,V v,long liveTime){ V v2 = map.put(k, v); DelayedItem<K> tmpItem = new DelayedItem<K>(k, liveTime); if (v2 != null) { queue.remove(tmpItem); } queue.put(tmpItem); } public Cache(){ Thread t = new Thread(){ @Override public void run(){ dameonCheckOverdueKey(); } }; t.setDaemon(true); t.start(); } public void dameonCheckOverdueKey(){ while (true) { DelayedItem<K> delayedItem = queue.poll(); if (delayedItem != null) { map.remove(delayedItem.getT()); System.out.println(System.nanoTime()+" remove "+delayedItem.getT() +" from cache"); } try { Thread.sleep(300); } catch (Exception e) { // TODO: handle exception  } } } /** * TODO * @param args * 2014-1-11 上午11:30:36 * @author:孫振超 * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Random random = new Random(); int cacheNumber = 10; int liveTime = 0; Cache<String, Integer> cache = new Cache<String, Integer>(); for (int i = 0; i < cacheNumber; i++) { liveTime = random.nextInt(3000); System.out.println(i+" "+liveTime); cache.put(i+"", i, random.nextInt(liveTime)); if (random.nextInt(cacheNumber) > 7) { liveTime = random.nextInt(3000); System.out.println(i+" "+liveTime); cache.put(i+"", i, random.nextInt(liveTime)); } } Thread.sleep(3000); System.out.println(); } } class DelayedItem<T> implements Delayed{ private T t; private long liveTime ; private long removeTime; public DelayedItem(T t,long liveTime){ this.setT(t); this.liveTime = liveTime; this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem){ DelayedItem<T> tmpDelayedItem = (DelayedItem<T>)o; if (liveTime > tmpDelayedItem.liveTime ) { return 1; }else if (liveTime == tmpDelayedItem.liveTime) { return 0; }else { return -1; } } long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return diff > 0 ? 1:diff == 0? 0:-1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.nanoTime(), unit); } public T getT() { return t; } public void setT(T t) { this.t = t; } @Override public int hashCode(){ return t.hashCode(); } @Override public boolean equals(Object object){ if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ?true:false; } return false; } }
相關文章
相關標籤/搜索