005-多線程-JUC線程池-Future、FutureTask、CompletionService 、CompletableFuture

1、概述

  建立線程的兩種方式,一種是直接繼承Thread,另一種就是實現Runnable接口。這兩種方式都有一個缺陷就是:在執行完任務以後沒法獲取執行結果。若是須要獲取執行結果,就必須經過共享變量或者使用線程通訊的方式來達到效果,這樣使用起來就比較麻煩。而自從Java 1.5開始,就提供了Callable和Future,經過它們能夠在任務執行完畢以後獲得任務執行結果。html

  詳述:https://www.cnblogs.com/bjlhx/p/7588971.htmljava

1.一、Runnable接口

它是一個接口,裏面只聲明瞭一個run()方法:編程

public interface Runnable {
    public abstract void run();
}

因爲run()方法返回值爲void類型,因此在執行完任務以後沒法返回任何結果。設計模式

1.二、Callable接口

Callable接口位於java.util.concurrent包下,在它裏面也只聲明瞭一個方法,只不過這個方法叫作call()。數據結構

public interface Callable<V> {   
    V call() throws Exception;
}

是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型。Callable接口能夠看做是Runnable接口的補充,call方法帶有返回值,而且能夠拋出異常。多線程

1.三、Future接口

  Future的核心思想是:併發

    一個方法,計算過程可能很是耗時,等待方法返回,顯然不明智。能夠在調用方法的時候,立馬返回一個Future,能夠經過Future這個數據結構去控制方法f的計算過程。dom

  Future類位於java.util.concurrent包下,它是一個接口:這裏的控制包括:異步

    get方法:獲取計算結果(若是還沒計算完,也是必須等待的)這個方法會產生阻塞,會一直等到任務執行完畢才返回;ide

    get(long timeout, TimeUnit unit)用來獲取執行結果,若是在指定時間內,還沒獲取到結果,就直接返回null。

    cancel方法:還沒計算完,能夠取消計算過程,若是取消任務成功則返回true,若是取消任務失敗則返回false。參數mayInterruptIfRunning表示是否容許取消正在執行卻沒有執行完畢的任務,若是設置true,則表示能夠取消正在執行過程當中的任務。若是任務已經完成,則不管mayInterruptIfRunning爲true仍是false,此方法確定返回false,即若是取消已經完成的任務會返回false;若是任務正在執行,若mayInterruptIfRunning設置爲true,則返回true,若mayInterruptIfRunning設置爲false,則返回false;若是任務尚未執行,則不管mayInterruptIfRunning爲true仍是false,確定返回true。

    isDone方法:判斷是否計算完

    isCancelled方法:判斷計算是否被取消,方法表示任務是否被取消成功,若是在任務正常完成前被取消成功,則返回 true。

  也就是說Future提供了三種功能:

    1)判斷任務是否完成;

    2)可以中斷任務;

    3)可以獲取任務執行結果。

  Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時能夠經過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

  由於Future只是一個接口,因此是沒法直接用來建立對象使用的,所以就有了下面的FutureTask。

