基於多核CPU的發展,使得多線程開發日趨流行。然而線程的建立和銷燬,都涉及到系統調用,比較消耗系統資源,因此就引入了線程池技術,避免頻繁的線程建立和銷燬。html
在Java用有一個Executors工具類,能夠爲咱們建立一個線程池,其本質就是new了一個ThreadPoolExecutor對象。java
建議使用較爲方便的 Executors 工廠方法來建立線程池。緩存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
線程池的執行流程圖 多線程
任務被提交到線程池,會先判斷當前線程數量是否小於corePoolSize,若是小於則建立線程來執行提交的任務,不然將任務放入workQueue隊列,若是workQueue滿了,則判斷當前線程數量是否小於maximumPoolSize,若是小於則建立線程執行任務,不然就會調用handler,以表示線程池拒絕接收任務。框架
public class SchedulePoolDemo { public static void main(String[] args){ ScheduledExecutorService service = Executors.newScheduledThreadPool(10); //若是前面的任務沒有完成, 調度也不會啓動 service.scheduleAtFixedRate(()->{ try { Thread.sleep(2000);// 每兩秒打印一次. System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 2, TimeUnit.SECONDS); } }
使用Executors來建立要注意潛在宕機風險.其返回的線程池對象的弊端以下:ide
綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來建立線程池, 具體構造函數配置以下:函數
通常根據任務類型進行區分, 假設CPU爲N核工具
主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不一樣的排隊隊列.oop
線程池提供了一些回調方法, 具體使用以下所示.源碼分析
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行任務: " + r.toString()); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("結束任務: " + r.toString()); } @Override protected void terminated() { System.out.println("線程池退出"); } };
能夠在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短期等等, 還有一些其餘的屬性以下:
線程池滿負荷運轉後, 由於時間空間的問題, 可能須要拒絕掉部分任務的執行.
jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("reject task: " + r.toString()); } });
線程工廠用於建立池裏的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.或者統一指定線程優先級, 設置名稱等等.
class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger threadIndex = new AtomicInteger(0); private final String baseName; private final boolean daemon; public NamedThreadFactory(String baseName) { this(baseName, true); } public NamedThreadFactory(String baseName, boolean daemon) { this.baseName = baseName; this.daemon = daemon; } public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement()); thread.setDaemon(this.daemon); return thread; } }
跟直接new Thread不同, 局部變量的線程池, 須要手動關閉, 否則會致使線程泄漏問題. 默認提供兩種方式關閉線程池. - shutdown: 等全部任務, 包括阻塞隊列中的執行完, 纔會終止, 可是不會接受新任務. - shutdownNow: 當即終止線程池, 打斷正在執行的任務, 清空隊列.
ctl是ThreadPoolExecutor的一個重要屬性,它記錄着ThreadPoolExecutor的線程數量和線程狀態。
//Integer有32位,其中前三位用於記錄線程狀態,後29位用於記錄線程的數量. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //表示用於記錄線程數量的位數 private static final int COUNT_BITS = Integer.SIZE - 3; //將1左移COUNT_BITS位再減1,表示能表示的最大線程數 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//用ctl前三位分別表示線程池的狀態 //(前三位爲111)接受新任務而且處理已經進入隊列的任務 private static final int RUNNING = -1 << COUNT_BITS; //(前三位爲000)不接受新任務,可是處理已經進入隊列的任務 private static final int SHUTDOWN = 0 << COUNT_BITS; //(前三位001)不接受新任務,不處理已經進入隊列的任務,而且中斷正在執行的任務 private static final int STOP = 1 << COUNT_BITS; //(前三位010)全部任務執行完成,workerCount爲0。線程轉到了狀態TIDYING會執行terminated()鉤子方法 private static final int TIDYING = 2 << COUNT_BITS; //(前三位011)任務已經執行完成 private static final int TERMINATED = 3 << COUNT_BITS;
//狀態值就是隻關心前三位的值,因此把後29位清0 private static int runStateOf(int c) { return c & ~CAPACITY; } //線程數量就是隻關心後29位的值,因此把前3位清0 private static int workerCountOf(int c) { return c & CAPACITY; } //兩個數相或 private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //判斷當前活躍線程數是否小於corePoolSize if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))//調用addWorker建立線程執行任務 return; c = ctl.get(); } //若是不小於corePoolSize,則將任務添加到workQueue隊列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();//再次獲取ctl的狀態 //若是不在運行狀態了,那麼就從隊列中移除任務 if (! isRunning(recheck) && remove(command)) reject(command); //若是在運行階段,可是Worker數量爲0,調用addWorker方法 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //嘗試建立非核心線程若是建立失敗就會調用reject拒絕接受任務。 else if (!addWorker(command, false)) reject(command); }
//調用handler的rejectedExecution(command,this)方法。handler是RejectedExecutionHandler接口,默認實現是AbortPolicy final void reject(Runnable command) { handler.rejectedExecution(command, this); }
addWorker方法用於建立線程,而且經過core參數表示該線程是不是核心線程,若是返回true則表示建立成功,不然失敗。addWorker的代碼以下所示:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//獲得線程池的運行狀態 // rs>=SHUTDOWN爲false,即線程池處於RUNNING狀態. // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()這個條件爲true,也就意味着三個條件同時知足,即線程池狀態爲SHUTDOWN且firstTask爲null且隊列不爲空,這種狀況爲處理隊列中剩餘任務。上面提到過當處於SHUTDOWN狀態時,不接受新任務,可是會處理完隊列裏面的任務。若是firstTask不爲null,那麼就屬於添加新任務;若是firstTask爲null,而且隊列爲空,那麼就不須要再處理了。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || //若是建立的是非核心線程(core=false)時,則須要判斷當前線程數wc>=maximumPoolSize,若是返回false,建立非核心線程失敗。 //若是建立的是核心線程(core=true)時,則須要判斷當前線程數wc>=corePoolSize,若是返回false,建立核心線程失敗。 wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//worker+1執行成功,那麼跳出外循環 break retry; c = ctl.get(); if (runStateOf(c) != rs)//再次判斷當前狀態,若是新獲取的狀態和當前狀態不一致,則再次進入外循環 continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* 一旦跳出外循環,表示能夠建立建立線程,這裏具體是Worker對象,Worker實現了Runnable接口而且繼承AbstractQueueSynchronizer,內部維持一個Runnable的隊列。try塊中主要就是建立Worker對象,而後將其保存到workers中,workers是一個HashSet,表示工做線程的集合。而後若是添加成功,則開啓Worker所在的線程。若是開啓線程失敗,則調用addWorkerFailed方法,addWokerFailed用於回滾worker線程的建立。 */ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //以firstTask做爲Worker的第一個任務建立Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//對整個線程池加鎖 try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//啓動啓動這個線程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
private void addWorkerFailed(Worker w) { //對整個線程成績加鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //移除Worker對象 if (w != null) workers.remove(w); //減少worker數量 decrementWorkerCount(); //檢查termination狀態 tryTerminate(); } finally { mainLock.unlock(); } }
addWorkerFailed首先從workers集合中移除線程,而後將wokerCount減1,最後檢查終結。
tryTerminate()方法用於檢查是否有必要將線程池狀態轉移到TERMINATED。
final void tryTerminate() { for (;;) { int c = ctl.get(); /* 狀態判斷,若是有符合如下條件之一。則跳出循環 (1)線程池處於RUNNING狀態 (2)線程池狀態處於TIDYING狀態 (3)線程池狀態處於SHUTDOWN狀態而且隊列不爲空 若是不知足上述的狀況,那麼目前狀態屬於SHUTDOWN切隊列爲空,或者狀態屬於STOP,那麼調用interruptIdleWorkers方法中止一個Worker線程,而後退出。 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } /* 若是沒有退出循環的話,那麼就首先將狀態設置成TIDYING,而後調用terminated方法,最後設置狀態爲TERMINATED。terminated方法是個空實現,用於當線程池終結時處理一些事情。 */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
Worker繼承自AbstractQueuedSynchronizer並實現Runnbale接口。AbstractQueuedSynchronizer提供了一個實現阻塞鎖和其餘同步工具,好比信號量、事件等依賴於等待隊列的框架。Worker的構造方法中會使用threadFactory構造線程變量並持有run方法調用了runWorker方法,將線程委託給主循環線程。
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask;//設置該線程的 this.thread = getThreadFactory().newThread(this);//建立一個線程 } //當我咱們啓動一個線程時就會觸發Worker中的此方法 public void run() { runWorker(this); }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//首次任務是建立Worker時添加的任務 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //線程調用runWoker,會while循環調用getTask方法從workerQueue裏讀取任務,而後執行任務。只要getTask方法不返回null,此線程就不會退出。 while (task != null || (task = getTask()) != null) { w.lock();//對Worker加鎖 //若是線程池中止了,那麼中斷線程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//空實現,任務運行以前的操做 Throwable thrown = null; try { task.run();//執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//空實現,任務運行以後的操做 } } finally { task = null;//任務執行完畢後,將task設置爲null w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //必要時檢查隊列是否爲空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //判斷是否容許超時,wc>corePoolSize則是判斷當前線程數是否大於corePoolSize。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //若是當前線程數大於corePoolSize, //則會調用workQueue的poll方法獲取任務,超時時間是keepAliveTime。 //若是超過keepAliveTime時長,poll返回了null, //上邊提到的while循序就會退出,線程也就執行完了。 //若是當前線程數小於corePoolSize, //則會調用workQueue的take方法阻塞當前線程,不會退出 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
參考地址: