Timer 與 DelayQueue

簡介

有這樣一個業務場景,不少業務須要發郵件,若是失敗了要重試,每隔5分鐘重試一次,最多12次,若是是12次都失敗,就記入數據庫。java

粗一想,很簡單嘛,可是仔細想想,好像不是那麼容易,在想想,嗯,也不是那麼難。仍是要親自試一下,寫一下代碼才知道有哪些坑。數據庫

分析

其實,問題的關鍵在於時間間隔的處理,是使用定時器,仍是隊列把時間封裝一下。dom

注意,很容易把這2中狀況混在一塊兒了。ide

下面就來把這兩種狀況都實現一下。函數

Timer實現

不少對線程池比較熟悉的朋友可能首先想到的是ScheduledThreadPoolExecutor,可是ScheduledThreadPoolExecutor有一個問題就是沒有辦法取消。測試

好比發送到第5次成功了,就須要取消週期任務,避免重複發送,ScheduledThreadPoolExecutor是很差實現的。this

因此咱們直接使用Timer和TimerTask。線程

先來一個TimerTask3d

import java.util.TimerTask;

public class MailSendTimerTask extends TimerTask {

    private int exeNum;

    private String name;

    public MailSendTimerTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        if(exeNum < 12){
            try{
                if(MailUtil.sendMail(name)){
                    this.cancel();
                }
            }finally {
                exeNum++;
            }
        }else {
            MailUtil.logError(name);
            this.cancel();
        }
    }
}

若是發送成功或者到了12次都失敗了,咱們就取消任務。調試

MailUtil僞裝發了郵件和失敗後記錄到數據庫了

import java.util.Random;

public class MailUtil {

    private static final Random random = new Random();

    private static final boolean [] deafult_boolean = {false,false,false,false,false,false,false,false,false,false,false,false,true};

    public static boolean sendMail(String name){
        System.out.println("send mail:" + name);
        return deafult_boolean[random.nextInt(deafult_boolean.length)];
    }

    public static void logError(String name){
        System.out.println("log error:" + name);
    }
}

來一個測試類:

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Start {

    /**
     * 5分鐘
     */
//    public static final int PERIOD = 1000 * 60 * 5;
    /**
     * 2秒
      */
    public static final int PERIOD = 1000 * 2;

    private static final Timer timer = new Timer();

    private static final ScheduledThreadPoolExecutor scheduledExe = new ScheduledThreadPoolExecutor(2);

    public static void main(String[] args) {

        for(int i = 0 ;i<20;i++){
            addTask(new MailSendTimerTask(String.valueOf(i)));
//            exeTask(new MailSendTimerTask(String.valueOf(i)));
        }
    }

    public static void addTask(TimerTask task){
        timer.schedule(task,0, PERIOD);
    }

    public static void exeTask(TimerTask task){
        scheduledExe.scheduleAtFixedRate(task,0,PERIOD, TimeUnit.MILLISECONDS);
    }
}

咱們能夠看到使用ScheduledThreadPoolExecutor是沒有辦法取消任務的。

Timer有一些問題,首先Timer是單線程的,另外Timer執行任務拋出異常後,後面的任務就都不會執行了。

因此咱們來看一下隊列方式的實現。

DelayQueue實現

爲了方便咱們使用DelayQueue阻塞隊列。

注意DelayQueue隊列的元素須要實現Delayed。

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

Delayed主要須要獲取的是剩餘的延遲時間和比較元素的優先級(繼承了Comparable)

當getDelay<=0的時候才能從DelayQueue中獲取到元素。

