Java SDK 併發包全面總結

1、Lock 和 Condition

Java 併發包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這二者的配合使用,至關於 synchronized、wait()、notify() 的使用。java

1. Lock 的優點

比起傳統的 synchronized 關鍵字,Lock 最大的不一樣(或者說優點)在於:編程

  • 阻塞的線程可以響應中斷,這樣可以有機會釋放本身持有的鎖,避免死鎖
  • 支持超時,若是線程在必定時間內未獲取到鎖,不是進入阻塞狀態,而是拋出異常
  • 非阻塞的獲取鎖,若是未獲取到鎖,不進入阻塞狀態,而是直接返回

三種狀況分別對應 Lock 的三個方法:void lockInterruptibly()boolean tryLock(long time, TimeUnit unit)boolean tryLock()安全

Lock 最經常使用的一個實現類是 ReentrantLock,表明可重入鎖,意思是能夠反覆獲取同一把鎖。
除此以外,Lock 的構造方法能夠傳入一個 boolean 值,表示是不是公平鎖。性能優化

2. Lock 和 Condition 的使用

前面實現的簡單的阻塞隊列就是使用 Lock 和 Condition ,如今其含義已經很是明確了:多線程

public class BlockingQueue<T> {
    private int capacity;
    private int size;

    //定義鎖和條件
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    /**
     * 入隊列
     */
    public void enqueue(T data){
        lock.lock();
        try {
            //若是隊列滿了,須要等待,直到隊列不滿
            while (size >= capacity){
                notFull.await();
            }
            //入隊代碼,省略
            //入隊以後,通知隊列已經不爲空了
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //在finally塊中釋放鎖,避免死鎖
            lock.unlock();
        }
    }

    /**
     * 出隊列
     */
    public T dequeue(){
        lock.lock();
        try {
            //若是隊列爲空,須要等待,直到隊列不爲空
            while (size <= 0){
                notEmpty.await();
            }
            //出隊代碼,省略
            //出隊列以後,通知隊列已經不滿了
            notFull.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        //實際應該返回出隊數據
        return null;
    }
}

能夠看到,Lock 須要手動的加鎖和解鎖,而且解鎖操做是放在 finally 塊中的,這是一種編程範式,儘可能遵照。併發

2、ReadWriteLock

ReadWriteLock 表示讀寫鎖,適用於讀多寫少的狀況,讀寫鎖通常有幾個特徵:app

  • 讀鎖與讀鎖之間不互斥,即容許多個線程同時讀變量。
  • 寫鎖與讀鎖之間互斥,一個線程在寫時,不容許讀操做。
  • 寫鎖與寫鎖之間互斥,只容許 一個線程寫操做。

讀寫鎖減少了鎖的粒度,在讀多寫少的場景下,對性能的提高較爲明顯。ReadWriteLock 的簡單使用示例以下:框架

public class ReadWriteLockTest {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock =lock.readLock();
    private final Lock writeLock =lock.writeLock();

    private int value;

    //加寫鎖
    private void addValue(){
        writeLock.lock();
        try {
            value += 1;
        }
        finally {
            writeLock.unlock();
        }
    }

    //加讀鎖
    private int getValue(){
        readLock.lock();
        try {
            return value;
        }
        finally {
            readLock.unlock();
        }
    }
}

讀寫鎖的升級與降級異步

Java 中不容許鎖的升級,即加寫鎖時必須釋放讀鎖。ide

可是容許鎖的降級,即加讀鎖時,能夠不釋放寫鎖,最後讀鎖和寫鎖一塊兒釋放。

3、StampedLock

1. StampedLock 的使用及特色

StampedLock 是 Java 1.8 版本中提供的鎖,主要支持三種鎖模式:寫鎖、悲觀讀鎖、樂觀讀。

其中寫鎖和悲觀讀鎖跟 ReadWriteLock 中的寫鎖和讀鎖的概念相似。StampedLock 在使用的時候不同,加鎖的時候會返回一個參數,解鎖的時候須要傳入這個參數,示例以下:

public class StampedLockTest {

    private final StampedLock lock = new StampedLock();
    private int value;

    private void addValue(){
        long stamp = lock.writeLock();
        try {
            value += 1;
        }
        finally {
            lock.unlockWrite(stamp);
        }
    }
}

StampedLock 最主要的特色是支持「樂觀讀」,即當進行讀操做的時候,並非全部的寫操做都被阻塞,容許一個線程獲取寫鎖。樂觀讀的使用示例以下:

public class StampedLockTest {