使用Callable+Future獲取執行結果:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主線程在執行任務");
         
        try {
            System.out.println("task運行結果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("全部任務執行完畢");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子線程在進行計算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.四、FutureTask類

FutureTask繼承體系中的核心接口是Future。事實上,FutureTask是Future接口的一個惟一實現類。

如何獲取Callable的返回結果:通常是經過FutureTask這個中間媒介來實現的。總體的流程是這樣的:

把Callable實例看成參數,生成一個FutureTask的對象,而後把這個對象看成一個Runnable,做爲參數另起線程。

1.4.一、FutureTask結構

  

1.4.二、FutureTask使用

方式1、使用thread方式

  FutureTask實現了Runnable,所以它既能夠經過Thread包裝來直接執行,也能夠提交給ExecuteService來執行。如下使用Thread包裝線程方式啓動

    public static void main(String[] args) throws Exception {
        Callable<Integer> call = () -> {
            System.out.println("計算線程正在計算結果...");
            Thread.sleep(3000);
            return 1;
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();

        System.out.println("main線程乾點別的...");

        Integer result = task.get();
        System.out.println("從計算線程拿到的結果爲:" + result);
    }

方式2、使用 ExecutorService

   ExecutorService executor = Executors.newFixedThreadPool(2);線程池方式

    public static void main(String[] args) {
        Callable<String> callable1=()->{
            Thread.sleep(2000);
            return Thread.currentThread().getName();
        };
        Callable<String> callable2=()->{
            Thread.sleep(3000);
            return Thread.currentThread().getName();
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 將Callable寫的任務封裝到一個由執行者調度的FutureTask對象
        FutureTask<String> futureTask2 = new FutureTask<>(callable2);

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 建立線程池並返回ExecutorService實例
        executor.execute(futureTask1);  // 執行任務
        executor.execute(futureTask2);
        //同時開啓了兩個任務
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                if(futureTask1.isDone() && futureTask2.isDone()){//  兩個任務都完成
                    System.out.println("Done");
                    executor.shutdown();                          // 關閉線程池和服務
                    return;
                }

                if(!futureTask1.isDone()){ // 任務1沒有完成,會等待,直到任務完成
                    System.out.println("FutureTask1 output="+futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if(s !=null){
                    System.out.println("FutureTask2 output="+s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }catch(TimeoutException e){
                //do nothing
            }
            System.out.println((System.currentTimeMillis()-startTime));
        }
    }
View Code

使用Callable+FutureTask獲取執行結果

public class Test {
    public static void main(String[] args) {
        //第一種方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二種方式,注意這種方式和第一種方式效果是相似的,只不過一個使用的是ExecutorService,一個使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主線程在執行任務");
         
        try {
            System.out.println("task運行結果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("全部任務執行完畢");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子線程在進行計算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.五、CompletionService

原理:內部經過阻塞隊列+FutureTask,實現了任務先完成可優先獲取到,即結果按照完成前後順序排序。

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //開啓5個線程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //結果集
            List<Integer> list = new ArrayList<>();
            //1.定義CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任務
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================結果歸集===================
            //方法1:future是提交時返回的,遍歷queue則按照任務提交順序,獲取結果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//線程在這裏阻塞等待該任務執行完畢,按照
//                System.out.println("任務result="+result+"獲取到結果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用內部阻塞隊列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//採用completionService.take(),內部維護阻塞隊列,任務先完成的先獲取到
                System.out.println(LocalDateTime.now()+"---任務i=="+result+"完成!");
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("總耗時="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//關閉線程池
        }

    }


    static class Task implements Callable<Integer>{
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("線程:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!");
            return i;
        }

    }
}
View Code

建議:使用率也挺高,並且能按照完成前後排序,建議若是有排序需求的優先使用。只是多線程併發執行任務結果歸集,也可使用。

2、CompletableFuture

2.一、對標Futrue

  Future 接口,用於描述一個異步計算的結果。雖然 Future 以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。

  阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,並且也不能及時地獲得計算結果,爲何不能用觀察者設計模式呢?即當計算結果完成及時通知監聽者。

    Future侷限性,它很難直接表述多個Future 結果之間的依賴性。

2.二、類圖

  

2.2.一、CompletionStage

  • CompletionStage表明異步計算過程當中的某一個階段,一個階段完成之後可能會觸發另一個階段

  • 一個階段的計算執行能夠是一個Function,Consumer或者Runnable。好比:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())

  • 一個階段的執行多是被單個階段的完成觸發,也多是由多個階段一塊兒觸發

2.2.二、Future

2.三、建立CompletableFuture對象

  CompletableFuture.compleatedFuture是一個靜態輔助方法,用來返回一個已經計算好的CompletableFuture.

  如下四個靜態方法用來爲一段異步執行的代碼建立CompletableFuture對象:

public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)

  以Async結尾而且沒有指定Executor的方法會使用ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。

  runAsync方法:它以Runnabel函數式接口類型爲參數,因此CompletableFuture的計算結果爲空。

  supplyAsync方法以Supplier<U>函數式接口類型爲參數,CompletableFuture的計算結果類型爲U。

  注意:這些線程都是Daemon線程,主線程結束Daemon線程不結束,只有JVM關閉時,生命週期終止。

示例:簡單同步用法

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //長時間的計算任務
            try {
                System.out.println("計算型任務開始");
                Thread.sleep(2000);
                return "計算型任務結束";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "·00";
        });
        System.out.println(future.get());
    }
View Code

2.四、計算結果完成時的處理

當CompletableFuture的計算結果完成,或者拋出異常的時候,能夠執行特定的Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)  

  能夠看到Action的類型是BiConsumer<? super T,? super Throwable>它能夠處理正常的計算結果,或者異常狀況。

  方法不以Async結尾,意味着Action使用相同的線程執行,而Async可能會使用其餘線程執行(若是是使用相同的線程池,也可能會被同一個線程選中執行)

示例:

package com.lhx.cloud.futruetask;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BasicFuture {

    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();

    static int getMoreData()  {
        System.out.println("begin to start compute");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
        return rand.nextInt(1000);
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
    }}
View Code

2.五、轉換

CompletableFuture能夠做爲monad(單子)和functor. 因爲回調風格的實現,咱們沒必要由於等待一個計算完成而阻塞着調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個Function. 還能夠串聯起來。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

2.六、異常處理completeExceptionally

  爲了能獲取任務線程內發生的異常,須要使用 CompletableFuture的completeExceptionally方法將致使CompletableFuture內發生問題的異常拋出。

  這樣,當執行任務發生異常時,調用get()方法的線程將會收到一個 ExecutionException異常,該異常接收了一個包含失敗緣由的Exception 參數。

    /**
     * 任務沒有異常 正常執行,而後結束
     */
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告訴completableFuture任務已經完成
            completableFuture.complete("ok");
        }).start();
        // 獲取任務結果,若是沒有完成會一直阻塞等待
        String result = completableFuture.get();
        System.out.println("計算結果:" + result);
    }

    /**
     * 線程有異常  正常執行,而後沒法結束,主線程會一直等待
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i=1/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告訴completableFuture任務已經完成
            completableFuture.complete("ok");
        }).start();
        // 獲取任務結果,若是沒有完成會一直阻塞等待
        String result = completableFuture.get();
        System.out.println("計算結果:" + result);
    }
View Code
    /**
     * 線程有異常  正常執行,而後經過completableFuture.completeExceptionally(e);告訴completableFuture任務發生異常了
     * 主線程接收到 程序繼續處理,至結束
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i = 1/0;
            } catch (Exception e) {
                // 告訴completableFuture任務發生異常了
                completableFuture.completeExceptionally(e);
            }
            // 告訴completableFuture任務已經完成
            completableFuture.complete("ok");
        }).start();
        // 獲取任務結果,若是沒有完成會一直阻塞等待
        String result = completableFuture.get();
        System.out.println("計算結果:" + result);
    }

2.七、多任務組合方法allOf和anyOf

allOf是等待全部任務完成,構造後CompletableFuture完成

anyOf是隻要有一個任務完成,構造後CompletableFuture就完成

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        // 結果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式處理轉換成CompletableFuture[]+組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
                        .thenApply(h -> Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println(LocalDateTime.now()+"---任務" + s + "完成!result=" + s + ",異常 e=" + e);
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        // 封裝後無返回值,必須本身whenComplete()獲取
        CompletableFuture.allOf(cfs).join();
        System.out.println("list=" + list + ",耗時=" + (System.currentTimeMillis() - start));
    }

    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任務1耗時3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任務5耗時5秒
            } else {
                Thread.sleep(1000);//其它任務耗時1秒
            }
            System.out.println(LocalDateTime.now()+"---task線程:" + Thread.currentThread().getName()
                    + "任務i=" + i + ",完成!" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
View Code

2.八、經常使用多線程併發,取結果歸集的幾種實現方案

描述 Future FutureTask CompletionService CompletableFuture
原理 Future接口 接口RunnableFuture的惟一實現類,RunnableFuture接口繼承自Future+Runnable 內部經過阻塞隊列+FutureTask接口 JDK8實現了Future, CompletionStage兩個接口
多任務併發執行 支持 支持 支持 支持
獲取任務結果的順序 按照提交順序獲取結果 未知 支持任務完成的前後順序 支持任務完成的前後順序
異常捕捉 本身捕捉 本身捕捉 本身捕捉 原生API支持,返回每一個任務的異常
建議 CPU高速輪詢,耗資源,或者阻塞,可使用,但不推薦 功能不對口,併發任務這一塊多套一層,不推薦使用 推薦使用,沒有JDK8CompletableFuture以前最好的方案 API極端豐富,配合流式編程,推薦使用!

上表來源:http://www.javashuo.com/article/p-evadfian-gq.html

相關文章
相關標籤/搜索