從使用到原理學習Java線程池

線程池的技術背景

在面向對象編程中,建立和銷燬對象是很費時間的,由於建立一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每個對象,以便可以在對象銷燬後進行垃圾回收。css

因此提升服務程序效率的一個手段就是儘量減小建立和銷燬對象的次數,特別是一些很耗資源的對象建立和銷燬。如何利用已有對象來服務就是一個須要解決的關鍵問題,其實這就是一些」池化資源」技術產生的緣由。html

例如Android中常見到的不少通用組件通常都離不開」池」的概念,如各類圖片加載庫,網絡請求庫,即便Android的消息傳遞機制中的Meaasge當使用Meaasge.obtain()就是使用的Meaasge池中的對象,所以這個概念很重要。本文將介紹的線程池技術一樣符合這一思想。java

線程池的優勢:編程

  • 重用線程池中的線程,減小因對象建立,銷燬所帶來的性能開銷;bash

  • 能有效的控制線程的最大併發數,提升系統資源利用率,同時避免過多的資源競爭,避免堵塞;markdown

  • 可以多線程進行簡單的管理,使線程的使用簡單、高效。網絡

線程池框架Executor

java中的線程池是經過Executor框架實現的,Executor 框架包括類:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。多線程

Executor: 全部線程池的接口,只有一個方法。併發

public interface Executor {        
  void execute(Runnable command);      
}

ExecutorService: 增長Executor的行爲,是Executor實現類的最直接接口。框架

Executors: 提供了一系列工廠方法用於創先線程池,返回的線程池都實現了ExecutorService 接口。

ThreadPoolExecutor:線程池的具體實現類,通常用的各類線程池都是基於這個類實現的。
構造方法以下:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);

}
  • corePoolSize:線程池的核心線程數,線程池中運行的線程數也永遠不會超過 corePoolSize 個,默認狀況下能夠一直存活。能夠經過設置allowCoreThreadTimeOut爲True,此時 核心線程數就是0,此時keepAliveTime控制全部線程的超時時間。

  • maximumPoolSize:線程池容許的最大線程數;

  • keepAliveTime: 指的是空閒線程結束的超時時間;

  • unit :是一個枚舉,表示 keepAliveTime 的單位;

  • workQueue:表示存聽任務的BlockingQueue<Runnable隊列。

  • BlockingQueue:阻塞隊列(BlockingQueue)是java.util.concurrent下的主要用來控制線程同步的工具。若是BlockQueue是空的,從BlockingQueue取東西的操做將會被阻斷進入等待狀態,直到BlockingQueue進了東西纔會被喚醒。一樣,若是BlockingQueue是滿的,任何試圖往裏存東西的操做也會被阻斷進入等待狀態,直到BlockingQueue裏有空間纔會被喚醒繼續操做。
    阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。具體的實現類有LinkedBlockingQueue,ArrayBlockingQueued等。通常其內部的都是經過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒。

線程池的工做過程以下:

  1. 線程池剛建立時,裏面沒有一個線程。任務隊列是做爲參數傳進來的。不過,就算隊列裏面有任務,線程池也不會立刻執行它們。

  2. 當調用 execute() 方法添加一個任務時,線程池會作以下判斷:

  • 若是正在運行的線程數量小於 corePoolSize,那麼立刻建立線程運行這個任務;

  • 若是正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列;

  • 若是這時候隊列滿了,並且正在運行的線程數量小於 maximumPoolSize,那麼仍是要建立非核心線程馬上運行這個任務;

  • 若是隊列滿了,並且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會拋出異常RejectExecutionException。

當一個線程完成任務時,它會從隊列中取下一個任務來執行。當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize 的大小。

線程池的建立和使用

生成線程池採用了工具類Executors的靜態方法,如下是幾種常見的線程池。

SingleThreadExecutor:單個後臺線程 (其緩衝隊列是×××的)

public static ExecutorService newSingleThreadExecutor() {        
    return new FinalizableDelegatedExecutorService (        new ThreadPoolExecutor(1, 1,                                    
        0L, TimeUnit.MILLISECONDS,                                    
        new LinkedBlockingQueue<Runnable>()));   
}

建立一個單線程的線程池。這個線程池只有一個核心線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。

FixedThreadPool:只有核心線程的線程池,大小固定 (其緩衝隊列是×××的) 。

public static ExecutorService newFixedThreadPool(int nThreads) {         
        return new ThreadPoolExecutor(nThreads, nThreads,                                       
            0L, TimeUnit.MILLISECONDS,                                         
            new LinkedBlockingQueue<Runnable>());     
}

建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

CachedThreadPool:×××線程池,能夠進行自動線程回收。

public static ExecutorService newCachedThreadPool() {         
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,                                           
           60L, TimeUnit.SECONDS,                                       
           new SynchronousQueue<Runnable>());     
}