先來一個實現看一下:

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Email implements Delayed{

    /**
     * 毫秒
     */
    public static final int PERIOD = 1000 * 1;

    private String title;

    private Integer retryTimes;

    private long lastSendTime;

    public Email(String title, Integer retryTimes, long lastSendTime) {
        this.title = title;
        this.retryTimes = retryTimes;
        this.lastSendTime = lastSendTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long delay = lastSendTime + PERIOD - System.currentTimeMillis();
//        System.out.println(TimeUnit.SECONDS.convert(delay,TimeUnit.MILLISECONDS));
        return unit.convert(delay,TimeUnit.MILLISECONDS);
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public long getLastSendTime() {
        return lastSendTime;
    }

    public void setLastSendTime(long lastSendTime) {
        this.lastSendTime = lastSendTime;
    }

    @Override
    public int compareTo(Delayed o) {
        if(!(o instanceof Email)){
            return -1;
        }
        Email that = (Email)o;
        long diff = this.lastSendTime - that.lastSendTime;
        if(diff == 0)
            return 0;
        else if(diff > 0)
            return 1;
        else
            return -1;
    }

    @Override
    public String toString() {
        return "Email{" +
                "title='" + title + '\'' +
                ", retryTimes=" + retryTimes +
                ", lastSendTime=" + lastSendTime +
                '}';
    }
}

注意:getDelay必須小於等於0纔可以從隊列裏面獲取到,因此getDelay返回值應該是動態變化的,通常是和當前時間相關的。

getDelay的返回值必須的時間單位必須是納秒。能夠看一下DelayQueue的take中的實現

available.awaitNanos(delay);

這裏簡單說一下這個思路,咱們知道DelayQueue中的隊列可能有元素可是沒有到延遲時間是取不到的。

若是沒有元素,咱們能夠在加入元素的時候通知,可是有元素,時間沒到怎麼處理呢?

讓while循環一直取?顯然這是很差的,消耗cpu,DelayQueue解決方式是,使用延遲時間優先級隊列,這樣取出來的就是延遲時間最短的,而後再等待一個延遲時間。這樣就能夠在等待的時候讓出cpu,避免無用的消耗。

compareTo也大(返回值>0),表示延遲時間也長,優先級也低。

咱們有了Delayed就可使用DelayQueue來存聽任務了,而後就能夠開線程池來執行任務了。

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class MailHandler {

    private static final Random random = new Random();

    private static final DelayQueue<Email> failQueue = new DelayQueue<Email>();

    private static final ExecutorService service = Executors.newFixedThreadPool(2);

    private static final MailHandler instance = new MailHandler();

    private static final boolean [] deafult_boolean = {false,false,false,false,false,false,false,false,false,false,false,false,true};

    private MailHandler(){
        service.submit(new MailSender());
    }

    public static MailHandler getInstance(){
        return instance;
    }

    public void failHandle(List<Email> mails) {
        failQueue.addAll(mails);
    }

    private static class MailSender implements Runnable{
        @Override
        public void run() {
            while (true){
                Email to = null;
                try {
                    to = failQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(to == null) {
                    continue;
                }
                System.out.println(to);
                if(!deafult_boolean[random.nextInt(deafult_boolean.length)]){
                    Integer retryTimes = to.getRetryTimes();
                    if(retryTimes < 3) {
                        to.setRetryTimes(retryTimes + 1);
                        to.setLastSendTime(System.currentTimeMillis());
                        failQueue.offer(to);
                    }else{
                        System.out.println(to.getTitle() + "----Failure!");
                    }
                }else{
                    System.out.println(to.getTitle() + "----Success!");
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MailHandler instance = MailHandler.getInstance();
        LinkedList<Email> list = new LinkedList<>();
        Random random = new Random();
        long lastSendTime = System.currentTimeMillis();
        for(int i =0;i<20;i++){
            list.add(new Email(String.valueOf(i),0, lastSendTime + random.nextInt(5) * 1000));
        }
        instance.failHandle(list);
    }
}

能看出上面的代碼有那些問題嗎?

其實這就是一個生成消費者模式,咱們開了一個2個線程從線程池,可是咱們只提交了一個任務。 因此咱們能夠稍微修改一下構造函數:

private MailHandler(){
    service.execute(new MailSender());
    service.execute(new MailSender());
}

這裏使用Executors.newFixedThreadPool是在適合不過了,由於從開始咱們就能決定使用多少個線程來處理任務,和不肯定任務數明顯不一樣。

你能夠嘗試重構一下,也許就能發現這其中的一些微妙的區別。親自修改一下代碼試一下,調試一下,是最容易發現問題的方式。

咱們也沒有使用submit的方式了,而是使用的execute方式,若是不關心結果的話最好使用execute,若是出錯了至少能獲得一點堆棧信息,若是使用submit就什麼也沒有了,除非自定義ThreadPoolExecutor處理了堆棧信息。

使用:

System.out.println(Thread.currentThread().getName()+ ":" + to);

代替:

System.out.println(to);

能夠看一下是由哪個線程處理的任務。

執行結果

相關文章
相關標籤/搜索