JAVA中的Fork/Join框架

看了下Java Tutorials中的fork/join章節,整理下。html

什麼是fork/join框架

  fork/join框架是ExecutorService接口的一個實現,能夠幫助開發人員充分利用多核處理器的優點,編寫出並行執行的程序,提升應用程序的性能;設計的目的是爲了處理那些能夠被遞歸拆分的任務。java

  fork/join框架與其它ExecutorService的實現類類似,會給線程池中的線程分發任務,不一樣之處在於它使用了工做竊取算法,所謂工做竊取,指的是對那些處理完自身任務的線程,會從其它線程竊取任務執行。算法

  fork/join框架的核心是ForkJoinPool類,該類繼承了AbstractExecutorService類。ForkJoinPool實現了工做竊取算法而且可以執行 ForkJoinTask任務。
express

基本使用方法

  在使用fork/join框架以前,咱們須要先對任務進行分割,任務分割代碼應該跟下面的僞代碼相似:api

if (任務足夠小){
  直接執行該任務;
}
else{ 將任務一分爲二; 執行這兩個任務並等待結果;
}

  首先,咱們會在ForkJoinTask的子類中封裝以上代碼,不過通常咱們會使用更加具體的ForkJoinTask類型,如 RecursiveTask(能夠返回一個結果)或RecursiveAction數組

  當寫好ForkJoinTask的子類後,建立該對象,該對象表明了全部須要完成的任務;而後將這個任務對象傳給ForkJoinPool實例的invoke()去執行便可。oracle

例子-圖像模糊

  爲了更加直觀的理解fork/join框架是如何工做的,能夠看一下下面這個例子。假定咱們有一個圖像模糊的任務須要完成,原始圖像數據能夠用一個整型數組表示,每個整型元素包含了一個像素點的顏色值(RBG,存放在整型元素的不一樣位中)。目標圖像一樣是由一個整型數組構成,每一個整型元素包含RBG顏色信息;框架

  執行模糊操做須要遍歷原始圖像整型數組的每一個元素,並對其周圍的像素點作均值操做(RGB均值),而後將結果存放到目標數組中。因爲圖像是一個大數組,這個處理操做會花費必定的時間。可是有了fork/join框架,咱們能夠充分利用多核處理器進行並行計算。以下是一個可能的代碼實現(圖像作水平方向的模糊操做):ide

Tips:該例子僅僅是闡述fork/join框架的使用,並不推薦使用該方法作圖像模糊,圖像邊緣處理也沒作判斷性能

public class ForkBlur extends RecursiveAction {
    private static final long serialVersionUID = -8032915917030559798L;
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    private int mBlurWidth = 15; // Processing window size, should be odd.
 
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }
 
    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
            }
 
            // Re-assemble destination pixel.
            int dpixel = (0xff000000)
                    | (((int) rt) << 16)
                    | (((int) gt) << 8)
                    | (((int) bt) << 0);
            mDestination[index] = dpixel;
        }
    }
...

  如今,咱們開始編寫compute()的實現方法,該方法分紅兩部分:直接執行模糊操做和任務的劃分;一個數組長度閾值sThreshold能夠幫助咱們決定任務是直接執行仍是進行劃分;

    @Override
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }
 
        int split = mLength / 2;
 
        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLength - split, 
                mDestination));
    }

接下來按以下步驟便可完成圖像模糊任務啦:

一、建立圖像模糊任務

ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

二、建立ForkJoinPool

ForkJoinPool pool = new ForkJoinPool();

三、執行圖像模糊任務

pool.invoke(fb);

完整代碼以下:

/*
* Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
*   - Redistributions of source code must retain the above copyright
*     notice, this list of conditions and the following disclaimer.
*
*   - Redistributions in binary form must reproduce the above copyright
*     notice, this list of conditions and the following disclaimer in the
*     documentation and/or other materials provided with the distribution.
*
*   - Neither the name of Oracle or the names of its
*     contributors may be used to endorse or promote products derived
*     from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
 
import java.awt.image.BufferedImage;
import java.io.File;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import javax.imageio.ImageIO;
 
/**
 * ForkBlur implements a simple horizontal image blur. It averages pixels in the
 * source array and writes them to a destination array. The sThreshold value
 * determines whether the blurring will be performed directly or split into two
 * tasks.
 *
 * This is not the recommended way to blur images; it is only intended to
 * illustrate the use of the Fork/Join framework.
 */
public class ForkBlur extends RecursiveAction {
    private static final long serialVersionUID = -8032915917030559798L;
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    private int mBlurWidth = 15; // Processing window size, should be odd.
 
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }
 
    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
            }
 
            // Re-assemble destination pixel.
            int dpixel = (0xff000000)
                    | (((int) rt) << 16)
                    | (((int) gt) << 8)
                    | (((int) bt) << 0);
            mDestination[index] = dpixel;
        }
    }
    protected static int sThreshold = 10000;
 
    @Override
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }
 
        int split = mLength / 2;
 
        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLength - split, 
                mDestination));
    }
 
    // Plumbing follows.
    public static void main(String[] args) throws Exception {
        String srcName = "C:\\test6.jpg";
        File srcFile = new File(srcName);
        BufferedImage image = ImageIO.read(srcFile);
         
        System.out.println("Source image: " + srcName);
         
        BufferedImage blurredImage = blur(image);
         
        String dstName = "C:\\test6_out.jpg";
        File dstFile = new File(dstName);
        ImageIO.write(blurredImage, "jpg", dstFile);
         
        System.out.println("Output image: " + dstName);
         
    }
 
    public static BufferedImage blur(BufferedImage srcImage) {
        int w = srcImage.getWidth();
        int h = srcImage.getHeight();
 
        int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w);
        int[] dst = new int[src.length];
 
        System.out.println("Array size is " + src.length);
        System.out.println("Threshold is " + sThreshold);
 
        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println(Integer.toString(processors) + " processor"
                + (processors != 1 ? "s are " : " is ")
                + "available");
 
        ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
 
        ForkJoinPool pool = new ForkJoinPool();
 
        long startTime = System.currentTimeMillis();
        pool.invoke(fb);
        long endTime = System.currentTimeMillis();
 
        System.out.println("Image blur took " + (endTime - startTime) + 
                " milliseconds.");
 
        BufferedImage dstImage =
                new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB);
        dstImage.setRGB(0, 0, w, h, dst, 0, w);
 
        return dstImage;
    }
}
View Code

測試了一下,執行效果以下:

Source image: C:\test6.jpg
Array size is 120000
Threshold is 10000
4 processors are available
Image blur took 10 milliseconds.
Output image: C:\test6_out.jpg

 

 JDK中使用fork/join的例子

  除了咱們上面提到的使用fork/join框架並行執行圖像模糊任務以外,在JAVA SE中,也已經利用fork/join框架實現了一些很是有用的特性。其中一個實現是在JAVA SE8 中java.util.Arrays 類的parallelSort()方法。這些方法和sort()方法相似,可是能夠經過fork/join框架並行執行。對於大數組排序,在多核處理器系統中,使用並行排序方法比順序排序更加高效。固然,關於這些排序方法是如何利用fork/join框架不在本篇文章討論範圍,更多信息能夠查看JAVA API文檔。
  另外一個fork/join框架的實現是在JAVA SE8中的java.util.streams包內,與Lambda表達式相關,更多信息,能夠查看https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html連接。

 

參考連接:https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

相關文章
相關標籤/搜索