Java 線程池

系統啓動一個線程的成本是比較高,使用線程池能夠很好地提升性能,尤爲是當程序中須要建立大量生存期很短暫的線程時java

線程池在系統啓動時即建立大量空閒線程,將一個Runnable、Callable對象—–>傳給線程池—–>線程池啓動裏面的一個線程來執行它們的run()或者call()方法———->當線程執行體執行完成後,線程並不會死亡,而是再次返回線程池成爲空閒狀態,等待下一個Runnable、Callable對象的run()或者call()方法數組

Java8改進的線程池

Executors工廠類

Java5開始,Java內建支持線程池。Executors工廠類來產生線程池,該工廠類包含以下幾個靜態工廠方法來建立線程池:緩存

  • ExecutorService newCachedThreadPool():建立一個具備緩存功能的線程池,系統根據須要建立線程,這些線程將會被緩存在線程池中dom

  • ExecutorService newFixedThreadPool(int nThreads):建立一個可重用的、具備nThread個固定的線程的線程池ide

  • ExecutorService newSingleThreadExecutor():建立包含一個只有單線程的線程池,至關於調用newFixedThreadPool(1)工具

  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize):建立具備指定線程數量的線程池,能夠在指定延遲後執行線程任務。corePoolSize指池中所保存的線程數,即便線程是空閒的也被保存在線程池內性能

  • ScheduledExecutorService newSingleThreadScheduledExecutor():建立只有一個線程的線程池,能夠指定延遲後執行線程任務this

  • ExecutorService newWorkStealingPool(int parallelism):建立持有足夠的線程的線程池來支持給定的並行級別,該方法還會使用多個隊列來減小競爭spa

  • ExecutorService newWorkStealingPool():該方法能夠看作是前一個方法的簡本,並行級別不須要用戶手工指定,是根據計算機CPU個數自動生成的,若是當前機器有6個CPU,則調用該方法時並行級別被設爲6線程

前三個方法返回一個ExecutorService對象,表明一個線程池,能夠執行Runnable對象和Callable對象所表明的線程;中間兩個方法返回一個ScheduledExecutorService對象,它是ExecutorService的子類,能夠在指定延遲後執行線程任務;最後兩個方法生成的work stealing池,至關於後臺線程池,若是全部的前臺線程都死亡了,work stealig池中的線程也會自動死亡

ExecutorService

ExecutorService表明儘快執行線程的線程池(只要線程中有空閒線程就當即執行線程任務),程序只要將一個Runnable對象或Callable對象(表明線程任務)提交給該線程池,該線程池就會盡快執行該任務

ExecutorService裏提供了以下3個方法:

  • Future<?> submit(Runnable task):將一個Runnable對象提交給指定的線程池,線程池將在有空閒線程時執行Runnable對象表明的任務。其中Future對象表明Runnable任務的返回值——可是run()方法沒有返回值,因此Future對象將在run()方法執行結束後返回null。但能夠調用Future的isDone()、isCancelled()方法來得到Runnable對象的執行狀態

  • <T> Future<T> submit(Runnable task, T result):將一個Runnable對象提交給指定的線程池,線程池將在有空閒線程時執行Runnable對象表明的任務。其中result顯式指定線程執行結束後的返回值,因此Future對象將在run()方法執行後返回result

  • <T>Future<T> submit(Callable<T> task):將一個Callable對象提交給指定的線程池,線程池將在有空閒線程時執行Callable對象表明的任務。其中Future表明Callable對象裏call()方法的返回值

ScheduledExecutorService

ScheduledExecutorService表明可在指定延遲後或週期性地執行線程任務的線程池,這提供了以下4個方法:

  • ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):指定Callable任務將在delay延遲後執行

  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):指定Command任務將在delay延遲後執行

  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):指定command任務將在delay延遲後執行,並且以設定頻率重複執行。即initialDelay後開始執行,依次在initialDelay + period、initialDelay + 2*period...處重複執行

  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit):建立並執行一個在給定初始延遲後首次啓動的按期操做,隨後在每一次執行終止和下一次執行開始之間都存在給定的延遲。若是任務在任一次執行時遇到異常,就會取消後續執行,不然,只能經過程序來顯式取消或終止該任務

使用線程池來執行線程任務

當用完一個線程池後,應該調用該線程池的shutdown()方法,該方法將啓動線程池的關閉序列,調用shutdown()方法後的線程池再也不接收新任務,但會將之前全部已提交的任務執行完成。當線程池中的全部任務都執行完成後,池中的全部線程都會死亡;另外也能夠調用線程池的shutdownNow()方法來關閉線程池,該方法試圖中止全部正在執行的活動任務,暫停處理正在等待的任務,並返回等待的任務列表

使用線程池執行線程任務的步驟以下:

  1. 調用Executor類的靜態工廠方法建立一個ExecutorService對象,該對象表明一個線程池

  2. 建立Runnable或Callable接口實現類的實例,做爲線程執行任務

  3. 調用ExcutorService的submit方法來提交Runnable或Callable實例

  4. 當不想提交任何任務時調用ExcutorService的shutdown()方法來關閉線程池

import java.util.concurrent.*;

