JDK5之後將工做單元和執行機制分離開來,工做單元包括Runnable和Callable;執行機制由Executor框架提供,管理線程的生命週期,將任務的提交和如何執行進行解耦。Executors是一個快速獲得線程池的工具類,相關的類圖以下所示:java
Executor接口數組
Executor接口只有一個execute方法,用來替代一般建立或啓動線程的方法。框架
public interface Executor { void execute(Runnable command); }
ExecutorService接口異步
ExecutorService接口繼承自Executor接口,加入了關閉方法、submit方法和對Callable、Future的支持。ide
ScheduledExecutorService接口工具
ScheduledExecutorService擴展ExecutorService接口並加入了對定時任務的支持。oop
ThreadPoolExecutor繼承自AbstractExecutorService,也是實現了ExecutorService接口。ui
4.1 內部狀態this
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是對線程池的運行狀態(高3位)和線程池中有效線程的數量(低29位)進行控制的一個字段。線程池有五種狀態,分別是:atom
4.2 構造方法
構造方法有4個,這裏只列出其中最基礎的一個。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
構造方法中參數的含義以下:
4.3 execute方法
ThreadPoolExecutor.execute(task)實現了Executor.execute(task),用來提交任務,不能獲取返回值,代碼以下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 /* 26 * workerCountOf方法取出低29位的值,表示當前活動的線程數; 27 * 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中; 28 * 並把任務添加到該線程中。 29 */ 30 31 if (workerCountOf(c) < corePoolSize) { 32 /* 33 * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷; 34 * 若是爲true,根據corePoolSize來判斷; 35 * 若是爲false,則根據maximumPoolSize來判斷 36 */ 37 if (addWorker(command, true)) 38 return; 39 /* 40 * 若是添加失敗,則從新獲取ctl值 41 */ 42 c = ctl.get(); 43 } 44 /* 45 * 線程池處於RUNNING狀態,把提交的任務成功放入阻塞隊列中 46 */ 47 if (isRunning(c) && workQueue.offer(command)) { 48 // 從新獲取ctl值 49 int recheck = ctl.get(); 50 // 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了, 51 // 這時須要移除該command 52 // 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回 53 if (! isRunning(recheck) && remove(command)) 54 reject(command); 55 /* 56 * 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法 57 * 這裏傳入的參數表示: 58 * 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動; 59 * 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷; 60 * 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。 61 */ 62 else if (workerCountOf(recheck) == 0) 63 addWorker(null, false); 64 } 65 /* 66 * 若是執行到這裏,有兩種狀況: 67 * 1. 線程池已經不是RUNNING狀態; 68 * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。 69 * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize; 70 * 若是失敗則拒絕該任務 71 */ 72 else if (!addWorker(command, false)) 73 reject(command); 74 }
若是線程池狀態一直是RUNNING,則執行過程以下:
4.4 addWorker方法
從executor的方法實現能夠看出,addWorker主要負責建立新的線程並執行任務。線程池建立新線程執行任務時,須要獲取全局鎖:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 // 獲取運行狀態 6 int rs = runStateOf(c); 7 /* 8 * 這個if判斷 9 * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務; 10 * 接着判斷如下3個條件,只要有1個不知足,則返回false: 11 * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務 12 * 2. firsTask爲空 13 * 3. 阻塞隊列不爲空 14 * 15 * 首先考慮rs == SHUTDOWN的狀況 16 * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false; 17 * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false, 18 * 由於隊列中已經沒有任務了,不須要再添加線程了 19 */ 20 // Check if queue empty only if necessary. 21 if (rs >= SHUTDOWN && 22 ! (rs == SHUTDOWN && 23 firstTask == null && 24 ! workQueue.isEmpty())) 25 return false; 26 27 for (;;) { 28 // 獲取線程數 29 int wc = workerCountOf(c); 30 // 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false; 31 // 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較, 32 // 若是爲false則根據maximumPoolSize來比較。 33 if (wc >= CAPACITY || 34 wc >= (core ? corePoolSize : maximumPoolSize)) 35 return false; 36 // 嘗試增長workerCount,若是成功,則跳出第一個for循環 37 if (compareAndIncrementWorkerCount(c)) 38 break retry; 39 // 若是增長workerCount失敗,則從新獲取ctl的值 40 c = ctl.get(); // Re-read ctl 41 // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行 42 if (runStateOf(c) != rs) 43 continue retry; 44 // else CAS failed due to workerCount change; retry inner loop 45 } 46 } 47 48 boolean workerStarted = false; 49 boolean workerAdded = false; 50 Worker w = null; 51 try { 52 // 根據firstTask來建立Worker對象 53 w = new Worker(firstTask); 54 // 每個Worker對象都會建立一個線程 55 final Thread t = w.thread; 56 if (t != null) { 57 final ReentrantLock mainLock = this.mainLock; 58 mainLock.lock(); 59 try { 60 // Recheck while holding lock. 61 // Back out on ThreadFactory failure or if 62 // shut down before lock acquired. 63 int rs = runStateOf(ctl.get()); 64 // rs < SHUTDOWN表示是RUNNING狀態; 65 // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。 66 // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務 67 if (rs < SHUTDOWN || 68 (rs == SHUTDOWN && firstTask == null)) { 69 if (t.isAlive()) // precheck that t is startable 70 throw new IllegalThreadStateException(); 71 // workers是一個HashSet 72 workers.add(w); 73 int s = workers.size(); 74 // largestPoolSize記錄着線程池中出現過的最大線程數量 75 if (s > largestPoolSize) 76 largestPoolSize = s; 77 workerAdded = true; 78 } 79 } finally { 80 mainLock.unlock(); 81 } 82 if (workerAdded) { 83 // 啓動線程,執行任務(Worker.thread(firstTask).start()); 84 //啓動時會調用Worker類中的run方法,Worker自己實現了Runnable接口,因此一個Worker類型的對象也是一個線程。 85 t.start(); 86 workerStarted = true; 87 } 88 } 89 } finally { 90 if (! workerStarted) 91 addWorkerFailed(w); 92 } 93 return workerStarted; 94 }
4.5 Worker類
線程池中的每個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象。Worker類設計以下:
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 /** 6 * This class will never be serialized, but we provide a 7 * serialVersionUID to suppress a javac warning. 8 */ 9 private static final long serialVersionUID = 6138294804551838833L; 10 11 /** Thread this worker is running in. Null if factory fails. */ 12 final Thread thread; 13 /** Initial task to run. Possibly null. */ 14 Runnable firstTask; 15 /** Per-thread task counter */ 16 volatile long completedTasks; 17 18 /** 19 * Creates with given first task and thread from ThreadFactory. 20 * @param firstTask the first task (null if none) 21 */ 22 Worker(Runnable firstTask) { 23 setState(-1); // inhibit interrupts until runWorker 24 this.firstTask = firstTask; 25 this.thread = getThreadFactory().newThread(this); 26 } 27 28 /** Delegates main run loop to outer runWorker */ 29 public void run() { 30 runWorker(this); 31 } 32 33 // Lock methods 34 // 35 // The value 0 represents the unlocked state. 36 // The value 1 represents the locked state. 37 38 protected boolean isHeldExclusively() { 39 return getState() != 0; 40 } 41 42 protected boolean tryAcquire(int unused) { 43 if (compareAndSetState(0, 1)) { 44 setExclusiveOwnerThread(Thread.currentThread()); 45 return true; 46 } 47 return false; 48 } 49 50 protected boolean tryRelease(int unused) { 51 setExclusiveOwnerThread(null); 52 setState(0); 53 return true; 54 } 55 56 public void lock() { acquire(1); } 57 public boolean tryLock() { return tryAcquire(1); } 58 public void unlock() { release(1); } 59 public boolean isLocked() { return isHeldExclusively(); } 60 61 void interruptIfStarted() { 62 Thread t; 63 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 64 try { 65 t.interrupt(); 66 } catch (SecurityException ignore) { 67 } 68 } 69 } 70 }
4.6 runWorker方法
Worker類中的run方法調用了runWorker方法來執行任務,執行過程以下:
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 // 獲取第一個任務 4 Runnable task = w.firstTask; 5 w.firstTask = null; 6 // 容許中斷 7 w.unlock(); // allow interrupts 8 boolean completedAbruptly = true; 9 try { 10 // 若是task爲空,則經過getTask來獲取任務 11 while (task != null || (task = getTask()) != null) { 12 w.lock(); 13 // If pool is stopping, ensure thread is interrupted; 14 // if not, ensure thread is not interrupted. This 15 // requires a recheck in second case to deal with 16 // shutdownNow race while clearing interrupt 17 if ((runStateAtLeast(ctl.get(), STOP) || 18 (Thread.interrupted() && 19 runStateAtLeast(ctl.get(), STOP))) && 20 !wt.isInterrupted()) 21 wt.interrupt(); 22 try { 23 beforeExecute(wt, task); 24 Throwable thrown = null; 25 try { 26 task.run(); 27 } catch (RuntimeException x) { 28 thrown = x; throw x; 29 } catch (Error x) { 30 thrown = x; throw x; 31 } catch (Throwable x) { 32 thrown = x; throw new Error(x); 33 } finally { 34 afterExecute(task, thrown); 35 } 36 } finally { 37 task = null; 38 w.completedTasks++; 39 w.unlock(); 40 } 41 } 42 completedAbruptly = false; 43 } finally { 44 processWorkerExit(w, completedAbruptly); 45 } 46 }
4.7 getTask方法
getTask方法用來從阻塞隊列中取等待的任務
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // Check if queue empty only if necessary. 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 // Are workers subject to culling? 17 // timed變量用於判斷是否須要進行超時控制。 18 // allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時; 19 // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量; 20 // 對於超過核心線程數量的這些線程,須要進行超時控制 21 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 22 23 if ((wc > maximumPoolSize || (timed && timedOut)) 24 && (wc > 1 || workQueue.isEmpty())) { 25 if (compareAndDecrementWorkerCount(c)) 26 return null; 27 continue; 28 } 29 30 try { 31 /* 32 * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null; 33 * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。 34 * 35 */ 36 Runnable r = timed ? 37 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 38 workQueue.take(); 39 if (r != null) 40 return r; 41 timedOut = true; 42 } catch (InterruptedException retry) { 43 timedOut = false; 44 } 45 } 46 }
1 public class Test{ 2 3 public static void main(String[] args) { 4 5 ExecutorService es = Executors.newCachedThreadPool(); 6 Future<String> future = es.submit(new Callable<String>() { 7 @Override 8 public String call() throws Exception { 9 try { 10 TimeUnit.SECONDS.sleep(2); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 return "future result"; 15 } 16 }); 17 try { 18 String result = future.get(); 19 System.out.println(result); 20 } catch (Exception e) { 21 e.printStackTrace(); 22 } 23 } 24 }
在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。
5.1 submit方法
AbstractExecutorService.submit()實現了ExecutorService.submit(),能夠得到執行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子類,因此submit方法也是ThreadPoolExecutor的方法。
1 public Future<?> submit(Runnable task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<Void> ftask = newTaskFor(task, null); 4 execute(ftask); 5 return ftask; 6 } 7 public <T> Future<T> submit(Runnable task, T result) { 8 if (task == null) throw new NullPointerException(); 9 RunnableFuture<T> ftask = newTaskFor(task, result); 10 execute(ftask); 11 return ftask; 12 } 13 public <T> Future<T> submit(Callable<T> task) { 14 if (task == null) throw new NullPointerException(); 15 RunnableFuture<T> ftask = newTaskFor(task); 16 execute(ftask); 17 return ftask; 18 }
經過submit方法提交的Callable或者Runnable任務會被封裝成了一個FutureTask對象。經過Executor.execute方法提交FutureTask到線程池中等待被執行,最終執行的是FutureTask的run方法。
5.2 FutureTask對象
類圖
內部狀態
/** *... * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
內部狀態的修改經過sun.misc.Unsafe修改。
get方法
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
內部經過awaitDone方法對主線程進行阻塞,具體實現以下:
1 /** 2 * Awaits completion or aborts on interrupt or timeout. 3 * 4 * @param timed true if use timed waits 5 * @param nanos time to wait, if timed 6 * @return state upon completion 7 */ 8 private int awaitDone(boolean timed, long nanos) 9 throws InterruptedException { 10 final long deadline = timed ? System.nanoTime() + nanos : 0L; 11 WaitNode q = null; 12 boolean queued = false; 13 for (;;) { 14 if (Thread.interrupted()) { 15 removeWaiter(q); 16 throw new InterruptedException(); 17 } 18 19 int s = state; 20 if (s > COMPLETING) { 21 if (q != null) 22 q.thread = null; 23 return s; 24 } 25 else if (s == COMPLETING) // cannot time out yet 26 Thread.yield(); 27 else if (q == null) 28 q = new WaitNode(); 29 else if (!queued) 30 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 31 q.next = waiters, q); 32 else if (timed) { 33 nanos = deadline - System.nanoTime(); 34 if (nanos <= 0L) { 35 removeWaiter(q); 36 return state; 37 } 38 LockSupport.parkNanos(this, nanos); 39 } 40 else 41 LockSupport.park(this); 42 } 43 }
run方法
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result; 10 boolean ran; 11 try { 12 result = c.call(); 13 ran = true; 14 } catch (Throwable ex) { 15 result = null; 16 ran = false; 17 setException(ex); 18 } 19 if (ran) 20 set(result); 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
FutureTask.run方法是在線程池中被執行的,而非主線程
Exectors工廠類提供了線程池的初始化接口,主要有以下幾種:
newFixedThreadPool
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
建立一個固定大小、任務隊列容量無界(Integer.MAX_VALUE)的線程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞隊列爲LinkedBlockingQuene。
注意點:
corePoolSize
,這致使了maximumPoolSize
和keepAliveTime
將會是個無用參數 ;newSingleThreadExecutor
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
只有一個線程來執行無界任務隊列的單一線程池。若是該線程異常結束,會從新建立一個新的線程繼續執行任務,惟一的線程能夠保證所提交任務的順序執行。因爲使用了無界隊列, 因此SingleThreadPool永遠不會拒絕,即飽和策略失效。與newFixedThreadPool(1)的區別在於單一線程池的大小不能再改變。
newCachedThreadPool
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
建立一個大小無界的緩衝線程池。任務隊列是一個同步隊列。緩衝線程池適用於執行耗時較小的異步任務。池的核心線程數=0 最大線程數=Integer.MAX_VLUE。與前兩種稍微不一樣的是:
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
能定時執行任務的線程,池的核心線程數由參數指定。和前面3個線程池基於ThreadPoolExecutor類實現不一樣的是,它基於ScheduledThreadPoolExecutor實現。
可使用ThreadPoolExecutor如下方法: