java多線程之fork/join框架詳解

    這個框架的目的主要是更好地利用底層平臺上的多核CPU和多處理器來進行處理,解決問題時一般使用分治算法或map/reduce算法來進行.這個框架的名稱來源於使用時的兩個基本操做fork和join,能夠類比於map/reduce中的map和reduce操做.fork操做的做用是把一個大的問題劃分紅若干個較小的問題.這個劃分過程通常是遞歸進行的,直到獲得能夠直接進行計算的粒度合適的子問題.在劃分時,須要恰當地選取子問題的大小.太大的子問題不利於經過並行方式來提升性能,而過小的子問題則會帶來較大的額外開銷.每一個子問題在計算完成以後,能夠獲得關於整個問題的部分解.join操做的做用是把這些部分解收集並組織起來,獲得最終的完整解.調用join操做的過程也多是遞歸進行的,於fork操做相對應.java


    相對於通常的線程池實現,fork/join框架的優點體如今對其中包含的任務的處理方式上.在通常的線程池中,若是一個線程正在執行的任務因爲某些緣由沒法繼續運行,那麼該線程會處於等待狀態.而在fork/join框架實現中,若是某個子問題因爲等待另一個子問題的完成而沒法繼續運行.那麼處理該子問題的線程會主動尋找其餘還沒有運行的子問題來執行.這種方式減小了線程的等待時間,提升了性能.爲了fork/join框架可以高效運行,在每一個子問題的實現中應該避免使用synchronized關鍵字或其餘方式來進行同步,也不該該使用阻塞式I/O操做或過多地訪問共享變量.在理想狀況下,每一個子問題的實現中都應該只進行CPU相關的計算,而且只使用每一個問題的內部對象.惟一的同步應該只發生在子問題和建立它的父問題之間.算法


     一個fork/join框架執行的任務由ForkJoinTask類表示.ForkJoinTask類實現了Future接口,能夠按照Future接口的方式來使用.在ForkJoinTask類中最重要的兩個方法是fork和join,其中fork方法用來以異步方式啓動任務的執行,而join方法則等待任務完成並返回執行結果.在建立本身的任務時,最好不要直接繼承自ForkJoinTask類,而要繼承自ForkJoinTask類的子類RecursiveTask或RecursiveAction類.二者的區別在於RecursiveTask類表示的任務能夠返回結果,而RecursiveAction類不行.框架


     在fork/join框架中,任務的執行由ForkJoinPool類的對象來完成.ForkJoinPool類實現了ExecutorService接口,除了執行ForkJoinTask類的對新外,還可使用通常的Callable和Runnable接口來表示的任務,在ForkJoinPool類的對新中執行的任務大體能夠分紅兩類:一類是經過execute,invoke或submit方法直接提交的任務;另一類是ForkJoinTask類的對新在執行過程當中產生的子任務,並經過fork方法來運行.通常的作法是表示整個問題的ForkJoinTask類的對象用第一類形式提交,而在執行過程當中產生的子任務並不須要進行處理,ForkJoinPool類的對新會負責子任務的執行.異步

import java.io.File;ide

import java.util.ArrayList;性能

import java.util.List;spa

import java.util.concurrent.ForkJoinPool;線程

import java.util.concurrent.ForkJoinTask;對象

import java.util.concurrent.RecursiveTask;繼承

  

/**

 * <pre>

 * 輕量級任務執行框架fork/join詳解,使用該框架計算文件目錄大小

 * </pre>

 * User: ketqi

 * Date: 2013-01-27 17:40

 */

public class ForkJoinDemo {

    private static class CalculateTask extends RecursiveTask<Long> {

        final File file;

        public CalculateTask(final File theFile) {

            file = theFile;

        }

        @Override

        public Long compute() {

            long size = 0;

            if (file.isFile()) {

                size = file.length();

            } else {

                final File[] children = file.listFiles();

                if (children != null) {

                    List<ForkJoinTask<Long>> taskList = new ArrayList<ForkJoinTask<Long>>();

                    for (final File child : children) {

                        if (child.isFile()) {

                            size += child.length();

                        } else {

                            taskList.add(new CalculateTask(child));

                        }

                    }

                    //invokeAll方法循環調用task.fork()執行子任務.

                    for (final ForkJoinTask<Long> task : invokeAll(taskList)) {

                        //task.join()方法等待任務完成並返回執行的結果

                        size += task.join();

                    }

                }

            }

            return size;

        }

    }

    public static void main(String[] args) {

        long start = System.nanoTime();

        long total = new ForkJoinPool().invoke(new CalculateTask(new File("E:\\workspace\\java7")));

        long end = System.nanoTime();

        System.out.println("Total Size: " + total + "B");

        System.out.println("Time taken: " + (end - start) / 1.0e9);

    }

}

相關文章
相關標籤/搜索