《Java 8 in Action》Chapter 7:並行數據處理與性能

在Java 7以前,並行處理數據集合很是麻煩。第一,你得明確地把包含數據的數據結構分紅若干子部分。第二,你要給每一個子部分分配一個獨立的線程。第三,你須要在恰當的時候對它們進行同步來避免不但願出現的競爭條件,等待全部線程完成,最後把這些部分結果合併起來。Java 7引入了一個叫做分支/合併的框架,讓這些操做更穩定、更不易出錯。
Stream接口讓你不用太費力氣就能對數據集執行並行操做。它容許你聲明性地將順序流變爲並行流。此外,你將看到Java是如何變戲法的,或者更實際地來講, 流是如何在幕後應用Java 7引入的分支/合併框架的。java

1. 並行流

並行流就是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每一個數據塊的流。算法

public static long sequentialSum(long n) {
             return Stream.iterate(1L, i -> i + 1)
                          .limit(n)
                          .reduce(0L, Long::sum);
}
傳統寫法:
public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 1L; i <= n; i++) {
            result += i;
        }
        return result;
}複製代碼

1.1 將順序流轉換爲並行流

能夠把流轉換成並行流,從而讓前面的函數歸約過程(也就是求和)並行運行——對順序流調用parallel方法:編程

public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     .parallel()
                     .reduce(0L, Long::sum);
}複製代碼

在現實中,對順序流調用parallel方法並不意味着流自己有任何實際的變化。它在內部實際上就是設了一個boolean標誌,表示你想讓調用parallel以後進行的全部操做都並行執行。相似地,你只須要對並行流調用sequential方法就能夠把它變成順序流。請注意,你可能覺得把這兩個方法結合起來,就能夠更細化地控制在遍歷流時哪些操做要並行執行,哪些要順序執行。數組

配置並行流使用的線程池數據結構

看看流的parallel方法,你可能會想,並行流用的線程是從哪來的?有多少個?怎麼自定義這個過程呢?架構

並行流內部使用了默認的ForkJoinPool,它默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime().available- Processors()獲得的。框架

可是你能夠經過系統屬性 java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池大小,以下所示:ide

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");函數

這是一個全局設置,所以它將影響代碼中全部的並行流。反過來講,目前還沒法專爲某個並行流指定這個值。通常而言,讓ForkJoinPool的大小等於處理器數量是個不錯的默認值,性能

除非你有很好的理由,不然咱們強烈建議你不要修改它。

1.2 測量流性能

並行編程可能很複雜,有時候甚至有點違反直覺。若是用得不對(好比採用了一 個不易並行化的操做,如iterate),它甚至可能讓程序的總體性能更差,因此在調用那個看似神奇的parallel操做時,瞭解背後到底發生了什麼是頗有必要的。並行化並非沒有代價的。並行化過程自己須要對流作遞歸劃分,把每一個子流的概括操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。但在多個內核之間移動數據的代價也可能比你想的要大,因此很重要的一點是要保證在內核中並行執行工做的時間比在內核之間傳輸數據的時間長。總而言之,不少狀況下不可能或不方便並行化。然而,在使用 並行Stream加速代碼以前,你必須確保用得對;若是結果錯了,算得快就毫無心義了。

1.3 正確使用並行流

錯用並行流而產生錯誤的首要緣由,就是使用的算法改變了某些共享狀態。下面是另外一種實現對前n個天然數求和的方法,但這會改變一個共享累加器:

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add)
    return accumulator.total;
}
public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}複製代碼

這段代碼自己上就是順序的,由於每次訪問total都會出現數據競爭。接下來將這段代碼改成並行:

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;}
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" );
Result: 5959989000692
Result: 7425264100768
Result: 6827235020033
Result: 7192970417739
Result: 6714157975331
Result: 7715125932481
SideEffect parallel sum done in: 49 msecs複製代碼

