1、Executors的API介紹
Java類庫提供了許多靜態方法來建立一個線程池:java
a、newFixedThreadPool 建立一個固定長度的線程池,當到達線程最大數量時,線程池的規模將再也不變化。
b、newCachedThreadPool 建立一個可緩存的線程池,若是當前線程池的規模超出了處理需求,將回收空的線程;當需求增長時,會增長線程數量;線程池規模無限制。
c、 newSingleThreadPoolExecutor 建立一個單線程的Executor,確保任務對了,串行執行
d、newScheduledThreadPool 建立一個固定長度的線程池,並且以延遲或者定時的方式來執行,相似Timer;緩存
小結一下:在線程池中執行任務比爲每一個任務分配一個線程優點更多,經過重用現有的線程而不是建立新線程,能夠在處理多個請求時分攤線程建立和銷燬產生的巨大的開銷。當請求到達時,一般工做線程已經存在,提升了響應性;經過配置線程池的大小,能夠建立足夠多的線程使CPU達到忙碌狀態,還能夠防止線程太多耗盡計算機的資源。服務器
建立線程池基本方法:併發
(1)定義線程類異步
- class Handler implements Runnable{
- }
(2)創建ExecutorService線程池socket
- ExecutorService executorService = Executors.newCachedThreadPool();
或ide
- int cpuNums = Runtime.getRuntime().availableProcessors(); //獲取當前系統的CPU 數目
- ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE); //ExecutorService一般根據系統資源狀況靈活定義線程池大小
(3)調用線程池操做this
循環操做,成爲daemon,把新實例放入Executor池中spa
- while(true){
- executorService.execute(new Handler(socket));
- // class Handler implements Runnable{
- 或者
- executorService.execute((createTaski));
- //private static Runnable createTask(final int taskID)
- }
execute(Runnable對象)方法其實就是對Runnable對象調用start()方法(固然還有一些其餘後臺動做,好比隊列,優先級,IDLE timeout,active激活等).net
2、幾種不一樣的ExecutorService線程池對象
1.newCachedThreadPool() |
-緩存型池子,先查看池中有沒有之前創建的線程,若是有,就reuse.若是沒有,就建一個新的線程加入池中 -緩存型池子一般用於執行一些生存期很短的異步型任務 所以在一些面向鏈接的daemon型SERVER中用得很少。 -能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。 注意,放入CachedThreadPool的線程沒必要擔憂其結束,超過TIMEOUT不活動,其會自動被終止。 |
2.newFixedThreadPool |
-newFixedThreadPool與cacheThreadPool差很少,也是能reuse就用,但不能隨時建新的線程 -其獨特之處:任意時間點,最多隻能有固定數目的活動線程存在,此時若是有新的線程要創建,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子 -和cacheThreadPool不一樣,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,確定很是長,相似依賴上層的TCP或UDP IDLE機制之類的),因此FixedThreadPool多數針對一些很穩定很固定的正規併發線程,多用於服務器 -從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不一樣: fixed池線程數固定,而且是0秒IDLE(無IDLE) cache池線程數支持0-Integer.MAX_VALUE(顯然徹底沒考慮主機的資源承受能力),60秒IDLE |
3.ScheduledThreadPool |
-調度型線程池 -這個池子裏的線程能夠按schedule依次delay執行,或週期執行 |
4.SingleThreadExecutor |
-單例線程,任意時間池中只能有一個線程 -用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE) |
應用實例:
1.CachedThreadPool
CachedThreadPool首先會按照須要建立足夠多的線程來執行任務(Task)。隨着程序執行的過程,有的線程執行完了任務,能夠被從新循環使用時,纔再也不建立新的線程來執行任務。咱們採用《Thinking In Java》中的例子來分析。客戶端線程和線程池之間會有一個任務隊列。當程序要關閉時,你須要注意兩件事情:入隊的這些任務的狀況怎麼樣了以及正在運行的這個任務執行得如 何了。使人驚訝的是不少開發人員並沒能正確地或者有意識地去關閉線程池。正確的方法有兩種:一個是讓全部的入隊任務都執行完畢(shutdown()), 再就是捨棄這些任務(shutdownNow())——這徹底取決於你。好比說若是咱們提交了N多任務而且但願等它們都執行完後才返回的話,那麼就使用 shutdown():
- import java.util.Date;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 功能概要:緩衝線程池實例-execute運行
- *
- * @author linbingwen
- * @since 2016年5月24日
- */
- class Handle implements Runnable {
- private String name;
- public Handle(String name) {
- this.name = "thread"+name;
- }
- @Override
- public void run() {
- System.out.println( name +" Start. Time = "+new Date());
- processCommand();
- System.out.println( name +" End. Time = "+new Date());
- }
- private void processCommand() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Override
- public String toString(){
- return this.name;
- }
- }
驗證明例:
- public static void testCachedThreadPool() {
- System.out.println("Main: Starting at: "+ new Date());
- ExecutorService exec = Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
- for(int i = 0; i < 10; i++) {
- exec.execute(new Handle(String.valueOf(i)));
- }
- exec.shutdown(); //執行到此處並不會立刻關閉線程池,但以後不能再往線程池中加線程,不然會報錯
- System.out.println("Main: Finished all threads at"+ new Date());
- }
執行結果:

