通常在生產環境中,咱們都不會直接new一個Thread,而後再去start(),由於這麼作會不斷頻繁的建立線程,銷燬線程,過大的線程會耗盡CPU和內存資源,大量的垃圾回收,也會給GC帶來壓力,延長GC停頓時間.java
一、固定大小線程池算法
public class ThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0;i < 10;i++) { es.submit(task); } es.shutdown(); } }
運行結果:併發
1539134496389:Thread ID:11
1539134496389:Thread ID:12
1539134496389:Thread ID:13
1539134496389:Thread ID:14
1539134496389:Thread ID:15
1539134497390:Thread ID:14
1539134497390:Thread ID:12
1539134497390:Thread ID:15
1539134497390:Thread ID:13
1539134497390:Thread ID:11框架
結果解讀:運行結果並非一次刷出來的,而是刷出了5個,中間會停頓1秒,再刷出5個,說明,並行處理是5個線程執行一次,而後再並行處理5個。ide
將Executors.newFixedThreadPool改爲Executors.newCachedThreadPool()性能
public class ThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0;i < 10;i++) { es.submit(task); } es.shutdown(); } }
結果相同,可是是同時並行處理的,中間沒有停頓,說明newCachedThreadPool()是根據須要來分配線程數的。this
二、計劃任務線程
newScheduledThreadPool()有兩個方法來調用線程對象,scheduleAtFixedRate()跟scheduleWithFixedDelay().他們之間的差異就是scheduleAtFixedRate()總共只佔用調度時間,而scheduleWithFixedDelay()佔用的是線程執行時間加調度時間.但若是scheduleAtFixedRate()的線程執行時間大於調度時間,也不會出現重複調度(即一個線程尚未執行完,另一個線程會啓動),而是一個線程執行完,另外一個線程立刻啓動.3d
public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { public void run() { try { long start = System.currentTimeMillis(); Thread.sleep(2000); System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName()); }catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } }
運行結果(部分截取)對象
2001:pool-1-thread-1
2000:pool-1-thread-1
2000:pool-1-thread-2
2000:pool-1-thread-1
2001:pool-1-thread-3
2000:pool-1-thread-2
結果解讀:儘管有時間調度,他們依然是不一樣的線程來運行的,每顯示一條中間停頓2秒(線程運行時間也是2秒)
public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleWithFixedDelay(new Runnable() { public void run() { try { long start = System.currentTimeMillis(); Thread.sleep(2000); System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName()); }catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } }
運行結果與以前相同,可是每顯示一條的時間間隔爲4秒(線程運行時間依然爲2秒),其中2秒爲調度時間,2秒爲運行時間.
三、核心線程池的內部實現。
其實不管是Executors工廠的哪一種實現,都是調用了同一個類ThreadPoolExecutor,使用了不一樣的構造參數罷了.不一樣的構造參數能夠產生不一樣種類的線程池,所以咱們也能夠自定義線程池.
JDK實現
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
拒絕策略
當線程池任務數量超過系統實際承載能力時,能夠啓用拒絕策略。
直接中斷策略
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
ThreadPoolExecutor()的最後一個參數爲中斷策略,上面的new ThreadPoolExecutor.AbortPolicy()爲直接中斷!
參數說明:
第一個參數corePoolSize:指定了線程池中的線程數量.
第二個參數maximumPoolSize:指定了線程池中的最大線程數量.
第三個參數KeepAliveTime:當線程池線程數量超過了corePoolSize時,多餘的空閒線程的存活時間.即超過corePoolSize的空閒線程,在多長時間內會被銷燬.
第四個參數unit:keepAliveTime的單位.
第五個參數workQueue:任務隊列,被提交但還沒有被執行的任務.
1,直接提交的隊列:SynchronousQueue,無容量,每個插入操做都要等待一個刪除操做,提交的任務不會被真實保存,老是將新任務提交給線程執行.若是沒有空閒進程,則嘗試建立新的進程.若是進程數量達到最大,則執行拒絕策略.
2,有界的任務隊列:ArrayBlockingQueue,必須帶一個容量參數,表示該隊列的最大容量.當線程池的實際線程數小於corePoolSize,會優先建立新的線程,若大於corePoolSize,則會將新任務加入到等待隊列.若等待隊列滿的時候,沒法加入,則在總線程數不大於maximumPoolSize的前提下,建立新的進程執行任務.若大於maximumPoolSize,執行拒絕策略.
3,無界的任務隊列:LinkedBlockingQueue,除非系統資源耗盡,不存在任務入隊失敗的狀況.當線程池的實際線程數小於corePoolSize,會優先建立新的線程,若大於corePoolSize,則會將新任務加入到等待隊列,若任務的建立和處理的速度差別很大,無界隊列會保持快速增加,直到耗盡系統內存.
4,優先任務隊列:PriorityBlockingQueue,能夠控制任務執行的前後順序.是一個特殊的無界隊列.不管是ArrayBlockingQueue仍是LinkedBlockingQueue都是按照先進先出算法處理任務的,而PriorityBlockingQueue則能夠根據任務自身的優先級順序前後執行,老是確保高優先級的任務先執行.
第六個參數threadFactory:線程工廠,用於建立線程,通常用默認的便可.
第七個參數handler:拒絕策略,當任務太多,來不及處理,如何拒絕任務.
運行結果
1539153799420:Thread ID:11
1539153799430:Thread ID:12
1539153799440:Thread ID:13
1539153799450:Thread ID:14
1539153799460:Thread ID:15
1539153799520:Thread ID:11
1539153799530:Thread ID:12
1539153799540:Thread ID:13
1539153799550:Thread ID:14
1539153799560:Thread ID:15
1539153799621:Thread ID:11
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 5, active threads = 5, queued tasks = 9, completed tasks = 6]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.guanjian.RejectThreadPoolDemo.main(RejectThreadPoolDemo.java:31)
結果解讀:因爲併發線程數量太大,Integer.MAX_VALUE,咱們線程池的最大線程數只有5個,而無界任務隊列LinkedBlockingQueue<Runnable>只有10個,沒法知足快速的線程數量增加,拒絕策略發揮做用,拋出異常,阻止系統正常工做.
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
new ThreadPoolExecutor.CallerRunsPolicy()只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務,但性能極有可能會急劇降低.
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
ThreadPoolExecutor.DiscardOldestPolicy()該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務.
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
ThreadPoolExecutor.DiscardPolicy()丟棄沒法處理的任務,不予任何處理.
自定義拒絕策略
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is discard"); } }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
運行結果:
1539159379178:Thread ID:11
1539159379187:Thread ID:12
1539159379197:Thread ID:13
1539159379207:Thread ID:14
1539159379217:Thread ID:15
1539159379279:Thread ID:11
1539159379288:Thread ID:12
1539159379301:Thread ID:13
1539159379308:Thread ID:14
1539159379318:Thread ID:15
1539159379379:Thread ID:11
1539159379388:Thread ID:12
1539159379401:Thread ID:13
java.util.concurrent.FutureTask@45ee12a7 is discard
1539159379408:Thread ID:14
1539159379418:Thread ID:15
java.util.concurrent.FutureTask@330bedb4 is discard
java.util.concurrent.FutureTask@2503dbd3 is discard
java.util.concurrent.FutureTask@4b67cf4d is discard
java.util.concurrent.FutureTask@7ea987ac is discard
這裏只是比ThreadPoolExecutor.DiscardPolicy()多了打印出丟棄的任務.
自定義線程建立
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); System.out.println("create " + t); return t; } }); for (int i = 0; i < 5; i++) { es.submit(task); } Thread.sleep(2000); } }
就是能夠本身定義線程,如守護線程等等
運行結果:
create Thread[Thread-0,5,main]
create Thread[Thread-1,5,main]
create Thread[Thread-2,5,main]
create Thread[Thread-3,5,main]
create Thread[Thread-4,5,main]
1539159694414:Thread ID:11
1539159694414:Thread ID:12
1539159694414:Thread ID:13
1539159694414:Thread ID:14
1539159694414:Thread ID:15
擴展線程池
線程池能夠擴展出線程執行前,執行後,終止的後續處理
public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } public void run() { System.out.println("正在執行" + ":Thread ID" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行:" + ((MyTask)r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執行完成:" + ((MyTask)r).name); } @Override protected void terminated() { System.out.println("線程池退出!"); } }; for (int i = 0;i < 5;i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); } }
運行結果:
準備執行:TASK-GEYM-0
正在執行:Thread ID11,Task Name=TASK-GEYM-0
準備執行:TASK-GEYM-1
正在執行:Thread ID12,Task Name=TASK-GEYM-1
準備執行:TASK-GEYM-2
正在執行:Thread ID13,Task Name=TASK-GEYM-2
準備執行:TASK-GEYM-3
正在執行:Thread ID14,Task Name=TASK-GEYM-3
準備執行:TASK-GEYM-4
正在執行:Thread ID15,Task Name=TASK-GEYM-4
執行完成:TASK-GEYM-0
執行完成:TASK-GEYM-1
執行完成:TASK-GEYM-2
執行完成:TASK-GEYM-3
執行完成:TASK-GEYM-4
線程池退出!
在線程池中尋找堆棧
有時候線程執行時會出現Bug,拋出異常,若是使用submit()來提交線程時,不會打印異常信息,而使用execute()來執行線程時能夠打印異常信息.
public class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } public void run() { double re = a / b; System.out.println(re); } public static void main(String[] args) { // ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS, // new SynchronousQueue<Runnable>()); ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for (int i = 0;i < 5;i++) { pools.submit(new DivTask(100,i)); } } }
這段代碼中,5個併發線程會有一個線程有除0錯誤
運行結果:
100.0
50.0
33.0
25.0
結果沒有任何提示,異常拋出.
public class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } public void run() { double re = a / b; System.out.println(re); } public static void main(String[] args) { // ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS, // new SynchronousQueue<Runnable>()); ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for (int i = 0;i < 5;i++) { pools.execute(new DivTask(100,i)); } } }
運行結果:
100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
50.0
33.0
at com.guanjian.DivTask.run(DivTask.java:18)
25.0
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
有異常拋出
重寫跟蹤線程池,自定義跟蹤
public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable task) { super.execute(wrap(task,clientTrace(),Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) { return new Runnable() { public void run() { try { task.run(); } catch (Exception e) { clientStack.printStackTrace(); try { throw e; } catch (Exception e1) { e1.printStackTrace(); } } } }; } }
public class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } public void run() { double re = a / b; System.out.println(re); } public static void main(String[] args) { ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()); // ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS, // new SynchronousQueue<Runnable>()); for (int i = 0;i < 5;i++) { pools.execute(new DivTask(100,i)); } } }
運行結果:
100.0
java.lang.Exception: Client stack trace
50.0
at com.guanjian.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:27)
33.0
25.0
at com.guanjian.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:18)
at com.guanjian.DivTask.main(DivTask.java:28)
java.lang.ArithmeticException: / by zero
at com.guanjian.DivTask.run(DivTask.java:18)
at com.guanjian.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
這樣咱們就能夠知道是在哪裏出了錯.
四、分而治之,Fork/Join框架
將一個大任務拆分紅各類較小規模的任務,進行並行處理,也許按照約定條件拆分的任務仍是大於約定條件就繼續拆分.有兩種線程類型,一種是有返回值的RecursiveTask<T>,一種是沒有返回值的RecursiveAction,他們都繼承於ForkJoinTask<>,一個帶泛型<T>,一個是<Void>.
/** * Created by Administrator on 2018/10/11. * 能夠理解成一個Runnable(線程類) */ public class CountTask extends RecursiveTask<Long> { private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { this.start = start; this.end = end; } /** * 能夠理解成run()方法 * @return */ @Override protected Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; //最終計算,全部的最終拆分都是在這裏計算 if (canCompute) { for (long i = start;i <= end;i++) { sum += i; } }else { //並行計算的規模,拆分紅100個並行計算 long step = (start + end) /100; //建立子任務線程集合 List<CountTask> subTasks = new ArrayList<CountTask>(); //每一個並行子任務的開始值 long pos = start; //並行執行100個分叉線程 for (int i = 0;i < 100;i++) { //每一個並行子任務的結束值 long lastOne = pos + step; if (lastOne > end) { lastOne = end; } //創建一個子任務的線程 CountTask subTask = new CountTask(pos,lastOne); //建立下一個並行子任務的開始值 pos += step + 1; //將當前子任務線程添加到線程集合 subTasks.add(subTask); //執行該線程,實際上是一個遞歸,判斷lastOne-pos是否小於THRESHOLD,小於則真正執行,不然繼續分叉100個子線程 subTask.fork(); } for (CountTask t:subTasks) { //阻斷每一次分叉前的上一級線程進行等待,並將最終並行的結果進行層層累加 sum += t.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0,200000L); ForkJoinTask<Long> result = forkJoinPool.submit(task); try { long res = result.get(); System.out.println("sum: " + res); }catch (InterruptedException e) { e.printStackTrace(); }catch (ExecutionException e) { e.printStackTrace(); } } }
運行結果:
sum: 20000100000