前言java
線程池能夠提升程序的併發性能(固然是合適的狀況下),由於對於沒有線程的狀況下,咱們每一次提交任務都新建一個線程,這種方法存在很多缺陷:安全
1. 線程的建立和銷燬的開銷很是高,線程的建立須要時間,會延遲任務的執行,會消耗大量的系統資源。併發
2. 活躍的線程會消耗系統資源,而大量的空閒線程會佔用許多內存,給垃圾回收器帶來很大的壓力,而大量線程在競爭CPU資源的時間還會產生氣體的性能開銷。框架
3. 系統在可建立的線程上存在一個限制,若是超過了這個限制,極可能拋出OOM。ide
咱們不難發現,在必定範圍下,增長線程可以提升系統的吞吐量,而當線程數超過合理值後,增長的線程反而會下降程序的執行速度。函數
Executor的引入性能
在JDK1.5中,咱們引入了一個線程池框架,Executor框架,它可以分解任務的建立和執行過程。它包括Executor、ExecutorService、Callable等接口和Executors、ThreadPoolExecutor實現類等。this
注意點atom
固然了看待事物須要辯證,是否使用了Executor框架就能很好地將複雜的任務執行解耦開來的。這邊咱們其實須要限制一下它的使用範圍。spa
合適的線程數
線程池在對處理同一類型的任務且相互獨立的時候,能達到性能上的最佳,不然任務時長不一致很容易引發擁塞或是飢餓。
那線程池的線程數以多少爲合適呢,對於計算密集型的任務而言,咱們最好設置的線程數 = CPU數+1;(+1是爲了保證當某個線程由於缺頁故障或其餘緣由而暫停時,這個+1的線程可以確保CPU的時鐘週期不會被浪費);而對於包含IO操做或者其餘阻塞操做的任務時,因爲線程不會一直執行,因此線程池的規模應該更大點。
線程池的使用
在實際使用過程當中,咱們通常藉助於ThreadPoolExecutor來完成線程池的建立。ThreadPoolExecutor具備極好的擴展性,除了系統提供的四種經常使用的線程池,如CachedThreadPoolExecutor,FixedThreadPoolExecutor,SingleThreadPoolExecutor,ScheduledThreadExecutor。咱們能夠自定義線程池的構造函數,如線程池的基本線程數、最大線程數、線程池的超時時間(時間+時間單位),線程池的任務隊列,線程池的線程工廠,線程池的飽和策略。
固然,咱們在使用系統提供的四種線程池的時候,一樣能夠在後來修改線程池的配置。
線程的任務隊列
LinkedBlockQueue:無界 CachedThreadPoolExecutor FixedThreadPoolExecutor的默認任務隊列
ArrayBlockQueue:有界
PriorityQueue:優先級隊列,有界 ScheduledThreadPoolExecutor的默認任務隊列
SynchronousQueue:同步移交,隊列容量爲0,僅當有線程準備好時纔會將任務放到隊列中。
飽和策略
飽和策略的設置
對於有界隊列而言,當有界隊列被填滿後,這時候咱們須要用到線程的飽和策略,前面提到咱們能夠在後面配置線程池的設置,而飽和策略的修改就是經過ThreadPoolExecutor的setRejectedExecutionHandler方法來進行修改的(當某個任務唄提交到一個已經被關閉的Executor中,也會用到飽和策略)
飽和策略主要有如下幾種:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
固然,咱們可使用Semaphore(信號量)來控制任務的提交速率。
線程工廠
咱們能夠經過自定義線程工廠,來實現咱們本身的線程。具體示例以下所示:
自定義的線程工廠,記錄了線程池的名字。
import java.util.concurrent.ThreadFactory; /** * Created by DB on 2017/9/1. */ public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } @Override public Thread newThread(Runnable r) { return new MyAPPThread(r,poolName); } }
自定義的線程類:實現了指定線程的名字,設置自定義UncaughtExecptionHandler。
import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; /** * Created by DB on 2017/9/1. */ public class MyAPPThread extends Thread { public static final String DEFALUT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAPPThread(Runnable r){ this(r,DEFALUT_NAME); } public MyAPPThread(Runnable r,String name){ super(r,name +"-"+created.incrementAndGet()); setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE,"Uncaught in thread"+t.getName(),e); } }); } public void run(){ boolean debug = debugLifecycle; if(debug){ log.log(Level.FINE,"Created" +getName()); } try { alive.incrementAndGet(); super.run(); }finally { alive.decrementAndGet(); if(debug){ log.log(Level.FINE,"Exiting"+getName()); } } } public static int getThreadsCreated(){ return created.get(); } public static int getThreadsAlive(){ return alive.get(); } public static boolean getDebug(){ return debugLifecycle; } public static void setDebug(Boolean b){ debugLifecycle=b; } }
擴展ThreadPoolExecutor
ThreadPoolExecutor是可擴展的,它提供了幾個在子類中能夠改寫的方法:beforeExecute、afterExecute和terminated,經過實現這些方法咱們能夠實現擴展。
在執行任務的線程中將調用beforeExecute和afterExecute方法,在這些方法中咱們能夠添加日誌、計時。監視或者統計信息收集的功能。不管任務是從run中正常返回仍是拋出一個異常而返回,afterExecute都會被調用。而beforeExecute拋出RuntimeException時,任務將不被執行,afterExecute固然也不會被調用。
在線程池完成關閉操做時,會調用terminated,也就在全部任務都已經完成而且全部工做者線程也已經關閉後。在這個方法中咱們能夠實現Executor在其生命週期中分配的各類資源,還有執行發送通知、記錄日誌或者收集finalize統計信息等操做。
具體的框架以下:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by DB on 2017/9/1. */ public class MyThreadPoolExecutor extends ThreadPoolExecutor { public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); } @Override protected void terminated() { super.terminated(); } }