總結:線程池的特色是,在線程的數量=corePoolSize後,僅任務隊列滿了以後,纔會從任務隊列中取出一個任務,而後構造一個新的線程,循環往復直到線程數量達到maximumPoolSize執行拒絕策略。java
若是觀察jdk提供的各類線程池的源碼實現能夠發現,除了jdk8新增的線程池newWorkStealingPool之外,都是基於對ThreadPoolExecutor的封裝實現,因此首先講解ThreadPoolExecutor的具體功能。算法
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:指定線程池中線程數量編程
maximumPoolSize:最大線程數量併發
keepAliveTime:線程數量超過corePoolSize時,多於的空閒線程的存活時間(超過這段時間,該空閒線程會被銷燬)。性能
unit:keepAliveTime的時間單位優化
workQueue:任務隊列,提交可是未被執行的任務ui
threadFactory:建立線程的線程工廠,默認便可this
ExecutorService es = new ThreadPoolExecutor(3, 8, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("discard"); } });
CallerRunsPolicy:該策略直接在調用者線程中運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是任務提交線程的性能極有可能會急劇降低。
atom
DiscardOldestPolicy:將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。
spa
DiscardPolicy:默默丟棄沒法處理的任務,不予任何處理。若是容許任務丟失,這多是最好的一種解決方案。在線程池不空閒的時候,提交的任務都將丟棄,當有空閒的線程時提交的任務會執行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 直接在調用者線程中運行當前被丟棄的任務,要注意這裏是調用Runnable的run()方法,而不是start()方法啓動線程,run()以普通方法的形式在主線程中執行任務,會阻塞
* 後面es.submit(new MyTask(i))方法的執行 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
總結:AbortPolicy策略下,咱們要catch異常,這樣咱們能夠捕獲到哪些任務被丟棄了。若是採用其餘的策略,丟棄的任務沒法定位的,只能經過下列程序中es.submit(new MyTask(i));任務以前打印該任務,運行任務的run()邏輯是,在打印任務信息,兩處日誌比對來定位哪些任務被丟棄了。
public class MyTask implements Runnable { private int number; public MyTask(int number) { super(); this.number = number; } public void run() { System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, // new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, // new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, // new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()); ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()); for(int i=0;i<10000;i++) { try { System.out.println(i); es.submit(new MyTask(i)); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); System.out.println("------------------------"+i); } } }
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *若是少於corePoolSize線程正在運行,首先嚐試用給定的命令啓動一個新的線程任務。 自動調用addWorker檢查runState和workerCount, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *若是任務能夠成功排隊,那麼咱們仍然須要 仔細檢查咱們是否應該添加一個線程 (由於現有的自從上次檢查後死亡)或者那個 自進入該方法以來,該池關閉。 因此咱們 從新檢查狀態,若是有必要的話回滾隊列 中止,或者若是沒有的話就開始一個新的線程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);//隊列滿了,執行拒絕策略 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } final void reject(Runnable command) { handler.rejectedExecution(command, this);//這裏就是調用咱們傳入的拒絕策略對象的方法 } /** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
任務隊列爲LinkedBlockingQueue中(長度無限),線程數量和最大線程數量相同。功能參考前面的任務隊列總結。
ExecutorService es=Executors.newFixedThreadPool(5);//參數同時指定線程池中線程數量爲5,最大線程數量爲5public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService es=Executors.newSingleThreadExecutor();//線程池中線程數量和最大線程數量均爲1.public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService es=Executors.newCachedThreadPool(); //指定線程池中線程數量爲0,最大線程數量爲Integer.MAX_VALUE,任務隊列爲SynchronousQueuepublic static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
任務隊列爲new DelayedWorkQueue(),返回的對象在ExecutorService接口上擴展了在指定時間執行某認爲的功能,在某個固定的延時以後執行或週期性執行某個任務。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
至關於newScheduledThreadPool(int corePoolSize)中corePoolSize設置爲1。
ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();
延遲線程池
class MyScheduledTask implements Runnable { private String tname; public MyScheduledTask(String tname) { this.tname=tname; } public void run() { System.out.println(tname+"任務時延2秒執行!!!"); } } public class intsmaze { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool =Executors.newScheduledThreadPool(2); MyScheduledTask mt1=new MyScheduledTask("MT1"); scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS); } }
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//建立指定數量的線程池來執行給定的並行級別,還會使用多個隊列減小競爭 public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//前一個方法的簡化,若是當前機器有4個CPU,則目標的並行級別被設置爲4。
Runtime.getRuntime().availableProcessors()
取得能夠CPU數量。