提交一個線程到線程池:java
1.判斷核心線程是否都在執行任務,若是有空閒的或者沒有被建立的,則建立去執行新的工做線程執行任務,若是核心線程都在執行,則下一步;緩存
2.線程池判斷工做隊列是否已滿,沒有滿,則提交任務到工做等待隊列,若是工做等待隊列已滿,則下一步;bash
3.判斷線程池裏的全部工做線程是否都在工做,若是沒有,則建立一個新的工做線程執行任務,若是已滿,則根據飽和策略處理任務。服務器
飽和執行策略:併發
1.AbortPolicy 拒絕新任務,直接拋出異常異步
2.DiscardPolicy 直接丟棄任務spa
3.DiscardOldestPolicy 丟棄隊列中最老的任務 先將阻塞隊列中的頭元素出隊拋棄,再嘗試提交任務。線程
4.CallerRunsPolicy 將任務丟給調用線程執行rest
接口繼承關係code
Java API對ExecutorService接口的實現有兩個,因此這兩個便是Java線程池具體實現類:
1. ThreadPoolExecutor 2. ScheduledThreadPoolExecutor
ExecutorService還繼承了Executor
接口(注意區分Executor接口和Executors工廠類),這個接口只有一個execute()
方法,最後咱們看一下整個繼承樹
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
參數詳情:
corePoolSize 核心線程池大小,在剛開始建立線程池時,核心線程建立完不會當即啓動,有任務提交時纔會啓動達到核心線程數大小,若是一開始就要建立能夠調用prestartAllCoreThreads。
maximumPoolSize 容許的最大線程數量,當核心線程滿和阻塞隊列滿時纔會判斷最大線程數量,決定是否建立普通線程。
keepAliveTime 線程數大於核心線程數時,多餘空閒線程存活時間
unit keepAliveTime的時間單位
workQueue 線程數量超過核心線程數時用於保存任務,包含三種類型的BlockingQueue:有界隊列,無界隊列,同步移交。
可選擇的BlockingQueue:
1.LinkedBlockingQueue 無界隊列,遵循先進先出,使用該隊列作爲阻塞隊列時要尤爲小心,當任務耗時較長時可能會致使大量新任務在隊列中堆積最終致使OOM。閱讀代碼發現,Executors.newFixedThreadPool 採用就是 LinkedBlockingQueue,當QPS很高,發送數據很大,大量的任務被添加到這個無界LinkedBlockingQueue 中,致使cpu和內存飆升服務器掛掉。
2.ArrayBlockingQueue 有界隊列 遵循先進先出,
3.PriorityBlockingQueue 優先級隊列,任務的優先級由Comparator決定。
4.SychronousQueue 將任務直接移交給工做線程,不是一個真正的隊列,是一種線程移交機制。必須有另外一個線程正在等待接收這個元素。只有在使用無界線程池或者有飽和策略時才建議使用該隊列。
threadPoolFactory 建立新線程時使用的工廠類
handler 阻塞隊列已滿,且線程數達到最大線程後所採起的飽和策略
Java給咱們提供了一個Executors工廠類
,它能夠幫助咱們很方便的建立各類類型ExecutorService線程池,Executors一共能夠建立下面這四類線程池:
newCachedThreadPool:建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
初始化時核心線程數爲0,若是線程池長度超過處理須要,可當即回收空閒線程,無可回收則新建。
要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接收這個元素。所以即使SynchronousQueue一開始爲空且大小爲1,第一個任務也沒法放入其中,由於沒有線程在等待從SynchronousQueue中取走元素。所以第一個任務到達時便會建立一個新線程執行該任務。
newFixedThreadPool:建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
固定大小線程數,使用無限大的LinkedBlockingQueue存聽任務。
newScheduledThreadPool:建立一個定長線程池,支持定時及週期性任務執行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
DelayedWorkQueue是一個無界阻塞隊列,是ScheduledThreadPoolExecutor的靜態內部類
newSingleThreadExecutor:建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
首先new了一個線程數目爲 1 的ScheduledThreadPoolExecutor,再把該對象傳入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的實現代碼:
DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } #父類: DelegatedExecutorService(ExecutorService executor) { e = executor; }
其實就是使用裝飾模式加強了ScheduledExecutorService(1)的功能,不只確保只有一個線程順序執行任務,也保證線程意外終止後會從新建立一個線程繼續執行任務。
Executors只是一個工廠類,它全部的方法返回的都是ThreadPoolExecutor
、ScheduledThreadPoolExecutor
這兩個類的實例。
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
ExecutorService有以下幾個執行方法:
- execute(Runnable) - submit(Runnable) - submit(Callable) - invokeAny(...) - invokeAll(...)
這個方法接收一個Runnable實例,而且異步的執行,請看下面的實例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
這個方法有個問題,就是沒有辦法獲知task的執行結果。若是咱們想得到task的執行結果,咱們能夠傳入一個Callable的實例。
submit(Runnable)
和execute(Runnable)
區別是前者能夠返回一個Future對象,經過返回的Future對象,咱們能夠檢查提交的任務是否執行完畢,請看下面執行的例子:
Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); future.get(); //returns null if the task has finished correctly.
若是任務執行完成,future.get()
方法會返回一個null。注意,future.get()方法會產生阻塞。
submit(Callable)
和submit(Runnable)
相似,也會返回一個Future對象,可是除此以外,submit(Callable)接收的是一個Callable的實現,Callable接口中的call()
方法有一個返回值,能夠返回任務的執行結果,而Runnable接口中的run()
方法是void
的,沒有返回值。請看下面實例:
Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("Asynchronous Callable"); return "Callable Result"; } }); System.out.println("future.get() = " + future.get());
若是任務執行完成,future.get()方法會返回Callable任務的執行結果。注意,future.get()方法會產生阻塞。
invokeAny(...)
方法接收的是一個Callable的集合,執行這個方法不會返回Future,可是會返回全部Callable任務中其中一個任務的執行結果。這個方法也沒法保證返回的是哪一個任務的執行結果,反正是其中的某一個。請看下面實例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = executorService.invokeAny(callables); System.out.println("result = " + result); executorService.shutdown();
invokeAll(...)
與 invokeAny(...)
相似也是接收一個Callable集合,可是前者執行以後會返回一個Future的List,其中對應着每一個Callable任務執行後的Future對象。狀況下面這個實例
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("future.get = " + future.get()); } executorService.shutdown();
當咱們使用完成ExecutorService以後應該關閉它,不然它裏面的線程會一直處於運行狀態。
舉個例子,若是的應用程序是經過main()方法啓動的,在這個main()退出以後,若是應用程序中的ExecutorService沒有關閉,這個應用將一直運行。之因此會出現這種狀況,是由於ExecutorService中運行的線程會阻止JVM關閉。
若是要關閉ExecutorService中執行的線程,咱們能夠調用ExecutorService.shutdown()
方法。在調用shutdown()方法以後,ExecutorService不會當即關閉,可是它再也不接收新的任務,直到當前全部線程執行完成纔會關閉,全部在shutdown()執行以前提交的任務都會被執行。
若是咱們想當即關閉ExecutorService,咱們能夠調用ExecutorService.shutdownNow()
方法。這個動做將跳過全部正在執行的任務和被提交尚未執行的任務。可是它並不對正在執行的任務作任何保證,有可能它們都會中止,也有可能執行完成。
正確結束線程應該在線程內先使用break結束任務。