這回方法的性能可有可無了,惟一要緊的是每次執行都會返回不一樣的結果,都離正確值50000005000000差很遠。這是因爲多個線程在同時訪問累加器,執行total += value,而這一句􏱵然看似簡單,卻不是一個原子操做。問題的根源在於,forEach中調用的方法有反作用,它會改變多個線程共享的對象的可變狀態。要是你想用並行Stream又不想引起相似的意外,就必須避免這種狀況。如今你知道了,共享可變狀態會影響並行流以及並行計算。

1.4 高效使用並行流

  • 若是有疑問,測量。把順序流轉成並行流垂手可得,但卻不必定是好事。咱們在本節中已經指出,並行流並不老是比順序流快。此外,並行流有時候會和你的直覺不一致,因此在考慮選擇順序流仍是並行流時,第一個也是最重要的建議就是用適當的基準來檢查其性能。
  • 留意裝箱。自動裝箱和拆箱操做會大大下降性能。Java 8中有原始類型流(IntStream、 LongStream、DoubleStream)來避免這種操做,但凡是有可能都應該用這些流。
  • 有些操做自己在並行流上的性能就比順序流差。特別是limit和findFirst等依賴於元素順序的操做,它們在並行流上執行的代價很是大。例如,findAny會比findFirst性能好,由於它不必定要按順序來執行。你老是能夠調用unordered方法來把有序流變成無序流。那麼,若是你須要流中的n個元素而不是專門要前n個的話,對無序並行流調用 limit可能會比單個有序流(好比數據源是一個List)更高效。
  • 還要考慮流的操做流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素經過 流水線的大體處理成本,則N*Q就是這個對成本的一個粗略的定性估計。Q值較高就意味着使用並行流時性能好的可能性比較大。
  • 對於較小的數據量,選擇並行流幾乎歷來都不是一個好的決定。並行處理少數幾個元素的好處還抵不上並行化形成的額外開銷。
  • 要考慮流背後的數據結構是否易於分解。例如,ArrayList的拆分效􏶲比LinkedList 高得多,由於前者用不着遍歷就能夠平均拆分,然後者則必須遍歷。另外,用range工廠方法建立的原始類型流也能夠快速分解。最後,你將在7.3節中學到,你能夠本身實現Spliterator來徹底掌握分解過程。
  • 流自身的特色,以及流水線中的中間操做修改流的方式,均可能會改變分解過程的性能。例如,一個SIZED流能夠分紅大小相等的兩部分,這樣每一個部分均可以比較高效地並行處理,但篩選操做可能丟棄的元素個數卻沒法預測,致使流自己的大小未知。
  • 還要考慮終􏲧操做中合併步驟的代價是大是小(例如Collector中的combiner方法)。 若是這一步代價很大,那麼組合每一個子流產生的部分結果所付出的代價就可能會超出經過並行流獲得的性能提高。

並行流背後使用的基礎架構是Java 7中引入的分支/合併框架。

2. 分支/合併框架

分支/合併框架的目的是以遞歸方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體結果。它是ExecutorService接口的一個實現,它把子任務分配給線程池(稱爲ForkJoinPool)中的工做線程。

2.1 使用RecursiveTask

要把任務提交到這個池,必須建立RecursiveTask 的一個子類,其中R是並行化任務(以 及全部子任務)產生的結果類型,或者若是任務不返回結果,則是RecursiveAction類型(固然它可能會更新其餘非局部機構)。要定義RecursiveTask,只需實現它惟一的抽象方法 compute:

protected abstract R compute();複製代碼

這個方法同時定義了將任務拆分紅子任務的邏輯,以及沒法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。下圖表示了遞歸任務的拆分過程:

讓咱們試着用這個框架爲一個數字範圍(這裏用一個 long[]數組表示)求和。如前所述,你須要先爲RecursiveTask類作一個實現,就是下面代碼清單中的ForkJoinSumCalculator。

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;

    public static final long THRESHOLD = 10_000;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    public ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        leftTask.fork();

        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}複製代碼

