【併發編程】JUC組件擴展(Callable、FutureTask、Fork/Join 框架、BlockingQueue)

前言

以前咱們已經學習了建立線程的2種方式,一種是直接繼承Thread,另一種就是實現Runnable接口。java

這2種方式都有一個缺陷就是:在執行完任務以後沒法獲取執行結果。算法

若是要得到返回值就必須經過共享變量或者線程間通訊的方式,實現起來較複雜。數組

所以在Java5開始提供了CallableFuture,經過它們能夠在任務執行完畢以後獲得任務執行結果。框架

Callable

首先咱們來看一下Runnable的源碼ide

@FunctionalInterface
public interface Runnable {
    /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */
    public abstract void run();
}
複製代碼

能夠看到run方法的返回值爲void函數

下面來看一下Callable的源碼學習

@FunctionalInterface
public interface Callable<V> {
    /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */
    V call() throws Exception;
}
複製代碼

能夠看到,這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型。this

Callable的使用會在後面的代碼演示中給出。spa

Future

Future類位於java.util.concurrent包下,它是一個接口.線程

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

Future接口中聲明瞭5個方法,下面依次解釋每一個方法的做用:

  • cancel方法:用來取消任務,若是取消任務成功則返回true,若是取消任務失敗則返回false。參數mayInterruptIfRunning表示是否容許取消正在執行卻沒有執行完畢的任務,若是任務已經完成,則不管mayInterruptIfRunningtrue仍是false,此方法確定返回false,即若是取消已經完成的任務會返回false;若是任務正在執行,若mayInterruptIfRunning設置爲true,則返回true,若mayInterruptIfRunning設置爲false,則返回false;若是任務尚未執行,則不管mayInterruptIfRunning爲true是什麼,確定返回true

  • isCancelled方法:表示任務是否被取消成功,若是在任務正常完成前被取消成功,則返回 true

  • isDone方法:表示任務是否已經完成,若任務完成,則返回true

  • get()方法:用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;

  • get(long timeout, TimeUnit unit)用來獲取執行結果,若是在指定時間內,還沒獲取到結果,就直接返回null。   

也就是說Future提供了三種功能:

  • 1.判斷任務是否完成;

  • 2.可以中斷任務;

  • 3.可以獲取任務執行結果。

Future的使用

@Slf4j
public class FutureExample {

    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("do sth in callable");
            Thread.sleep(5000);
            return "done";
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do sth in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result: {}", result);
    }
}
複製代碼

運行結果

FutureTask

FutureTask位於JUC包內但不是AQS的子類。

FutureTask實現了FutureRunnable接口,能夠獲取線程的返回值。

FutureTask的使用

@Slf4j
public class FutureTaskExample {


    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do sth. in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do sth. in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result: {}", result);
    }
}
複製代碼

運行結果與上面相同。

從使用例子中能夠看出FutureTask是很是方便的,何時想用就何時啓動線程就能夠了。

FutureTask還能夠傳入多種參數類型,咱們進入它的源碼看一下

/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
複製代碼

能夠看到FutureTask能夠傳入CallableRunnable,傳入Runnable時還能夠指定返回值類型。

Fork/Join 框架

Fork/Join 框架Java7提供的用於並行執行任務的框架,經過把大任務分紅多個小任務最終彙總每一個小任務結果來獲得最終結果。

它主要採用的是工做竊取算法,這個算法是指某個線程從其餘隊列裏竊取任務來執行,過程以下圖

線程的任務採用了雙端隊列,竊取任務時只能從尾部竊取。這個算法的優勢就是能夠充分利用線程進行並行計算,並減小了線程間的競爭。缺點是在極端狀況下仍是存在競爭,如隊列中只有一個任務時。

Fork/Join框架有必定的侷限性:

  • 任務只能使用fork和join操做做爲同步機制,若是使用了其餘同步機制,工做線程執行時就不能執行其餘任務了。
  • 任務不該該執行IO操做
  • 任務不能拋出檢查異常,必須經過必要的代碼來處理它們。

Coding演示

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample (int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //若是任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //若是任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            //執行任務
            leftTask.fork();
            rightTask.fork();

            //任務結束後合併結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            //合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        Future<Integer> result = forkJoinPool.submit(task);

        try{
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}
複製代碼

重寫的compute方法就是遞歸調用自身不斷將大任務拆分紅小任務,最後彙總結果獲得最終結果的。

BlockingQueue

BlockingQueue即阻塞隊列。

當阻塞隊列滿時進行入隊操做和當阻塞隊列空時進行出隊操做都會使線程進入阻塞狀態。

主要用於生產者消費者場景。

ArrayBlockingQueue

大小固定,內部實現是一個數組,在初始化時須要指定容量且不能改變。

先進先出的方式存儲數據,最新數據在尾部。

DelayQueue

阻塞的是內部元素。DelayQueue中的元素必須實現JUC裏的Delay接口(Delay接口繼承了Comparable接口),通常都以元素過時時間的優先級進行排序。

內部實現是優先隊列和lock。

LinkedBlockingQueue

大小配置可選,初始化時指定大小就是有邊界的,若不指定就是無邊界的。

內部實現是鏈表,其餘特色與ArrayBlockingQueue一致。

PriorityBlockingQueue

沒有邊界,有排序規則,容許插入null。

全部插入PriorityBlockingQueue的對象必須實現Comparable接口。

SynchronousQueue

同步隊列

內部僅容許容納一個元素,內部插入一個元素後就會被阻塞,除非這個元素被其餘線程消費。

Written by Autu

2019.7.21

相關文章
相關標籤/搜索