併發——執行器

簡介

構建一個新的現成是有必定代價的,由於涉及和操做系統的交互。若是程序中建立了大量的生命期很短的現成,應該使用線程池(Thread Pool)。一個線程池中包含許多準備運行的空閒線程。將Runnable對象交給線程池,就會有一個線程調用run方法。java

當run方法退出時,線程不會死亡,而在池中準備爲下一個請求提供服務。web

執行器有不少靜態共產方法用來建立線程池。例如newCachedThreadPool、newFixedThreadPool、newSingleThraedExecutor、newScheledTheadPool、newSingleThreadScheduleExecutor.數組

一、線程池

ExecutorService接口。接下來的一個例子是用途查看某個目錄下的全部文件是否包含指定的關鍵字。並統計知足這些條件的文件的數量。每遇到一個文件夾就提交一個任務到線程池。緩存

package ch14;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class MatchCounter implements Callable<Integer> {

    private File fileDirectory;

    private String keyWord;

    // Thread pool
    private ExecutorService pool;

    private int count;

    public MatchCounter(File fileDirectory, String keyWord, ExecutorService pool) {
        this.fileDirectory = fileDirectory;
        this.keyWord = keyWord;
        this.pool = pool;
    }

    @Override
    public Integer call() throws Exception {
        count = 0;

        try{
            File[] files = fileDirectory.listFiles();
            List<Future<Integer>> results = new ArrayList<>();
            for (File file : files) {
                // 若是是文件夾,繼續調用
                if(file.isDirectory()){
                    MatchCounter matchCounter = new MatchCounter(file, keyWord, pool);
                    // 提交任務
                    Future<Integer> result = pool.submit(matchCounter);
                    results.add(result);
                }else{
                    // search this file content is has keyword. then count ++.
                    if(search(file)){
                        count ++;
                    }
                }
            }

            // 統計結果
            for (Future<Integer> result : results) {
                try{
                    count += result.get();
                }catch (Exception e){
                    System.out.println(e.getMessage());
                }
            }


        }catch (Exception e){

        }
        // 返回計算結果
        return count;
    }

    // 搜索文件中是否包含關鍵字
    public boolean search(File file){
        try(Scanner in = new Scanner(file,"UTF-8")){
            boolean founded = false;
            while(!founded && in.hasNextLine()){
                String line = in.nextLine();
                if(line.contains(keyWord)){
                    founded = true;
                }
            }
            return founded;

        }catch (Exception e){
            System.out.println(e.getMessage());
            return false;
        }
    }
}

測試類服務器

package ch14;
import java.io.File;
import java.util.Scanner;
import java.util.concurrent.*;

public class TheadPoolTest {
    public static void main(String[] args) {
        try (Scanner scanner = new Scanner(System.in)) {
            System.out.println("輸入查找目錄:");
            String dir = scanner.nextLine();

            System.out.println("輸入查找關鍵字:");
            String keyword = scanner.nextLine();

            // 申請線程池
            ExecutorService pool = Executors.newCachedThreadPool();

            // 建立對象
            MatchCounter matchCounter = new MatchCounter(new File(dir), keyword, pool);

            // 獲取結果
            Future<Integer> result = pool.submit(matchCounter);

            try {
                System.out.println(result.get() + " files matching the keywords: " + keyword);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

            // 關閉線程池
            pool.shutdown();

            // 獲取程序運行過程當中線程池的最大線程數
            int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();

            System.out.println("Largest pool size = " + largestPoolSize);
            /**
             * 輸入查找目錄:
             * C:\Users\Administrator\Desktop\test
             * 輸入查找關鍵字:
             * xx
             *
             * 3 files matching the keywords: xx
             * Largest pool size = 2
             */

        }
    }
}
  • ExecutorService new CachedThreadPool 返回一個帶緩存的線程池,該線程池在必要的時候建立線程,在線程空閒60秒以後終止線程;框架

  • ExecutorService newFixedThreadPool(int threads) 返回一個線程池,該池子中的線程數由參數指定;dom

  • ExecutorService newSginleThreadExecutor() 返回一個執行器,他在一個單個的線程中一次執行各個任務。ide

  • Future<T> submit(Callable<T> task)測試

  • Future<T> submit(Runnable task, T result)this

  • Future<T> submit(Runnable task)

提交指定的任務去執行,注意第一個方法能夠指定返回結果,第二個方法能夠執行完成任務時獲得result。

  • void shutdown() 關閉服務,會先完成已經提交的任務而再也不接受新的任務

  • int getLargestPoolSize() 返回線程池在該執行器生命週期中的最大尺寸。換句話說,就是得到該線程池建立一個歷史線程數的最大值。

二、預約執行

ScheduledExecutorService接口具備爲預約執行或重複執行任務而設計的方法。他是一種容許使用線程池機制的java.util.Timer的泛化。

Executors類的newSingleThreadExecutor

三、控制任務組

四、Fork Join框架

有些應用使用了大量的線程,可是不少都是空閒的。例如,一個web服務器可能會爲每隔鏈接分別使用一個線程。另一些應用可能對每一個處理器內核分別使用一個線程,來完成計算密集型任務,如圖像處理或者視頻處理。JAVA SE 7中新引入了fork-join框架,專門用來支持後一類應用。假設有一個處理任務,他能夠很天然的分解爲子任務,以下所示

if(問題規格 < 設定的閾值){
    直接處理任務
}else{
    分解該任務
    遞歸的解決每個子任務
    組合結果
}

work stealing(工做覓取).

以下的實例用於在大小爲10000的數組中找出大於0.5的數字的個數。

package ch14;

import java.util.concurrent.RecursiveTask;
import java.util.function.DoublePredicate;

public class ForkCounter extends RecursiveTask<Integer> {
    public static final int THRESHOLD = 1000;
    private double[] values;
    private int from;
    private int to;
    private DoublePredicate filter;

    public ForkCounter(double[] values, int from, int to, DoublePredicate filter) {
        this.values = values;
        this.from = from;
        this.to = to;
        this.filter = filter;
    }

    @Override
    protected Integer compute() {
        if(to - from < THRESHOLD){
            int count = 0;
            for (int i = from; i < to; i++) {
                if(filter.test(values[i])){
                    count ++;
                }
            }
            return count;
        }else{
            // 分解任務爲2段
            int mid = (from + to) / 2;
            ForkCounter first = new ForkCounter(values,from,mid,filter);
            ForkCounter second = new ForkCounter(values,mid,to,filter);

            // 執行兩個任務,返回全部任務的結果(Any則是其中一個就行)
            invokeAll(first,second);
            // 返回結果
            return first.join() + second.join();
        }
    }
}

測試程序

package ch14;

import java.util.concurrent.ForkJoinPool;

public class ForkCounterTest {
    public static void main(String[] args) {
        final int SIZE = 10000;
        double[] numbers = new double[SIZE];

        for (int i = 0; i < SIZE; i++) {
            numbers[i] = Math.random();
        }

        // 找出大於0.5的數
        ForkCounter counter = new ForkCounter(numbers,0,numbers.length,x -> x>0.5);

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        forkJoinPool.invoke(counter);

        System.out.println(counter.join());
        // 5026
    }
}

五、可完成Future

thenApply

thenCompose

handle

theAccept

whenComplete

thenRun

相關文章
相關標籤/搜索