能夠經過parallel或者parallelStream方法實現數據並行化操做java
經過手動使用線程模擬擲骰子事件數組
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * 計算2次篩子投擲的和的機率 */ public class ManualDiceRools { private static final int N = 100000000; private final double fraction; private final Map<Integer, Double> results; private final int numberOfThreads; private final ExecutorService executor; private final int workPerThread; public static void main(String[] args) { ManualDiceRools rools = new ManualDiceRools(); rools.simulateDiceRoles(); } public ManualDiceRools() { // 出現一次的機率 this.fraction = 1.0 / N; this.results = new ConcurrentHashMap<>(); // 獲取可用處理器的Java虛擬機的數量 this.numberOfThreads = Runtime.getRuntime().availableProcessors(); this.executor = Executors.newFixedThreadPool(numberOfThreads); // 每一個線程處理的數據量 this.workPerThread = N / numberOfThreads; } public void simulateDiceRoles() { List<Future<?>> futures = submitJobs(); awaitCompletion(futures); printResults(); } /** * 打印結果 */ private void printResults() { results.entrySet().forEach(System.out::println); } /** * 執行線程 * * @return */ private List<Future<?>> submitJobs() { List<Future<?>> futures = new ArrayList<>(); for (int i=0;i<numberOfThreads;i++) { futures.add(executor.submit(makeJob())); } return futures; } private Runnable makeJob() { return () -> { // 使用安全隨機生成器 ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i=0;i<workPerThread;i++) { int entry = twoDiceThrows(random); accumulateResult(entry); } }; } /** * 計算生成的機率,即fraction + 以前的機率值 * * @param entry */ private void accumulateResult(int entry) { results.compute(entry, (key, previous) -> previous == null ? fraction : previous + fraction); } /** * 生成2次篩子隨機的和 * * @param random * @return */ private int twoDiceThrows(ThreadLocalRandom random) { int firstThrow = random.nextInt(1, 7); int secondThrow = random.nextInt(1, 7); return firstThrow + secondThrow; } /** * get每一個Future的值,獲取每一個線程計算的結果 * * @param futures */ private void awaitCompletion(List<Future<?>> futures) { try { futures.forEach(future -> { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } finally { executor.shutdown(); } } }
使用並行化安全
ThreadLocalRandom random = ThreadLocalRandom.current(); double fraction = 1.0 / N; Map<Integer, Double> collect = IntStream.range(0, N) .parallel() .mapToObj(n -> random.nextInt(1, 7) + random.nextInt(1, 7))// 計算隨機產生的和 .collect(Collectors.groupingBy(side -> side, Collectors.summingDouble(n -> fraction)));// 相同的值進行分組,並計算機率值,機率步長爲fraction collect.entrySet().forEach(System.out::println);
以前調用reduce方法,初始值能夠爲任意值,爲了讓其在並行化時能正常工做,初始值必須爲組合函數的恆等值(好比組合函數(acc, element) -> acc + element時,初始值必須爲0,由於任何數字加0值不變)數據結構
reduce操做的另外一個限制是組合操做必須符合結合律。意味着只要序列的值不變,組合操做的順序不重要,好比:(4 + 2) +1 = 4 + (2 + 1) =7框架
要避免的是持有鎖,流框架會在須要時,本身處理同步操做dom
還有個叫sequential的方法,串行的意思,在對流求值時,不能同時處於兩種模式,要麼是並行要麼是串行,若是同時用了這個2個方法,最後調用的那個方法起效ide
影響並行流性能的主要因素有5個:函數
在底層,並行流仍是沿用了fork/join框架,fork遞歸式的分解問題,而後每段並行執行,最終由join合併返回結果性能
parallelSetAll:使用Lambda表達式更新數據元素this
// 使用for循環初始化數組 public static double[] imperativeInitilize(int size) { double[] values = new double[size]; for (int i=0;i<values.length;i++) { values[i] = i; } return values; } // 使用並行化數組操做初始化數組,它改變了傳入的數組,但沒有建立一個新的數組 public static double[] parallelInitialize(int size) { double[] values = new double[size]; Arrays.parallelSetAll(values, i -> i); return values; }
parallelPrefix:會更新一個數組,將每個元素替換爲當前元素和前驅元素的和,這裏的「和」不必定是加法,能夠說任意一個BinaryOperator
public static double[] simpleMovingAverage(double[] values, int n) { double[] sums = Arrays.copyOf(values, values.length); // 和前一個值進行相加 Arrays.parallelPrefix(sums, Double::sum); return sums; }
parallelSort:並行化對數組元素排序