數據並行化

並行化流操做

能夠經過parallel或者parallelStream方法實現數據並行化操做java

經過手動使用線程模擬擲骰子事件數組

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 計算2次篩子投擲的和的機率
 */
public class ManualDiceRools {

    private static final int N = 100000000;

    private final double fraction;
    private final Map<Integer, Double> results;
    private final int numberOfThreads;
    private final ExecutorService executor;
    private final int workPerThread;

    public static void main(String[] args) {
        ManualDiceRools rools = new ManualDiceRools();
        rools.simulateDiceRoles();
    }

    public ManualDiceRools() {
        // 出現一次的機率
        this.fraction = 1.0 / N;
        this.results = new ConcurrentHashMap<>();
        // 獲取可用處理器的Java虛擬機的數量
        this.numberOfThreads = Runtime.getRuntime().availableProcessors();
        this.executor = Executors.newFixedThreadPool(numberOfThreads);
        // 每一個線程處理的數據量
        this.workPerThread = N / numberOfThreads;
    }

    public void simulateDiceRoles() {
        List<Future<?>> futures = submitJobs();
        awaitCompletion(futures);
        printResults();
    }

    /**
     * 打印結果
     */
    private void printResults() {
        results.entrySet().forEach(System.out::println);
    }

    /**
     * 執行線程
     *
     * @return
     */
    private List<Future<?>> submitJobs() {
        List<Future<?>> futures = new ArrayList<>();
        for (int i=0;i<numberOfThreads;i++) {
            futures.add(executor.submit(makeJob()));
        }
        return futures;
    }

    private Runnable makeJob() {
        return () -> {
            // 使用安全隨機生成器
            ThreadLocalRandom random = ThreadLocalRandom.current();
            for (int i=0;i<workPerThread;i++) {
                int entry = twoDiceThrows(random);
                accumulateResult(entry);
            }
        };
    }

    /**
     * 計算生成的機率,即fraction + 以前的機率值
     *
     * @param entry
     */
    private void accumulateResult(int entry) {
        results.compute(entry, (key, previous) -> previous == null ? fraction : previous + fraction);
    }

    /**
     * 生成2次篩子隨機的和
     *
     * @param random
     * @return
     */
    private int twoDiceThrows(ThreadLocalRandom random) {
        int firstThrow = random.nextInt(1, 7);
        int secondThrow = random.nextInt(1, 7);
        return firstThrow + secondThrow;
    }

    /**
     * get每一個Future的值,獲取每一個線程計算的結果
     *
     * @param futures
     */
    private void awaitCompletion(List<Future<?>> futures) {
        try {
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        } finally {
            executor.shutdown();
        }
    }
}

使用並行化安全

ThreadLocalRandom random = ThreadLocalRandom.current();
double fraction = 1.0 / N;
Map<Integer, Double> collect = IntStream.range(0, N)
        .parallel()
        .mapToObj(n -> random.nextInt(1, 7) + random.nextInt(1, 7))// 計算隨機產生的和
        .collect(Collectors.groupingBy(side -> side,
                Collectors.summingDouble(n -> fraction)));// 相同的值進行分組,並計算機率值,機率步長爲fraction
collect.entrySet().forEach(System.out::println);

限制

以前調用reduce方法,初始值能夠爲任意值,爲了讓其在並行化時能正常工做,初始值必須爲組合函數的恆等值(好比組合函數(acc, element) -> acc + element時,初始值必須爲0,由於任何數字加0值不變)數據結構

reduce操做的另外一個限制是組合操做必須符合結合律。意味着只要序列的值不變,組合操做的順序不重要,好比:(4 + 2) +1 = 4 + (2 + 1) =7框架

要避免的是持有鎖,流框架會在須要時,本身處理同步操做dom

還有個叫sequential的方法,串行的意思,在對流求值時,不能同時處於兩種模式,要麼是並行要麼是串行,若是同時用了這個2個方法,最後調用的那個方法起效ide

性能

影響並行流性能的主要因素有5個:函數

  1. 數據大小
    只有數據足夠大、每一個數據處理管道話費的時間足夠多時,並行化纔有意義
  2. 源數據結構
    性能好:ArrayList、數組或IntStream.range,這些數據支持隨機讀取
    性能通常:HashSet、TreeSet,這些數據結構不易公平地被分解
    性能差:LinkedList、Streams.iterate或BufferedReader.lines,難於分解,可能要花O(N)的時間複雜度來分解問題
  3. 裝箱
    處理基本類型比處理裝箱類型要快
  4. 核的數量
    擁有的核數越多,得到潛在性能提高的幅度越大,在實踐中,核的數量不單指你的機器有多少核,而是指運行時能使用多少核
  5. 單元處理開銷
    花在流中每一個元素身上的時間越長,並行操做帶來的性能提高越明顯

在底層,並行流仍是沿用了fork/join框架,fork遞歸式的分解問題,而後每段並行執行,最終由join合併返回結果性能

並行化數組操做

parallelSetAll:使用Lambda表達式更新數據元素this

// 使用for循環初始化數組
public static double[] imperativeInitilize(int size) {
    double[] values = new double[size];
    for (int i=0;i<values.length;i++) {
        values[i] = i;
    }
    return values;
}

// 使用並行化數組操做初始化數組,它改變了傳入的數組,但沒有建立一個新的數組
public static double[] parallelInitialize(int size) {
    double[] values = new double[size];
    Arrays.parallelSetAll(values, i -> i);
    return values;
}

parallelPrefix:會更新一個數組,將每個元素替換爲當前元素和前驅元素的和,這裏的「和」不必定是加法,能夠說任意一個BinaryOperator

public static double[] simpleMovingAverage(double[] values, int n) {
    double[] sums = Arrays.copyOf(values, values.length);
    // 和前一個值進行相加
    Arrays.parallelPrefix(sums, Double::sum);
    return sums;
}

parallelSort:並行化對數組元素排序

相關文章
相關標籤/搜索