《Java7併發編程實戰手冊》學習筆記(六)——Fork/Join框架

此篇博客爲我的學習筆記,若有錯誤歡迎你們指正

本次內容

  1. 建立Fork/Join線程池
  2. 合併任務的結果
  3. 異步運行任務
  4. 在任務中拋出異常
  5. 取消任務

Java爲咱們提供了ExecutorService接口的另外一種實現——Fork/Join框架(分解/合併框架),這個框架能幫助咱們更簡單的用分治技術解決問題。使用Fork/Join框架,在執行一個任務時,咱們首先判斷這個任務的規模是否大於咱們制定的標準,若是大於就將這個任務分解(fork)爲規模更小的任務去執行;最後再將執行完成小任務層層合併(join)爲大任務並返回,原理圖以下:java

Fork/Join框架與咱們以前使用的執行器框架的主要區別在於前者實現了 工做竊取算法。當咱們使用 join()方法使一個主任務等待它所建立的子任務完成時,執行任務的線程(工做者線程)並不會由於等待其餘任務的完成而進入休眠狀態,而是隨機的去其餘線程所維護的雙端隊列末尾取出一個任務來執行,這就極大的提高了工做效率。固然,爲了達到上述目標,在使用Fork/Join框架時有如下限制:

  • 任務只能使用fork()join()等一些專門爲Fork/Join框架準備的方法進行同步。若是使用了其餘同步機制,工做者線程會真正的進入阻塞狀態而且不會竊取其餘線程的任務來執行
  • 任務不能執行I/O操做
  • 任務不能夠拋出非運行時異常

Fork/Join框架的核心是由如下兩個類組成的:算法

  • ForkJoinPool:這個類也實現了ExecutorExecutorService接口,和咱們以前使用過的ThreadPoolExecutor類有些相似,主要區別在於這個類實現了工做竊取算法。獲取ForkJoinPool對象的方法主要有如下幾種,咱們能夠根據不一樣的需求進行選擇:數組

    1. 首先是ForkJoinPool類的構造方法:app

      • ForkJoinPool():無參構造方法,調用此方法得到的ForkJoinPool對象將執行默認的配置。其並行級別爲當前JVM能夠調用的CPU內核數量
      • ForkJoinPool(int parallelism):經過這個構造方法能夠指定線程池的並行級別,可是咱們傳入的參數應該是大於0且小於等於JVM能夠調用的CPU內核數量的
      • ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) 此方法參數較多,下面會分別記錄:
        1. parallelism:並行級別。Fork/Join框架將根據這個參數來設定框架內並行執行的線程數。注意,並非框架中最大的線程數量
        2. factory:線程工廠。咱們能夠編寫本身的Fork/Join縣城工廠類,和之構造的線程工廠不一樣。構造Fork/Join線程工廠類須要咱們實現ForkJoinWorkerThreadFactory接口而不是ThreadFactory接口
        3. handler:異常捕獲處理器。當執行中的任務向上拋出異常時,就會被處理器捕獲
        4. asyncMode:工做模式。Fork/Join框架中的每個工做線程都維護着一個雙端隊列用於裝載任務,參數爲true則表示隊列中的任務先進先出(FIFO),爲false則表示後進先出(LIFO)
    2. ForkJoinPool類的靜態方法commonPool()一樣能夠得到ForkJoinPool對象。值得注意的是,調用此方法得到的是Java預約義的線程池,這能夠減小資源的消耗由於咱們再也不須要每提交一個任務就建立一個新的線程池了,也就是說每次咱們調用此方法所得到的對象引用實際上都指向同一個線程池,能夠發現執行下面的代碼打印出的值將爲true框架

      ForkJoinPool forkJoinPool1 = ForkJoinPool.commonPool();
          ForkJoinPool forkJoinPool2 = ForkJoinPool.commonPool();
          System.out.println(forkJoinPool1 == forkJoinPool2);
      複製代碼

      另外,在調用經此方法得到的ForkJoinPool對象的shutdown()方法時,線程池並不會關閉dom

    3. 使用Executors類的靜態方法newWorkStealingPool()或者此方法的另外一種實現newWorkStealingPool(int parallelism)異步

  • ForkJoinTask:此類實現了Future接口,是在ForkJoinPool中執行的任務的基類。爲了使用Fork/Join框架執行任務,一般狀況下咱們須要實現如下兩個ForkJoinTask子類的其中一個async

    • RecursiveAciton:用於任務沒有返回結果的場景
    • RecursiveTask:用於任務有返回結果的場景

    在繼承上面兩個類後,咱們最好在本身的類中加上這樣一個屬性:
    private static final long serialVersionUID = 1L;
    這是由於RecursiveActionRecursiveTask類均繼承了ForkJoinTask類,而ForkJoinTask類又實現了Serializable接口。若是咱們不顯示的聲明這個屬性,那麼Java會根據當前類的屬性、方法給出一個默認值。當咱們修改了類的屬性或方法後,這個值會發生變化。這樣一來,咱們在將修改以前進行過序列化的類進行反序列化時就會出現錯誤。因此咱們最好顯示的聲明這一屬性。ide

