1.線程池的使用java
線程池通常配合隊列一塊兒工做,是線程池限制併發處理任務的數量。而後設置隊列的大小,當任務超過隊列大小時,經過必定的拒絕策略來處理,這樣能夠保護系統免受大流量而致使崩潰--只是部分拒絕服務,仍是有一部分是能夠正常服務的。程序員
線程池通常有核心線程池大小和線程池最大大小配置,當線程池中的線程空閒一段時間時將會回收,而核心線程池中的線程不會被回收。算法
多少個線程合適呢?建議根據實際業務狀況來壓測決定,或者根據利特法則來算出一個合理的線程池大小。Java提供了ExecutorService的幾種實現:緩存
a.ThreadPoolExecutor:標準線程池。併發
b.newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。this
c.newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。url
d.newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。spa
e.newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。線程
f.ForkJoinPool:相似於ThreadPoolExecutor,可是使用work-stealing模式,其會爲線程池中的每一個線程建立一個隊列,從而用work-stealing(任務竊取)算法使得線程能夠從其餘線程隊列裏竊取任務來執行。即若是本身的任務處理完成了,則能夠去忙碌的工做線程那裏竊取任務執行。code
2.線程池簡單分析
2.1 、建立單線程的線程池:newSingleThreadExecutor
ExecutorService executorService= Executors.newSingleThreadExecutor();
等價於
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
2.二、建立固定數量的線程池
ExecutorService executorService1= Executors.newFixedThreadPool(10);
等價於
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
注意:單線程的線程池與固定數量的線程池使用隊列的策略是同樣的,若是固定數量的線程池爲1,則至關於單線程的線程池。
dubbo源碼的使用:
// 文件緩存定時寫入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //執行代碼 if (syncSaveFile) { doSaveProperties(version); } else { registryCacheExecutor.execute(new SaveProperties(version)); }
2.三、建立可緩存的線程池,初始大小爲0,線程池最大大小爲Integer.MAX_VALUE。其使用SynchronousQueue隊列,一個沒有數據緩衝的阻塞隊列。對其執行put操做後必須等待take操做消費該數據,反之亦然。該線程池不限制最大大小,若是線程池有空閒則複用,不然會建立一個新線程。若是線程池中的線程空閒60秒,則將被回收。該線程默認最大大小爲Integer.MAX_VALUE,請肯定必要後再使用該線程池。
ExecutorService executorService2= Executors.newCachedThreadPool();
等價於
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
dubbo源碼:在集羣的時候使用到了,由於並不知道,傳遞過來的集羣參數是多少。
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true)); @SuppressWarnings("rawtypes") public Result invoke(final Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); for( final Invoker<T> invoker : invokers ) { Future<Result> future = executor.submit( new Callable<Result>() { public Result call() throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } } ); results.put( invoker.getUrl().getServiceKey(), future ); }
2.四、支持延遲執行的線程池,其使用DelayedWorkQueue實現任務延遲。
ExecutorService executorService3= Executors.newScheduledThreadPool(10);
等價於
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
使用的案例:
Dubbo源碼
//1.檢測並鏈接註冊中心,使用的是newScheduledThreadPool //定義一個全局的線程池: // 定時任務執行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // 失敗重試定時器,定時檢查是否有請求失敗,若有,無限次重試 private final ScheduledFuture<?> retryFuture; public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 檢測並鏈接註冊中心 try { retry(); } catch (Throwable t) { // 防護性容錯 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
2.五、work-stealing線程池,默認爲並行行數爲Runtime.getRuntime().availableProcessors()
ExecutorService executorService4= Executors.newWorkStealingPool(2);
等價於
return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
3.線程池終止
線程池不在使用記得中止線程,能夠調用shutdown以確保不接受新任務,並等待線程池中任務處理完成後再退出,或調用shutdownNow清除未執行任務,並用Thread.interrupt中止正在執行的任務。而後調用awaitTermination方法等待終止操做執行完成。
static ExecutorService executorService3= Executors.newScheduledThreadPool(10); public static void main(String[] args) { executorService3.shutdown(); try { executorService3.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
總結:在使用線程池時務必須設置池大小、隊列大小並設置相應的拒絕策略(RejectedExcutionHandler)。線程池執行狀況下沒法捕獲堆棧上下文,所以任務要記錄相關參數,以方便定位提交任務的源頭及定位引發問題的源頭。
4.ThreadPoolExecutor六個核心參數
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
4.一、corePoolSize
核心池的大小。在建立了線程池以後,默認狀況下,線程池中沒有任何線程,而是等待有任務到來才建立線程去執行任務。默認狀況下,在建立了線程池以後,線程池鐘的線程數爲0,當有任務到來後就會建立一個線程去執行任務。
4.二、maximumPoolSize
池中容許的最大線程數,這個參數表示了線程池中最多能建立的線程數量,當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,將繼續建立線程以處理任務,maximumPoolSize表示的就是wordQueue滿了,線程池中最多能夠建立的線程數量。
4.三、keepAliveTime
只有當線程池中的線程數大於corePoolSize時,這個參數纔會起做用。當線程數大於corePoolSize時,終止前多餘的空閒線程等待新任務的最長時間。
4.四、unit
keepAliveTime時間單位。
4.五、workQueue
存儲還沒來得及執行的任務。
4.六、threadFactory
執行程序建立新線程時使用的工廠。
4.七、handler
因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。
總結:上面的內容,其餘應該都相對比較好理解,只有corePoolSize和maximumPoolSize須要多思考。這裏要特別再舉例以四條規則解釋一下這兩個參數:
一、池中線程數小於corePoolSize,新任務都不排隊而是直接添加新線程
二、池中線程數大於等於corePoolSize,workQueue未滿,首選將新任務加入workQueue而不是添加新線程
三、池中線程數大於等於corePoolSize,workQueue已滿,可是線程數小於maximumPoolSize,添加新的線程來處理被添加的任務
四、池中線程數大於大於corePoolSize,workQueue已滿,而且線程數大於等於maximumPoolSize,新任務被拒絕,使用handler處理被拒絕的任務
ThreadPoolExecutor的使用很簡單,前面的代碼也寫過例子了。經過execute(Runnable command)方法來發起一個任務的執行,經過shutDown()方法來對已經提交的任務作一個有效的關閉。儘管線程池很好,但咱們要注意JDK API的一段話:
強烈建議程序員使用較爲方便的Executors工廠方法Executors.newCachedThreadPool()(無界線程池,能夠進行線程自動回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和Executors.newSingleThreadExecutor()(單個後臺線程),它們均爲大多數使用場景預約義了設置。
因此,跳開對ThreadPoolExecutor的關注(仍是那句話,有問題查詢JDK API),重點關注一下JDK推薦的Executors。
4.八、四種拒絕策略
所謂拒絕策略以前也提到過了,任務太多,超過maximumPoolSize了怎麼把?固然是接不下了,接不下那只有拒絕了。拒絕的時候能夠指定拒絕策略,也就是一段處理程序。
決絕策略的父接口是RejectedExecutionHandler,JDK自己在ThreadPoolExecutor裏給用戶提供了四種拒絕策略,看一下:
一、AbortPolicy
直接拋出一個RejectedExecutionException,這也是JDK默認的拒絕策略。
二、CallerRunsPolicy
嘗試直接運行被拒絕的任務,若是線程池已經被關閉了,任務就被丟棄了。
三、DiscardOldestPolicy
移除最晚的那個沒有被處理的任務,而後執行被拒絕的任務。一樣,若是線程池已經被關閉了,任務就被丟棄了。
四、DiscardPolicy
不能執行的任務將被刪除。