java.util.concurrent.Executor : 負責線程的使用與調度的根接口
其餘的建立完線程池後,使用 threadPool.execute(new Runnable())方式執行任務。數組
public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 表示延遲3秒執行 scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } // 表示延遲1秒後每3秒執行一次 scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); }
查看Executors源碼咱們知道,Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實現,也就是上面所說的四種Executors線程池,可是 ThreadPoolExecutor 提供的功能遠不止於此。
不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池
咱們能夠在建立 ThreadPoolExecutor 實例時指定活動線程的數量,咱們也能夠限制線程池的大小而且建立咱們本身的 RejectedExecutionHandler 實現來處理不能適應工做隊列的工做。
Executors 源碼:併發
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } }
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
四、還有一大波get的方法, 能夠獲取與線程池相關屬性的方法。線程
volatile int runState; // 前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性 static final int RUNNING = 0; // 當建立線程池後,初始時,線程池處於RUNNING狀態 static final int SHUTDOWN = 1; 若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢 static final int STOP = 2; // 若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務; static final int TERMINATED = 3; // 當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(好比線程池大小、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工做集 private volatile long keepAliveTime; //線程存貨時間 private volatile boolean allowCoreThreadTimeOut; //是否容許爲核心線程設置存活時間 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數, 當線程數大於corePoolSize時,建立新的先線程,可是建立新的線程數 + corePoolSize不能大於maximumPoolSize private volatile int poolSize; //線程池中當前的線程數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程 private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 private long completedTaskCount; //用來記錄已經執行完畢的任務個數
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
addWorker() 添加任務, 建立Worker, Worker 繼承 AbstractQueuedSynchronizer 實現 Runnable
w = new Worker(firstTask); final Thread t = w.thread; // 從worker取得線程 if (workerAdded) { t.start(); // worker添加成功,執行任務 workerStarted = true; }
// 初始化一個核心線程; public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } // 初始化全部核心線程 public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
前面已經講過, 當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
hreadPoolExecutor 提供了一些方法,咱們可使用這些方法來查詢 executor 的當前狀態,線程池大小,活動線程數量以及任務數量。所以我是用來一個監控線程在特定的時間間隔內打印 executor 信息。
public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){; } @Override public void run() { while(run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這裏是使用 ThreadPoolExecutor 的線程池實現例子。
public class WorkerPool { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for(int i=0; i<10; i++){ executorPool.execute(new WorkerThread("cmd"+i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }
意在初始化 ThreadPoolExecutor 時,咱們保持初始池大小爲 2,最大池大小爲 4 而工做隊列大小爲 2。所以若是已經有四個正在執行的任務而此時分配來更多任務的話,工做隊列將僅僅保留他們(新任務)中的兩個,其餘的將會被 RejectedExecutionHandlerImpl 處理。
遵循兩原則: 一、若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1 二、若是是IO密集型任務,參考值能夠設置爲2*NCPU 固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。