使用 CompletableFuture 異步組裝數據

使用 CompletableFuture 異步組裝數據

一種快捷、優雅的異步組裝數據方式html

實際項目中常常遇到這種狀況: 從多個表中查找到數據而後拼裝成一個VO返回給前端。
這個過程有可能會很是耗時。由於最終每一條返回的VO數據是由多個表中的數據拼裝而成,若是項目仍是微服務須要從其餘服務獲取數據,那將會更加耗時,更加麻煩。簡單的幾十條、幾百條數據單個線程跑起來可能沒有什麼壓力,可是當數量達到成千上萬,幾十萬,幾百萬,組裝的邏輯也變得很是複雜時,這個操做就很是耗時。前端

最近我在項目中就遇到這個的狀況。項目中咱們須要作一個相關流程數據的下載功能。
最第一版本使用單線程,由於業務的複雜性,5000多條數據徹底下載下來須要30min。覺得是從數據庫分揀數據比較耗時,查詢日誌後發現數據庫查詢並無耗時多久,反而是組裝數據佔用了大多數時間。java

所以機智的我就想起以前同組小夥伴分享的Java8一個新的類CompletableFuture。數據庫

CompletableFuture 簡介

CompletableFuture 是Java 8 新增長的Api,該類實現,Future和CompletionStage兩個接口,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。編程

具體你們能夠查看Java Api 文檔,或者閱讀網上一些博客。api

CompletableFuture 異步組裝數據

代碼示例以下緩存

/**
     * 功能描述: 拼裝數據
     * @author lkb
     * @date 2019/12/25
     * @param
     * @return java.util.List<com.laidian.erp.crm.vo.DeviceProcessListExportVO>
     */
    private List<DeviceProcessListExportVO> listByFlowJobIds(List<String> flowJobIds, Map<String, ProcessInfoVo> map, Map<Integer,UserInfoDTO> userInfoDTOMap, Map<Integer,HatCity> cityMap){
        //result 列表保存組裝完成的數據
        List<DeviceProcessListExportVO> result = new LinkedList<>();
        //每次組裝100條數據
        List<List<String>> partition = Lists.partition(flowJobIds,100);
        List<CompletableFuture> futures = partition.stream().map(subList -> CompletableFuture.supplyAsync(() -> {
            //packVOs 方法就是組裝數據
            return packVOs(subList,map,userInfoDTOMap,cityMap);
        },ASYNC_IO_POOL).whenCompleteAsync((r,e)->result.addAll(r))
                        .exceptionally(e->{
                            log.error(e.getMessage(),e);
                            log.error("listByFlowJobIds error.");
                            return result;
                        })).collect(Collectors.toList());

        CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        log.info("任務阻塞 ");
        Instant start = Instant.now();
        //阻塞,直到全部任務結束。
        all.join();
        log.info("任務阻塞結束 耗時 = {}",ChronoUnit.MILLIS.between(start, Instant.now()));
        return result;
    }

具體步驟以下:服務器

  1. 將原始數據按照每組100條進行拆分。(具體每組拆分多少條須要根據實際的業務狀況和服務器性能,多測試一下應該就知道了)
  2. 多線程組成數據,每一個線程組裝一組數據(上面拆分的100條原始數據)。packVOs 方法就是組裝數據。爲了高效,我建議 在組裝數據的時候多采用批量,緩存的思想,能批量儘可能批量,重複數據就儘可能緩存下來。
  3. CompletableFuture.supplyAsync() 方法說明以下。第一個參數是線程須要執行的動做,第二個參數是線程執行用的Executor,能夠填自定義的,也能夠不填寫,不填寫程序會使用默認的執行器。多線程

    public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
    返回由給定執行程序中運行的任務異步完成的新CompletableFuture,其中包含經過調用給定供應商得到的值。異步

  4. whenCompleteAsync 方法含義和名字同樣,將上一步執行的結果或者異常做爲參數傳給指定的參數。這裏咱們但願分批組裝的結果能過add進result中。
  5. exceptionally 是用來處理異常。當一個線程執行出現異常的時候應該執行怎樣的操做。
  6. all.join() 這個方法是等待全部的任務(全部的CompletableFuture)完成。組裝數據是耗時的,若是咱們不等待全部組裝任務完成,直接返回result,相信result中不會有數據,或者數據是不完整的。咱們期待的結果是全部的數據都正常組裝完成,添加進result。

使用了CompletableFuture方式實現多線程分批組裝,而且在組裝時採用 「批量+緩存」 的思想,原來5000條數據30min縮短爲3min。固然還有優化的空間,可是能達到這個效果已經讓我很是滿意了。

下次遇到相似的狀況,我會優先考慮CompletableFuture分批組裝的方式,快捷、優雅。大家有好的方法呢?

相關文章
相關標籤/搜索