Java併發編程學習系列五:函數式接口、Stream流等

四大函數接口

什麼是函數式接口?

有且只有一個抽象方法的接口被稱爲函數式接口,函數式接口適用於函數式編程的場景,Lambda 就是 Java 中函數式編程的體現,可使用Lambda表達式建立一個函數式接口的對象,必定要確保接口中有且只有一個抽象方法,這樣Lambda才能順利的進行推導。 php

函數式接口裏除了抽象方法以外,還容許包含默認方法和靜態方法。css

@FunctionalInterface註解

與@Override 註解的做用相似,Java 8中專門爲函數式接口引入了一個新的註解:@FunctionalInterface 。該註解用於編譯級錯誤檢查,加上該註解,當你寫的接口不符合函數式接口定義的時候,編譯器會報錯。 。可是這個註解不是必須的,只要符合函數式接口的定義,那麼這個接口就是函數式接口。java

java.util.function 包下定義了內置核心四大函數式接口,可使用 lambda 表達式。web

關於這四個接口的介紹以下圖所示:算法

Function

函數型接口,有一個輸入,有一個輸出。編程

    public static void main(String[] args) {
//        Function function = new Function<String, Integer>() {
//            @Override
//            public Integer apply(String s) {
//                return s.length();
//            }
//        };

        //使用lambda表達式
        Function<String, Integer> function = s -> {
            return s.length();
        };

        System.out.println(function.apply("xxx"));
    }
複製代碼

Predicate

判定型接口,有一個輸入參數,返回只有布爾值。api

    public static void main(String[] args) {
        //判斷字符串是否爲空,空返回true
//        Predicate predicate = new Predicate<String>() {
//            @Override
//            public boolean test(String s) {
//                return s.isEmpty();
//            }
//        };

        Predicate<String> predicate = str ->{return str.isEmpty();};

        System.out.println(predicate.test("ff"));
    }
複製代碼

Consumer

消費型接口,有一個輸入參數,沒有返回值。數組

    public static void main(String[] args) {
//        Consumer<String> consumer = new Consumer<String>() {
//            @Override
//            public void accept(String s) {
//                System.out.println(s);
//            }
//        };

        Consumer<String> consumer = Str ->{System.out.println(Str);};

        consumer.accept("fjdskf");
    }
複製代碼

Supplier

供給型接口,沒有輸入參數,只有返回參數。多線程

    public static void main(String[] args) {
//        Supplier<String> supplier = new Supplier<String>() {
//            @Override
//            public String get() {
//                return "hresh";
//            }
//        };

        Supplier<String> supplier = () -> {
            return "hresh";
        };

        System.out.println(supplier.get());
    }
複製代碼

Stream流式計算

官網文檔定義以下:併發

關於流的方法能夠去官網看詳細介紹。

流(Stream)究竟是什麼呢?

是數據渠道,用於操做數據源(集合、數組等)所生成的元素序列。

集合存儲數據,流講的是計算!

特色:

  • Stream 本身不會存儲元素。
  • Stream 不會改變源對象,相反,他們會返回一個持有結果的新Stream。
  • Stream 操做是延遲執行的。這意味着他們會等到須要結果的時候才執行。

案例測試

一、新建一個實體類 User

@Data
@AllArgsConstructor
public class User {
    private int id;
    private String name;
    private int age;
}
複製代碼

二、流式計算

/**
 * 題目:請按照給出數據,找出同時知足如下條件的用戶
 * 也即如下條件:
 * 一、所有知足偶數ID
 * 二、年齡大於24
 * 三、用戶名轉爲大寫
 * 四、用戶名字母倒排序
 * 五、只輸出一個用戶名字 limit
 */

public class Test {