    private final StampedLock lock = new StampedLock();
    private int value;

    private void getValue(){
        //樂觀讀,讀入變量
        long stamp = lock.tryOptimisticRead();
        int a = value;
        //若是驗證失敗
        if (!lock.validate(stamp)){
            //升級爲悲觀讀鎖,繼續讀入變量
            stamp = lock.readLock();
            try {
                a = value;
            }
            finally {
                lock.unlockRead(stamp);
            }
        }
    }
}

須要注意的是,這裏使用 validate() 方法進行驗證,若是樂觀讀失敗,則升級爲悲觀讀鎖,繼續獲取變量。

2. StampedLock 的注意事項

StampedLock 不支持重入,即不可反覆獲取同一把鎖。

在使用 StampedLock 的時候,不要調用中斷操做。若是須要支持中斷,能夠調用 readLockInterruptibly 和 writeLockInterruptibly 方法。

4、Semaphore

Semaphore 表示信號量,初始化對象的時候,須要傳一個參數,表示信號量的計數器值。acquire() 方法將計數器加 1,release() 方法減 1,這兩個方法都可以保證原子性。

信號量的簡單示例:

public class SemaphoreTest {

    private final Semaphore semaphore = new Semaphore(1);
    private int value;

    public void addValue() {
        try {
            semaphore.acquire();
            value += 1;
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            semaphore.release();
        }
    }

程序中使用信號量實現了一個線程安全的方法,初始值設爲了 1,當多個方法訪問 addValue 方法的時候,因爲 acquire 方法保證原子性,因此只能有一個線程將計數器減 1 並進入臨界區,另外一個線程等待。

一個線程執行完後,調用 release 方法,計數器加 1,另外一個等待的線程被喚醒。

Semaphore 與 Lock 的一個不一樣點即是信號量容許多個線程同時進入臨界區,例如將初始值設置的更大一些。例以下面這個例子:

public class SemaphoreTest {

    //初始值 2,表示 2 個線程可同時進入臨界區
    private final Semaphore semaphore = new Semaphore(2);

    public void test() {
        try {
            semaphore.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + " 進入臨界區 : " + System.currentTimeMillis());
            Thread.sleep(1000);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            semaphore.release();
        }
    }
}

5、CountDownLatch

CountDownLatch 是一個線程同步的工具,主要實現一個線程等待多個線程的功能。在原始的 Thread 中,能夠調用 join() 方法來等待線程執行完畢,而 CountDownLatch 則能夠用在線程池中的線程等待。

下面是 CountDownLatch 的使用示例:

public class CountDownLatchTest {

    //實際生產中不推薦使用這種建立線程的方式
    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        threadPool.execute(() -> {
            System.out.println("線程1執行完畢");
            latch.countDown();
        });
        threadPool.execute(() -> {
            System.out.println("線程2執行完畢");
            latch.countDown();
        });
        latch.await();
        System.out.println("兩個線程都執行完畢");

        threadPool.shutdown();
    }
}

CountDownLatch 的初始值爲 2,線程執行完畢則調用 countDown 方法,計數器減 1。減到 0 的時候,會喚醒主線程繼續執行。

6、CyclicBarrier

CyclicBarrier 也是一個線程同步工具類,主要實現多個線程之間的互相等待。

CyclicBarrier 有兩個構造函數,能夠傳一個計數器的初始值,還能夠加上一個 Runnable,表示計數器執行減到 0 的時候,須要執行的回調方法。

public class CyclicBarrierTest {

    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
    private final CyclicBarrier barrier = new CyclicBarrier(2, this::note);

