java.util.concurrent.Executor : 負責線程的使用與調度的根接口
|–ExecutorService:Executor的子接口,線程池的主要接口
|–ThreadPoolExecutor:ExecutorService的實現類
|–ScheduledExecutorService:ExecutorService的子接口,負責線程的調度
|–ScheduledThreadPoolExecutor:繼承了ThreadPoolExecutor實現了ScheduledExecutorServicejava
其餘的建立完線程池後,使用 threadPool.execute(new Runnable())方式執行任務。數組
public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 表示延遲3秒執行 scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } // 表示延遲1秒後每3秒執行一次 scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); }
查看Executors源碼咱們知道,Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實現,也就是上面所說的四種Executors線程池,可是 ThreadPoolExecutor 提供的功能遠不止於此。
不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池
咱們能夠在建立 ThreadPoolExecutor 實例時指定活動線程的數量,咱們也能夠限制線程池的大小而且建立咱們本身的 RejectedExecutionHandler 實現來處理不能適應工做隊列的工做。
下面咱們就先了解一下ThreadPoolExecutor,而後在看個示例代碼。緩存
Executors 源碼:併發
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } }
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。下面咱們來看一下ThreadPoolExecutor類的具體實現源碼。ide
在ThreadPoolExecutor類中提供了四個構造方法:ui
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。this
構造器中各個參數的含義:atom
一、Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;
二、而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
三、抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法;
四、而後ThreadPoolExecutor繼承了類AbstractExecutorService。spa
一、execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
二、submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
三、shutdown()和shutdownNow()是用來關閉線程池的。
四、還有一大波get的方法, 能夠獲取與線程池相關屬性的方法。線程
volatile int runState; // 前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性 static final int RUNNING = 0; // 當建立線程池後,初始時,線程池處於RUNNING狀態 static final int SHUTDOWN = 1; 若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢 static final int STOP = 2; // 若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務; static final int TERMINATED = 3; // 當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(好比線程池大小、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工做集 private volatile long keepAliveTime; //線程存貨時間 private volatile boolean allowCoreThreadTimeOut; //是否容許爲核心線程設置存活時間 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數, 當線程數大於corePoolSize時,建立新的先線程,可是建立新的線程數 + corePoolSize不能大於maximumPoolSize private volatile int poolSize; //線程池中當前的線程數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程 private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 private long completedTaskCount; //用來記錄已經執行完畢的任務個數
ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
從代碼註釋,咱們知道:
addWorker() 添加任務, 建立Worker, Worker 繼承 AbstractQueuedSynchronizer 實現 Runnable
addWorker()幾個關鍵步驟:
w = new Worker(firstTask); final Thread t = w.thread; // 從worker取得線程 if (workerAdded) { t.start(); // worker添加成功,執行任務 workerStarted = true; }
默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。
在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:
// 初始化一個核心線程; public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } // 初始化全部核心線程 public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
workQueue的類型爲BlockingQueue,一般能夠取下面三種類型:
1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
2)LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
前面已經講過, 當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設置核心池大小
setMaximumPoolSize:設置線程池最大能建立的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。
咱們能夠在建立 ThreadPoolExecutor 實例時指定活動線程的數量,咱們也能夠限制線程池的大小而且建立咱們本身的 RejectedExecutionHandler 實現來處理不能適應工做隊列的工做。
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
hreadPoolExecutor 提供了一些方法,咱們可使用這些方法來查詢 executor 的當前狀態,線程池大小,活動線程數量以及任務數量。所以我是用來一個監控線程在特定的時間間隔內打印 executor 信息。
MyMonitorThread.java
public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){ this.run=false; } @Override public void run() { while(run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這裏是使用 ThreadPoolExecutor 的線程池實現例子。
WorkerPool.java
public class WorkerPool { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for(int i=0; i<10; i++){ executorPool.execute(new WorkerThread("cmd"+i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }
意在初始化 ThreadPoolExecutor 時,咱們保持初始池大小爲 2,最大池大小爲 4 而工做隊列大小爲 2。所以若是已經有四個正在執行的任務而此時分配來更多任務的話,工做隊列將僅僅保留他們(新任務)中的兩個,其餘的將會被 RejectedExecutionHandlerImpl 處理。
遵循兩原則: 一、若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1 二、若是是IO密集型任務,參考值能夠設置爲2*NCPU 固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。