thread_fork/join併發框架2

轉自  http://blog.csdn.net/mr_zhuqiang/article/details/48300229算法

三.使用異步方式數組

  invokeAll(task1,task2); 是同步方式,將當前任務掛起直到子任務發送到 Fork/join線程池中執行完成,這種方式容許工做竊取算法,分配一個新任務給在執行休眠任務的工做者線程。
  相反當採用異步的方式(好比,fork()),任務將繼續執行,因此就沒有辦法使用工做竊取算法了。由於不存在等待的線程了。除非使用join() 或則 get() 來獲取結果,等待任務的完成。
  invokeAll()採用同步方式,工做者線程將會休眠等待子任務的完成,因此能使用竊取算法派給工做者一個新任務
  fork() 採用異步方式,只有結合join()或者get() 來等待任務的完成,進而可使用竊取算法來提升性能。
  get():若是ForkJoinTask類執行結束,或則一直等到結束,那麼get()方法的這個版本則返回由compute()方法返回的結果
  get() 方法 和 join()方法的區別:
  join()方法不能被中斷,若是中斷join()方法的線程,方法將拋出Interrupted異常
  若是任務拋出任何運行時異常,那麼get()方法將返回ExecutionException異常,可是join方法返回的是RuntimeException;dom

public class ForkJoin3Test {
    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        Task3 mp3 = new Task3("C:\\360CloudUI", "mp3");
        pool.execute(mp3);

        do {
            System.out.println("***********   狀態信息巡查 ***************");
            System.out.printf("最大並行任務:%s,當前活動任務數(不許確的):%s,隊列中的任務數量:%s,竊取數量:%s\n", 
                    pool.getParallelism(),
                    pool.getActiveThreadCount(), 
                    pool.getQueuedTaskCount(), 
                    pool.getStealCount());
            TimeUnit.MILLISECONDS.sleep(10);
        } while (!mp3.isDone()); // 未完成則一直循環獲取狀態信息

        pool.shutdown();
        List<String> join = mp3.join();
        System.out.println("共找到符合的文件數量:" + join.size());
        for (String s : join) {
            System.out.println(s);
        }
    }
}

class Task3 extends RecursiveTask<List<String>> {
    private static final long serialVersionUID = 1L;
    private String path; // 文件夾路徑
    private String suffix; // 後綴

    public Task3(String path, String suffix) {
        this.path = path;
        this.suffix = suffix;
    }
    @Override
    protected List<String> compute() {
        List<String> result = new ArrayList<String>(); // 存儲結果
        List<Task3> tasks = new ArrayList<Task3>();    // 存儲任務

        File file = new File(path);
        File[] files = file.listFiles();
        for (File f : files) {     // 分發和執行任務
            if (f.isDirectory()) { // 若是是文件夾,則使用異步的方式發送一個任務去執行
                Task3 task = new Task3(f.getAbsolutePath(), suffix);
                task.fork();      // 拆分任務異步執行
                tasks.add(task);
            } else {
                String name = f.getName();
                if (name.endsWith(suffix)) {
                    result.add(name);
                }
            }
        }
        if (tasks.size() > 1) {    // 若是當前任務大於1個 則打印信息
            System.out.printf("%s,tasks size(當前路徑有) = %s個(文件夾),當前路徑是:%s\n", Thread.currentThread().getName(),
                    tasks.size(), path);
        }
        for (Task3 task : tasks) {           // 獲取當前任務的結果
            List<String> join = task.join(); // 調用join方法等待任務完成
            result.addAll(join);             // 把任務結果添加到當前任務的結果中
        }
        return result;
    }
}

四 取消任務異步

  ForkJoinTask 對象中有一個cancel()方法來取消未開始的任務。取消任務有如下兩點須要注意:
   1. ForkJoinPool類不提供任何方法來取消線程池中正在運行或則等待運行的全部任務。
   2. 取消任務時,不能取消已經被執行的任務。ide

public class ForkJoin4Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int[] arrs = new ArrayGenerator().generateArray(50);
// TaskManger taskManger
= new TaskManger(); ForkJoinPool pool = new ForkJoinPool(); SearchNumberTask task = new SearchNumberTask(arrs, 0, arrs.length, 50, taskManger); pool.execute(task); pool.shutdown(); pool.awaitTermination(1, TimeUnit.MILLISECONDS); // System.out.println("main:結束:" + task.get()); } } // 數組生成 class ArrayGenerator { public int[] generateArray(int size) { int[] array = new int[size]; Random random = new Random(); for (int i = 0; i < size; i++) { array[i] = random.nextInt(10); } return array; } } // 任務管理類 class TaskManger { private List<ForkJoinTask<Integer>> tasks = new ArrayList<ForkJoinTask<Integer>>(); public void addTask(ForkJoinTask<Integer> task) { tasks.add(task); } public void cancelTasks(ForkJoinTask<Integer> cancelTask) { for (ForkJoinTask<Integer> task : tasks) { if (task != cancelTask) { task.cancel(true); ((SearchNumberTask) task).writeCanceMesg(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } class SearchNumberTask extends RecursiveTask<Integer> { private int[] numbers; private int start, end; private int number; private TaskManger taskManger; private final static int NOT_FOUND = -1; public SearchNumberTask(int[] numbers, int start, int end, int number, TaskManger taskManger) { this.numbers = numbers; this.start = start; this.end = end; this.number = number; this.taskManger = taskManger; } @Override protected Integer compute() { int ret; if (end - start > 10) { // 拆分任務 ret = launchTasks(); } else { // 執行查找 System.out.println("Task:開始:" + start + ":" + end); ret = lookForNumber(); System.out.println("Task:結束--------:" + start + ":" + end); } return ret; } /** * 查找數字 * * @return */ private int lookForNumber() { for (int i = start; i < end; i++) { if (numbers[i] == number) { System.out.printf("Task:目標number:%s已被找到,索引位置:%s\n", number, i); taskManger.cancelTasks(this); return i; } } return NOT_FOUND; } /** * 拆分任務 * * @return */ private int launchTasks() { int mid = (start + end) / 2; SearchNumberTask task1 = new SearchNumberTask(numbers, start, mid, number, taskManger); SearchNumberTask task2 = new SearchNumberTask(numbers, mid, end, number, taskManger); taskManger.addTask(task1); taskManger.addTask(task2); task1.fork(); // 異步執行 task2.fork(); int result = task1.join(); if (result != -1) { return result; } return task2.join(); } /** 取消任務 信息 **/ public void writeCanceMesg() { System.out.printf("Task:取消了,start=%s,end=%s\n", start, end); } }