    public void print(){
        threadPool.execute(() -> {
            System.out.println("線程1執行完畢");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        threadPool.execute(() -> {
            System.out.println("線程2執行完畢");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        threadPool.shutdown();
    }
    public void note(){
        System.out.println("兩個線程執行完畢");
    }
}

示例中設置 CyclicBarrier 的初始值爲 2,線程執行完畢調用 await 方法,計數器減 1。print() 方法中的兩個線程執行完後,計數器減到 0,就會調用 note 方法。

7、ThreadPoolExecutor

1. 線程池的工做原理

因爲線程是一種重量級對象,頻繁的建立和銷燬比較消耗系統資源,所以線程池的優點就顯現出來了。線程池可有下降資源消耗,由於不用頻繁建立和銷燬線程;提升響應速度,須要執行任務時,可直接使用線程池中的線程資源;還可以有效的管理、監控線程池中的線程。

Java 中的線程池的實現是一種很典型的生產者-消費者模式,使用線程的一方是生產者,主要提供須要執行的任務,線程池是消費者,消費生產者提供的任務。

下面這段代碼可以幫助理解線程池的實現原理(僅用於幫助理解,實際執行結果有出入):

public class ThreadPool {
    //保存任務的阻塞隊列
    private BlockingQueue<Runnable> workQueue;
    //保存工做線程的列表
    private List<WorkThread> threadList = new ArrayList<>();

    //構造方法
    public ThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        //根據poolSize的數量建立工做線程,並執行線程
        for (int i = 0; i < poolSize; i++) {
            WorkThread thread = new WorkThread();
            thread.start();
            threadList.add(thread);
        }
    }

    //執行任務的方法,主要是將任務添加到隊列中
    public void execute(Runnable task) {
        try {
            workQueue.put(task);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //工做線程
    class WorkThread extends Thread{
        @Override
        public void run() {
            //循環取出任務執行
            while (!workQueue.isEmpty()) {
                try {
                    Runnable task = workQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上面的代碼註釋很詳細了,主要是使用了一個阻塞隊列,用來存儲生產者的任務。而後在構造器中建立線程,並循環從隊列中取出任務執行。

2. Java 中的線程池

Java 中提供了 Executors 這個類來快速建立線程池,簡單使用示例以下:

Executors.newSingleThreadExecutor();//建立一個線程的線程池
Executors.newFixedThreadPool(5);//建立固定數量線程
Executors.newCachedThreadPool();//建立可調整數量的線程
Executors.newScheduledThreadPool(5);//建立定時任務線程池

可是在《阿里巴巴Java開發手冊》中,明確禁止使用 Executors 建立線程池(甚至也不建議使用 Thread 顯式建立線程),主要緣由是 Executors 的默認方法都是使用的無界隊列,在高負載的狀況下,很容易致使 OOM(Out Of Memory)。

因此在 Java 中建立線程池的正確姿式是使用 ThreadPoolExecutor ,其構造函數有七個:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,//可選
                          RejectedExecutionHandler handler//可選
                          ) { ...
  • corePoolSize:線程池中最少的線程數
  • maximumPoolSize:線程池中建立的最大的線程數
  • keepAliveTime:表示線程池中線程的活躍時間,若是線程在這個活躍時間內沒有執行任務,而且線程數量超過了 corePoolSize,那麼線程池就會回收多餘的線程。
  • TimeUnit:上一個參數的時間單位
  • workQueue:保存任務的隊列,爲了不 OOM,建議使用有界隊列
  • threadFactory:可選參數,不傳的話就是默認值。也能夠本身傳一個實現了 ThreadFactory 接口的類,表示自定義線程,例如給線程指定名字,線程組等。
  • handler:可選參數。定義任務的拒絕策略,表示無空閒線程時,而且隊列中的任務滿了的,怎麼拒絕新的任務。目前的拒絕策略有四種:

    • AbortPolicy:默認的拒絕策略,拋出 RejectedExecutionException 異常
    • CallerRunsPolicy:讓提交任務的線程本身去執行這個任務
    • DiscardOldestPolicy:丟棄最老的任務,及最早加入隊列中的任務,並添加新的任務
    • DiscardPolicy:直接丟棄任務,而且不會拋出任何異常

調用 ThreadPoolExecutor

線程池建立好了以後,就須要執行任務,ThreadPoolExecutor 提供了兩個方法,一是 execute,二是 submit。execute 沒有返回值,也就是說沒法獲取執行結果。使用示例以下:

public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    threadPool.execute(() -> {
        System.out.println("In this world");
    });

    threadPool.shutdown();
}

而 submit 方法有一個 Future 接口的返回值,Future 接口有五個方法:

  • cancle:取消任務
  • isCancled:任務是否已取消
  • isDone:任務是否已執行完
  • get:獲取任務執行結果
  • get(long timeout, TimeUnit unit):支持超時獲取任務執行結果

下面代碼展現了取消任務的方法:

public static void main(String[] args) {
    
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);

    Future<?> future = threadPool.submit(() -> {
        System.out.println("I am roseduan");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    future.cancel(false);
    threadPool.shutdown();
}

程序的本意是打印語句而後休眠 5 秒,但因爲調用了 cancle 方法 ,所以程序直接結束,不會有任何輸出。

8、FutureTask

FutureTask 也是一個支持獲取任務執行結果的工具類,FutureTask 實現了 Runnable 和 Future 接口。

因此能夠將 FutureTask 做爲任務提交給 ThreadPoolExecutor 或者 Thread 執行,而且能夠獲取執行結果。簡單的使用以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //建立任務
    FutureTask<String> task = new FutureTask<>(() -> "Java and " + "Python");

    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    
    threadPool.execute(task);
    //獲取執行結果
    System.out.println(task.get());

    threadPool.shutdown();
}

傳給 Thread 做爲參數的使用示例以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> task = new FutureTask<>(() -> 1 + 2);
        Thread thread = new Thread(task);
        thread.start();
        System.out.println(task.get());//輸出3
    }

9、CompletableFuture

CompletableFuture 是一個異步編程的工具類,異步化可以最大化並行程序的執行,是多線程性能優化的基礎。

1. 建立 CompletableFuture 對象

Completable 有四個靜態方法,能夠用來建立對象:

runAsync(Runnable runnable);//無返回值
runAsync(Runnable runnable, Executor executor);//無返回值,可指定線程池

supplyAsync(Supplier<U> supplier);//有返回值
supplyAsync(Supplier<U> supplier, Executor executor);//有返回值,可指定線程池

能夠看到,四個方法分爲了是否有返回值,和是否自定義線程池。若是不自定義線程池,那麼 CompletableFuture 會使用公共的線程池,默認建立 CPU 核數的數量的線程池,當有多個任務的時候,仍是建議根據每一個任務自定義線程池。

一個簡單的使用示例以下,其中 task3 會等待兩個任務都執行完畢:

public static void main(String[] args) {
    CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
        System.out.println("任務1執行完畢");
    });
    CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任務2執行完畢");
    });
    CompletableFuture<String> task3 = task1.thenCombine(task2, (__, res) -> "兩個任務執行完畢");

    System.out.println(task3.join());
}

CompletableFuture 實現了 Future 接口,所以能夠查看任務執行的狀況,而且能夠獲取返回值。

2. CompletionStage 接口中的方法

CompletableFuture 還實現了 CompletionStage 接口。這個接口描述了任務之間的時序關係,分別有串行、並行、聚合三種關係。須要注意的是,並行本就是其所具備的特性,因此再也不探討了,而且聚合關係又分爲了 AND 聚合關係和 OR 聚合關係。下面依次介紹串行、AND 聚合、OR 聚合這三種關係。

首先是串行關係,串行很簡單,一個任務執行完後再執行另外一個任務,例以下圖:

在這裏插入圖片描述

描述串行關係的幾個方法是:thenApply、thenAccept、thenRun、thenCompose。

thenApply 既支持接收參數,又可以支持返回值。

thenAccept 支持接收參數,可是不支持返回值。

thenRun 既不能接收參數,也不能有返回值。

CompletionStage 中的大部分方法都有帶有 Async 後綴的方法,表示可能會使用其餘的線程來執行主體中的內容,後面介紹的方法都相似這樣,再也不贅述。

簡單的使用示例以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任務1執行完畢");
        return "Task1";
    }).thenApply((s) -> "接收到的參數 : " + s);;

    System.out.println(future.get());
}

其次是 AND 匯聚關係,典型的場景即是一個線程等待兩個線程都執行完後再執行,例以下圖:

在這裏插入圖片描述

描述 AND 聚合關係的有三個方法:thenCombine、thenAcceptBoth、runAfterBoth,其是否接收參數和支持返回值,和上面的三個方法對應。一個簡單的使用示例以下:

public static void main(String[] args) {
    CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任務1執行完畢");
        return "task1";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任務2執行完畢");
        return "task2";
    });

    CompletableFuture<String> task3 = task1.thenCombine(task2, (r,s) -> r + " " + s);
    System.out.println(task3.join());
}

