一.類圖說明緩存
Executor接口類,執行Runnable接口execute。函數
ExecutorService接口類繼承Executor接口,包含提交執行Runnable和Callable接口submit以及shutdown,invokeAll接口。this
ScheduledExecutorService接口類繼承ExecutorService接口,主要包含計劃執行接口schedule,scheduleAtFixedRate以及scheduleWithFixedDelay。spa
虛類AbstractExecutorService,實現ExecutorService接口。實現ExecutorService相關接口。線程
ThreadPoolExecutor類,繼承虛類AbstractExecutorService。實現初始化線程池數量、策略並執行線程。code
ScheduledThreadPoolExecutor類,繼承ThreadPoolExecutor類並實現ScheduledExecutorService接口。實現計劃執行相關接口。對象
執行類,定義ThreadPoolExecutor和ScheduledThreadPoolExecutor類,並使用相關concurrent類方法。繼承
二.簡述下從定義線程池,到其底層執行的簡單流程接口
ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.submit(new NewTask());
簡單定義瞭如上兩句代碼,JDK自動建立3個固定大小的線程的線程池,submit實現了Runnable接口的NewTask對象之後,JDK自動啓動線程並執行NewTask對象的run方法。流程是如何的呢?隊列
1.Executors的newFixedThreadPool方法new了一個ThreadPoolExecutor對象,且new了一個LinkedBlockingQueue對象。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.調用ThreadPoolExecutor的構造函數,其中ThreadFactory默認使用defaultThreadFactory,defaultHandler爲AbortPolicy。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
3.初始化之後,調用AbstractExecutorService類的submit方法,執行Runnable對象。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
4.RunnableAdapter實現了Callable接口,FutureTask類包含callable類成員對象。
FutureTask實現RunnableFuture接口,RunnableFuture繼承Runnable接口。因此newTaskFor(task, null)方法返回一個FutureTask對象。
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V> RunnableFuture<Void> ftask = newTaskFor(task, null); protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
5.關鍵的調用ThreadPoolExecutor類execute方法,管理線程池並執行Runnable任務。主要邏輯以下:
1)若是線程數量小於corePoolSize,新來一個任務,建立一個新線程。
2)若是線程數量等於corePoolSize,則將任務緩存到workQueue中。
3)若是線程數量等於corePoolSize而且workQueue隊列已滿,則繼續建立線程直到等於maximumPoolSize。
4)若是線程數量等於maximumPoolSize,且workQueue隊列已滿,則根據defaultHandler策略執行相應措施。默認是AbortPolicy,拋出一個運行時異常RejectedExecutionException。另外還有3種策略:CallerRunsPolicy->若是線程池沒有shutdown,則直接調用Runnable的run方法執行。若是線程池shutdown,則直接丟棄;DiscardPolicy->直接丟棄,沒有任何異常;DiscardOldestPolicy->丟棄最老的任務,並調用線程池的execute方法執行,若是線程池沒有shutdown。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }