本文主要講解了Java
裏面線程池的接口以及實現類,以及它們的基本使用方法,內容包括:java
Executor
/Executors
ExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executor
+ExecutorService
Executor
是一個接口,裏面只是定義了一個簡單的任務提交方法:shell
//Executor package java.util.concurrent; public interface Executor { void execute(Runnable var1); }
而ExecutorService
也是一個接口,繼承了Executor
,而且提供了更多用於任務提交和管理的一些方法,好比中止任務的執行等:安全
//ExecutorService package java.util.concurrent; import java.util.Collection; import java.util.List; public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException; <T> Future<T> submit(Callable<T> var1); <T> Future<T> submit(Runnable var1, T var2); Future<?> submit(Runnable var1); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException; }
下面將詳細講述ExecutorService
的兩個重要實現:bash
ThreadPoolExecutor
ScheduledThreadPoolExecutor
ThreadPoolExecutor
這就是一般所說的線程池類,一般來講,一個線程池有以下特徵:併發
先來看一個簡單的例子:框架
public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //執行沒有返回值的任務 executor.execute(()-> System.out.println(" Execute the runnable task.")); //執行帶返回值的任務,用到了Future泛型類 Future<String> future = executor.submit(()->" Execute the callable task and this is the result."); //經過get()獲取任務結果,get()會在任務未完成時一直阻塞 System.out.println(future.get()); //手動關閉線程池 executor.shutdown(); } }
從這個簡單的例子能夠看到,線程池能夠執行帶返回值以及不帶返回值的任務,帶返回值的話須要使用get()
方法阻塞獲取。另外,運行完畢後須要手動關閉線程池,不然JVM
不會退出,由於線程池中有指定數量的活躍線程數量,而JVM
正常退出的條件是JVM
進程中不存在任何運行着的非守護進程。異步
構造方法的源碼以下:ide
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
雖然提供了四個構造方法,但本質上調用的是最後一個構造方法,該構造方法帶有7個參數,分別是:高併發
corePoolSize
:核心線程數量,即便當線程池中的核心線程不工做,核心線程的數量也不會減小。該參數的最小值爲0,且小於等於maximumPoolSize
maximumPoolSize
:用於設置線程池中容許的線程數量的最大值keepAliveTime
:當線程池中的線程數量超過核心線程數而且處於空閒時,線程池將會回收一部分線程讓出系統資源,該參數可用於設置超過corePoolSize
數量的線程在多長時間後被回收,與後一個表示時間單位的參數unit
配合使用unit
:用於設定keepAliveTime
的時間單位workQueure
:用於存放已提交至線程池但未被執行的任務threadFactory
:用於建立線程的工廠,開發者能夠自定義ThreadFactory
來建立線程handler
:拒絕策略,當任務超過阻塞隊列的邊界時,線程池會拒絕新增的任務,主要用於設置拒絕策略線程池被成功建立後,內部的運行線程並不會被當即建立,ThreadPoolExecutor
會採用一種Lazy
的方式去建立而且運行。首次調用執行任務方法時纔會建立線程,好比:this
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); assert executor.getActiveCount() == 0; assert executor.getMaximumPoolSize() == 4; assert executor.getCorePoolSize() == 2; executor.execute(()-> System.out.println(" Execute the runnable task.")); assert executor.getActiveCount() == 1; assert executor.getMaximumPoolSize() == 4; assert executor.getCorePoolSize() == 2;
(運行的時候請加上-ea
參數)
下面看一下任務的具體執行流程:
RejectedEcecutionHandler
keepAliveTime
指定時間,會回收線程,直到保留corePoolSize
個核心線程爲止(不過核心線程也能夠設置被超時回收,默認不開啓核心線程超時)線程工廠ThreadFactory
是一個接口:
package java.util.concurrent; public interface ThreadFactory { Thread newThread(Runnable var1); }
使用線程工廠能夠在建立線程時加入自定義配置,好比指定名字、優先級、是否爲守護線程等,好比下面是線程工廠的一個簡單實現:
public class TestThreadFactory implements ThreadFactory { private final static String PREFIX = "Test thread["; private final static String SUFFIX = "]"; private final static AtomicInteger THREAD_NUM = new AtomicInteger(); @Override public Thread newThread(Runnable runnable) { ThreadGroup group = new ThreadGroup("My pool"); Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX); thread.setPriority(5); return thread; } }
默認狀況下,ThreadPoolExecutor
提供了四種拒絕策略:
DiscardPolicy
:丟棄策略,直接丟棄任務AbortPolicy
:終止策略,拋出RejectedExecutionException
DiscardOldestPolicy
:丟棄隊列中最老任務的策略(嚴格意義來講須要根據任務隊列去選擇,由於不是全部的隊列都是FIFO
的)CallerRunsPolicy
:調用者線程執行策略,任務會在當前線程中阻塞執行固然,若是不能知足須要,能夠實現RejectedExecutionHandler
接口去自定義策略:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable var1, ThreadPoolExecutor var2); }
若是不須要線程池,那麼須要手動對線程池關閉。線程池提供了以下三種方式:
shutdown()
shutdownNow()
shutdown()+shutdownNow()
shutdown()
提供了一種有序關閉的方式去關閉線程池,調用該方法後,會等待當前執行的任務所有執行完成而後關閉,同時新提交任務將會被拒絕。注意該方法是非阻塞,當即返回的。若是須要查看關閉狀態,可使用:
isShutdown()
:返回是否調用了shutdown()
的結果isTerminating()
:返回是否正在結束中isTerminated()
:返回是否已經結束shutdownNow()
方法首先將線程池狀態修改成shutdown
狀態,而後將未被執行的任務掛起,接着將嘗試中斷運行中的線程,最後返回未執行的任務:
public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); IntStream.range(0,10).forEach(i-> executor.execute(()-> { try{ TimeUnit.SECONDS.sleep(5); }catch (Exception e){ e.printStackTrace(); } })); List<Runnable> runnables = executor.shutdownNow(); System.out.println(runnables.size()); }
輸出:
8 BUILD SUCCESSFUL in 326ms 2 actionable tasks: 2 executed java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.company.Main.lambda$main$0(Main.java:29) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.company.Main.lambda$main$0(Main.java:29) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 3:14:36 AM: Task execution finished 'Main.main()'.
爲了確保安全關閉線程池,通常會使用組合方式關閉,確保正在運行的任務被正常執行的同時又能提升線程池被關閉的成功率,例子以下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); IntStream.range(0,10).forEach(i-> executor.execute(()-> { try{ TimeUnit.SECONDS.sleep(5); }catch (Exception e){ e.printStackTrace(); } })); //首先調用shutdown()嘗試關閉 executor.shutdown(); try{ //若是等待一段時間後還沒關閉 if(!executor.awaitTermination(10,TimeUnit.SECONDS)){ //強制關閉 executor.shutdownNow(); //若是強制關閉失敗,好比運行的線程異常耗時且不能被中斷 if(!executor.awaitTermination(10,TimeUnit.SECONDS)){ //其餘處理,這裏只是輸出中斷失敗的信息 System.out.println("Terminate failed."); } } }catch (InterruptedException e){ //若是當前線程被中斷,而且捕獲了異常,執行當即關閉方法 executor.shutdownNow(); //從新拋出中斷信號 Thread.currentThread().interrupt(); }
ScheduledThreadPoolExecutor
ScheduledExecutorService
繼承了ExecutorService
,而且提供了任務被定時執行的特性,可使用ScheduledThreadPoolExecutor
去實現某些特殊的任務執行。固然實現固定任務的方法或者框架有不少,有原生的shell
實現,老式的Timer/TimerTask
實現,或者專門的框架Quartz
實現,這裏要說的是JDK
內部的實現ScheduledThreadPoolExecutor
。
ScheduledThreadPoolExecutor
繼承了ThreadPoolExecutor
,除了具有ThreadPoolExecutor
的全部方法外,還定義了4個與schedule
有關的方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
:一個one-shot
(只執行一次)的方法, 任務(callable
)會在單位時間(delay
)後被執行,而且當即返回ScheduledFuture
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
:也是一個one-shot
方法,任務會在單位時間後被執行,與第一個方法不一樣的是返回的ScheduledFuture
不包含任何執行結果,可是能夠經過返回的ScheduledFuture
判斷任務是否執行結束ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:任務會根據固定的速率在initialDelay
後不斷被執行ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:任務將以固定延遲單位時間的方式執行任務關於後二者的區別以下:
public static void main(String[] args) throws Exception { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); Runnable runnable = ()->{ long startTimestamp = System.currentTimeMillis(); System.out.println("current timestamp: "+startTimestamp); try{ TimeUnit.MILLISECONDS.sleep(current().nextInt(100)); }catch (Exception e){ e.printStackTrace(); } System.out.println("elapsed time: "+(System.currentTimeMillis() - startTimestamp)); }; executor.scheduleAtFixedRate(runnable,10,1000,TimeUnit.MILLISECONDS); // executor.scheduleWithFixedDelay(runnable,10,1000,TimeUnit.MILLISECONDS); }
輸出:
current timestamp: 1619351675438 elapsed time: 97 current timestamp: 1619351676438 elapsed time: 85 current timestamp: 1619351677438 elapsed time: 1 current timestamp: 1619351678438 elapsed time: 1 current timestamp: 1619351679438 elapsed time: 68 current timestamp: 1619351680438 elapsed time: 99
能夠看到任務始終以一種固定的速率運行,每次運行的開始時間始終相隔1000ms
。
而使用FixedDelay
的輸出以下:
current timestamp: 1619351754890 elapsed time: 53 current timestamp: 1619351755944 elapsed time: 30 current timestamp: 1619351756974 elapsed time: 13 current timestamp: 1619351757987 elapsed time: 80 current timestamp: 1619351759068 elapsed time: 94 current timestamp: 1619351760162 elapsed time: 29
每次開始的時間爲上一次執行完成後的時間再加上時間間隔(1000ms
)。
Executors
中的線程池Executors
類提供了六種建立線程池的靜態方法:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
ScheduledThreadPool
SingleThreadScheduledExecutor
WorkStealingPool
下面分別來看一下。
FixedThreadPool
源碼以下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }
FixedThreadPool
底層調用的是ThreadPoolExecutor
,默認建立的核心線程數與最大線程數相等,任務隊列爲無邊界的LinkedBlockingQueue
。
SingleThreadExecutor
相關源碼以下:
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); } private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
能夠看到SingleThreadPool
其實是內部類FinalizableDelegatedExecutorService
的包裝,核心線程與最大線程數均爲1,任務隊列爲無邊界的LinkedBlockingQueue
。發生GC
的時候,會調用shutdown()
方法。
CachedThreadPool
源碼以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }
CachedThreadPool
會根據須要建立新線程,一般用於執行量大的,耗時較短的異步任務。未被使用且空閒時間超過60s
的線程會被回收。
ScheduledThreadPool
源碼以下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
建立指定核心數ScheduledThreadPoolExecutor
。
SingleThreadScheduledExecutor
源碼以下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory)); } private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); this.e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return this.e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return this.e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return this.e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
其實就是SingelThreadPool
+ScheduledThreadPool
。
WorkStealingPool
源碼以下:
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true); }
WorkStealingPool
是JDK8
引入的線程池,返回的是ForkJoinPool
。在WorkStealingPool
中,若是每一個線程處理的任務執行比較耗時,那麼它負責的任務會被其餘線程「竊取」,進而提升併發處理的效率。