任務 1 休眠了 2 秒,任務 3 會等待前面兩個任務執行完成以後再執行。

最後是 OR 聚合關係,表示線程等待其中一個線程知足條件以後,就能夠繼續執行了,不用等待所有的線程。

在這裏插入圖片描述

描述 OR 聚合關係的是 applyToEither、acceptEither、runAfterEither。使用示例和上面的相似,只須要將方法改一下就是了,這裏再也不贅述了。

3. 處理異常

在異步編程中,CompletionStage 接口還提供了幾個能夠處理異常的方法,和 try() catch() finally() 相似。

這幾個方法分別是 :

  • exceptionally:至關於 catch
  • whenComplete:至關於 finally
  • handle:至關於 finally ,支持返回值

使用示例以下:

public static void main(String[] args) {
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
        String str = null;
        return str.length();
        //至關於catch
    }).exceptionally((e) -> {
        System.out.println("發生異常");
        return 0;
    });

    //至關於 finally
    task.whenComplete((s, r) -> {
        System.out.println("執行結束");
    });
    System.out.println(task.join());
}

10、CompletionService

CompletionService 是一個批量執行異步任務的工具類,先來看一個例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));

    Future<String> task1 = threadPool.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    Future<String> task2 = threadPool.submit(() -> "Task2");
    Future<String> task3 = threadPool.submit(() -> "Task3");
    
    sb.append(task1.get());
    sb.append(task2.get());
    sb.append(task3.get());
}

程序的意思是,依次執行三個任務,並將其結果存儲到 StringBuffer 中,因爲 task1 休眠了 2 秒,因此 sb 會在這裏阻塞。

因爲這三個任務之間沒有關聯,因此等待的消耗徹底是不必的,解決的辦法即是利用一個阻塞隊列,先執行完的任務將結果保存在隊列中,sb 從隊列中取出就好了。

CompletionService 實際上就是將線程池和阻塞隊列的功能整合了起來,解決了相似上面的問題。CompletionService 的實現類是 ExecutorCompletionService,這個類有兩個構造方法:

public ExecutorCompletionService(Executor executor) {}
public ExecutorCompletionService(Executor executor,
    BlockingQueue<Future<V>> completionQueue) {}

若是不傳一個阻塞隊列,則會使用默認的無界隊列。

CompletionService 主要有這幾個方法:

submit() 提交任務、take() 從阻塞隊列中獲取執行結果(若是隊列爲空,線程阻塞)、poll() 也是從隊列中獲取執行結果(若是隊列爲空,則返回 null),另外 poll 還支持超時獲取。

使用 CompletionService 改造後的程序示例以下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));

    CompletionService<String> service = new ExecutorCompletionService<>(threadPool);
    service.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    service.submit(() -> "Task2");
    service.submit(() -> "Task3");

    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
}

11、Fork/Join

1. Fork/Join 使用

Fork/Join 是一個處理分治任務的計算框架,所謂分治,即分而治之,將一個任務分解成子任務,求解子任務,而後將子任務的結果合併,就獲得了最後的結果。分治思想的應用十分的普遍,例如常見的快速排序、歸併排序,還有流行的大數據計算框架 MapReduce,都應用了分治思想。

Java 中,Fork 對應的是 任務分解,Join 則表示 子任務的結果合併。

Fork/Join 主要包含兩個主要的實現類:

  • 一是線程池 ForkJoinPool,默認會建立 CPU核數數量的線程
  • 二是 ForkJoinTask,這是一個抽象類,主要的方法有 fork() 和 join(),前者表示執行子任務,後者表示阻塞等待子任務的執行結果。ForkJoinTask 還有兩個子類:

    • RecursiveTask
    • RecursiveAction

這兩個類也是抽象的,咱們須要自定義並繼承這個類,並覆蓋其 compute 方法。其中 RecursiveTask 有返回值,而 RecursiveAction 沒有返回值。

下面是一個使用 ForkJoin 的示例,實現了 n 的階乘,註釋寫得比較詳細。

public class ForkJoinTest {
    public static void main(String[] args) {
        //建立線程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        //建立任務
        Factorial task = new Factorial(6);
        //invoke 方法執行任務(還可使用 execute、submit),獲得執行的結果
        Integer res = forkJoinPool.invoke(task);
        System.out.println(res);
    }

    static class Factorial extends RecursiveTask<Integer> {
        private final int n;
        Factorial(int n) {
            this.n = n;
        }
        @Override
        protected Integer compute() {
            if (n == 0){
                return 1;
            }
            Factorial f = new Factorial(n - 1);
            //執行子任務
            f.fork();
            //等待子任務結果
            return n * factorial.join();
        }
    }
}

2. ForkJoinPool 原理

和普通的線程池相似,ForkJoinPool 是一個特殊的線程池,而且也採用的是生產者 - 消費者模式。跟普通線程池共享一個隊列不一樣,ForkJoinPool 其中維護了多個雙端隊列,當一個線程對應的任務隊列爲空的時候,線程並不會空閒,而是「竊取」其餘隊列的任務執行。

因爲是雙端隊列,正常執行任務和「竊取任務」能夠從兩端進行出隊,這樣避免了數據競爭。

採用「任務竊取」這種模式,也是 ForkJoinPool 比普通線程池更加智能的體現。

相關文章
相關標籤/搜索