    public static void main(String[] args) {
        User u1 = new User(1,"a",22);
        User u2 = new User(2,"b",23);
        User u3 = new User(3,"c",24);
        User u4 = new User(4,"d",25);
        User u5 = new User(6,"e",26);

        List<User> list = Arrays.asList(u1,u2,u3,u4,u5);

        list.stream().filter(u->{return u.getAge()>23;})
                .filter(u->{return u.getId() %2 ==0;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);

        List<Integer> list2 = null;
        list2 = list.stream().map(u -> {return u.getAge()+2;}).collect(Collectors.toList());
        list2.forEach(System.out::println);
    }
}
複製代碼

使用流式計算,代碼看起來更加簡潔,效率相應也會有所提高。

分支合併

什麼是ForkJoin

從 JDK1.7開始,Java 提供 Fork/Join 框架用於並行執行任務。ForkJoin 的框架的基本思想是分而治之。什麼是分而治之?分而治之就是將一個複雜的計算,按照設定的閾值進行分解成多個計算,而後將各個計算結果進行彙總。相應的 ForkJoin 將複雜的計算當作一個任務。而分解的多個計算則是當作一個子任務。

主要有兩步:

  1. 任務切分;
  2. 結果合併

它的模型大體是這樣的:線程池中的每一個線程都有本身的工做隊列(PS:這一點和 ThreadPoolExecutor 不一樣,ThreadPoolExecutor 是全部線程共用一個工做隊列,全部線程都從這個工做隊列中取任務),當本身隊列中的任務都完成之後,會從其它線程的工做隊列中偷一個任務執行,這樣能夠充分利用資源。

工做竊取

另外,forkjoin 有一個工做竊取的概念。簡單理解,就是一個工做線程下會維護一個包含多個子任務的雙端隊列。而對於每一個工做線程來講,會從頭部到尾部依次執行任務。這時,總會有一些線程執行的速度較快,很快就把全部任務執行完了。空閒下來的線程不會閒置下來,而是隨機選擇一個其餘的線程從隊列的尾巴上「偷走」一個任務。這個過程會一直繼續下去,知道全部的任務都執行完畢。

工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。工做竊取的運行流程圖以下:

工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。

核心類

ForkJoinPool

在官方文檔中有以下定義:

ForkJoinPool 執行任務的線程池,繼承了 AbstractExecutorService 類,該線程池是經過DefaultForkJoinWorkerThreadFactory 或者 InnoCuousForkJoinWorkerThreadFactory 線程工廠產生的工做線程 。

ForkJoinPool 主要經過 executeinvokesubmit 這三個方法來處理任務 ForkJoinTask 。查看方法詳細介紹可知:execute 方法異步執行給定任務,無返回值;invoke 方法執行給定的任務,在完成後返回其結果,結果類型與 ForkJoinTask 中的 V 類型一致;submit 方法執行任務 ForkJoinTask 並返回一個結果任務 ForkJoinTask

查看上述三個方法,實質上都執行的是 externalPush 方法,在該方法中有個任務隊列 WorkQueue,它是 ForkJoinPool 的內部類, WorkQueue 中有執行任務的線程(ForkJoinWorkerThread owner),還有這個線程須要處理的任務(ForkJoinTask<?>[] array),新提交的任務就是加到 array 中。

ForkJoinWorkerThread

執行任務的工做線程,即 ForkJoinPool 線程池裏面的線程,每一個線程都維護者一個雙端隊列 WorkQueue,用於存放內部任務。

ForkJoinTask

ForkJoinTask 表明運行在 ForkJoinPool 中的任務。主要方法:

  • fork() 在當前線程運行的線程池中安排一個異步執行。簡單的理解就是再建立一個子任務。
  • join() 當任務完成的時候返回計算結果。
  • invoke() 開始執行任務,若是必要,等待計算完成。

子類: Recursive:遞歸

  • RecursiveAction 一個遞歸無結果的 ForkJoinTask(沒有返回值)
  • RecursiveTask 一個遞歸有結果的 ForkJoinTask(有返回值)

代碼測試

RecursiveTask 實現類

public class ForkJoinDemo extends RecursiveTask<Long{
    private Long start; //起始值
    private Long end;   //結束值

    public static final Long temp = 10000L;//臨界值

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        Long length = end - start;
        //判斷是否拆分完畢
        if(length <= temp){
            Long sum = 0L;
            //若是拆分完畢就相加
            for (Long i = start; i <= end; i++) {
                sum+=i;
            }
            return sum;
        }else{
            Long middle = (start+end)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
            task1.fork();//拆分,並壓入線程隊列
            ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
            task2.fork();
            //合併結果
            return task1.join()+task2.join();
        }
    }
}
複製代碼

