多線程編程CompletableFuture與parallelStream

1、簡介

日常在頁面中咱們會使用異步調用$.ajax()函數,若是是多個的話他會並行執行相互不影響,實際上Completable我理解也是和它相似,是java 8裏面新出的異步實現類,CompletableFuture類實現了Future接口,CompletableFuture與Stream的設計都遵循了相似的設計模式:使用Lambda表達式以及流水線的思想,從這個角度能夠說CompletableFuture與Future的關係相似於Stream與Collection的關係。java

2、代碼

直接上代碼,運行以後能夠看出CompletableFuture是調用的時候就開始執行,當後續代碼調到get的取值方法時,若是內部已經返回結果則直接拿到,若是尚未返回將阻塞線程等待結果,能夠設置超時時間避免長時間等待。ajax

如下是模擬並行調用多個方法的場景,好比查詢頁可能會有多個條件選擇,這些條件須要後臺數據相互之間有沒有聯繫的場景,就不須要串行執行,異步執行能夠節省大量時間編程

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 多任務單次異步執行
     */
    @Test
    public void testManyFunAsync() {
        long start = System.nanoTime();//程序開始時間
        try {
            int id = 1;//模擬一個參數,如學校Id
            printlnConsole("調用異步任務...");
            //使用異步方式調用方法【調用時就會開始執行方法】
            CompletableFuture futureClassCount = CompletableFuture.supplyAsync(() -> getClassCount(id));
            CompletableFuture futureStudentCount = CompletableFuture.supplyAsync(() -> getStudentCount(id));

            //do something 作了一些其餘的事情超過了異步任務執行的時間
            printlnConsole("作一些其餘的事情...");
            Thread.sleep(3000);
            printlnConsole("其餘事情完成");

            //下面獲取異步任務的結果,就會當即拿到返回值
            printlnConsole("獲取異步任務結果...");
            Object classCount = futureClassCount.get();
            //Object classCount = futureClassCount.get(2, TimeUnit.SECONDS);//能夠設置超時時間,超過這個時間時將再也不等待,返回異常
            Object studentCount = futureStudentCount.get();
            //Object studentCount = futureStudentCount.get(2, TimeUnit.SECONDS);
            printlnConsole("異步任務結果獲取完成");

            printlnConsole("ClassCount:" + classCount);
            printlnConsole("StudentCount:" + studentCount);

        } catch (Exception e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();//程序結束時間
        long time = (end - start) / 1000000;//總耗時
        System.out.println("運行時間:" + time);
    }

    public int getClassCount(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getClassCount(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 20;
    }

    public int getStudentCount(int id) {
        try {
            Thread.sleep(1000);
            printlnConsole("getStudentCount(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 100;
    }
}

anyOf()爲任意一個子任務線程執行完畢後返回
allOf()爲等待全部子任務線程所有執行完畢後返回
getNow()表示我須要當即拿到結果,若是當前的線程並未執行完成,則使用我傳入的值進行任務調用,參數爲沒法獲取結果時使用我傳入的值
get()獲取子線程運算的結果,會拋出檢查到的異常
join()獲取子線程運算的結果,不會拋出異常segmentfault

package com.ysl;

import org.joda.time.LocalDateTime;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 並行執行等待所有結果或等待任意結果
     */
    @Test
    public void testAllOfAnyOf() {
        long start = System.nanoTime();
        try {
            printlnConsole("調用異步任務...");
            List<Integer> ids = Arrays.asList(1, 3, 5);//準備的請求參數
            //建立異步方法數組
            CompletableFuture[] futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).toArray(size -> new CompletableFuture[size]);
            //指定該異步方法數組的子任務線程等待類型
            CompletableFuture.anyOf(futures).join();//anyOf()爲任意一個子任務線程執行完畢後返回
            //CompletableFuture.allOf(futures).join();//allOf()爲等待全部子任務線程所有執行完畢後返回

            printlnConsole("作一些其餘的事情...");
            Thread.sleep(2000);
            printlnConsole("其餘事情完成");

            printlnConsole("獲取異步任務結果:");
            for (CompletableFuture f : futures) {
                //Object obj = f.getNow(1);//getNow()表示我須要當即拿到結果,若是當前的線程並未執行完成,則使用我傳入的值進行任務調用,參數爲沒法獲取結果時使用我傳入的值
                Object obj = f.get();//get()獲取子線程運算的結果,會拋出檢查到的異常
                //Object obj = f.join();//join()獲取子線程運算的結果,不會拋出異常
                printlnConsole(String.valueOf(obj));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("運行時間:" + time);
    }

    public String getClassName(int id) {
        try {
            Thread.sleep(id * 1000);
            printlnConsole("getClassName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

下面是並行流的演示parallelStream也是java 8新特性設計模式

 ids.stream()轉化爲流.map()映射每一個元素對應的結果.collect(Collectors.toList)把結果概括爲List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;也能夠toArray(size -> new Class[size])轉化爲數組數組

如下是模擬根據Id查詢學生名稱的場景,接收到的是一個集合又都是調用同一個方法獲取,就可使用並行流同時異步請求等待返回結果安全

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 單任務屢次並行流執行
     */
    @Test
    public void testParallelStream() {
        long start = System.nanoTime();
        try {
            printlnConsole("調用異步任務...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//準備的請求參數
            //串行執行會等待每個方法執行完畢後在繼續執行下一個
            //List<String> names = ids.stream().map(id -> getStudentName(id)).collect(Collectors.toList());
            //並行執行會同時調用多個方法待所有執行完畢後一塊兒返回(parallelStream是非線程安全的,配合collect達到線程安全,後續驗證一下)
            List<String> names = ids.parallelStream().map(id -> getStudentName(id)).collect(Collectors.toList());
            //不管stream()或者parallelStream()調用時均會阻斷線程執行
            printlnConsole("作一些其餘的事情...");
            Thread.sleep(3000);
            printlnConsole("其餘事情完成");

            printlnConsole("獲取異步任務結果:");
            names.forEach(item -> printlnConsole(item));
        } catch (Exception e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("運行時間:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

 上面能看到並行流雖然是並行執行但等待結果是阻塞線程的,因此能夠利用異步CompletableFuture配合串行流來實現網絡

如下是採用串行流配合異步實現的併發處理併發

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 單任務屢次異步執行
     */
    @Test
    public void testOneFunAsync() {
        long start = System.nanoTime();
        try {
            printlnConsole("調用異步任務...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//準備的請求參數
            //ids.stream()轉化爲流.map()映射每一個元素對應的結果.collect()把結果概括爲List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;
            List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id))).collect(Collectors.toList());

            //不用並行流parallelStream()調用時就不會阻斷線程執行
            printlnConsole("作一些其餘的事情...");
            Thread.sleep(3000);
            printlnConsole("其餘事情完成");

            printlnConsole("獲取異步任務結果:");
            futures.forEach(f -> {
                try {
                    Object obj = f.get();
                    printlnConsole(String.valueOf(obj));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("運行時間:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

 當個人並行任務數量超過了我機器的核心數就會產生等待,我電腦是8核使用並行流執行數量就能夠開8個子線程,當多餘這個數量時剩下的就須要等待前面線程執行完再執行異步

當須要並行執行的任務數量大於核心數的時候,產生的等待是咱們不想看到的,這時CompletableFuture就更加適用,它能夠手動這隻線程池大小,避免並行任務過多時的等待

咱們將代碼作些修正

如下是源碼,這樣就能夠提升對多任務並行處理的支持了

import org.joda.time.LocalDateTime;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 手動配置線程執行器的線程池大小
     */
    private final Executor myExecutor = Executors.newFixedThreadPool(20, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            //使用守護線程保證不會阻止程序的關停
            t.setDaemon(true);
            return t;
        }
    });
    /**
     * 單任務屢次異步執行
     */
    @Test
    public void testOneFunAsync() {
        long start = System.nanoTime();
        try {
            printlnConsole("調用異步任務...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);//準備的請求參數
            //ids.stream()轉化爲流.map()映射每一個元素對應的結果.collect()把結果概括爲List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;
            List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id), myExecutor)).collect(Collectors.toList());

            //不用並行流parallelStream()調用時就不會阻斷線程執行
            printlnConsole("作一些其餘的事情...");
            Thread.sleep(3000);
            printlnConsole("其餘事情完成");

            printlnConsole("獲取異步任務結果:");
            futures.forEach(f -> {
                try {
                    Object obj = f.get();
                    printlnConsole(String.valueOf(obj));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("運行時間:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

  

java 8的新特性也只作到了會用,不少深刻的還不瞭解,還望指導謝謝,下面備份一下別人的總結我以爲挺有用的:

選擇正確的線程池大小
《Java併發編程實戰》中給出以下公式:

Number = NCpu * Ucpu * ( 1 + W/C)
Number : 線程數量
NCpu : 處理器核數
UCpu : 指望cpu利用率
W/C : 等待時間與計算時間比
咱們這裏:99%d的時間是等待商店響應 W/C = 99 ,cpu利用率指望 100% ,NCpu = 9,推斷出 number = 800。可是爲了不過多的線程搞死計算機,咱們選擇商店數與計算值中較小的一個。

並行流與CompletableFuture
目前,咱們對集合進行計算有兩種方式:1.並行流 2.CompletableFuture;

一、而CompletableFuture更加的靈活,咱們能夠配置線程池的大小確保總體的計算不會由於等待IO而發生阻塞。

書上給出的建議以下:若是是計算密集型的操做而且沒有IO推薦stream接口,由於實現簡單效率也高,若是全部的線程都是計算密集型的也就沒有必要建立比核數更多的線程。

二、反之,若是任務涉及到IO,網絡等操做:CompletableFuture靈活性更好,由於大部分線程處於等待狀態,須要讓他們更加忙碌,而且再邏輯中加入異常處理能夠更有效的監控是什麼緣由觸發了等待。

 參考地址:http://www.javashuo.com/article/p-siujpsba-nc.html

相關文章
相關標籤/搜索