1.建立Fork/Join線程池

使用Fork/Join框架,咱們最好參考JavaAPI手冊爲咱們推薦的代碼結構學習

if (problem size > default size) {
    tasks = divide(task);
    execute(tasks);
} else {
    resolve problem using another algorthm;
}
複製代碼

下面是在此小節中須要瞭解的方法:

  • ForkJoinPool類:
    1. execute(ForkJoinTask<?> task):無返回值。調用此方法向線程池提交一個任務,注意這個方法是異步的,調用後線程不會等待而是直接向下執行。execute(Runnable task)是另外一種實現,提交一個Runnable類型的任務給線程池,在這種狀況下線程池不會使用工做竊取算法
    2. invoke(ForkJoinTask<T> task):此方法最好和execute(ForkJoinTask<?> task)方法對比來看。區別在與這個方法是同步的,調用後會直到任務執行結束後才返回。返回值即爲任務返回的結果
    3. 由於ForkJoinPool類實現了ExecutorService接口,因此也實現了invokeAll()invokeAny()方法。這些方法以前都已經使用過,參數爲Callable類型的任務列表。可是當咱們向ForkJoinPool發送Runnable或Callable類型的任務時,線程池並不會使用工做竊取算法,所以咱們不推薦這樣作
  • ForkJoinTask類:
    1. adapt():傳入一個Runnable或Callable對象,返回一個ForkJoinTask對象
    2. invokeAll():傳入ForkJoinTask對象列表或數個ForkJoinTask對象。這個方法是同步的,當主任務在等待子任務時,執行主任務的工做線程會開始執行另外一個等待執行的任務。值得注意的是,因傳入參數不一樣這個方法的返回值也有所區別。直接傳入ForkJoinTask對象的話此方法沒有返回值;傳入ForkJoinTask對象列表的話返回值也爲傳入的ForkJoinTask對象列表,而且通過調試咱們能夠發現傳入和返回的兩個列表對象的引用實際是指向同一個對象

範例實現

在這個範例中,咱們將對全部商品使用分治技術進行漲價操做。因爲任務不須要有返回值,咱們的任務類繼承了RecursiveAciton
商品類:

package day06.code_1;

public class Product {

    //商品名稱
    private String name;

    //商品價格
    private double price;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}
複製代碼

商品列表生成類:

package day06.code_1;

import java.util.ArrayList;
import java.util.List;

public class ProductListGenerator {

    //根據傳入的大小建立一個產品集合
    public List<Product> generate(int size) {
        //建立一個集合
        ArrayList<Product> products = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            //建立產品
            Product product = new Product();
            //設置名字
            product.setName("Product " + i);
            //統一設置初始價格爲10,方便檢查程序的正確性
            product.setPrice(10);
            //裝入集合
            products.add(product);
        }
        //返回集合
        return products;
    }

}
複製代碼

任務類:

package day06.code_1;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.RecursiveAction;

public class Task extends RecursiveAction {

    //必備參數
    private static final long serialVersionUID = 1L;

    //產品集合
    private List<Product> products;

    //起始位置
    private int first;

    //終止位置
    private int last;

    //價格增百分比
    private double increment;

    public Task(List<Product> products, int first, int last, double increment) {
        this.products = products;
        this.first = first;
        this.last = last;
        this.increment = increment;
    }

