自定義 ForkJoinPool 提高並行流 ParallelStream 執行速度

簡介

    在 java8 中 添加了流Stream,可讓你以一種聲明的方式處理數據。使用起來很是簡單優雅。ParallelStream 則是一個並行執行的流,採用 ForkJoinPool 並行執行任務,提升執行速度。
     
    下面咱們看看2個簡單的示例:java

示例1 (list)

Arrays.asList(1,2,3,4,5,6)
    .parallelStream()
    .forEach((value) -> {
        String name = Thread.currentThread().getName();
        System.out.println("示例1 Thread:" + name + " value:" + value);
    });

示例2 (array)

Stream.of(1,2,3,4,5,6)
    .parallel()
    .forEach((value) -> {
        String name = Thread.currentThread().getName();
        System.out.println("示例2 Thread:" + name + " value:" + value);
    });

問題引出

    筆者最近在作一些爬蟲相關的業務,其核心工具已開源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,通過2個版本的迭代已經發展成了一個強大非帳號爬蟲利器,趕忙來試試吧。git

image.png

image.png

    咱們採集了大量的代理 ip 用來供爬蟲使用,其中有個定時任務每 5 分鐘去檢測代理是否失效,代理 ip 檢測比較費時,咱們給每一個檢測的請求
設定了 2s 的超時,這樣單線程的話 1000 個 ip 就得消耗半個多小時,固然筆者在校驗的時候採用的 parallel Stream 簡化開發。數據庫

    而後發現效果並不明顯,代理 ip 數量上來以後 5 分鐘徹底檢測不完,致使任務堆積。明明用了併發流爲何沒有明顯的提升執行速度呢?segmentfault

    下面咱們來看看剛剛的「示例」打印出的信息:多線程

示例1 Thread:main value:4
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1
示例1 Thread:main value:6
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例1 Thread:main value:3
示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2
示例2 Thread:main value:4
示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3
示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1
示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2
示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6

咱們能夠看到 Parallel Stream,默認採用的是一個 ForkJoinPool.commonPool 的線程池,這樣咱們就算使用了 Parallel Stream,
整個 jvm 共用一個 common pool 線程池,一不當心就職務堆積了,在校驗代理 ip 的時候咱們還有采集代理等其餘的任務中也大量使用了併發流,
這樣也就印證了爲何會任務堆積了。架構

解決問題

    使用自定義 ForkJoinPool 執行速度。示例代碼以下:併發

// 示例:自定義線程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

// 這裏是從數據庫裏查出來的一批代理 ip
List<ProxyList> records = new ArrayList<>();

// 找出失效的代理 ip
List<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()
    .map(ProxyList::getIpPort)
    .filter(IProxyListTask::isFailed)
    .collect(Collectors.toList())
).join();

// 刪除失效的代理

    整個代碼依然比較優雅,在使用自定義的 ForkJoin 線程池以後,執行速度有了明顯的提高。之前 5 分鐘執行不完的任務如今 2 分鐘以內就能所有執行完畢。jvm

結論

    java8 的併發流在大批量數據處理時可簡化多線程的使用,在遇到耗時業務或者重度使用併發流不妨根據業務狀況採用自定義線程池來提示處理速度。微服務

開源推薦

關注咱們

如夢技術-公衆號.jpg

掃描上面二維碼,更多精彩內容天天推薦!工具

相關文章
相關標籤/搜索