從上面的結果能夠看出:
一、主線程的執行與線程池裏的線程分開,有可能主線程結束了,可是線程池還在運行
二、放入線程池的線程並不必定會按其放入的前後而順序執行
2.FixedThreadPool
FixedThreadPool模式會使用一個優先固定數目的線程來處理若干數目的任務。規定數目的線程處理全部任務,一旦有線程處理完了任務就會被用來處理新的任務(若是有的話)。這種模式與上面的CachedThreadPool是不一樣的,CachedThreadPool模式下處理必定數量的任務的線程數目是不肯定的。而FixedThreadPool模式下最多 的線程數目是必定的。
應用實例:
- public static void testFixThreadPool() {
- System.out.println("Main Thread: Starting at: "+ new Date());
- ExecutorService exec = Executors.newFixedThreadPool(5);
- for(int i = 0; i < 10; i++) {
- exec.execute(new Handle(String.valueOf(i)));
- }
- exec.shutdown(); //執行到此處並不會立刻關閉線程池
- System.out.println("Main Thread: Finished at:"+ new Date());
- }
運行結果:

上面建立了一個固定大小的線程池,大小爲5.也就說同一時刻最多隻有5個線程能運行。而且線程執行完成後就從線程池中移出。它也不能保證放入的線程能按順序執行。這要看在等待運行的線程的競爭狀態了。
三、newSingleThreadExecutor
其實這個就是建立只能運行一條線程的線程池。它能保證線程的前後順序執行,而且能保證一條線程執行完成後纔開啓另外一條新的線程
- public static void testSingleThreadPool() {
- System.out.println("Main Thread: Starting at: "+ new Date());
- ExecutorService exec = Executors.newSingleThreadExecutor(); //建立大小爲1的固定線程池
- for(int i = 0; i < 10; i++) {
- exec.execute(new Handle(String.valueOf(i)));
- }
- exec.shutdown(); //執行到此處並不會立刻關閉線程池
- System.out.println("Main Thread: Finished at:"+ new Date());
- }
運行結果:

其實它也等價於如下:
- ExecutorService exec = Executors.newFixedThreadPool(1);
四、newScheduledThreadPool
這是一個計劃線程池類,它能設置線程執行的前後間隔及執行時間等,功能比上面的三個強大了一些。
如下實例:
- public static void testScheduledThreadPool() {
- System.out.println("Main Thread: Starting at: "+ new Date());
- ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10); //建立大小爲10的線程池
- for(int i = 0; i < 10; i++) {
- exec.schedule(new Handle(String.valueOf(i)), 10, TimeUnit.SECONDS);//延遲10秒執行
- }
- exec.shutdown(); //執行到此處並不會立刻關閉線程池
- while(!exec.isTerminated()){
- //wait for all tasks to finish
- }
- System.out.println("Main Thread: Finished at:"+ new Date());
- }
實現每一個放入的線程延遲10秒執行。
結果:

ScheduledThreadPoolExecutor的定時方法主要有如下四種:

下面將主要來具體講講scheduleAtFixedRate和scheduleWithFixedDelay
scheduleAtFixedRate 按指定頻率週期執行某個任務
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
command:執行線程
initialDelay:初始化延時
period:兩次開始執行最小間隔時間
unit:計時單位
scheduleWithFixedDelay 週期定時執行某個任務/按指定頻率間隔執行某個任務(注意)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
command:執行線程
initialDelay:初始化延時
period:前一次執行結束到下一次執行開始的間隔時間(間隔執行延遲時間)
unit:計時單位
使用實例:
- class MyHandle implements Runnable {
-
- @Override
- public void run() {
- System.out.println(System.currentTimeMillis());
- try {
- Thread.sleep(1 * 1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
1.按指定頻率週期執行某個任務
下面實現每隔2秒執行一次,注意,若是上次的線程尚未執行完成,那麼會阻塞下一個線程的執行。即便線程池設置得足夠大。
- /**
- * 初始化延遲0ms開始執行,每隔2000ms從新執行一次任務
- * @author linbingwen
- * @since 2016年6月6日
- */
- public static void executeFixedRate() {
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
- executor.scheduleAtFixedRate(
- new MyHandle(),
- 0,
- 2000,
- TimeUnit.MILLISECONDS);
- }
間隔指的是連續兩次任務開始執行的間隔。對於scheduleAtFixedRate方法,當執行任務的時間大於咱們指定的間隔時間時,它並不會在指定間隔時開闢一個新的線程併發執行這個任務。而是等待該線程執行完畢。
二、按指定頻率間隔執行某個任務
- /**
- * 以固定延遲時間進行執行
- * 本次任務執行完成後,須要延遲設定的延遲時間,纔會執行新的任務
- */
- public static void executeFixedDelay() {
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
- executor.scheduleWithFixedDelay(
- new MyHandle(),
- 0,
- 2000,
- TimeUnit.MILLISECONDS);
- }
間隔指的是連續上次執行完成和下次開始執行之間的間隔。
3.週期定時執行某個任務
週期性的執行一個任務,可使用下面方法設定天天在固定時間執行一次任務。
- /**
- * 天天晚上9點執行一次
- * 天天定時安排任務進行執行
- */
- public static void executeEightAtNightPerDay() {
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- long oneDay = 24 * 60 * 60 * 1000;
- long initDelay = getTimeMillis("21:00:00") - System.currentTimeMillis();
- initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;
-
- executor.scheduleAtFixedRate(
- new MyHandle(),
- initDelay,
- oneDay,
- TimeUnit.MILLISECONDS);
- }
-
- /**
- * 獲取指定時間對應的毫秒數
- * @param time "HH:mm:ss"
- * @return
- */
- private static long getTimeMillis(String time) {
- try {
- DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
- DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
- Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
- return curDate.getTime();
- } catch (ParseException e) {
- e.printStackTrace();
- }
- return 0;
- }
3、線程池一些經常使用方法
一、submit()
將線程放入線程池中,除了使用execute,也可使用submit,它們兩個的區別是一個使用有返回值,一個沒有返回值。submit的方法很適應於生產者-消費者模式,經過和Future結合一塊兒使用,能夠起到若是線程沒有返回結果,就阻塞當前線程等待線程 池結果返回。
它主要有三種方法:
通常用第一種比較多

以下實例。注意,submit中的線程要實現接口Callable
- package com.func.axc.executors;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
-
- /**
- * 功能概要:緩衝線程池實例-submit運行
- *
- * @author linbingwen
- * @since 2016年5月25日
- */
- class TaskWithResult implements Callable<String> {
- private int id;
-
- public TaskWithResult(int id) {
- this.id = id;
- }
-
- /**
- * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,則該方法自動在一個線程上執行。
- *
- * @return
- * @throws Exception
- */
- public String call() throws Exception {
- System.out.println("call()方法被自動調用,幹活!!! " + Thread.currentThread().getName());
- //一個模擬耗時的操做
- for (int i = 999999; i > 0; i--) ;
- return"call()方法被自動調用,任務的結果是:" + id + " " + Thread.currentThread().getName();
- }
- }
-
- public class ThreadPool2 {
- public static void main(String[] args) {
- ExecutorService executorService = Executors.newCachedThreadPool();
- List<Future<String>> resultList = new ArrayList<Future<String>>();
-
- //建立10個任務並執行
- for (int i = 0; i < 10; i++) {
- //使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
- Future<String> future = executorService.submit(new TaskWithResult(i));
- //將任務執行結果存儲到List中
- resultList.add(future);
- }
- //啓動一次順序關閉,執行之前提交的任務,但不接受新任務。若是已經關閉,則調用沒有其餘做用。
- executorService.shutdown();
-
- //遍歷任務的結果
- for (Future<String> fs : resultList) {
- try {
- System.out.println(fs.get()); //打印各個線程(任務)執行的結果
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- } finally {
-
- }
- }
- }
- }
結果以下:

從上面能夠看到,輸出結果的依次的。說明每次get都 阻塞了的。
看了下它的源碼,其實它最終仍是調用 了execute方法
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
二、execute()
表示往線程池添加線程,有可能會當即運行,也有可能不會。沒法預知線程什麼時候開始,什麼時候線束。
主要源碼以下:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
- }
- }
三、shutdown()
一般放在execute後面。若是調用 了這個方法,一方面,代表當前線程池已再也不接收新添加的線程,新添加的線程會被拒絕執行。另外一方面,代表當全部線程執行完畢時,回收線程池的資源。注意,它不會立刻關閉線程池!
四、shutdownNow()
無論當前有沒有線程在執行,立刻關閉線程池!這個方法要當心使用,要不可能會引發系統數據異常!
總結:
ThreadPoolExecutor中,包含了一個任務緩存隊列和若干個執行線程,任務緩存隊列是一個大小固定的緩衝區隊列,用來緩存待執行的任務,執行線程用來處理待執行的任務。每一個待執行的任務,都必須實現Runnable接口,執行線程調用其run()方法,完成相應任務。 ThreadPoolExecutor對象初始化時,不建立任何執行線程,當有新任務進來時,纔會建立執行線程。 構造ThreadPoolExecutor對象時,須要配置該對象的核心線程池大小和最大線程池大小: 當目前執行線程的總數小於核心線程大小時,全部新加入的任務,都在新線程中處理 當目前執行線程的總數大於或等於核心線程時,全部新加入的任務,都放入任務緩存隊列中 當目前執行線程的總數大於或等於核心線程,而且緩存隊列已滿,同時此時線程總數小於線程池的最大大小,那麼建立新線程,加入線程池中,協助處理新的任務。 當全部線程都在執行,線程池大小已經達到上限,而且緩存隊列已滿時,就rejectHandler拒絕新的任務