這裏用了一個LongStream來生成包含前n個天然數的數組,而後建立一個ForkJoinTask (RecursiveTask的父類),並把數組傳遞給代碼清單7-2所示ForkJoinSumCalculator的公共構造函數。最後,你建立了一個新的ForkJoinPool,並把任務傳給它的調用方法 。在ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。請注意在實際應用時,使用多個ForkJoinPool是沒有什麼意義的。正是出於這個緣由,通常來講把它實例化一次,而後把實例保存在靜態字段中,使之成爲單例,這樣就能夠在軟件中任何部分方便地重用了。這裏建立時用了其默認的無參數構造函數,這意味着想讓線程池使用JVM可以使用的全部處理器。更確切地說,該構造函數將使用Runtime.availableProcessors的返回值來決定線程􏶈使用的線程數。請注意availableProcessors方法雖然看起來是處理器, 但它實際上返回的是可用內核的數量,包括超線程生成的虛擬內核。當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由􏶈中的一個線程 執行,這個線程會調用任務的compute方法。該方法會檢查任務是否小到足以順序執行,若是不夠小則會把要求和的數組分紅兩半,分給兩個新的ForkJoinSumCalculator,而它們也由ForkJoinPool安排執行。所以,這一過程能夠遞歸重複,把原任務分爲更小的任務,直到知足不方便或不可能再進一步拆分的條件(本例中是求和的項目數小於等於10000)。這時會順序計算每一個任務的結果,而後由分支過程建立的(隱含的)任務二叉樹遍歷回到它的根。接下來會合並每一個子任務的部分結果,從而獲得總任務的結果。這一過程以下圖所示。

2.2 使用分支/合併框架的最佳作法

  • 對一個任務調用join方法會阻塞調用方,直到該任務作出結果。所以,有必要在兩個子任務的計算都開始以後再調用它。不然,你獲得的版本會比原始的順序算法更慢更復雜,由於每一個子任務都必須等待另外一個子任務完成才能啓動。
  • 不該該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啓動並行計算。
  • 對子任務調用fork方法能夠把它排進ForkJoinPool。同時對左邊和右邊的子任務調用它彷佛很天然,但這樣作的效􏶲要比直接對其中一個調用compute低。這樣作你能夠爲其中一個子任務重用同一線程,從而避免在線程池中多分配一個任務形成的開銷。
  • 調試使用分支/合併框架的並行計算可能有點棘手。特別是你日常都在你喜歡的IDE裏面看棧跟蹤(stack trace)來找問題,但放在分支-合併並計算上就不行了,由於調用compute的線程並非概念上的調用方,後者是調用fork的那個。
  • 和並行流同樣,你不該理所固然地認爲在多核處理器上使用分支/合併框架就比順序計算快。咱們已經說過,一個任務能夠分解成多個獨立的子任務,才能讓性能在並行化時有所提高。全部這些子任務的運行時間都應該比分出新任務所花的時間長;一個慣用方法是把輸入/輸出放在一個子任務裏,計算放在另外一個裏,這樣計算就能夠和輸入/輸出同時進行。此外,在比較同一算法的順序和並行版本的性能時還有別的因素要考慮。就像任何其餘Java代碼同樣,分支/合併框架須要「預熱」或者說要執行幾遍纔會被JIT編譯器優化。這就是爲何在測量性能以前跑幾遍程序很重要,咱們的測試框架就是這麼作的。同時還要知道,編譯器內置的優化可能會爲順序版本帶來一些優􏲵(例如執行死碼分析——刪去從未被使用的計算)。

2.3 工做竊取

