什麼是延時任務面試
延時任務,顧名思義,就是延遲一段時間後才執行的任務。舉個例子,假設咱們有個發佈資訊的功能,運營須要在天天早上7點準時發佈資訊,可是早上7點你們都還沒上班,這個時候就可使用延時任務來實現資訊的延時發佈了。只要在前一天下班前指定次日要發送資訊的時間,到了次日指定的時間點資訊就能準時發出去了。若是你們有運營過公衆號,就會知道公衆號後臺也有文章定時發送的功能。總而言之,延時任務的使用仍是很普遍的。關於延時任務的實現方式,我知道的就不下於3種,後面會逐一介紹,今天就講下如何用redis實現延時任務。redis
延時任務的特色網絡
在介紹具體方案以前,咱們不妨先想一下要實現一個延時系統,有哪些內容是必須存儲下來的(這裏的存儲不必定是指持久化,也能夠是放在內存中,取決於延時任務的重要程度)。首先要存儲的就是任務的描述。假如你要處理的延時任務是延時發佈資訊,那麼你至少要存儲資訊的id吧。另外,若是你有多種任務類型,好比:延時推送消息、延時清洗數據等等,那麼你還須要存儲任務的類型。以上總總,都歸屬於任務描述。除此以外,你還必須存儲任務執行的時間點吧,通常來講就是時間戳。此外,咱們還須要根據任務的執行時間進行排序,由於延時任務隊列裏的任務可能會有不少,只有到了時間點的任務才應該被執行,因此必須支持對任務執行時間進行排序。數據結構
使用Redis實現延時任務多線程
以上就是一個延遲任務系統必須具有的要素了。回到redis,有什麼數據結構能夠既存儲任務描述,又能存儲任務執行時間,還能根據任務執行時間進行排序呢?想來想去,彷佛只有Sorted Set了。咱們能夠把任務的描述序列化成字符串,放在Sorted Set的value中,而後把任務的執行時間戳做爲score,利用Sorted Set自然的排序特性,執行時刻越早的會排在越前面。這樣一來,咱們只要開一個或多個定時線程,每隔一段時間去查一下這個Sorted Set中score小於或等於當前時間戳的元素(這能夠經過zrangebyscore命令實現),而後再執行元素對應的任務便可。固然,執行完任務後,還要將元素從Sorted Set中刪除,避免任務重複執行。若是是多個線程去輪詢這個Sorted Set,還有考慮併發問題,假如說一個任務到期了,也被多個線程拿到了,這個時候必須保證只有一個線程能執行這個任務,這能夠經過zrem命令來實現,只有刪除成功了,才能執行任務,這樣就能保證任務不被多個任務重複執行了。架構
接下來看代碼。首先看下項目結構:併發
一共4個類:Constants類定義了Redis key相關的常量。DelayTaskConsumer是延時任務的消費者,這個類負責從Redis拉取到期的任務,並封裝了任務消費的邏輯。DelayTaskProducer則是延時任務的生產者,主要用於將延時任務放到Redis中。RedisClient則是Redis客戶端的工具類。ide
最主要的類就是DelayTaskConsumer和DelayTaskProducer了。工具
咱們先來看下生產者DelayTaskProducer:性能
代碼很簡單,就是將任務描述(爲了方便,這裏只存儲資訊的id)和任務執行的時間戳放到Redis的Sorted Set中。
接下來是延時任務的消費者DelayTaskConsumer:
public class DelayTaskConsumer {
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public void start(){
scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(),1,1, TimeUnit.SECONDS);
}
public static class DelayTaskHandler implements Runnable{
public void run() {
Jedis client = RedisClient.getClient();
try {
Set ids = client.zrangeByScore(Constants.DELAY_TASK_QUEUE, 0, System.currentTimeMillis(),
0, 1);
if(ids==null||ids.isEmpty()){
return;
}
for(String id:ids){
Long count = client.zrem(Constants.DELAY_TASK_QUEUE, id);
if(count!=null&&count==1){
System.out.println(MessageFormat.format("發佈資訊。id - {0} , timeStamp - {1} , " +
"threadName - {2}",id,System.currentTimeMillis(),Thread.currentThread().getName()));
}
}
}finally {
client.close();
}
}
}
}
首先看start方法。在這個方法裏面咱們利用Java的ScheduledExecutorService開了一個調度線程池,這個線程池會每隔1秒鐘調度DelayTaskHandler中的run方法。
DelayTaskHandler類就是具體的調度邏輯了。主要有2個步驟,一個是從Redis Sorted Set中拉取到期的延時任務,另外一個是執行到期的延時任務。拉取到期的延時任務是經過zrangeByScore命令實現的,處理多線程併發問題是經過zrem命令實現的。代碼不復雜,這裏就很少作解釋了。
接下來測試一下:
咱們首先生產了4個延時任務,執行時間分別是程序開始運行後的5秒、10秒、15秒、20秒,而後啓動了10個消費者去消費延時任務。運行效果以下:
能夠看到,任務確實可以在相應的時間點左右被執行,不過有少量時間偏差,這個是由於咱們拉取到期任務是經過定時任務拉取而不是實時推送的,並且拉取任務時有一部分網絡開銷,再者,咱們的任務處理邏輯是同步處理的,須要上一次的任務處理完,才能拉取下一批任務,這些因素都會形成延時任務的執行時間產生誤差。
總結
以上就是經過Redis實現延時任務的思路了。這裏提供的只是一個最簡單的版本,實際上還有不少地方能夠優化。好比,咱們能夠把任務的處理邏輯再放到單獨的線程池中去執行,這樣的話任務消費者只須要負責任務的調度就能夠了,好處就是能夠減小任務執行時間誤差。還有就是,這裏爲了方便,任務的描述存儲的只是任務的id,若是有多種不一樣類型的任務,像前面說的發送資訊任務和推送消息任務,那麼就要經過額外存儲任務的類型來進行區分,或者使用不一樣的Sorted Set來存放延時任務了。
除此以外,上面的例子每次拉取延時任務時,只拉取1個,若是說某一個時刻要處理的任務數很是多,那麼會有一部分任務延遲比較嚴重,這裏能夠優化成每次拉取不止1個到期的任務,好比說10個,而後再逐個進行處理,這樣的話能夠極大地提高調度效率,由於若是是使用上面的方法,拉取10個任務須要10次調度,每次間隔1秒,總共須要10秒才能把10個任務拉取完,若是改爲一次拉取10個,只須要1次就能完成了,效率提高仍是挺大的。
固然還能夠從另外一個角度來優化。你們看上面的代碼,當拉取到待執行任務時,就直接執行任務,任務執行完該線程也就退出了,可是這個時候,隊列裏可能還有不少待執行的任務(由於咱們拉取任務時,限制了拉取的數量),因此其實在這裏可使用循環,當拉取不到待執行任務時,才結束調度,當有任務時,執行完還有順便查詢下有沒有堆積的任務,直到沒有堆積任務了才結束線程。
最後一個須要考慮的地方是,上面的代碼並無對任務執行失敗的狀況進行處理,也就是說若是某個任務執行失敗了,那麼連重試的機會都沒有。所以,在生產環境使用時,還須要考慮任務處理失敗的狀況。有一個簡單的方法是在任務處理時捕獲異常,當在處理過程當中出現異常時,就將該任務再放回Redis Sorted中,或者由當前線程再重試處理。不過這樣作的話,任務的時效性就不能保證了,有可能原本定在早上7點執行的任務,由於失敗重試的緣由,延遲到7點10分才執行完成,這個要根據業務來進行權衡,好比能夠在任務描述中增長重試次數或者是否容許重試的字段,這樣在任務執行失敗時,就能根據不一樣的任務採起不一樣的補償措施了。
那麼使用redis實現延時任務有什麼優缺點呢?優勢就是能夠知足吞吐量。缺點則是存在任務丟失的風險(當redis實例掛了的時候)。所以,若是對性能要求比較高,同時又能容忍少數狀況下任務的丟失,那麼可使用這種方式來實現。
歡迎工做一到五年的Java工程師朋友們加入Java架構開發:744677563
本羣提供免費的學習指導 架構資料 以及免費的解答
不懂得問題均可以在本羣提出來 以後還會有職業生涯規劃以及面試指導