本平臺的文章更新會有延遲,你們能夠關注微信公衆號-顧林海,包括年末前會更新kotlin由淺入深系列教程,目前計劃在微信公衆號進行首發,若是你們想獲取最新教程,請關注微信公衆號,謝謝!java
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,而ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務,ScheduledThreadPoolExecutor是一個實現定時任務的類,能夠在給定的延遲後運行命令,或者按期執行命令。web
ScheduledThreadPoolExecutor定義了四個構造函數,這四個構造函數以下:算法
/** * @param corePoolSize 核心線程池的大小 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
/** * @param corePoolSize 核心線程池的大小 * @param threadFactory 用於設置建立線程的工廠 */
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
/** * @param corePoolSize 核心線程池的大小 * @param handler 飽和策略 */
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
/** * @param corePoolSize 核心線程池的大小 * @param threadFactory 用於設置建立線程的工廠 * @param handler 飽和策略 */
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
複製代碼
經過源碼能夠發現,ScheduledThreadPoolExecutor的構造器都是調用父類的構造器也就是ThreadPoolExecutor的構造器,以此來建立一個線程池。數組
ThreadPoolExecutor的構造器以下:緩存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
建立一個線程池時須要輸入幾個參數,以下:安全
corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其它空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立,會把到達的任務放到緩存隊列當中。若是調用了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部基本線程,或調用線程池的prestartCoreThread()方法,線程池會提早建立一個線程。微信
maximumPoolSize(線程池最大數量):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,若是使用了無界的任務隊列這個參數就沒什麼效果。框架
KeepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存貨的時間。若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提供線程的利用率。函數
unit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)、千分之一毫秒和納秒(NANOSECONDS、千分之一微妙)。this
workQueue(任務隊列):用於保持等待執行的任務的阻塞隊列,能夠選擇如下幾個阻塞隊列。 (1)ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。 (2)LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。 (3)SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue。 (4)PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
ThreadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。在JDK1.5中Java線程池框架提供了4種策略(也可經過實現RejectedExecutionHandler接口自定義策略)。 (1)AbortPolicy:直接拋出異常。 (2)CallerRunsPolicy:只用調用者所在線程來運行任務。 (3)DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。 (4)DiscardPolicy:處理,丟棄掉。
在ScheduledThreadPoolExecutor構造器中使用了工做隊列java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,DelayedWorkQueue是一個無界的BlockingQueue, 用於放置實現了Delayed接口的對象,其中的對象只能在其到期才能從隊列中取走。
因爲ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也實現了ThreadPoolExecutor的方法,以下:
public void execute(Runnable command) {
...
}
public Future<?> submit(Runnable task) {
...
}
public <T> Future<T> submit(Runnable task, T result) {
...
}
public <T> Future<T> submit(Callable<T> task) {
...
}
複製代碼
同時它也有本身的定時執行任務的方法:
/** * 延遲delay時間後開始執行task,沒法獲取task的執行結果。 */
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
...
}
/** * 延遲delay時間後開始執行callable,它接收的是一個Callable實例, * 此方法會返回一個ScheduleFuture對象,經過ScheduleFuture咱們 * 能夠取消一個未執行的task,也能夠得到這個task的執行結果。 */
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
...
}
/** * 延遲initialDelay時間後開始執行command,而且按照period時間週期性 * 重複調用,當任務執行時間大於間隔時間時,以後的任務都會延遲,此時與 * Timer中的schedule方法相似。 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
...
}
/** *延遲initialDelay時間後開始執行command,而且按照period時間週期性重複 *調用,這裏的間隔時間delay是等上一個任務徹底執行完畢纔開始計算。 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
...
}
複製代碼
ScheduledThreadPoolExecutor把待調度的任務放到一個DelayedWorkQueue ,而且DelayedWorkQueue 是一個無界隊列,ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什麼意義。整個ScheduledThreadPoolExecutor的運行能夠分爲兩大部分。
一、當調用ScheduledThreadPoolExecutor的上面4個方法時,會向ScheduledThreadPoolExecutor的DelayedWorkQueue 添加一個實現了RunnableScheduleFuture接口的ScheduledFutureTask,以下ScheduledThreadPoolExecutor其中的一個schedule方法。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);//向ScheduledThreadPoolExecutor的DelayedWorkQueue添加一個實現了RunnableScheduleFuture接口的ScheduledFutureTask
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
複製代碼
二、線程池中的線程從DelayedWorkQueue 中獲取ScheduledFutureTask,而後執行。
ScheduledFutureTask是ScheduledThreadPoolExecutor的內部類並繼承自FutureTask,包含3個成員變量。
//ong型成員變量sequenceNumber,表示這個任務被添加到
//ScheduledThreadPoolExecutor中的序號。
private final long sequenceNumber;
//long型成員變量time,表示這個任務將要被執行的具體時間。
private volatile long time;
//long型成員變量period,表示任務執行的間隔週期。
private final long period;
ScheduledFutureTask內部實現了compareTo()方法,用於對task的排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
複製代碼
排序時,time小的排在前面,若是兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面。
DelayedWorkQueue 內部使用了二叉堆算法,DelayedWorkQueue 中的元素第一個元素永遠是 延遲時間最小的那個元素。當執行 schedule 方法是。若是不是重複的任務,那任務從 DelayedWorkQueue 取出以後執行完了就結束了。若是是重複的任務,那在執行結束前會重置執行時間並將本身從新加入到 DelayedWorkQueue 中
總結來講,ScheduledThreadPoolExecutor是一個實現ScheduledExecutorService的能夠調度任務的執行框架;DelayedWorkQueue是一個數組實現的阻塞隊列,根據任務所提供的時間參數來調整位置,實際上就是個小根堆(優先隊列);ScheduledFutureTask包含任務單元,存有時間、週期、外部任務、堆下標等調度過程當中必須用到的參數,被工做線程執行。ScheduledThreadPoolExecutor與Timer都是用做定時任務,它們直接的差別是Timer使用的是絕對時間,系統時間的改變會對Timer產生必定的影響;而ScheduledThreadPoolExecutor使用的是相對時間,不會致使這個問題。Timer使用的是單線程來處理任務,長時間運行的任務會致使其餘任務的延遲處理;而ScheduledThreadPoolExecutor能夠自定義線程數量。而且Timer沒有對運行時異常進行處理,一旦某個任務觸發運行時異常,會致使整個Timer崩潰;而ScheduledThreadPoolExecutor對運行時異常作了捕獲(經過afterExecute()回調方法中進行處理),因此更安全。