Java多線程(2):使用線程池 ThreadPoolExecutor

首先,咱們爲何須要線程池
讓咱們先來了解下什麼是 對象池 技術。某些對象(好比線程,數據庫鏈接等),它們建立的代價是很是大的 —— 相比於通常對象,它們建立消耗的時間和內存都很大(並且這些對象銷燬的代價比通常對象也大)。因此,若是咱們維護一個 ,每次使用完這些對象以後,並不銷燬它,而是將其放入池中,下次須要使用時就直接從池中取出,即可以免這些對象的重複建立;同時,咱們能夠固定 池的大小,好比設置池的大小爲 N —— 即池中只保留 N 個這類對象 —— 當池中的 N 個對象都在使用中的時候,爲超出數量的請求設置一種策略,好比 排隊等候 或者 直接拒絕請求 等,從而避免頻繁的建立此類對象。
線程池 即對象池的一種(池中的對象爲線程 Thread),相似的還有 數據庫鏈接池(池中對象爲數據庫鏈接 Connection)。合理利用線程池可以帶來三個好處(參考文末的 References[1]):html

  1. 下降資源消耗,經過重複利用已建立的線程,下降線程建立和銷燬時形成的時間和內存上的消耗;
  2. 提高響應速度,當任務到達時,直接使用線程池中的線程來運行任務,使得任務能夠不須要等到線程建立就能當即執行;
  3. 提升線程的可管理性,線程是開銷很大的對象,若是無限制的建立線程,不只會快速消耗系統資源,還會下降系統的穩定性;而使用線程池能夠對線程進行統一的分配和調控。

本文只介紹 Java 中線程池的基本使用,不會過多的涉及到線程池的原理。若是有興趣的讀者須要深刻理解線程池的實現原理,能夠參考文末的 Referencesjava

JDK 中線程池的基礎架構以下:
JDK 中線程池的基礎架構數據庫

執行器 Executor 是頂級接口,只包含了一個 execute 方法,用來執行一個 Runnable 任務:
Executorsegmentfault

執行器服務 ExecutorService 接口繼承了 Executor 接口,ExecutorService 是全部線程池的基礎接口,它定義了 JDK 中線程池應該實現的基本方法:
ExecutorService緩存

線程池執行器 ThreadPoolExecutor 是基礎線程池的核心實現,而且能夠經過定製 ThreadPoolExecutor 的構造參數或者繼承 ThreadPoolExecutor,實現本身的線程池;多線程

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor,是能執行週期性任務或定時任務的線程池;架構

ForkJoinPool 是 JDK1.7 時添加的類,做爲對 Fork/Join 型線程池的實現。ide

本文只介紹 ThreadPoolExecutor 線程池的使用,ScheduledThreadPoolExecutorForkJoinPool 會在以後的文章中介紹。工具


查看 ThreadPoolExecutor 的源碼可知,在 ThreadPoolExecutor 的內部,將每一個池中的線程包裝爲了一個 Worker性能

Worker 類圖
Worker

而後在 ThreadPoolExecutor 中定義了一個 HashSet<Worker>,做爲 「池」
workers


設置一個合適的線程池(即自定義 ThreadPoolExecutor)是比較麻煩的,所以 JDK 經過 Executors 這個工廠類爲咱們提供了一些預先定義好的線程池:

一、固定大小的線程池
構造固定大小的線程池的工廠方法

建立一個包含 nThreads 個工做線程的線程池,這 nThreads 個線程共享一個無界隊列(即不限制大小的隊列);當新任務提交到線程池時,若是當前沒有空閒線程,那麼任務將放入隊列中進行等待,直到有空閒的線程來從隊列中取出該任務並運行。

(經過 Runtime.getRuntime().availableProcessors() 能夠得到當前機器可用的處理器個數,對於計算密集型的任務,固定大小的線程池的 nThreads 設置爲這個值時,通常能得到最大的 CPU 使用率)

二、單線程線程池
構造單線程線程池的工廠方法

