大多數併發應用程序是圍繞執行任務(Task)進行管理的。把一個應用程序的工做(work)分離到任務中,能夠簡化程序的管理;這種分離還在不一樣事物間劃分了天然的分界線,能夠方便程序在出現錯誤時進行恢復;同時這種分離還能夠爲並行工做提供一個天然的結構,有利於提升程序的併發性。java
所謂的任務拆分就是肯定每個執行任務(工做單元)的邊界。理想狀況下獨立的工做單元有最大的吞吐量,這些工做單元不依賴於其它工做單元的狀態、結果或者其餘資源等。所以將任務儘量的拆分紅一個個獨立的工做單元有利於提升程序的併發性。api
任務的執行策略包括4W3H 部分:緩存
如何知足上面的條件安全
Executor 的execute 方法只是執行一個Runnable 的任務數據結構
ExecutorService 在Executor 的基礎上增長了一些方法,其中有兩個核心的方法:多線程
這兩個方法都是向線程池中提交任務,它們的區別在於Runnable 在執行完畢後沒有結果,Callable 執行完畢後有一個結果。這在多個線程中傳遞狀態和結果是很是有用的。另外他們的相同點在於都返回一個Future 對象。Future 對象能夠阻塞線程直到運行完畢(獲取結果,若是有的話),也能夠取消任務執行,固然也可以檢測任務是否被取消或者是否執行完畢。併發
ScheduledExecutorService 描述的功能和Timer/TimerTask 相似,解決那些須要任務重複執行
的問題。這包括延遲時間一次性執行、延遲時間週期性執行以及固定延遲時間週期性執行等。固然了繼承ExecutorService 的ScheduledExecutorService 擁有ExecutorService 的所有特性。異步
ScheduledThreadPoolExecutor 是繼承ThreadPoolExecutor 的ScheduledExecutorService 接口實現,週期性任務調度的類實現性能
CompletionService 接口,它是用於描述順序獲取執行結果的一個線程池包裝器ui
Executors 類裏面提供了一些靜態工廠,生成一些經常使用的線程池
線程是有多種執行狀態的,一樣管理線程的線程池也有多種狀態。JVM 會在全部線程(非後臺daemon 線程)所有終止後才退出,爲了節省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候沒法正確的關閉線程池,將會阻止JVM 的結束。
線程池Executor 是異步的執行任務,所以任什麼時候刻不可以直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。所以關閉線程池可能出現一下幾種狀況:
線程池的四種狀態:
shutdownNow()會返回那些已經進入了隊列可是尚未執行的任務列表
awaitTermination 描述的是等待線程池關閉的時間,若是等待時間線程池尚未關閉將會拋出一個超時異常。
除了以上數據結構之外,ThreadPoolExecutor 還有一些狀態用來描述線程池的運行計數,例如線程池運行的任務數、曾經達到的最大線程數
public interface ThreadFactory {
Thread newThread(Runnable r);
}
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
同一個線程池的全部線程屬於同一個線程組,也就是建立線程池的那個線程組
另外對於線程池中的全部線程默認都轉換爲非後臺線程,這樣主線程退出時不會直接退出JVM,而是等待線程池結束。還有一點就是默認將線程池中的全部線程都調爲同一個級別,這樣在操做系統角度來看全部系統都是公平的,不會致使競爭堆積。
一個線程Worker 被構造出來之後就開始處於運行狀態。
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { task.run(); } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } }
當提交一個任務時,若是須要建立一個線程(什麼時候須要在下一節中探討)時,就調用線程工廠建立一個線程,同時將線程綁定到Worker 工做隊列中。須要說明的是,Worker 隊列構造的時候帶着一個任務Runnable
一旦線程池啓動線程後(調用線程run())方法,那麼線程工做隊列Worker 就從第1個任務開始執行(這時候發現構造Worker 時傳遞一個任務的好處了),一旦第1個任務執行完畢,就從線程池的任務隊列中取出下一個任務進行執行。循環如此,直到線程池被關閉或者任務拋出了一個RuntimeException。
全部失敗的任務都將被「當前」的任務拒絕策略RejectedExecutionHandler 處理。
public void execute(Runnable command) { //1 if (command == null) throw new NullPointerException(); //2,3 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //4 if (runState == RUNNING && workQueue.offer(command)) { //5 if (runState != RUNNING || poolSize == 0) //6 ensureQueuedTaskHandled(command); } //7 else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
上述過程:
什麼時候任務當即執行
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Object> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
在Future 接口中提供了5個方法。
初始狀況下任務狀態state=0,任務執行(innerRun)後狀態變爲運行狀態RUNNING(state=1),執行完畢後變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消後就變爲狀態CANCELLED(state=4)
void innerRun() { if (!compareAndSetState(0, RUNNING)) return; try { runner = Thread.currentThread(); if (getState() == RUNNING) // recheck after setting thread innerSet(callable.call()); else releaseShared(0); // cancel } catch (Throwable ex) { innerSetException(ex); } }
執行一個任務有四步:設置運行狀態、設置當前線程(AQS 須要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這裏也能夠看到,一個任務只能執行一次,由於執行完畢後它的狀態不在爲初始值0,要麼爲CANCELLED,要麼爲RAN。
boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; if (compareAndSetState(s, CANCELLED)) break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } releaseShared(0); done(); return true; }
V innerGet() throws InterruptedException, ExecutionException { acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } //AQS#acquireSharedInterruptibly public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); //park current Thread for result } protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; }
當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS 的使用方式了。這裏獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。若是不知足條件,那麼在AQS 中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到知足條件
ScheduledThreadPoolExecutor 和ThreadPoolExecutor 的惟一區別在於任務是有序(按照執行時間順序)的,而且須要到達時間點(臨界點)才能執行,並非任務隊列中有任務就須要執行的。也就是說惟一不一樣的就是任務隊列BlockingQueue
java.util.concurrent.DelayQueue
因爲DelayQueue 在獲取元素時須要檢測元素是否「可用」,也就是任務是否達到「臨界點」(指定時間點),所以加入元素和移除元素會有一些額外的操做。
移除元素(出隊列)的過程是這樣的:
一樣加入元素也會有相應的條件變量操做。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才須要喚醒「等待線程」去檢測元素。由於頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。