轉:https://yq.aliyun.com/articles/633782?utm_content=m_1000015330java
在Java中,使用線程來異步執行任務。Java線程的建立與銷燬須要必定的開銷,若是咱們 爲每個任務建立一個新線程來執行,這些線程的建立與銷燬將消耗大量的計算資源。同時,爲每個任務建立一個新線程來執行,這種策略可能會使處於高負荷狀態的應用最終崩潰。程序員
Java的線程既是工做單元,也是執行機制。從JDK5開始,把工做單元與執行機制分離開來。工做單元包括Runnable和Callable,而執行機制由Executor框架提供。編程
Java線程(java.lang.Thread)被一對一映射爲本地操做系統線程。Java線程啓動時會建立一個本地操做系統線程;當該Java線程終止時,這個操做系統線程 也會被回收。操做系統會調度全部線程並將它們分配給可用的CPU。 在上層,Java多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器 (Executor框架)將這些任務映射爲固定數量的線程;在底層,操做系統內核將這些線程映射到 硬件處理器上。應用程序經過Executor框架控制上層的調度;而下層的調度由操做系統 內核控制,下層的調度不受應用程序的控制。小程序
(1)任務。包括被執行任務須要實現的接口:Runnable接口或Callable接口。服務器
(2)任務的執行。包括任務執行機制的核心接口Executor,以及繼承自Executor的 ExecutorService接口。Executor框架有兩個關鍵類實現了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。多線程
(3)異步計算的結果。包括接口Future和實現Future接口的FutureTask類。併發
(1)Executor是一個接口,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。框架
(2)ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務。異步
(3)ScheduledThreadPoolExecutor是一個實現類,能夠在給定的延遲後運行命令,或者按期執 行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。函數
(4)Future接口和實現Future接口的FutureTask類,表明異步計算的結果。
(5)Runnable接口和Callable接口的實現類
(1)首先要建立實現Runnable或者Callable接口的任務對象。工具類Executors能夠把一 個Runnable對象封裝爲一個Callable對象(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。
(2)而後能夠把Runnable對象直接交給ExecutorService執行(ExecutorService.execute(Runnable command));或者也能夠把Runnable對象或Callable對象提交給ExecutorService執行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callabletask))。
(3)若是執行ExecutorService.submit(…),ExecutorService將返回一個實現Future接口的對象 (到目前爲止的JDK中,返回的是FutureTask對象)。因爲FutureTask實現了Runnable,程序員也能夠建立FutureTask,而後直接交給ExecutorService執行。
(4)最後,主線程能夠執行FutureTask.get()方法來等待任務執行完成。主線程也能夠執行 FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。
Executor框架的主要成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、 Future接口、Runnable接口、Callable接口和Executors。
(1)ThreadPoolExecutor:ThreadPoolExecutor可使用工廠類Executors來建立。Executors能夠建立3種類型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。或者也能夠本身建立。詳解SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
(2)ScheduledThreadPoolExecutor:一般使用工廠類Executors來建立。Executors能夠建立2種類 型的ScheduledThreadPoolExecutor,包括
(3)Future接口:Future接口和實現Future接口的FutureTask類用來表示異步計算的結果。當咱們把Runnable 接口或Callable接口的實現類提交(submit)給ThreadPoolExecutor或 ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會向咱們 返回一個FutureTask對象。可是,在API中返回的是一個 FutureTask對象,Java僅僅保證返回的是一個實現了Future接口的對象。在未來的JDK實現中,返回的可能不必定是FutureTask。
(4)Runnable接口和Callable接口:Runnable接口和Callable接口的實現類,均可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。它們之間的區別是Runnable不會返回結果,而Callable能夠返回結 果。 除了能夠本身建立實現Callable接口的對象外,還可使用工廠類Executors的方法來把一個 Runnable包裝成一個Callable。
//把一個Runnable包裝成一個Callable public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); } //一個Runnable和一個待返回的結果包裝成一個Callable public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
關於ThreadPoolExecutor詳細參考https://my.oschina.net/u/3352298/blog/1811255
FixedThreadPool被稱爲可重用固定線程數的線程池。下面是FixedThreadPool的源代碼實現。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
FixedThreadPool的corePoolSize和maximumPoolSize都被設置爲建立FixedThreadPool時指定的參數nThreads。 當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裏把keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止。
execute方法執行流程爲:
1)若是當前運行的線程數少於corePoolSize,則建立新線程來執行任務。
2)當前運行的線程數等於或大於corePoolSize時,將任務加入 LinkedBlockingQueue。
3)線程執行完1中的任務後,會在循環中反覆從LinkedBlockingQueue獲取任務來執行。 FixedThreadPool使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲 Integer.MAX_VALUE)。
若是使用無界隊列會帶來以下影響:
1)當線程池中的線程數達到corePoolSize後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過corePoolSize。
2)因爲1,使用無界隊列時maximumPoolSize將是一個無效參數。
3)因爲1和2,使用無界隊列時keepAliveTime將是一個無效參數。
4)因爲使用無界隊列,運行中的FixedThreadPool(未執行方法shutdown()或 shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。
SingleThreadExecutor是使用單個worker線程的Executor。下面是SingleThreadExecutor的源代碼實現。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor的corePoolSize和maximumPoolSize被設置爲1。其餘參數與 FixedThreadPool相同。SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工 做隊列(隊列的容量爲Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊列做爲工做隊列 對線程池帶來的影響與FixedThreadPool相同。
CachedThreadPool是一個會根據須要建立新線程的線程池。下面是建立CachedThreadPool的源代碼。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CachedThreadPool的corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲 Integer.MAX_VALUE,即maximumPool是無界的。這裏把keepAliveTime設置爲60L,意味着 CachedThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被 終止。
CachedThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列,但
CachedThreadPool的maximumPool是無界的。這意味着,若是主線程提交任務的速度高於
maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,
CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。
執行流程:
前面提到過,SynchronousQueue是一個沒有容量的阻塞隊列。每一個插入操做必須等待另外一 個線程的對應移除操做,反之亦然。CachedThreadPool使用SynchronousQueue,把主線程提交的 任務傳遞給空閒線程執行。
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲以後運 行任務,或者按期執行任務。ScheduledThreadPoolExecutor的功能與Timer相似,但 ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺線程,而 ScheduledThreadPoolExecutor能夠在構造函數中指定多個對應的後臺線程數。
ScheduledThreadPoolExecutor的執行主要分爲兩大部分。 首先調用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現了 RunnableScheduledFutur接口的ScheduledFutureTask。接着線程池中的線程從DelayQueue中獲取ScheduledFutureTask,而後執行任務。
ScheduledThreadPoolExecutor爲了實現週期性的執行任務,對ThreadPoolExecutor作了一些修改。
1.ScheduledThreadPoolExecutor會把待調度的任務(ScheduledFutureTask) 放到一個DelayQueue中。而ScheduledFutureTask主要包含3個成員變量,long型成員變量time,表示這個任務將要被執行的具體時間。long型成員變量sequenceNumber,表示這個任務被添加到ScheduledThreadPoolExecutor中 的序號。long型成員變量period,表示任務執行的間隔週期。
2.DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對隊列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。若是兩個 ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就 是說,若是兩個任務的執行時間相同,那麼先提交的任務將被先執行)。
3.ScheduledThreadPoolExecutor中的線程執行週期任務的過程:
(1)線程1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務 是指ScheduledFutureTask的time大於等於當前時間。
(2)線程1執行這個ScheduledFutureTask。
(3)線程1修改ScheduledFutureTask的time變量爲下次將要被執行的時間。
(4)線程1把這個修改time以後的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
4.DelayQueue.take()的執行步驟:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
(1)獲取lock鎖
(2)獲取週期任務。這一步包括
(3)釋放Lock。
ScheduledThreadPoolExecutor在一個循環中執行步驟2,直到線程從PriorityQueue獲取到一 個元素以後,纔會退出無限循環(結束步驟2)。
5.DelayQueue.add()的執行步驟:
//add方法中調用offer方法 public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
添加任務分爲3大步驟:
(1)獲取Lock。
(2)添加任務。
3)釋放Lock。
Future接口和實現Future接口的FutureTask類,表明異步計算的結果。FutureTask除了實現Future接口外,還實現了Runnable接口。所以,FutureTask能夠交給 Executor執行,也能夠由調用線程直接執行(FutureTask.run())。根據FutureTask.run()方法被執行 的時機,FutureTask處於下面3種狀態。
//jdk1.8 源碼細分爲如下7中狀態 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
當FutureTask處於未啓動或已啓動狀態時,執行FutureTask.get()方法將致使調用線程阻塞;
當FutureTask處於已完成狀態時,執行FutureTask.get()方法將致使調用線程當即返回結果或拋出異常。
當FutureTask處於未啓動狀態時,執行FutureTask.cancel()方法將致使此任務永遠不會被執 行;
當FutureTask處於已啓動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程 的方式來試圖中止任務;
當FutureTask處於已啓動狀態時,執行FutureTask.cancel(false)方法將 不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成);
當FutureTask處於已完 成狀態時,執行FutureTask.cancel(…)方法將返回false。
能夠把FutureTask交給Executor執行;也能夠經過ExecutorService.submit(…)方法返回一個 FutureTask,而後執行FutureTask.get()方法或FutureTask.cancel(…)方法。除此之外,還能夠單獨使用FutureTask,注意利用其run方法和get方法的特性,只有一個FutureTask對象run方法執行完畢後get方法才能返回,不然將會阻塞,利用這一點能夠實現與等待/通知機制相同的做用。
FutureTask的實現基於抽象類AbstractQueuedSynchronizer(如下簡稱爲AQS)。java.util.concurrent中的不少可阻塞類(好比ReentrantLock)都是基於AQS來實現的。AQS是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒線程,以及維護被阻塞線程的隊列。JDK 6中AQS 被普遍使用,基於AQS實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。
每個基於AQS實現的同步器都會包含兩種類型的操做:
- 至少一個acquire操做。這個操做阻塞調用線程,除非/直到AQS的狀態容許這個線程繼續 執行。FutureTask的acquire操做爲get()/get(long timeout,TimeUnit unit)方法調用。
- 至少一個release操做。這個操做改變AQS的狀態,改變後的狀態可容許一個或多個阻塞 線程被解除阻塞。FutureTask的release操做包括run()方法和cancel(…)方法。
注意:《Java併發編程的藝術》這裏所說並不能用於全部jdk版本,我在查看jdk1.8源碼發現 FutureTask的實現並無基於抽象類AbstractQueuedSynchronizer,而是基於LockSupport中的park()與unpark()方法實現的,而且經過一個內置的鏈表類WaitNode來存儲管理全部被get()方法阻塞的線程。所以,我在這裏便僅對jdk1.8的實現進行一些分析。而基於抽象類AbstractQueuedSynchronizer可自行參考Semaphore類中的源碼,在此再也不進行贅述。
1.首先是保存管理全部被get方法阻塞的線程,經過鏈表結構來管理。
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } /** 阻塞線程鏈表,或者說是一個棧,遵循先進後出 */ private volatile WaitNode waiters;
2.接下來咱們查看get方法的源碼,查看其如何實現阻塞
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
get方法實現的執行步驟爲:
(1)首先判斷當前FutureTask是否處於運行中或未運行狀態時,是則調用awaitDone方法進行阻塞進行下一步。不然返回任務計算結果。
(2)awaitDone方法中首先判斷當前線程是否被中斷,是則拋出異常返回,awaitDone方法中止阻塞。不然進行下一步。
(3)判斷FutureTask任務是否處於完成狀態,若是處於完成狀態且在上一次循環中給與阻塞線程節點q賦予對象時,則將q引用置爲null,且返回當前狀態值s,awaitDone方法中止阻塞。不然進行下一步。
(4)是否FutureTask任務處於運行狀態,是則繼續阻塞。不然進行下一步。
(5)若是阻塞線程節點對象爲null,則對引用q賦予阻塞線程節點對象。不然進行下一步
(6)是否將當前被阻塞線程加入到鏈表中,若是沒有則加入鏈表並返回給queued爲true,若是已經加入則執行下一步。
(7)調用park(Object)方法阻塞當前線程,直到unpark(Thread t)方法來解除阻塞。或使用parkNanos(Object blocker, long nanos)來進行超時阻塞,超過規定時間則返回state或調用unpark(Thread t)方法解除阻塞。
(8)重複2-7步,直到該方法return 解除阻塞
3.run方法與cancle方法則要簡單許多,run方法執行完畢後或cancle方法中斷當前線程後,都會調用finishCompletion()方法。finishCompletion()方法會遍歷阻塞線程鏈表,刪除全部節點而且解除全部節點中的線程的阻塞狀態而後返回。