建立一個只包含一個工做線程的線程池,它的功能能夠簡單的理解爲 即 newFixedThreadPool 方法傳入參數爲 1 的狀況。可是與 newFixedThreadPool(1) 不一樣的是,若是線程池中這個惟一的線程意外終止,線程池會建立一個新線程繼續執行以後的任務。

三、可緩存線程的線程池
構造可緩存線程的線程池的工廠方法

建立一個可緩存線程的線程池。當新任務提交到線程池時,若是當前線程池中有空閒線程可用,則使用空閒線程來運行任務,不然新建一個線程來運行該任務,並將該線程添加到線程池中;並且該線程池會終止並移除那些超過 60 秒未被使用的空閒線程。因此這個線程池表現得就像緩存,緩存的資源爲線程,緩存的超時時間爲 60 秒。根據 JDK 的文檔,當任務的運行時間都較短的時候,該線程池有利於提升性能。

咱們看到每一個構造線程池的工廠方法都有一個帶 ThreadFactory 的重載形式。ThreadFactory 即線程池用來新建線程的工廠,每次線程池須要新建一個線程時,調用的就是這個 ThreadFactorynewThread 方法:
ThreadFactory

(若是不提供自定義的 ThreadFactory,那麼使用的就是 DefaultThreadFactory —— Executors 內定義的內部類)
好比咱們要爲線程池中的每一個線程提供一個特定的名字,那麼咱們就能夠自定義 ThreadFactory 並重寫其 newThread 方法:

public class SimpleThreadFactory implements ThreadFactory {

    private AtomicInteger id = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("Test_Thread-" + id.getAndIncrement());
        return thread;
    }

}

經過 JDK 的源碼咱們能夠知道,以上三種線程池的實現都是基於 ThreadPoolExecutor

可緩存線程池的實現

單線程池的實現

固定大小線程池的實現


下面咱們來看一下線程池的基礎接口 ExecutorService 中每一個方法的含義。
首先是從 Executor 接口繼承到的 execute 方法:
execute 方法

使用該方法即將一個 Runnable 任務交給線程池去執行。


submit 方法:
submit 方法

submit 方法會提交一個任務去給線程池執行,該任務能夠是帶返回結果的 Callable<V> 任務,也能夠是一開始就指定結果的 Runnable 任務,或者不帶結果的 Runnable 任務(此時即一開始指定結果爲 null)。submit 方法會返回一個與所提交任務相關聯的 Future<V>。經過 上一篇文章 咱們能夠知道,Future<V>get 方法能夠等待任務執行完畢並返回結果。因此經過 Future<V>,咱們能夠與已經提交到線程池的任務進行交互。submit 提交任務及任務運行過程大體以下:

  1. 向線程池提交一個 Runnable 或者 Callable<V> 任務;
  2. 將 任務 做爲參數使用 newTaskFor 方法構造出 FutureTask<V>

(由 上一篇文章 可知,FutureTask<V> 實現了 RunnableFuture<V> 兩個接口,從而 FutureTask<V> 能夠做爲 Runnable 交給 WorkerThread)去運行,也能夠做爲一個 Future<V> 與任務交互)

![newTaskFor 方法][19]
  1. 線程池使用 execute 方法將 FutureTask<V> 交給當前的 Worker 去運行,並將 FutureTask<V>Future<V> 返回;

    submit 方法

  2. 而後 Worker 執行任務(即運行 run 方法),在任務完成後,爲 Future<V>FutureTask<V>) 設置結果 —— 設置結果以前,調用 Future<V>get 方法會讓調用線程處於阻塞狀態;
    FutureTask 實現的 run 方法
  3. 經過 Future<V>get 方法,得到任務的結果。

invokeAll 方法:
invokeAll 方法

invokeAll 方法能夠一次執行多個任務,但它並不一樣等於屢次調用 submit 方法。submit 方法是非阻塞的,每次調用 submit 方法提交任務到線程池以後,會當即返回與任務相關聯的 Future<V>,而後當前線程繼續向後執行;

