參考:《實戰java高併發程序設計》 3.2.3 刨根問底:核心線程池的內部是實現 java
java api 提供了幾種工廠方法建立線程池 算法
public class ExecutorServiceTest { public static void main(String[] args) { // 可根據實際狀況調整線程數量的線程池 ExecutorService executorService1 = Executors.newCachedThreadPool(); //固定大小的線程池 ExecutorService executorService2 = Executors.newFixedThreadPool(1); // 線程池大小爲1 ExecutorService executorService3 = Executors.newSingleThreadExecutor(); } }
查看底層代碼的話,發現其均使用了ThreadPoolExecutor實現。api
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
因此好好來看一下ThreadPoolExecutor的代碼吧,下面是ThreadPoolExecutor的一個構造函數。併發
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
corePoolSize :指定了線程中的核心線程數量函數
maximumPoolSize:指定了線程池中的最大線程數量高併發
keepAliveTime:當線程池中線程數量超過corePoolSize時,多餘的空閒線程的存活時間。即超過corePoolSize的空閒線程,在多長時間,會被銷燬。性能
unit: keepAliveTime 的單位。ui
workQueue:任務隊列,被提交但還沒有被執行的任務。this
threadFactory:線程工廠,用於建立線程,通常用默認的便可。atom
handler:拒絕策略,當任務太多來不及處理的時候,如何拒絕任務。
參數詳解:
workQueue 指被提交但未執行的任務隊列,它是一個BlockingQueue接口的對象,僅用於存放Runnable對象,如下介紹幾種BlockingQueue。
直接提交的隊列:SynchronousQueue 對象提供。SynchronousQueue是一個特殊的BlockingQueue。SynchronousQueue沒有容量,每個插入操做都要等待一個相應的刪除操做,反之,每個刪除操做都要等待一個插入操做。若是使用SynchronousQueue,提交的任務不會被真實的保存,而老是將新任務提交給線程執行,若是沒有空閒的線程,則嘗試建立新的線程,若是線程數量已經達到最大值,則執行拒絕策略。所以,若是使用SynchronousQueue,一般要設置很大的maximumPoolSize值,不然很容易執行拒絕策略。
有界的隊列:有界的隊列可使用ArrayBlockingQueue實現。ArrayBlockingQueue的構造函數必須帶一個容量參數,表示該隊列的最大容量。當使用有界隊列時,如有新的任務執行,若是線程數量小於corePoolSize ,則優先建立新的線程,若大於corePoolSize,則會將任務加入隊列中。若隊列已滿,沒法加入,則在總線程數不大於maximumPoolSize的前提下建立新的線程執行任務,若大於maximumPoolSize時,則執行拒絕策略。可見,有界隊列僅當在任務裝滿時,纔可能將線程數量提高到corePoolSize以上。
無界的隊列:無界的隊列能夠經過LinkedBlockingQueue 實現,與有界隊列相比,除非系統資源耗盡,不然無界隊列不存在任務入隊失敗的狀況,當有新的任務到來,系統的線程數小於corePoolSize時,線程池會生成新的線程執行任務,但當系統的線程數達到corePoolSize後,就不會繼續增長。若後續仍有新的任務加入,而又有空閒的線程資源,則任務直接進入隊列等待。若任務建立和處理的速度差別很大,誤解隊列會保持快速增加,直到耗盡系統內存。
優先任務隊列:優先隊列是帶有執行優先隊列級的隊列,它經過PriorityBlockingQueue實現,能夠控制任務執行的前後順序。它是一個無界隊列。不管是有界隊列ArrayBlockingQueue,仍是未指定大小的LinkedBlockingQueue都是按照先進先出算法處理任務的。而PriorityBlockQueue則能夠根據任務自身的優先級順序執行,在確保系統性能的同時,也能有很好的質量保證。
拒絕策略:
AbortPolicy:該策略會拋出異常,阻止系統正常工做。
CallerRunsPolicy: 只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是,任務提交線程的性能極有可能會急劇降低。
DiscardOledestPolicy:該策略將丟棄一個最老的請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務,
DiscardPolicy:該策略默默丟棄沒法處理的任務,不予任何處理。若是容許任務丟失,這多是最好的一種處理方案了。
ThreadPoolExecutor 線程池的核心調度代碼
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
是否小於corePoolSize,若是小於,分配線程執行。若是大於corePoolSize,則提交到隊列中,若是提交到隊列失敗,則將任務直接提交給線程池。若是當前線程數已經達到maximumPoolSize,則提交失敗,直接拒絕策略。
在這裏有一個疑問,當隊列已滿時,提交的任務直接提交給線程池執行,那麼這樣的話是不是 後面提交的任務比先提交的任務先執行了嗎?
阿里巴巴java開發手冊強制不容許使用Executors去建立線程池,就是因爲Executors建立的線程中對 線程的最大數量或者工做隊列的長度設置爲Integer.MAX_VALUE,會致使OOM。