public class ThreadPoolTest
{
    public static void main(String[] args)
        throws Exception
    {
        // 建立足夠的線程來支持4個CPU並行的線程池
        // 建立一個具備固定線程數(6)的線程池
        ExecutorService pool = Executors.newFixedThreadPool(6);
        // 使用Lambda表達式建立Runnable對象
        Runnable target = () -> {
            for (int i = 0; i < 100 ; i++ )
            {
                System.out.println(Thread.currentThread().getName() + "的i值爲:" + i);
            }
        };
        // 向線程池中提交兩個線程
        pool.submit(target);
        pool.submit(target);
        // 關閉線程池
        pool.shutdown();
    }
}

Java8加強的ForkJoinPool

Java7提供了ForkJoinPool支持將一個任務拆分紅多個「小任務」並行計算,再把多個「小任務」的結果合併成總的計算結果。ForkJoinPool是ExecutorService的實現類,是一種特殊的線程池

ForkJoinService提供了以下兩個經常使用的構造器:

  • ForkJoinPool(int parallelism):建立一個包含parallelism個並行線程的ForkJoinPool

  • ForkJoinPool():以Runtime.availableProcessors()方法的返回值做爲parallelism參數來建立ForkJoinPool

Jav增長通用池功能,由以下兩個靜態方法提供通用池功能:

  • ForkJoinPoll commonPool():該方法返回一個通用池,通用池的運行狀態不受shutDown()或shutdownNow()方法的影響。但若是程序直接調用System.exit(0);來終止虛擬機,通用池以及通用池中正在執行的任務都會被自動終止

  • int getCommonPollParallelism():該方法返回通用池的並行級別

建立ForkJoinPool實例可調用ForkJoinPool的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法來執行指定任務了。其中ForkJoinTask表明一個能夠並行、合併的任務

ForkJoinTask是一個抽象類,它有兩個抽象子類:RecursiveAction和RecursiveTask。其中RecursiveTask表明有返回值的任務,RecursiveAction表明沒有返回值的任務

線程池工具類的類圖:
clipboard.png

使用RecursiveAction執行沒有返回值的「大任務」

簡單打印0~500的數值:

import java.util.concurrent.*;

// 繼承RecursiveAction來實現"可分解"的任務
class PrintTask extends RecursiveAction
{
    // 每一個「小任務」只最多隻打印50個數
    private static final int THRESHOLD = 50;
    private int start;
    private int end;
    // 打印從start到end的任務
    public PrintTask(int start, int end)
    {
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute()
    {
        // 當end與start之間的差小於THRESHOLD時,開始打印
        if(end - start < THRESHOLD)
        {
            for (int i = start ; i < end ; i++ )
            {
                System.out.println(Thread.currentThread().getName() + "的i值:" + i);
            }
        }
        else
        {
            // 若是當end與start之間的差大於THRESHOLD時,即要打印的數超過50個
            // 將大任務分解成兩個小任務。
            int middle = (start + end) / 2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            // 並行執行兩個「小任務」
            left.fork();
            right.fork();
        }
    }
}
public class ForkJoinPoolTest
{
    public static void main(String[] args)
        throws Exception
    {
        ForkJoinPool pool = new ForkJoinPool();
        // 提交可分解的PrintTask任務
        pool.submit(new PrintTask(0 , 500));
        pool.awaitTermination(2, TimeUnit.SECONDS);
        // 關閉線程池
        pool.shutdown();
    }
}

程序實現了對指定打印任務的分解,分解後的任務分別調用fork()方法開始並行執行。ForkJoinPool啓動了4個線程來執行打印任務
clipboard.png

使用RecursiveTask<T>執行有返回值的「大任務」

對一個長度爲100的數值的元素值進行累加:

import java.util.concurrent.*;
import java.util.*;

// 繼承RecursiveTask來實現"可分解"的任務
class CalTask extends RecursiveTask<Integer>
{
    // 每一個「小任務」只最多隻累加20個數
    private static final int THRESHOLD = 20;
    private int arr[];
    private int start;
    private int end;
    // 累加從start到end的數組元素
    public CalTask(int[] arr, int start, int end)
    {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute()
    {
        int sum = 0;
        // 當end與start之間的差小於THRESHOLD時,開始進行實際累加
        if(end - start < THRESHOLD)
        {
            for (int i = start ; i < end ; i++ )
            {
                sum += arr[i];
            }
            return sum;
        }
        else
        {
            // 若是當end與start之間的差大於THRESHOLD時,即要累加的數超過20個時
            // 將大任務分解成兩個小任務。
            int middle = (start + end) / 2;
            CalTask left = new CalTask(arr, start, middle);
            CalTask right = new CalTask(arr, middle, end);
            // 並行執行兩個「小任務」
            left.fork();
            right.fork();
            // 把兩個「小任務」累加的結果合併起來
            return left.join() + right.join();    // ①
        }
    }
}
public class Sum
{
    public static void main(String[] args)
        throws Exception
    {
        int[] arr = new int[100];
        Random rand = new Random();
        int total = 0;
        // 初始化100個數字元素
        for (int i = 0, len = arr.length; i < len ; i++ )
        {
            int tmp = rand.nextInt(20);
            // 對數組元素賦值,並將數組元素的值添加到sum總和中。
            total += (arr[i] = tmp);
        }
        System.out.println(total);
        // 建立一個通用池
        ForkJoinPool pool = ForkJoinPool.commonPool();
        // 提交可分解的CalTask任務
        Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
        System.out.println(future.get());
        // 關閉線程池
        pool.shutdown();
    }
}
相關文章
相關標籤/搜索