Timer和ScheduledThreadPoolExecutor的定時任務

1 目錄

2 調度概述

  • 1 說到調度,有最簡單的Timer、ScheduledThreadPoolExecutor,又有Spring Task、quartz。數據庫

  • 2 說到分佈式調度,有基於數據庫實現的quartz集羣方案、噹噹網開源的基於ZooKeeper的elastic-jobapache

  • 3 說到大數據方向的調度,如apache的oozie、阿里的zeus。他們更可能是定製大數據方向的job,如MapReduce job,spark job,hive job等,還有解決了上面都沒觸碰的job依賴管理。數組

  • 4 說到集羣資源的調度,如Yarn、Mesos。他們則是更高度的抽象,他們把調度拆分紅資源調度任務調度,他們只負責資源方面的調度。微信

    資源調度:僅僅負責對集羣全部機器的CPU 、內存、網絡等資源進行統一規劃和管理,有任務到來時,經過合理的分配(達到資源充分利用的目的)挑選出對應資源交付給任務來執行。網絡

    任務調度:就要提到任務模型了,如MapReduce是一種任務模型,每一種任務模型有有對應的ApplicationMaster來負責任務調度,如MapReduce的就是MRAppMaster。MRAppMaster負責任務的分片,任務的failover,任務之間的邏輯、向Yarn申請資源來執行Map任務或者Reduce任務等等。目前Yarn已經集成了對MapReduce、Spark等任務模型的處理,若是你還想在Yarn上運行本身的任務模型,就須要實現一個本身的ApplicationMaster來負責任務調度。多線程

3 Timer

Timer是單線程的,比較簡單,一個線程TimerThread 加一個任務隊列TaskQueue,每個任務都是TimerTask類型的。分佈式

提供了以下方式來調度任務的執行,這個就再也不說了,本身看下文檔大數據

Timer的調度方法

3.1 TimerThread執行過程

見下圖 TimerThread的執行過程spa

  • 1 一旦隊列是空的,就進行等待,直到隊列中有數據
  • 2 一旦Timer被取消,就會設置newTasksMayBeScheduled=false,而且清空隊列。正常狀況下走到這裏隊列是有數據的,沒有數據則說明Timer被取消了,因此這裏就退出while循環了,TimerThread執行結束了
  • 3 若是queue有任務,則取出最近要執行的任務,查看該任務是否取消了,一旦取消了,就從queue中移除,繼續下一次循環
  • 4 若是該任務沒有取消,則判斷是否到達該任務的執行時間了,若是到達即taskFired=true,若是該任務不是週期任務,則直接從queue中刪除該任務
  • 5 若是該任務時週期性任務,計算出下次執行時間,再將該任務放到queue中(實際上是重新調整該任務在隊列中的位置)
  • 6 若是該任務還沒到觸發時間,則等待一段時間
  • 7 若是該任務觸發了,就直接調用任務的run方法

存在的比較嚴重的問題:這裏能夠看到,這裏並無對任務的run方法可能拋出的異常進行捕獲,就會致使,一旦某個TimerTask任務拋出異常,就會致使TimerThread結束,Timer不可用。因此在使用Timer的時候,本身實現的TimerTask要對可能的異常進行捕獲和處理。.net

3.2 TaskQueue對TimerTask的存儲

從上面看到,是須要可以從TaskQueue中獲取最近要執行的一個任務。若是對全部任務的執行時間進行排序存儲也能夠實現,可是該場景就沒有必要,只須要知道一個最小的執行時間,因此使用二叉堆來進行存儲,又分最大堆和最小堆,這裏使用最小堆,這裏再也不詳細說明。

最小堆:就是全部的父節點的值都比左右孩子節點的值小。因此最小的值必定是在堆頂。

二叉堆通常使用數組來實現,TaskQueue實現以下:

private TimerTask[] queue = new TimerTask[128];
private int size = 0;

size用於標記該數組中任務的個數

添加一個任務的過程:

void add(TimerTask task) {
    // Grow backing store if necessary
    if (size + 1 == queue.length)
        queue = Arrays.copyOf(queue, 2*queue.length);

    queue[++size] = task;
    fixUp(size);
}

這裏會先判斷是否須要擴容,而後會將任務放到數組的末尾,而後調用fixUp(size)方法來調整該任務在數組中的位置,以達到二叉堆的特性。

private void fixUp(int k) {
    while (k > 1) {
        int j = k >> 1;
        if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
            break;
        TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
        k = j;
    }
}

這裏也很簡單,就是不斷的找出該任務的父節點,判斷該任務節點的下一次執行時間和父節點的下一次執行時間的大小,若是小於父節點的話,則與父節點交換位置,繼續往上查找父節點再重複上述比較。

4 ScheduledThreadPoolExecutor

4.1 繼承ThreadPoolExecutor

Timer是單線程的,而ScheduledThreadPoolExecutor是多線程的。ScheduledThreadPoolExecutor繼承了線程池ThreadPoolExecutor,總體的執行流程不變,仍是有那幾個核心東西

  • int corePoolSize
  • int maximumPoolSize
  • long keepAliveTime
  • BlockingQueue<Runnable> workQueue
  • ThreadFactory threadFactory
  • RejectedExecutionHandler handler

ThreadPoolExecutor這一部分能夠參考我以前的文章線程池系列分析

從ThreadPoolExecutor的原理中能夠看到並無對job的定時調度功能,它裏面的Worker並不會去延遲執行一個任務,由於它是通用的,而Timer中的TimerThread是專用的,能夠將延遲邏輯放到TimerThread中,而讓TaskQueue更輕量的專作簡單的二叉堆存儲操做,ScheduledThreadPoolExecutor如何來基於ThreadPoolExecutor來實現定時任務呢?

那就只能將TimerThread中作的延遲邏輯放到queue中來作,ScheduledThreadPoolExecutor爲此實現了一個DelayedWorkQueue。在取任務的時候會作延遲邏輯。

4.2 實現定時功能

咱們知道ThreadPoolExecutor會調用BlockingQueue<Runnable> workQueue的offer方法添加任務,會調用task、poll方法來獲取任務,來看看DelayedWorkQueue是如何實現這2個操做的

對任務的添加:也是採用二叉堆形式來存放這些任務的,和上述Timer的添加任務方法相似,存儲結構以下

private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
private int size = 0;

對於任務的獲取:普通BlockingQueue<Runnable> workQueue中有數據了,poll方法就能夠取到數據,而DelayedWorkQueue則不是,只有當有數據,而且達到任務執行時間了,才能poll出數據,從而實現對定時任務的調度。下面來詳細看下這個poll過程

DelayedWorkQueue的poll過程

  • 1 因爲在多線程環境下采用數據來存儲,須要使用鎖來控制
  • 2 而後判斷數組中第一個是否爲null,若是爲null,而且沒有等待時間已經用完,就直接返回了,本次poll就沒有拿到數據
  • 3 若是數組中第一個是null,則等待一段時間,知道時間超時或者被喚醒(在添加任務的時候會進行喚醒操做)
  • 4 若是數組中第一個任務不爲null,而且到了觸發時間,則直接從數組中取出該任務,而後進行必定的調整,是數組中的數據仍然知足二叉堆的性質,而後將取出的任務返回,用於去執行
  • 5 若是數組中的任務沒有到觸發時間,則等待到它的觸發時間

5 未完待續

下一篇就來講說quartz的簡單模式,Spring Task對定時任務的封裝,以及quartz基於數據庫的集羣模式。

歡迎關注微信公衆號:乒乓狂魔

微信公衆號

相關文章
相關標籤/搜索