    @Override
    protected void compute() {
        //若是任務數量小於10
        if (last - first < 10) {
            //執行漲價操做
            updatePrices();
        } else {
            //若是任務數量大於10則將任務均分
            int middle = (first + last) / 2;
            //打印分割任務提示語
            System.out.printf("Task: Pending tasks:%s\n",
                    getQueuedTaskCount());
            //根據新分配的範圍建立兩個任務
            Task t1 = new Task(products, first, middle + 1, increment);
            Task t2 = new Task(products, middle + 1, last, increment);
            //執行
            invokeAll(t1, t2);
        }
    }

    private void updatePrices() {
        //遍歷集合爲每個商品作漲價操做
        for (int i = first; i < last; i++) {
            Product product = products.get(i);
            product.setPrice(product.getPrice() * (1 + increment));
        }
    }
}
複製代碼

main方法:

package day06.code_1;

import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class Main {

    public static void main(String[] args) {
        //建立產品生成對象
        ProductListGenerator generator = new ProductListGenerator();
        //經過產品生成器獲得大小爲10000的產品集合
        List<Product> products = generator.generate(10000);
        //建立一個任務
        Task task = new Task(products, 0, 10000, 0.20);
        //建立線程池
        ForkJoinPool pool = new ForkJoinPool();
        //調用線程池的方法執行任務
        pool.execute(task);
        do {
            //打印線程池中當前正在執行任務的線程數量
            System.out.printf("Main: Thread Count: %d\n",
                    pool.getActiveThreadCount());
            //打印線程池中竊取的工做數量
            System.out.printf("Main: Thread Steal: %d\n",
                    pool.getStealCount());
            //打印線程池的並行級別
            System.out.printf("Main: Parallelism: %d\n",
                    pool.getParallelism());
            //休眠5秒
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //等待任務結束
        } while (!task.isDone());
        //關閉線程池
        pool.shutdown();
        //判斷任務是否拋出了異常
        if (task.isCompletedNormally()) {
            //打印任務無異常完成的提示信息
            System.out.printf("Main: The process has completed normally\n");
        }
        //檢查商品是否已正確漲價
        for (int i = 0; i < products.size(); i++) {
            Product product = products.get(i);
            if (product.getPrice() != 12) {
                System.out.printf("Product %s: %f\n",
                        product.getName(), product.getPrice());
            }
        }
        //打印程序結束提示語
        System.out.println("Main: End of the program\n");
    }

}
複製代碼

2.合併任務的結果

使用Fork/Join框架執行帶有返回值的任務時必須繼承RecursiveTask類並使用JavaAPI文檔推薦的結構:

if (problem size > default size) {
    tasks = Divide(task);
    execute(tasks);
    groupResults();
    return results;
} else {
    resolve problem;
    return result;
}
複製代碼

如下幾個ForkJoinTask類中的方法咱們須要瞭解:

  1. fork():無參數、返回值。此方法用於向線程池異步的發送一個任務,發送完成後將會馬上返回並向下執行
  2. get():一直等待直到得到任務返回的結果。另外一種實現爲get(long timeout, TimeUnit unit),若是等待時間超時後任務還未返回結果,則方法直接返回null。get方法能夠被中斷。若是任務拋出運行時異常,get方法會返回ExecutionException異常
  3. join():一直等待直到得到任務返回的結果。此方法和get()方法有些相似,區別在於join()方法不能被中斷。若是中斷調用了該方法的線程,join()方法將拋出InterruptedException異常。另外,任務拋出運行時異常時,join()方法會返回RuntimeWxception異常
    以上三個方法中,第一個與第二或三個方法組合常常用來實現異步運行任務這一需求

範例實現

在這個範例中,咱們將統計一個指定詞彙在文檔中出現的次數。咱們會不斷切割任務直到每一個任務僅搜索100個之內的詞彙
DocumentMock(文檔生成類):

package day06.code_2;


import java.util.Random;

public class DocumentMock {

    //從如下詞彙中選擇詞語組成文檔
    private String words[] = {
            "the", "hello", "goodbye", "packt", "java",
            "thread", "pool", "random", "class", "main"
    };