測試代碼

public class ForkJoinTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Long start = 0L;
        Long end = 1000000000L;//10億

        work1(start,end);   //5687
//        work2(start,end);   //4360
//        work3(start,end);   //195
    }

    //普通線程計算
    public static void work1(Long start,Long end){
        long startTime = System.currentTimeMillis();

        Long sum=0L;
        for (Long i = start;  i<= end; i++) {
            sum+=i;
        }
        long endTime = System.currentTimeMillis();
        System.out.println("result="+sum+",time="+(endTime-startTime));

    }

    //ForkJoin實現
    public static void work2(Long start,Long end) throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();

        Long sum=0L;
        ForkJoinPool pool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持
        ForkJoinTask task = new ForkJoinDemo(start,end);
//        ForkJoinTask result = pool.submit(task);
//        sum = (Long) task.get();
        sum = (Long) pool.invoke(task);

        long endTime = System.currentTimeMillis();
        System.out.println("result="+sum+",time="+(endTime-startTime));

    }

    //並行流進行大數值運算
    public static void work3(Long start,Long end) {
        long startTime = System.currentTimeMillis();

        Long sum=0L;
        sum = LongStream.rangeClosed(start,end).parallel().reduce(0,Long::sum);

        long endTime = System.currentTimeMillis();
        System.out.println("result="+sum+",time="+(endTime-startTime));
    }
}
複製代碼

異步回調

前言

咱們前面講併發編程一直都着重於多線程同步調用,除了同步線程,還存在異步線程。在此以前咱們來回顧一下同步和異步的定義。

同步:就是當任務A依賴於任務B的執行時,必須等待任務B執行完畢以後任務A才繼續執行,此過程任務A被阻塞。任務要麼都成功,要麼都失敗!想想咱們打電話的情景便可! 異步:任務A調用任務B,任務A不須要等到任務B執行完畢,任務B只是返回一個虛擬的結果給任務A,使得任務A可以繼續作其餘事情,等到任務B執行完成以後再通知任務A(回調)或者是任務A主動去請求任務B要結果。

Future 模式的核心思想是可以讓主線程將原來須要同步等待的這段時間用來作其餘的事情。(由於能夠異步得到執行結果,因此不用一直同步等待去得到執行結果)

上圖簡單描述了普通模式和使用Future的區別,普通模式下,客戶端訪問服務端,等待結果返回很是耗時,此時客戶端只能等待沒法去作其餘任務。而 Future 模式下,客戶端向服務端發送完請求以後,先獲得一個虛擬結果,真實的結果在將來某個時刻完成以後返回給客戶端,而客戶端在此期間能夠去作其餘任務。

Future的優勢:比更底層的 Thread 更易用。要使用 Future,一般只須要將耗時的操做封裝在一個 Callable 對象中,再將它提交給 ExecutorService

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定義任務:
Callable<String> task = new Task();
// 提交任務並得到Future:
Future<String> future = executor.submit(task);
// 從Future獲取異步執行返回的結果:
String result = future.get(); // 可能阻塞
複製代碼

當咱們提交一個Callable任務後,咱們會同時得到一個Future對象,而後,咱們在主線程某個時刻調用Future對象的get()方法,就能夠得到異步執行的結果。在調用get()時,若是異步任務已經完成,咱們就直接得到結果。若是異步任務尚未完成,那麼get()會阻塞,直到任務完成後才返回結果。

一個Future接口表示一個將來可能會返回的結果,它定義的方法有:

  • get():獲取結果(可能會等待)
  • get(long timeout, TimeUnit unit):獲取結果,但只等待指定的時間;
  • cancel(boolean mayInterruptIfRunning):取消當前任務;
  • isDone():判斷任務是否已完成。

使用Future得到異步執行結果時,要麼調用阻塞方法get(),要麼輪詢看isDone()是否爲true,這兩種方法都不是很好,由於主線程也會被迫等待。

從Java 8開始引入了CompletableFuture,它針對Future作了改進,能夠傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法。

代碼測試