若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。SynchronousQueue是一個是緩衝區爲1的阻塞隊列。

ScheduledThreadPool:核心線程池固定,大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

public static ExecutorService newScheduledThreadPool(int corePoolSize) {         
    return new ScheduledThreadPool(corePoolSize, 
              Integer.MAX_VALUE,                                                  
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,                                                    
              new DelayedWorkQueue());    
}

建立一個週期性執行任務的線程池。若是閒置,非核心線程池會在DEFAULT_KEEPALIVEMILLIS時間內回收。

線程池最經常使用的提交任務的方法有兩種:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);

FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result);

FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

submit(Callable callable)的實現,submit(Runnable runnable)同理。

public <T> Future<T> submit(Callable<T> task) {    if (task == null) throw new NullPointerException();
    FutureTask<T> ftask = newTaskFor(task);
    execute(ftask);    return ftask;
}

能夠看出submit開啓的是有返回結果的任務,會返回一個FutureTask對象,這樣就能經過get()方法獲得結果。submit最終調用的也是execute(Runnable runable),submit只是將Callable對象或Runnable封裝成一個FutureTask對象,由於FutureTask是個Runnable,因此能夠在execute中執行。關於Callable對象和Runnable怎麼封裝成FutureTask對象,見Callable和Future、FutureTask的使用

線程池實現的原理

若是隻講線程池的使用,那這篇博客沒有什麼大的價值,充其量也就是熟悉Executor相關API的過程。線程池的實現過程沒有用到Synchronized關鍵字,用的都是Volatile,Lock和同步(阻塞)隊列,Atomic相關類,FutureTask等等,由於後者的性能更優。理解的過程能夠很好的學習源碼中併發控制的思想。

在開篇提到過線程池的優勢是可總結爲如下三點:

  1. 線程複用

  2. 控制最大併發數

  3. 管理線程

1.線程複用過程

理解線程複用原理首先應瞭解線程生命週期。

在線程的生命週期中,它要通過新建(New)、就緒(Runnable)、運行(Running)、阻塞(Blocked)和死亡(Dead)5種狀態。

Thread經過new來新建一個線程,這個過程是是初始化一些線程信息,如線程名,id,線程所屬group等,能夠認爲只是個普通的對象。調用Thread的start()後Java虛擬機會爲其建立方法調用棧和程序計數器,同時將hasBeenStarted爲true,以後調用start方法就會有異常。

處於這個狀態中的線程並無開始運行,只是表示該線程能夠運行了。至於該線程什麼時候開始運行,取決於JVM裏線程調度器的調度。當線程獲取cpu後,run()方法會被調用。不要本身去調用Thread的run()方法。以後根據CPU的調度在就緒——運行——阻塞間切換,直到run()方法結束或其餘方式中止線程,進入dead狀態。

因此實現線程複用的原理應該就是要保持線程處於存活狀態(就緒,運行或阻塞)。接下來來看下ThreadPoolExecutor是怎麼實現線程複用的。

在ThreadPoolExecutor主要Worker類來控制線程的複用。看下Worker類簡化後的代碼,這樣方便理解:

private final class Worker implements Runnable {	final Thread thread;

	Runnable firstTask;

	Worker(Runnable firstTask) {		this.firstTask = firstTask;		this.thread = getThreadFactory().newThread(this);
	}	public void run() {
		runWorker(this);
	}	final void runWorker(Worker w) {
		Runnable task = w.firstTask;
		w.firstTask = null;		while (task != null || (task = getTask()) != null){
		task.run();
	}
}

Worker是一個Runnable,同時擁有一個thread,這個thread就是要開啓的線程,在新建Worker對象時同時新建一個Thread對象,同時將Worker本身做爲參數傳入TThread,這樣當Thread的start()方法調用時,運行的其實是Worker的run()方法,接着到runWorker()中,有個while循環,一直從getTask()裏獲得Runnable對象,順序執行。getTask()又是怎麼獲得Runnable對象的呢?

依舊是簡化後的代碼:

private Runnable getTask() {    if(一些特殊狀況) {        return null;
    }

    Runnable r = workQueue.take();    return r;
}

這個workQueue就是初始化ThreadPoolExecutor時存聽任務的BlockingQueue隊列,這個隊列裏的存放的都是將要執行的Runnable任務。由於BlockingQueue是個阻塞隊列,BlockingQueue.take()獲得若是是空,則進入等待狀態直到BlockingQueue有新的對象被加入時喚醒阻塞的線程。因此通常狀況Thread的run()方法就不會結束,而是不斷執行從workQueue裏的Runnable任務,這就達到了線程複用的原理了。

2.控制最大併發數

那Runnable是何時放入workQueue?Worker又是何時建立,Worker裏的Thread的又是何時調用start()開啓新線程來執行Worker的run()方法的呢?有上面的分析看出Worker裏的runWorker()執行任務時是一個接一個,串行進行的,那併發是怎麼體現的呢?

很容易想到是在execute(Runnable runnable)時會作上面的一些任務。看下execute裏是怎麼作的。

execute:

簡化後的代碼

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();     int c = ctl.get();    // 當前線程數 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {        // 直接啓動新的線程。
        if (addWorker(command, true))            return;
        c = ctl.get();
    }    // 活動線程數 >= corePoolSize
    // runState爲RUNNING && 隊列未滿
    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        // 再次檢驗是否爲RUNNING狀態
        // 非RUNNING狀態 則從workQueue中移除任務並拒絕
        if (!isRunning(recheck) && remove(command))
            reject(command);// 採用線程池指定的策略拒絕任務
        // 兩種狀況:
        // 1.非RUNNING狀態拒絕新的任務
        // 2.隊列滿了啓動新的線程失敗(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))        reject(command);
}