invokeAll 方法是阻塞的,只有當提交的多個任務都執行完畢以後,invokeAll 方法纔會返回,執行結果會以List<Future<V>> 返回,該 List<Future<V>> 中的每一個 Future<V> 是和提交任務時的 Collection<Callable<V>> 中的任務 Callable<V> 一 一對應的。帶 timeout 參數的 invokeAll 就是設置一個超時時間,若是超過這個時間 invokeAll 中提交的全部任務還有沒所有執行完,那麼沒有執行完的任務會被取消(中斷),以後一樣以一個 List<Future<V>> 返回執行的結果。


invokeAny 方法:
invokeAny 方法

invokeAny 方法也是阻塞的,與 invokeAll 方法的不一樣之處在於,當所提交的一組任務中的任何一個任務完成以後,invokeAny 方法便會返回(返回的結果即是那個已經完成的任務的返回值),而其餘任務會被取消(中斷)。

舉一個 invokeAny 使用的例子:電腦有 C、D、E、F 四個盤,咱們須要找一個文件,可是咱們不知道這個文件位於哪一個盤中,咱們即可以使用 invokeAny,並提交四個任務(對應於四個線程)分別查找 C、D、E、F 四個盤,若是哪一個線程找到了這個文件,那麼此時 invokeAny 便中止阻塞並返回結果,同時取消其餘任務。


shutdown 方法:
shutdown 方法

shutdown 方法的做用是向線程池發送關閉的指令。一旦在線程池上調用 shutdown 方法以後,線程池便不能再接受新的任務;若是此時還向線程池提交任務,那麼將會拋出 RejectedExecutionException 異常。以後線程池不會馬上關閉,直到以前已經提交到線程池中的全部任務(包括正在運行的任務和在隊列中等待的任務)都已經處理完成,纔會關閉。


shutdownNow 方法:
shutdownNow 方法

shutdown 不一樣,shutdownNow 會當即關閉線程池 —— 當前在線程池中運行的任務會所有被取消,而後返回線程池中全部正在等待的任務。


(值得注意的是,咱們 必須顯式的關閉線程池,不然線程池不會本身關閉)


awaitTermination 方法:
awaitTermination 方法

awaitTermination 能夠用來判斷線程池是否已經關閉。調用 awaitTermination 以後,在 timeout 時間內,若是線程池沒有關閉,則阻塞當前線程,不然返回 true;當超過 timeout 的時間後,若線程池已經關閉則返回 true,不然返回 false。該方法通常這樣使用:

  1. 任務所有提交完畢以後,咱們調用 shutdown 方法向線程池發送關閉的指令;
  2. 而後咱們經過 awaitTermination 來檢測到線程池是否已經關閉,能夠得知線程池中全部的任務是否已經執行完畢;
  3. 線程池執行完已經提交的全部任務,並將本身關閉;
  4. 調用 awaitTermination 方法的線程中止阻塞,並返回 true

isShutdown() 方法,若是線程池已經調用 shutdown 或者 shutdownNow,則返回 true,不然返回 false


isTerminated() 方法,若是線程池已經調用 shutdown 而且線程池中全部的任務已經執行完畢,或者線程池調用了 shutdownNow,則返回 true,不然返回 false


經過以上介紹,咱們已經瞭解了 ExecutorService 中全部方法的功能,如今讓咱們來實踐 ExecutorService 的功能。

咱們繼續使用 上一篇文章 的兩個例子中的任務,首先是任務類型爲 Runnable 的狀況:

import java.util.*;
import java.util.concurrent.*;

public class RunnableTest {

    public static void main(String[] args) throws Exception {
        System.out.println("使用線程池運行 Runnable 任務:");
        
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定爲 5 的線程池

        List<AccumRunnable> tasks = new ArrayList<>(10);

        for (int i = 0; i < 10; i++) {
            AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10);
            tasks.add(task);
            
            threadPool.execute(task); // 讓線程池執行任務 task
        }
        threadPool.shutdown(); // 向線程池發送關閉的指令,等到已經提交的任務都執行完畢以後,線程池會關閉

        threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待線程池關閉,等待的最大時間爲 1 小時

        int total = 0;
        for (AccumRunnable task : tasks) {
            total += task.getResult(); // 調用在 AccumRunnable 定義的 getResult 方法得到返回的結果
        }

        System.out.println("Total: " + total);
    }

    static final class AccumRunnable implements Runnable {

        private final int begin;
        private final int end;

        private int result;

        public AccumRunnable(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public void run() {
            result = 0;
            try {
                for (int i = begin; i <= end; i++) {
                    result += i;
                    Thread.sleep(100);
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace(System.err);
            }
            System.out.printf("(%s) - 運行結束,結果爲 %d\n",
                    Thread.currentThread().getName(), result);
        }

        public int getResult() {
            return result;
        }
    }
}

運行結果:
線程池的運行 Runnable 任務的結果

能夠看到 NetBeans 給出的運行時間爲 2 秒 —— 由於每一個任務須要 1 秒的時間,而線程池中的線程個數固定爲 5 個,因此線程池最多同時處理 5 個任務,於是 10 個任務總共須要 2 秒的運行時間。


任務類型爲 Callable

import java.util.*;
import java.util.concurrent.*;

public class CallableTest {

    public static void main(String[] args) throws Exception {
        System.out.println("使用線程池運行 Callable 任務:");
        
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定爲 5 的線程池
        
        List<Future<Integer>> futures = new ArrayList<>(10);
        
        for (int i = 0; i < 10; i++) {
            AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10);
            Future<Integer> future = threadPool.submit(task); // 提交任務
            futures.add(future);
        }
        threadPool.shutdown(); // 向線程池發送關閉的指令,等到已經提交的任務都執行完畢以後,線程池會關閉

        int total = 0;
        for (Future<Integer> future : futures) {
            total += future.get(); // 阻塞,直到任務結束,返回任務的結果
        }

        System.out.println("Total: " + total);
    }

    static final class AccumCallable implements Callable<Integer> {

        private final int begin;
        private final int end;

        public AccumCallable(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public Integer call() throws Exception {
            int result = 0;
            for (int i = begin; i <= end; i++) {
                result += i;
                Thread.sleep(100);
            }
            System.out.printf("(%s) - 運行結束,結果爲 %d\n",
                    Thread.currentThread().getName(), result);

            return result;
        }

    }

}

運行結果:
使用線程池運行 Callable 任務的結果


改寫上面的代碼,使用 invokeAll 來直接執行一組任務:

public static void main(String[] args) throws Exception {
    System.out.println("使用線程池 invokeAll 運行一組 Callable 任務:");

    ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定爲 5 的線程池

    List<AccumCallable> tasks = new ArrayList<>(10); // tasks 爲一組任務
    for (int i = 0; i < 10; i++) {
        tasks.add(new AccumCallable(i * 10 + 1, (i + 1) * 10)); 
    }

    List<Future<Integer>> futures = threadPool.invokeAll(tasks); // 阻塞,直到全部任務都運行完畢

    int total = 0;
    for (Future<Integer> future : futures) {
        total += future.get(); // 返回任務的結果
    }

    System.out.println("Total: " + total);

    threadPool.shutdown(); // 向線程池發送關閉的指令
}

運行結果:
使用線程池 invokeAll 運行一組 Callable 任務的結果


線程池是很強大並且很方便的工具,它提供了對線程進行統一的分配和調控的各類功能。自 JDK1.5 時 JDK 添加了線程池的功能以後,通常狀況下更推薦使用線程池來編寫多線程程序,而不是直接使用 Thread

invokeAny 也是很實用的方法,請有興趣的讀者本身實踐)


References:

  1. http://www.infoq.com/cn/artic...
  2. http://www.cnblogs.com/absfre...
相關文章
相關標籤/搜索