CompletableFuture能夠指定異步處理流程:

  • runAsync()返回無結果的CompletableFuture

  • supplyAsync()返回無結果的CompletableFuture

  • whenComplete()處理正常和異常結果;

  • thenAccept()處理正常結果;

  • exceptional()處理異常結果;

  • thenApplyAsync()用於串行化另外一個CompletableFuture

  • anyOf()allOf()用於並行化多個CompletableFuture

CompletableFuture.runAsync()

返回一個CompletableFuture,它須要一個實現了Runnable接口的對象 ,無返回值(此處說的無返回值指的是 CompletableFuture)。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //沒有返回值的異步回調
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        });

        System.out.println("主線程優先執行");
        completableFuture.get();
    }
複製代碼

執行結果爲:

主線程優先執行
ForkJoinPool.commonPool-worker-1
複製代碼

CompletableFuture.supplyAsync()

返回一個CompletableFuture,它須要一個實現了Supplier接口的對象 ,有返回值。

public class CompletableFutureTest {

    public static void main(String[] args) throws InterruptedException {
        //建立一個CompletableFuture
        CompletableFuture<Double> cfture = CompletableFuture.supplyAsync(CompletableFutureTest::fetchPrice);//lambda語法簡化方法調用
//        cfture.thenAccept(result ->{// 若是執行成功
//            System.out.println(result);
//        }).exceptionally(e ->{// 若是執行異常
//            e.printStackTrace();
//            return null;
//        });

        cfture.whenComplete((r1,r2) ->{
            System.out.println("執行結果爲:"+r1); //輸出執行成功的結果
            System.out.println("異常信息:"+r2); //輸出異常信息
        }).exceptionally(e ->{// 若是執行異常
            e.printStackTrace();
            return null;
        });
        // 主線程不要馬上結束,不然CompletableFuture默認使用的線程池會馬上關閉
        TimeUnit.SECONDS.sleep(2);
        System.out.println("主線程執行完畢");
    }

    static Double fetchPrice() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20;
    }
}
複製代碼

無異常時結果爲:

執行結果爲:6.110276836465158
異常信息:null
主線程執行完畢
複製代碼

拋出異常結果爲:

相比FutureCompletableFuture更強大的功能是,多個CompletableFuture能夠串行執行。

public class CompletableFutureTest {

    public static void main(String[] args) throws Exception {
        // 第一個任務:
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中國石油");
        });
        cfQuery.thenAccept((result) -> {
            System.out.println("query result: " + result);
        });
        // cfQuery成功後繼續執行下一個任務:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功後打印結果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主線程不要馬上結束,不然CompletableFuture默認使用的線程池會馬上關閉:
        TimeUnit.SECONDS.sleep(2);
    }

    static String queryCode(String name) {
        try {
            TimeUnit.MILLISECONDS.sleep(200);
        } catch (InterruptedException e) {
        }
        return name;
    }

    static Double fetchPrice(String code) {
        try {
            TimeUnit.MILLISECONDS.sleep(600);
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}
複製代碼

除了串行執行外,多個CompletableFuture還能夠並行執行。例如,咱們考慮這樣的場景:

同時重新浪和網易查詢證券代碼,只要任意一個返回結果,就進行下一步查詢價格,查詢價格也同時重新浪和網易查詢,只要任意一個返回結果,就完成操做。

public class CompletableFutureTest {

    public static void main(String[] args) throws Exception {
        // 兩個CompletableFuture執行異步查詢:
        CompletableFuture<String> cfQueryFromBing = CompletableFuture.supplyAsync(() -> {
            return queryName("hresh""https://cn.bing.com/");
        });
        CompletableFuture<String> cfQueryFromBaidu = CompletableFuture.supplyAsync(() -> {
            return queryName("hresh2""https://cn.baidu.com/");
        });

        // 用anyOf合併爲一個新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromBing, cfQueryFromBaidu);

        // 並行執行結果多是兩個CompletableFuture中任意一個的返回結果
        cfQuery.thenAccept((result) -> {
            System.out.println("name: " + result);
        });

        // 主線程不要馬上結束,不然CompletableFuture默認使用的線程池會馬上關閉:
        Thread.sleep(200);
    }

    static String queryName(String name, String url) {
        System.out.println("query name from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return name;
    }
}
複製代碼

參考文獻

使用CompletableFuture

相關文章
相關標籤/搜索