addWorker:

簡化後的代碼

private boolean addWorker(Runnable firstTask, boolean core) {    int wc = workerCountOf(c);    if (wc >= (core ? corePoolSize : maximumPoolSize)) {        return false;
    }

    w = new Worker(firstTask);    final Thread t = w.thread;
    t.start();
}

根據代碼再來看上面提到的線程池工做過程當中的添加任務的狀況:

* 若是正在運行的線程數量小於 corePoolSize,那麼立刻建立線程運行這個任務;   
* 若是正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列;* 若是這時候隊列滿了,並且正在運行的線程數量小於 maximumPoolSize,那麼仍是要建立非核心線程馬上運行這個任務;* 若是隊列滿了,並且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會拋出異常RejectExecutionException。

這就是Android的AsyncTask在並行執行是在超出最大任務數是拋出RejectExecutionException的緣由所在,詳見基於最新版本的AsyncTask源碼解讀及AsyncTask的黑暗面

經過addWorker若是成功建立新的線程成功,則經過start()開啓新線程,同時將firstTask做爲這個Worker裏的run()中執行的第一個任務。

雖然每一個Worker的任務是串行處理,但若是建立了多個Worker,由於共用一個workQueue,因此就會並行處理了。

因此根據corePoolSize和maximumPoolSize來控制最大併發數。大體過程可用下圖表示。

上面的講解和圖來能夠很好的理解的這個過程。

若是是作Android開發的,而且對Handler原理比較熟悉,你可能會以爲這個圖挺熟悉,其中的一些過程和Handler,Looper,Meaasge使用中,很類似。Handler.send(Message)至關於execute(Runnuble),Looper中維護的Meaasge隊列至關於BlockingQueue,只不過須要本身經過同步來維護這個隊列,Looper中的loop()函數循環從Meaasge隊列取Meaasge和Worker中的runWork()不斷從BlockingQueue取Runnable是一樣的道理。

3.管理線程

經過線程池能夠很好的管理線程的複用,控制併發數,以及銷燬等過程,線程的複用和控制併發上面已經講了,而線程的管理過程已經穿插在其中了,也很好理解。

在ThreadPoolExecutor有個ctl的AtomicInteger變量。經過這一個變量保存了兩個內容:

  • 全部線程的數量

  • 每一個線程所處的狀態

其中低29位存線程數,高3位存runState,經過位運算來獲得不一樣的值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//獲得線程的狀態private static int runStateOf(int c) {    return c & ~CAPACITY;
}//獲得Worker的的數量private static int workerCountOf(int c) {    return c & CAPACITY;
}// 判斷線程是否在運行private static boolean isRunning(int c) {    return c < SHUTDOWN;
}

這裏主要經過shutdown和shutdownNow()來分析線程池的關閉過程。首先線程池有五種狀態來控制任務添加與執行。主要介紹如下三種:

  • RUNNING狀態:線程池正常運行,能夠接受新的任務並處理隊列中的任務;

  • SHUTDOWN狀態:再也不接受新的任務,可是會執行隊列中的任務;

  • STOP狀態:再也不接受新任務,不處理隊列中的任務

shutdown這個方法會將runState置爲SHUTDOWN,會終止全部空閒的線程,而仍在工做的線程不受影響,因此隊列中的任務人會被執行。shutdownNow方法將runState置爲STOP。和shutdown方法的區別,這個方法會終止全部的線程,因此隊列中的任務也不會被執行了。

總結

經過對ThreadPoolExecutor源碼的分析,從整體上了解了線程池的建立,任務的添加,執行等過程,熟悉這些過程,使用線程池就會更輕鬆了。

而從中學到的一些對併發控制,以及生產者——消費者模型任務處理的使用,對之後理解或解決其餘相關問題會有很大的幫助。好比Android中的Handler機制,而Looper中的Messager隊列用一個BlookQueue來處理一樣是能夠的,這寫就是讀源碼的收穫吧。

相關文章
相關標籤/搜索