實際中,每一個子任務所花的時間可能天差地別,要麼是由於劃分策略效率低,要麼是有不可預知的緣由,好比磁盤訪問慢,或是須要和外部任務協調執行。分支/合併框架工程用一種稱爲工做竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味着這些任務差很少被平均分配到ForkJoinPool中的全部線程上。每一個線程都爲分配給它的任務保存一個雙向鏈式隊列,每完成一個任務,就會從隊列頭上取出下一個任務開始執行。基於前面所述的緣由,某個線程可能早早完成了分配給它的全部任務,也就是它的隊列已經空了,而其餘的線程還很忙。這時,這個線程並無閒下來,而是隨機選了一個別的線程,從隊列的尾巴上「偷走」一個任務。這個過程一直繼續下去,直到全部的任務都執行完畢,全部的隊列都清空。這就是爲何要劃成許多小任務而不是少數幾個大任務,這有助於更好地在工做線程之間平衡負載。通常來講,這種工做竊取算法用於在池中的工做線程之間從新分配和平衡任務。

3. Spliterator

Spliterator是Java 8中加入的另外一個新接口;這個名字表明「可分迭代器」(splitable iterator)。和Iterator同樣,Spliterator也用於遍歷數據源中的元素,但它是爲了並行執行而設計的。

public interface Spliterator<T> {
        boolean tryAdvance(Consumer<? super T> action);
        Spliterator<T> trySplit();
        long estimateSize();
        int characteristics();
}複製代碼

與往常同樣,T是Spliterator遍歷的元素的類型。tryAdvance方法的行爲相似於普通的 Iterator,由於它會按順序一個一個使用Spliterator中的元素,而且若是還有其餘元素要遍歷就返回true。但trySplit是專爲Spliterator接口設計的,由於它能夠把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個並行處理。Spliterator還可經過 estimateSize方法估計還剩下多少元素要遍歷,由於即便不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點。

3.1 拆分過程

將Stream拆分紅多個部分的算法是一個遞􏰒過程,如圖7-6所示。第一步是對第一個 Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用 trysplit,這樣總共就有了四個Spliterator。這個框架不斷對Spliterator調用trySplit直到它返回null,代表它處理的數據結構不能再分割,如第三步所示。最後,這個遞歸拆分過程到第四步就終止了,這時全部的Spliterator在調用trySplit時都返回了null。

Spliterator的特性
    Spliterator接口聲明的最後一個抽象方法是characteristics,它將返回一個int,代 表Spliterator自己特性集的編碼。
    使用Spliterator的客戶能夠用這些特性來更好地控制和優化它的使用。
    表7-2總結了這些特性。(不幸的是,雖然它們在概念上與收集器的特性有重疊,編碼卻不同。)
![](https://user-gold-cdn.xitu.io/2019/8/28/16cd56026a7081e8?w=1260&h=456&f=png&s=67011)複製代碼

3.2 實現自定義Spliterator

4. 小結

在本章中,你瞭解瞭如下內容。

  • 內部迭代讓你能夠並行處理一個流,而無需在代碼中顯式使用和􏷡調不一樣的線程。
  • 雖然並行處理一個流很容易,卻不能保證程序在全部狀況下都運行得更快。並行軟件的行爲和性能有時是違反直覺的,所以必定要測量,確保你並無把程序拖得更慢。
  • 像並行流那樣對一個數據集並行執行操做能夠提高性能,特別是要處理的元素數量龐大,或處理單個元素特別耗時的時候。
  • 從性能角度來看,使用正確的數據結構,如儘量利用原始流而不是通常化的流,幾乎老是比嘗試並行化某些操做更爲重要。
  • 分支/合併框架讓你得以用遞歸方式將能夠並行的任務拆分紅更小的任務,在不一樣的線程上執行,而後將各個子任務的結果合併起來生成總體結果。
  • Spliterator定義了並行流如何拆分它要遍歷的數據。

資源獲取

  • 公衆號回覆 : Java8 便可獲取《Java 8 in Action》中英文版!

Tips

  • 歡迎收藏和轉發,感謝你的支持!(๑•̀ㅂ•́)و✧
  • 歡迎關注個人公衆號:莊裏程序猿,讀書筆記教程資源第一時間得到!

相關文章
相關標籤/搜索