java自1.5版本以後,提供線程池,供開發人員快捷方便的建立本身的多線程任務。下面簡單的線程池的方法及說明。java
一、Executor 緩存
線程池的頂級接口。定義了方法execute(Runnable),該方法接收一個Runnable實例,用來執行一個任務,該任務便是一個實現Runnable接口的類。多線程
此服務方法無返回值,緣由是由於實現Runnable接口的類的run方法是無返回(void)的。框架
經常使用方法 : void execute(execute)jvm
做用 : 啓動並執行線程任務ide
二、ExecutorService工具
繼承自Executor接口,提供了更多的方法調用,例如優雅關閉方法shutdown,有返回值的submit。測試
2.一、ExecutorService生命週期this
運行 - Running 、關閉 - shuttingdown、終止 - terminatedspa
Running : 線程池正在執行中,活動狀態。建立後即進入此狀態
shuttingdown : 優雅關閉,線程池正在關閉中。再也不接收新的線程任務,已有的任務(正在處理的 + 隊列中阻塞的),處理完畢後,關閉線程池。
調用shutdown()方法,即進入此狀態
terminated : 線程池已關閉。
2.二、submit方法
有返回值,Future類型。重載了方法,submit(Runnable)不須要提供返回值。submit(Callable)、submit(Runnable,T)能夠提供線程執行後的結果返回值。
2.三、Future
線程執行完畢結果。獲取線程執行結果是經過get()方法獲取。get()無參,阻塞等待線程執行結束。
get(long timeout, TimeUnit unit)有參,阻塞等待固定時長,超時未獲取,則拋出異常。
2.四、Callable
相似Runnable的一個線程接口。其中的對應run的方法是call方法。此接口提供了線程執行完畢返回值。
package com.cn.cfang.executor; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newFixedThreadPool(1); Data data = new Data(); // Future<Data> future = executorService.submit(new Task(data), data); //runnable Future<Data> future = executorService.submit(new Task1(data)); //callable System.out.println(future.get().getName()); executorService.shutdown(); } } class Data { String name; public String getName() { return name; } public void setName(String name) { this.name = name; } } class Task implements Runnable{ Data data; public Task(Data data) { this.data = data; } @Override public void run() { data.setName("hello world"); } } class Task1 implements Callable<Data>{ Data data; public Task1(Data data) { this.data = data; } @Override public Data call() throws Exception { data.setName("hello world"); return data; } }
三、Executors工具類
提供了不少的工廠方法用於建立線程池,返回的線程池都實現了ExecutorService接口。
線程池屬於進程級的重量級資源,默認的生命週期同JVM一致,當開啓線程池後,直到jvm關閉,是線程池的默認的生命週期。
若是手動調用shutdown方法,可優雅關閉線程池,在當前全部任務執行結束後,關閉線程池。
四、幾種經常使用的線程池
4.一、FixedThreadPool
容量固定的線程池。使用LinkedBlockingQueue做爲任務隊列,當任務數量大於線程池容量的時候,未執行的任務進入任務等待隊列LinkedBlockingQueue中,
當線程有空閒的時候,自動從隊列中取出任務執行。
使用場景: 大多數狀況下,推薦使用的線程池。由於os系統和硬件是有線程上限限制的,不可能去無限的提供線程池操做。
4.二、CachedThreadPool
緩存線程池。容量 0-Integer.MAX_VALUE,自動根據任務數擴容:若是線程池中的線程數不知足任務執行需求,則建立新的線程並添加到池中。
生命週期默認60s,當線程空閒時長到60s的時候,自動終止銷燬釋放線程,移除線程池。
使用場景 : 可用於測試最高負載量,用於對FixedThreadPool容量的參考。
注意,放入CachedThreadPool的線程沒必要擔憂其結束,超過TIMEOUT(默認60s)不活動,其會自動被終止。
4.三、ScheduledThreadPool
定時及週期性的任務執行的線程池,多數狀況下可用來替代Timer類。
public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(3); System.out.println(service); // 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) // runnable - 要執行的任務。
// start_limit - 第一次執行任務的時間間隔
// limit - 屢次任務執行的時間間隔
// timeunit - 時間單位 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS); }
4.四、SingleThreadExceutor 單一容量線程池。
4.五、自定義線程池
自定義線程池,可使用ThreadPoolExecutor類來進行建立管理。線程池中,除了ForkJoinPool外,其餘經常使用的線程池底層,都是使用ThreadPoolExecutor實現的。
參數說明:
corePoolSize:核心線程數,也是最少線程數。在建立線程池時,默認狀況下,是不會建立線程池的,也即此時的線程池中線程數爲0,直到有任務來臨時,纔會去建立線程。固然,手動調用prestartCoreThread()或者prestartAllCoreThreads()方法,能夠初始化建立線程池中的線程。默認狀況下,當有任務來臨時,就會建立新的線程去處理執行,即便此時線程池中有空閒的線程。當線程數達到corePoolSize時,線程數不增長,此時任務會放入等待隊列BlockingQueue中。
workQueue:阻塞隊列,用來存儲等待執行的任務資源。
maximumPoolSize:最大線程數。當阻塞隊列滿了,開始擴充線程池中的線程數。直到達到此最大值的時候。
handler:當線程池中的線程數等於maximumPoolSize的時候,此時再來任務的話,交由此拒絕策略執行。
keepAliveTime:表示的線程在空閒多長時間後會被終止。默認是在線程數大於corePoolSize才生效,也能夠手動設置allowCoreThreadTimeOut()方法讓線程數在不大於 corePoolSize也生效。
public ThreadPoolExecutor( int corePoolSize, //核心容量,建立線程池的時候,默認有多少的線程數。也是最少線程數 int maximumPoolSize, //最大線程數 long keepAliveTime, //線程生命週期,0爲永久。當線程空閒多長時間,自動回收。 TimeUnit unit, //生命週期時間單位。 BlockingQueue<Runnable> workQueue, //任務阻塞隊列。
RejectedExecutionHandler handler
) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
簡單例子:
package com.cn.cfang.executor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Test2 { public static void main(String[] args){ //建立等待隊列 BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); //建立線程池,池中保存的線程數爲3,容許的最大線程數爲5 ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); //建立七個任務 Runnable t1 = new MyThread(); Runnable t2 = new MyThread(); Runnable t3 = new MyThread(); Runnable t4 = new MyThread(); Runnable t5 = new MyThread(); Runnable t6 = new MyThread(); Runnable t7 = new MyThread(); //每一個任務會在一個線程上執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); pool.execute(t7); //關閉線程池 pool.shutdown(); } } class MyThread implements Runnable{ @Override public void run(){ System.out.println(Thread.currentThread().getName() + "正在執行。。。"); try{ Thread.sleep(100); }catch(InterruptedException e){ e.printStackTrace(); } } }
五、forkjoin框架
拆分合並,將一個大的任務,拆分紅若干子任務,並最終彙總子任務的執行結果,獲得大任務的執行結果。並行執行,採用工做竊取機制,更加有效的利用cpu資源。
5.一、主要類
ForkJoinPool : 用於執行Task。任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。
當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘未完成工做線程的隊列的尾部獲取一個任務。
ForkJoinTask:ForkJoin任務,提供在任務中執行fork()和join()操做的機制(二叉分邏輯),一般不直接繼承ForkJoinTask類,
而是繼承抽象子類RecursiveTask(有返回結果) 或者 RecursiveAction (無返回結果)。
ForkJoinWorkerThread:ForkJoinPool 內部的worker thread,用來具體執行ForkJoinTask。內部有 ForkJoinPool.WorkQueue,來保存要執行的 ForkJoinTask。
ForkJoinPool.WorkQueue:保存要執行的ForkJoinTask。
5.二、工做竊取機制
一、大任務分割成N個子任務,爲避免線程競爭,因而分開幾個隊列去保存這些子任務,併爲每一個隊列提供一個工做線程去處理其中的任務。工做線程與任務隊列一一對應。
二、若是A線程執行完本身隊列中的全部任務,若是此時其餘隊列中還有未執行的任務,則A線程會去竊取一個其餘隊列的任務來執行。可是,此時兩個線程同時訪問,
可能會產生競爭問題,因此,任務隊列設計成了雙向隊列。A線程竊取的時候,從另外一端開始執行,儘量的去避免線程競爭問題。
三、工做竊取機制,充分的利用線程資源,並儘量的去避免線程間的競爭問題。可是,只能是儘量避免,並不能規避。例如,雙向隊列只有一個任務。
5.三、簡單使用
例:求和 0 - 10000000000L。
package com.cn.cfang.executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * 步驟: * 一、創建任務類Task,繼承RecursiveTask或者RecursiveAction。須要返回值則選用RecursiveTask,無需返回值選用RecursiveAction * 二、任務類Task,知足必定的閾值,則對子任務進行計算,不知足,則二叉分後,遞歸調用自身 * 三、調用中,新建ForkJoinPool對象,新建任務類對象Task,將任務類對象Task放入ForkJoinPool中執行。 * 若是須要返回值,則能夠invoke或者Future-submit。 * @author cfang * 2018年5月15日 上午10:51:03 */ public class Test3 { public static void main(String[] args) throws Exception{ ForkJoinPool pool = new ForkJoinPool(); ForkJoinWorkTask task = new ForkJoinWorkTask(0l, 10000000l); // Long result = pool.invoke(task); // System.out.println(result); Future<Long> future = pool.submit(task); System.out.println(future.get()); } } class ForkJoinWorkTask extends RecursiveTask<Long>{ private static final long serialVersionUID = 1L; private Long start; //起始 private Long end; //終止 private static final Long THRESHOLD = 10000L; //子任務分割閾值 public ForkJoinWorkTask(Long start, Long end){ this.start = start; this.end = end; } @Override protected Long compute() { Long sum = 0l; if(end - start <= THRESHOLD){ //足夠小的子任務,進行計算求和 for(Long i = start; i < end; i++){ sum += i; } }else{ //任務拆分不知足,繼續拆分(二叉分邏輯) Long middle = (start + end) / 2; ForkJoinWorkTask rightTask = new ForkJoinWorkTask(start, middle); rightTask.fork(); ForkJoinWorkTask leftTask = new ForkJoinWorkTask(middle + 1, end); leftTask.fork(); sum = rightTask.join() + leftTask.join(); } return sum; } }