    public String[][] generateDocument(int numLines, int numWords, String word) {
        //記錄指定詞彙出現的次數,便於後期判斷程序對錯
        int counter = 0;
        //建立二維數組
        String[][] document = new String[numLines][numWords];
        //隨機數生成器
        Random random = new Random();
        //填充數組
        for (int i = 0; i < numLines; i++) {
            for (int j = 0; j < numWords; j++) {
                //隨機選取詞彙並填充
                int index = random.nextInt(words.length);
                document[i][j] = words[index];
                //若是是指定詞彙,計數器加一
                if (document[i][j] == word) {
                    counter++;
                }

            }
        }
        //打印指定詞彙出現的次數
        System.out.printf("DocumentMock: The word appears " +
                "%d times in the document\n", counter);
        //返回文檔
        return document;
    }

}
複製代碼

DocumentTask(文檔任務類):

package day06.code_2;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

public class DocumentTask extends RecursiveTask<Integer> {

    //必備參數
    private static final long serialVersionUID = 1L;

    //文檔
    private String[][] document;

    //起始、結束位置
    private int start, end;

    //待查找的詞彙
    private String word;

    public DocumentTask(String[][] document, int start, int end, String word) {
        this.document = document;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    @Override
    protected Integer compute() {
        //初始化計數器
        int result = 0;
        //若是行數小於10
        if (end - start < 10) {
            //處理每一行的數據
            result = processLines(document, start, end, word);
        } else {
            //行數大於10則進行任務分割
            int mid = (start + end) / 2;
            DocumentTask task1 = new DocumentTask(document, start, mid, word);
            DocumentTask task2 = new DocumentTask(document, mid, end, word);
            //提交任務(同步)
            invokeAll(task1, task2);
            try {
                //處理子任務返回的結果
                result = groupResults(task1.get(), task2.get());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        //返回結果
        return result;
    }

    //將子任務結果相加後返回
    private int groupResults(Integer number1, Integer number2) {
        return number1 + number2;
    }

    private int processLines(String[][] document, int start, int end, String word) {
        //建立裝載行任務的集合
        ArrayList<LineTask> tasks = new ArrayList<>();
        //建立行任務
        for (int i = start; i < end; i++) {
            LineTask task = new LineTask(document[i], 0, document[i].length, word);
            tasks.add(task);
        }
        //執行全部任務
        invokeAll(tasks);
        //初始化計數器
        int result = 0;
        //從任務中獲取結果
        for (int i = 0; i < tasks.size(); i++) {
            LineTask task = tasks.get(i);
            try {
                result = result + task.get();
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        //返回結果
        return result;
    }

}
複製代碼

LineTask(單行任務類):

package day06.code_2;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

public class LineTask extends RecursiveTask<Integer> {

    //必備參數
    private static final long serialVersionUID = 1L;

    //行數據
    private String line[];

    //起始、結束位置
    private int start, end;

    //待查找的詞彙
    private String word;

    public LineTask(String[] line, int start, int end, String word) {
        this.line = line;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    @Override
    protected Integer compute() {
        //初始化計數器
        int result = 0;
        //若是一行的數據小於100
        if (end - start < 100) {
            //查找指定詞彙的數量
            result = count(line, start, end, word);
        } else {
            //分割任務
            int mid = (start + end) / 2;
            LineTask task1 = new LineTask(line, start, mid, word);
            LineTask task2 = new LineTask(line, mid, end, word);
            //執行
            invokeAll(task1, task2);
            //獲取子任務的結果
            try {
                result = groupResults(task1.get(), task2.get());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    //將子任務結果相加後返回
    private Integer groupResults(Integer number1, Integer number2) {
        return number1 + number2;
    }

    private int count(String[] line, int start, int end, String word) {
        //初始化計數器
        int counter = 0;
        //查找每個元素是否爲指定的詞彙
        for (int i = start; i < end; i++) {
            if (line[i].equals(word)) {
                counter++;
            }
        }
        //休眠10毫秒
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回結果
        return counter;
    }
}
複製代碼

main方法:

package day06.code_2;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立文檔生成器
        DocumentMock mock = new DocumentMock();
        //生成文檔
        String[][] document = mock.generateDocument(100, 1000, "the");
        //建立文檔搜素任務
        DocumentTask task = new DocumentTask(document, 0, 100, "the");
        //建立線程池
        ForkJoinPool pool = new ForkJoinPool();
        //異步執行文檔搜索任務
        pool.execute(task);
        //每隔一秒打印一次線程池的狀態直到任務執行結束
        do {
            System.out.println("****************************************");
            //並行級別
            System.out.printf("Main: Parallelism: %d\n",
                    pool.getParallelism());
            //正在工做的線程
            System.out.printf("Main: Active Threads: %d\n",
                    pool.getActiveThreadCount());
            //已提交的任務數量(不包括還沒有執行的)
            System.out.printf("Main: Task Count: %d\n",
                    pool.getQueuedTaskCount());
            //竊取工做的數量
            System.out.printf("Main: Steal Count: %d\n",
                    pool.getStealCount());
            System.out.println("****************************************");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());
        //關閉線程池
        pool.shutdown();
        //打印待查找關鍵詞的數量
        try {
            System.out.printf("Main: The word appears %d in the document",
                    task.get());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

}
複製代碼

3.異步運行任務

當咱們採用異步的方式向線程池發送任務時,方法將當即返回,代碼也將繼續向下執行,不過咱們提交的任務會繼續執行。在第二小節中咱們已經將異步運行任務的相關方法記錄了,就不在此贅述

範例實現

在這個範例中咱們將查找指定的文件夾內是否有咱們要查找的文件
FolderProcessor類(文件查找任務類):

package day06.code_3;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

public class FolderProcessor extends RecursiveTask<List<String>> {

    //必備參數
    private static final long serialVersionUID = 1L;

    //文件夾路徑
    private String path;

    //文件後綴名
    private String extension;

    public FolderProcessor(String path, String extension) {
        this.path = path;
        this.extension = extension;
    }

    @Override
    protected List<String> compute() {
        //建立一個集合用於裝載文件路徑
        ArrayList<String> list = new ArrayList<>();
        //建立集合用於裝載任務
        ArrayList<FolderProcessor> tasks = new ArrayList<>();
        //建立文件對象
        File file = new File(path);
        //獲得文件夾下的所有文件
        File[] content = file.listFiles();
        //判斷是否爲空
        if (content != null) {
            //遍歷集合
            for (int i = 0; i < content.length; i++) {
                //若是是文件夾就建立任務繼續查找
                if (content[i].isDirectory()) {
                    FolderProcessor task = new FolderProcessor
                            (content[i].getAbsolutePath(), extension);
                    //異步執行任務
                    task.fork();
                    //將任務保存進集合
                    tasks.add(task);
                } else {
                    //檢查文件是否符合要求,符合的話就裝入集合
                    if (checkFile(content[i].getName())) {
                        list.add(content[i].getAbsolutePath());
                    }
                }
            }
        }
        //若是文件集合容量超過50了就打印
        if (tasks.size() > 50) {
            System.out.printf("%s: %d tasks run\n",
                    file.getAbsolutePath(), tasks.size());
        }
        //整合子任務返回的結果
        addResultsFromTasks(list, tasks);
        //返回結果
        return list;
    }

    private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
        //遍歷任務集合
        for (FolderProcessor item : tasks) {
            //取得全部子任務返回的結果並裝進集合中
            list.addAll(item.join());
        }
    }

    //檢查文件後綴名是否符合要求
    private boolean checkFile(String name) {
        return name.endsWith(extension);
    }
}
複製代碼

main方法:

package day06.code_3;

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立線程池
        ForkJoinPool pool = new ForkJoinPool();
        //建立三個任務並異步執行
        FolderProcessor system = new FolderProcessor("C:\\", "exe");
        FolderProcessor program = new FolderProcessor("D:\\", "exe");
        FolderProcessor data = new FolderProcessor("F:\\", "exe");
        pool.execute(system);
        pool.execute(program);
        pool.execute(data);
        //在任務沒有都結束以前不斷循環打印線程池的信息
        do {
            System.out.println("***************************************");
            System.out.printf("Main: Parallelism: %d\n",
                    pool.getParallelism());
            System.out.printf("Main: Active Threads: %d\n",
                    pool.getActiveThreadCount());
            System.out.printf("Main: Task Count: %d\n",
                    pool.getQueuedTaskCount());
            System.out.printf("Main: Steal Count: %d\n",
                    pool.getStealCount());
            System.out.println("***************************************");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while ((!system.isDone()) || (!program.isDone()) || (!data.isDone()));
        //關閉線程池
        pool.shutdown();
        //獲取並打印每個任務返回的結果
        List<String> result;
        result = system.join();
        System.out.printf("System: %d files found\n", result.size());
        result = program.join();
        System.out.printf("Program: %d files found\n", result.size());
        result = data.join();
        System.out.printf("Data: %d files found\n", result.size());
    }

}
複製代碼

4.在任務中拋出異常

ForkJoinTask類的compute()方法中不容許拋出非運行時異常,可是咱們仍能夠拋出運行時異常。然而,當任務拋出運行時異常時,ForkJoinPoolForkJoinTask類的行爲和咱們期待的並不相同。程序不會結束運行,異常信息也不會打印出來。只有當咱們去獲取任務的結果時,異常纔會拋出。須要注意的是,當子任務拋出異常時,它的父任務也會受到影響。如下ForkJoinTask類中的幾個方法會對咱們獲取異常信息有必定幫助:

  1. isCompletedAbnormally():若是主任務或它的子任務拋出了異常,此方法將返回true
  2. isCompletedNormally():若是主任務及它的子任務均正常完成了,此方法返回true
  3. getException():調用此方法來得到任務拋出的異常對象

範例實現

在這個範例中,咱們將對一個數組進行搜索。搜索任務中若是包含了索引3,則拋出運行時異常
Task類:

package day06.code_4;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveTask<Integer> {

    //數組
    private int[] array;

    //起始、終止位置
    private int start, end;

    public Task(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        //打印搜索範圍的信息
        System.out.printf("Task: Start from %d to %d\n",
                start, end);
        //若是搜索範圍小於10
        if (end - start < 10) {
            //判斷是否包含索引三
            if ((3 > start) && (3 < end)) {
                //拋出運行時異常
                throw new RuntimeException("This task throws an Exception: " +
                        "Task from " + start + " to " + end);
            }
            //休眠1秒
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            //分割任務
            int mid = (start + end) / 2;
            Task task1 = new Task(array, start, mid);
            Task task2 = new Task(array, mid, end);
            //執行
            invokeAll(task1, task2);
        }
        //打印任務結束語
        System.out.printf("Task: End from %d to %d\n", start, end);
        return 0;

    }
}
複製代碼

main方法:

package day06.code_4;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立數組
        int[] array = new int[100];
        //建立任務
        Task task = new Task(array, 0, 100);
        //建立線程池
        ForkJoinPool pool = new ForkJoinPool();
        //執行任務
        pool.execute(task);
        //關閉線程池
        pool.shutdown();
        //休眠,直至線程池中的任務所有完成
        try {
            pool.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //判斷任務是否存在異常
        if (task.isCompletedAbnormally()) {
            //打印異常提示語
            System.out.printf("Main: An exception has ocurred\n");
            //打印獲取到的異常對象
            System.out.printf("Main: %s\n", task.getException());
        }
        //打印任務結果
        System.out.printf("Main: Result: %d", task.join());
    }
}
複製代碼

5.取消任務

ForkJoinTask類提供了cancel(boolean mayInterruptIfRunning)方法來達到取消任務的目的。和以前咱們用到過的FutureTask類不一樣的是,ForkJoinTask類的cancel()方法只能取消未被執行的任務。JavaAPI文檔指出,在ForkJoinTask類的默認實現中,傳入的參數並無起到做用,這就致使已經開始執行和已經執行結束的任務都不能被取消。取消成功返回true,不然返回false。另外,ForkJoinPool類中並無提供任務用於取消任務的方法。

範例實現

在這個範例中,咱們將在數組中尋找一個數字,找到後就取消其餘的搜索任務。
ArrayGenerator(數組生成類):

package day06.code_5;

import java.util.Random;

public class ArrayGenerator {

    public int[] generateArray(int size) {
        //根據傳入的參數生成一個數組
        int[] array = new int[size];
        //建立隨機數生成器對象
        Random random = new Random();
        //對數組進行初始化
        for (int i = 0; i < size; i++) {
            array[i] = random.nextInt(10);
        }
        //返回數組
        return array;
    }

}
複製代碼

TaskManager(任務管理類,該類將幫助咱們取消其餘任務):

package day06.code_5;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinTask;

public class TaskManager {

    //任務集合
    private List<ForkJoinTask<Integer>> tasks;

    public TaskManager() {
        tasks = new ArrayList<>();
    }

    //向集合中添加任務
    public void addTask(ForkJoinTask<Integer> task) {
        tasks.add(task);
    }

    public void cancelTasks(ForkJoinTask<Integer> cancelTask) {
        //取消除傳入的任務之外的其餘全部任務
        for (ForkJoinTask<Integer> task : tasks) {
            if (task != cancelTask) {
                //取消任務
                task.cancel(true);
                //打印取消信息
                ((SearchNumberTask) task).writeCancelMessage();
            }
        }
    }
}
複製代碼

SearchNumberTask(搜索數字任務類):

package day06.code_5;


import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class SearchNumberTask extends RecursiveTask<Integer> {

    //待搜索的數組
    private int[] numbers;

    //搜索範圍
    private int start, end;

    //目標數字
    private int number;

    //任務管理器
    private TaskManager manager;

    //未查詢到目標數字時返回的常量
    private static final int NOT_FOUND = -1;

    //必要參數
    private static final long serialVersionUID = 1L;

    public SearchNumberTask(int[] numbers, int start, int end, int number, TaskManager manager) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
        this.number = number;
        this.manager = manager;
    }

    @Override
    protected Integer compute() {
        //打印任務開始提示信息
        System.out.printf("Task: %d : %d\n", start, end);
        int ret;
        //若是搜索範圍大於10
        if (end - start > 10) {
            //調用切割任務的方法
            ret = launchTasks();
        } else {
            //查找目標數字
            ret = lookForNumber();
        }
        //返回結果
        return ret;
    }

    private int launchTasks() {
        //切割任務
        int mid = (start + end) / 2;
        //建立兩個新的任務在將其加入任務集合後執行
        SearchNumberTask task1 = new SearchNumberTask(numbers, start, mid, number, manager);
        SearchNumberTask task2 = new SearchNumberTask(numbers, mid, end, number, manager);
        manager.addTask(task1);
        manager.addTask(task2);
        task1.fork();
        task2.fork();
        //返回值
        int returnValue;
        //獲取任務1的結果
        returnValue = task1.join();
        //若是查詢到了就返回索引
        if (returnValue != -1) {
            return returnValue;
        }
        //不然返回任務2的結果
        return task2.join();

    }

    private int lookForNumber() {
        //遍歷搜索範圍內的數組
        for (int i = start; i < end; i++) {
            //若是是目標數字
            if (numbers[i] == number) {
                //打印查找成功提示語
                System.out.printf("Task: Number %d found in position %d\n",
                        number, i);
                //調用任務管理器的方法取消其餘任務
                manager.cancelTasks(this);
                //返回目標數字的索引
                return i;
            }
            //休眠1秒
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //沒有查詢到,返回常量
        return NOT_FOUND;
    }

    public void writeCancelMessage() {
        //打印任務取消的提示信息
        System.out.printf("Task: Cancelled task from %d to %d\n",
                start, end);
    }
}
複製代碼

main方法:

package day06.code_5;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立數組生成器
        ArrayGenerator generator = new ArrayGenerator();
        //獲得一個容量爲1000的數組
        int[] array = generator.generateArray(1000);
        //建立任務管理器
        TaskManager manager = new TaskManager();
        //建立線程池
        ForkJoinPool pool = new ForkJoinPool();
        //建立搜素數字任務
        SearchNumberTask task = new SearchNumberTask
                (array, 0, 1000, 5, manager);
        //將任務發送給線程池執行
        pool.execute(task);
        //關閉線程池
        pool.shutdown();
        //等待線程池將全部未取消的任務執行完畢
        try {
            pool.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印程序結束信息
        System.out.println("Main: The program has finished");
    }

}
複製代碼
相關文章
相關標籤/搜索