五 運行異常性能

Java有兩種類型的異常:
  非運行時異常(Checked Exception):必須在方法上經過throws 子句拋出,或則經過try…catch語句進行撲捉處理。
  運行時異常(Unchecked Exception):不是強制的須要捕捉處理和throws拋出。
在ForkJoinTask類的compute方法中不能拋出非運行時異常,由於該方法沒有throws的聲明,根據Java從新方法的規則,因此不能拋出。並且在該compute中拋出的運行時異常,給我最明顯直觀的結果是,只要不調用get()獲取結果,控制檯是不會打印異常信息的。也就是說,異常被吞噬了。可是咱們能夠經過該類的其餘方法來獲取該異常。
  task.isCompletedNormally() : 任務完成時沒有出錯
  task.isCompletedAbnormally() : 來檢查任務是否已經拋出異常或已經被取消了,要注意此方法。因爲提交任務以後,檢測該任務是否有異常,不是阻塞的。因此須要等待任務的完成。才能正確的獲取到是否有異常
  task.getException() : 得到任務中拋出的異常
該類中拋出的異常,只要一拋出異常,子任務都不會再繼續執行。(反正就是說只要拋出了異常,任務結果確定是不正確的了)
  completeExceptionally(Throwable ex) : 該方法 能夠在語義上拋出一個異常,包括非運行時異常。要在獲取結果前 經過task.isCompletedAbnormally()來配合操做。測試

 

public class ForkJoin5Test {
    public static void main(String[] args) throws InterruptedException {
        int[] arrs = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        ForkJoinPool pool = new ForkJoinPool(1);
        Task5 task = new Task5(arrs, 0, arrs.length);
        pool.execute(task);

        pool.shutdown(); // 關閉執行器,並配合超時。來等待任務運行完成,發現一個特性:該執行器如來裏面沒有可活動的任務。執行器會自動關閉。並且調用get會阻塞任務直到返回結果
        pool.awaitTermination(1, TimeUnit.DAYS);

        // task.isCompletedNormally() 任務完成時沒有出錯
        if (task.isCompletedAbnormally()) { // 來檢查任務是否已經拋出異常或已經被取消了,要注意此方法。因爲提交任務以後,檢測該任務是否有異常,不是阻塞的。因此須要上面的等待任務的完成。才能正確的獲取到是否有異常
            System.out.println("檢測到任務中有拋出的異常:" + task.getException().getMessage());
        } else {
            System.out.println(task.join());
        }
    }
}

class Task5 extends RecursiveTask<Integer> {
    private int[] arrs; // 要處理的數據
    private int start; // 開始索引
    private int end; // 結束索引

    public Task5(int[] arrs, int start, int end) {
        this.arrs = arrs;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (end - start < 2) {
            for (int i = start; i < end; i++) {
                result += arrs[i];
            }
            System.out.printf("%s,結果:%s\n", Thread.currentThread().getName(), result);
            return result;
        } else {
            int mid = (start + end) / 2;
            // System.out.println(mid);
            if (mid == 2) {
                throw new RuntimeException("故意拋出的測試異常"); // 爲了測試拋出異常,能夠
                                                            // 關閉測異常。運行查看結果
                // Exception e = new Exception("故意拋出的非運行時異常");
                // completeExceptionally(e); //也可使用 該方法,設置一個異常,由於 源碼
                // setExceptionalCompletion
                // 是設置的異常,就至關於該異常並無被拋出。在語義上經過task.isCompletedAbnormally()來拋出了非運行時異常
                // return null; // 若是不返回,程序將繼續執行後面的代碼,並不能達到真正拋出異常的效果
            }
            // 拆分紅2個子任務繼續檢測和執行
            Task5 task1 = new Task5(arrs, start, mid);
            Task5 task2 = new Task5(arrs, mid, end);
            invokeAll(task1, task2); // 使用同步的方式 執行
            try {
                result = task1.get() + task2.get(); // 把子任務返回的結果相加
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
}
相關文章
相關標籤/搜索