有這樣一個業務場景,不少業務須要發郵件,若是失敗了要重試,每隔5分鐘重試一次,最多12次,若是是12次都失敗,就記入數據庫。java
粗一想,很簡單嘛,可是仔細想想,好像不是那麼容易,在想想,嗯,也不是那麼難。仍是要親自試一下,寫一下代碼才知道有哪些坑。數據庫
其實,問題的關鍵在於時間間隔的處理,是使用定時器,仍是隊列把時間封裝一下。dom
注意,很容易把這2中狀況混在一塊兒了。ide
下面就來把這兩種狀況都實現一下。函數
不少對線程池比較熟悉的朋友可能首先想到的是ScheduledThreadPoolExecutor,可是ScheduledThreadPoolExecutor有一個問題就是沒有辦法取消。測試
好比發送到第5次成功了,就須要取消週期任務,避免重複發送,ScheduledThreadPoolExecutor是很差實現的。this
因此咱們直接使用Timer和TimerTask。線程
先來一個TimerTask3d
import java.util.TimerTask; public class MailSendTimerTask extends TimerTask { private int exeNum; private String name; public MailSendTimerTask(String name) { this.name = name; } @Override public void run() { if(exeNum < 12){ try{ if(MailUtil.sendMail(name)){ this.cancel(); } }finally { exeNum++; } }else { MailUtil.logError(name); this.cancel(); } } }
若是發送成功或者到了12次都失敗了,咱們就取消任務。調試
MailUtil僞裝發了郵件和失敗後記錄到數據庫了
import java.util.Random; public class MailUtil { private static final Random random = new Random(); private static final boolean [] deafult_boolean = {false,false,false,false,false,false,false,false,false,false,false,false,true}; public static boolean sendMail(String name){ System.out.println("send mail:" + name); return deafult_boolean[random.nextInt(deafult_boolean.length)]; } public static void logError(String name){ System.out.println("log error:" + name); } }
來一個測試類:
import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Start { /** * 5分鐘 */ // public static final int PERIOD = 1000 * 60 * 5; /** * 2秒 */ public static final int PERIOD = 1000 * 2; private static final Timer timer = new Timer(); private static final ScheduledThreadPoolExecutor scheduledExe = new ScheduledThreadPoolExecutor(2); public static void main(String[] args) { for(int i = 0 ;i<20;i++){ addTask(new MailSendTimerTask(String.valueOf(i))); // exeTask(new MailSendTimerTask(String.valueOf(i))); } } public static void addTask(TimerTask task){ timer.schedule(task,0, PERIOD); } public static void exeTask(TimerTask task){ scheduledExe.scheduleAtFixedRate(task,0,PERIOD, TimeUnit.MILLISECONDS); } }
咱們能夠看到使用ScheduledThreadPoolExecutor是沒有辦法取消任務的。
Timer有一些問題,首先Timer是單線程的,另外Timer執行任務拋出異常後,後面的任務就都不會執行了。
因此咱們來看一下隊列方式的實現。
爲了方便咱們使用DelayQueue阻塞隊列。
注意DelayQueue隊列的元素須要實現Delayed。
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
Delayed主要須要獲取的是剩餘的延遲時間和比較元素的優先級(繼承了Comparable)
當getDelay<=0的時候才能從DelayQueue中獲取到元素。
先來一個實現看一下:
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Email implements Delayed{ /** * 毫秒 */ public static final int PERIOD = 1000 * 1; private String title; private Integer retryTimes; private long lastSendTime; public Email(String title, Integer retryTimes, long lastSendTime) { this.title = title; this.retryTimes = retryTimes; this.lastSendTime = lastSendTime; } @Override public long getDelay(TimeUnit unit) { long delay = lastSendTime + PERIOD - System.currentTimeMillis(); // System.out.println(TimeUnit.SECONDS.convert(delay,TimeUnit.MILLISECONDS)); return unit.convert(delay,TimeUnit.MILLISECONDS); } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public Integer getRetryTimes() { return retryTimes; } public void setRetryTimes(Integer retryTimes) { this.retryTimes = retryTimes; } public long getLastSendTime() { return lastSendTime; } public void setLastSendTime(long lastSendTime) { this.lastSendTime = lastSendTime; } @Override public int compareTo(Delayed o) { if(!(o instanceof Email)){ return -1; } Email that = (Email)o; long diff = this.lastSendTime - that.lastSendTime; if(diff == 0) return 0; else if(diff > 0) return 1; else return -1; } @Override public String toString() { return "Email{" + "title='" + title + '\'' + ", retryTimes=" + retryTimes + ", lastSendTime=" + lastSendTime + '}'; } }
注意:getDelay必須小於等於0纔可以從隊列裏面獲取到,因此getDelay返回值應該是動態變化的,通常是和當前時間相關的。
getDelay的返回值必須的時間單位必須是納秒。能夠看一下DelayQueue的take中的實現
available.awaitNanos(delay);
這裏簡單說一下這個思路,咱們知道DelayQueue中的隊列可能有元素可是沒有到延遲時間是取不到的。
若是沒有元素,咱們能夠在加入元素的時候通知,可是有元素,時間沒到怎麼處理呢?
讓while循環一直取?顯然這是很差的,消耗cpu,DelayQueue解決方式是,使用延遲時間優先級隊列,這樣取出來的就是延遲時間最短的,而後再等待一個延遲時間。這樣就能夠在等待的時候讓出cpu,避免無用的消耗。
compareTo也大(返回值>0),表示延遲時間也長,優先級也低。
咱們有了Delayed就可使用DelayQueue來存聽任務了,而後就能夠開線程池來執行任務了。
import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MailHandler { private static final Random random = new Random(); private static final DelayQueue<Email> failQueue = new DelayQueue<Email>(); private static final ExecutorService service = Executors.newFixedThreadPool(2); private static final MailHandler instance = new MailHandler(); private static final boolean [] deafult_boolean = {false,false,false,false,false,false,false,false,false,false,false,false,true}; private MailHandler(){ service.submit(new MailSender()); } public static MailHandler getInstance(){ return instance; } public void failHandle(List<Email> mails) { failQueue.addAll(mails); } private static class MailSender implements Runnable{ @Override public void run() { while (true){ Email to = null; try { to = failQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } if(to == null) { continue; } System.out.println(to); if(!deafult_boolean[random.nextInt(deafult_boolean.length)]){ Integer retryTimes = to.getRetryTimes(); if(retryTimes < 3) { to.setRetryTimes(retryTimes + 1); to.setLastSendTime(System.currentTimeMillis()); failQueue.offer(to); }else{ System.out.println(to.getTitle() + "----Failure!"); } }else{ System.out.println(to.getTitle() + "----Success!"); } } } } public static void main(String[] args) throws InterruptedException { MailHandler instance = MailHandler.getInstance(); LinkedList<Email> list = new LinkedList<>(); Random random = new Random(); long lastSendTime = System.currentTimeMillis(); for(int i =0;i<20;i++){ list.add(new Email(String.valueOf(i),0, lastSendTime + random.nextInt(5) * 1000)); } instance.failHandle(list); } }
能看出上面的代碼有那些問題嗎?
其實這就是一個生成消費者模式,咱們開了一個2個線程從線程池,可是咱們只提交了一個任務。 因此咱們能夠稍微修改一下構造函數:
private MailHandler(){ service.execute(new MailSender()); service.execute(new MailSender()); }
這裏使用Executors.newFixedThreadPool是在適合不過了,由於從開始咱們就能決定使用多少個線程來處理任務,和不肯定任務數明顯不一樣。
你能夠嘗試重構一下,也許就能發現這其中的一些微妙的區別。親自修改一下代碼試一下,調試一下,是最容易發現問題的方式。
咱們也沒有使用submit的方式了,而是使用的execute方式,若是不關心結果的話最好使用execute,若是出錯了至少能獲得一點堆棧信息,若是使用submit就什麼也沒有了,除非自定義ThreadPoolExecutor處理了堆棧信息。
使用:
System.out.println(Thread.currentThread().getName()+ ":" + to);
代替:
System.out.println(to);
能夠看一下是由哪個線程處理的任務。