併發編程之Future&FutureTask深刻解析

點贊再看,養成習慣,公衆號搜一搜【一角錢技術】關注更多原創技術文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有個人系列文章。html

前言

Java線程實現方式主要有四種:java

  • 繼承Thread類
  • 實現Runnable接口
  • 實現Callable接口經過FutureTask包裝器來建立Thread線程
  • 使用ExecutorService、Callable、Future實現有返回結果的多線程。

其中前兩種方式線程執行完後都沒有返回值,後兩種是帶返回值的。node

Callable 和 Runnable 接口

Runnable接口

// 實現Runnable接口的類將被Thread執行,表示一個基本的任務
public interface Runnable {
    // run方法就是它全部的內容,就是實際執行的任務
    public abstract void run();
}
複製代碼

沒有返回值

run 方法沒有返回值,雖然有一些別的方法也能實現返回值得效果,好比編寫日誌文件或者修改共享變量等等,可是不只容易出錯,效率也不高。git

不能拋出異常

public class RunThrowExceptionDemo {

    /** * 普通方法能夠在方法簽名中拋出異常 * * @throws IOException */
    public void normalMethod() throws IOException {
        throw new IOException();
    }

    class RunnableImpl implements Runnable {

        /** * run 方法內沒法拋出 checked Exception,除非使用 try catch 進行處理 */
        @Override
        public void run() {
            try {
                throw new IOException();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}
複製代碼

能夠看到普通方法 normalMethod 能夠在方法簽名上拋出異常,這樣上層接口就能夠捕獲這個異常進行處理,可是實現 Runnable 接口的類,run 方法沒法拋出 checked Exception,只能在方法內使用 try catch 進行處理,這樣上層就沒法得知線程中的異常。github

設計致使

其實這兩個缺陷主要緣由就在於 Runnable 接口設計的 run 方法,這個方法已經規定了 run() 方法的返回類型是 void,並且這個方法沒有聲明拋出任何異常。因此,當實現並重寫這個方法時,咱們既不能改返回值類型,也不能更改對於異常拋出的描述,由於在實現方法的時候,語法規定是不容許對這些內容進行修改的。算法

Runnable 爲何設計成這樣?

假設 run() 方法能夠返回返回值,或者能夠拋出異常,也無濟於事,由於咱們並無辦法在外層捕獲並處理,這是由於調用 run() 方法的類(好比 Thread 類和線程池)是 Java 直接提供的,而不是咱們編寫的。 因此就算它能有一個返回值,咱們也很難把這個返回值利用到,而 Callable 接口就是爲了解決這兩個問題。sql

Callable接口

public interface Callable<V> {
    //返回接口,或者拋出異常
    V call() throws Exception;
}
複製代碼

能夠看到 Callable 和 Runnable 接口其實比較相識,都只有一個方法,也就是線程任務執行的方法,區別就是 call 方法有返回值,並且聲明瞭 throws Exception。數據庫

Callable 和 Runnable 的不一樣之處

  • 方法名 :Callable 規定的執行方法是 call(),而 Runnable 規定的執行方法是 run();
  • 返回值 :Callable 的任務執行後有返回值,而 Runnable 的任務執行後是沒有返回值的;
  • 拋出異常 :call() 方法可拋出異常,而 run() 方法是不能拋出受檢查異常的;

與 Callable 配合的有一個 Future 接口,經過 Future 能夠了解任務執行狀況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是 Runnable 作不到的,Callable 的功能要比 Runnable 強大。安全

Future接口

Future的做用

簡單來講就是利用線程達到異步的效果,同時還能夠獲取子線程的返回值。markdown

好比當作必定運算的時候,運算過程可能比較耗時,有時會去查數據庫,或是繁重的計算,好比壓縮、加密等,在這種狀況下,若是咱們一直在原地等待方法返回,顯然是不明智的,總體程序的運行效率會大大下降。

咱們能夠把運算的過程放到子線程去執行,再經過 Future 去控制子線程執行的計算過程,最後獲取到計算結果。這樣一來就能夠把整個程序的運行效率提升,是一種異步的思想。

Future的方法

Future 接口一共有5個方法,源代碼以下:

public interface Future<V> {

  /** * 嘗試取消任務,若是任務已經完成、已取消或其餘緣由沒法取消,則失敗。 * 一、若是任務還沒開始執行,則該任務不該該運行 * 二、若是任務已經開始執行,由參數mayInterruptIfRunning來決定執行該任務的線程是否應該被中斷,這只是終止任務的一種嘗試。若mayInterruptIfRunning爲true,則會當即中斷執行任務的線程並返回true,若mayInterruptIfRunning爲false,則會返回true且不會中斷任務執行線程。 * 三、調用這個方法後,之後對isDone方法調用都返回true。 * 四、若是這個方法返回true,之後對isCancelled返回true。 */
    boolean cancel(boolean mayInterruptIfRunning);

   /** * 判斷任務是否被取消了,若是調用了cance()則返回true */
    boolean isCancelled();

   /** * 若是任務完成,則返回ture * 任務完成包含正常終止、異常、取消任務。在這些狀況下都返回true */
    boolean isDone();

   /** * 線程阻塞,直到任務完成,返回結果 * 若是任務被取消,則引起CancellationException * 若是當前線程被中斷,則引起InterruptedException * 當任務在執行的過程當中出現異常,則拋出ExecutionException */
    V get() throws InterruptedException, ExecutionException;

   /** * 線程阻塞必定時間等待任務完成,並返回任務執行結果,若是則超時則拋出TimeoutException */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

get方法(獲取結果)

get 方法最主要的做用就是獲取任務執行的結果,該方法在執行時的行爲取決於 Callable 任務的狀態,可能會發生如下 7 種狀況。

  • 任務已經執行完,執行 get 方法能夠馬上返回,獲取到任務執行的結果。

  • 任務尚未開始執行,好比咱們往線程池中放一個任務,線程池中可能積壓了不少任務,還沒輪到我去執行的時候,就去 get 了,在這種狀況下,至關於任務還沒開始,咱們去調用 get 的時候,會當前的線程阻塞,直到任務完成再把結果返回回來。

  • 任務正在執行中,可是執行過程比較長,因此我去 get 的時候,它依然在執行的過程當中。這種狀況調用 get 方法也會阻塞當前線程,直到任務執行完返回結果。

  • 任務執行過程當中拋出異常,咱們再去調用 get 的時候,就會拋出 ExecutionException 異常,無論咱們執行 call 方法時裏面拋出的異常類型是什麼,在執行 get 方法時所得到的異常都是 ExecutionException。

  • 任務被取消了,若是任務被取消,咱們用 get 方法去獲取結果時則會拋出 CancellationException。

  • 任務被中斷了,若是任務被當前線程中斷,咱們用 get 方法去獲取結果時則會拋出InterruptedException。

  • 任務超時,咱們知道 get 方法有一個重載方法,那就是帶延遲參數的,調用了這個帶延遲參數的 get 方法以後,若是 call 方法在規定時間內正常順利完成了任務,那麼 get 會正常返回;可是若是到達了指定時間依然沒有完成任務,get 方法則會拋出 TimeoutException,表明超時了。

參考示例:

package com.niuh.future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new FutureTask());
        try {
            Integer res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future線程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    static class FutureTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(new Random().nextInt(3000));
            return new Random().nextInt(10);
        }
    }
}
複製代碼

isDone方法(判斷是否執行完畢)

isDone() 方法,該方法是用來判斷當前這個任務是否執行完畢了

package com.niuh.future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureIsDoneDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new FutureTask());
        try {
            for (int i = 0; i < 3; i++) {
                Thread.sleep(1000);
                System.out.println("線程是否完成:" + future.isDone());
            }
            Integer res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future 線程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    static class FutureTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(2000);
            return new Random().nextInt(10);
        }
    }
}
複製代碼

執行結果:

線程是否完成:false
線程是否完成:false
線程是否完成:true
Future 線程返回值:9
複製代碼

能夠看到前兩次 isDone 方法的返回結果是 false,由於線程任務尚未執行完成,第三次 isDone 方法的返回結果是 ture。

注意:這個方法返回 true 則表明執行完成了,返回 false 則表明還沒完成。但返回 true,並不表明這個任務是成功執行的,好比說任務執行到一半拋出了異常。那麼在這種狀況下,對於這個 isDone 方法而言,它其實也是會返回 true 的,由於對它來講,雖然有異常發生了,可是這個任務在將來也不會再被執行,它確實已經執行完畢了。因此 isDone 方法在返回 true 的時候,不表明這個任務是成功執行的,只表明它執行完畢了。

咱們將上面的示例稍做修改再來看下結果,修改 FutureTask 代碼以下:

static class FutureTask implements Callable<Integer> {
	@Override
	public Integer call() throws Exception {
		Thread.sleep(2000);
		throw new Exception("故意拋出異常");
    }
}
複製代碼

執行結果: 雖然拋出了異常,可是 isDone 方法的返回結果依然是 ture。

這段代碼說明了:

  • 即使任務拋出異常,isDone 方法依然會返回 true。
  • 雖然拋出的異常是 IllegalArgumentException,可是對於 get 而言,它拋出的異常依然是 ExecutionException。
  • 雖然在任務執行到2秒的時候就拋出了異常,可是真正要等到咱們執行 get 的時候,纔看到了異常。

cancel方法(取消任務的執行)

若是不想執行某個任務了,則可使用 cancel 方法,會有如下三種狀況:

  • 第一種狀況最簡單,那就是當任務尚未開始執行時,一旦調用 cancel,這個任務就會被正常取消,將來也不會被執行,那麼 cancel 方法返回 true。

  • 第二種狀況也比較簡單。若是任務已經完成,或者以前已經被取消過了,那麼執行 cancel 方法則表明取消失敗,返回 false。由於任務不管是已完成仍是已經被取消過了,都不能再被取消了。

  • 第三種狀況比較特殊,就是這個任務正在執行,這個時候執行 cancel 方法是不會直接取消這個任務的,而是會根據咱們傳入的參數作判斷。cancel 方法是必須傳入一個參數,該參數叫做 mayInterruptIfRunning,它是什麼含義呢?

    • 若是傳入的參數是 true,執行任務的線程就會收到一箇中斷的信號,正在執行的任務可能會有一些處理中斷的邏輯,進而中止,這個比較好理解。
    • 若是傳入的是 false 則就表明不中斷正在運行的任務,也就是說,本次 cancel 不會有任何效果,同時 cancel 方法會返回 false。

參考示例:

package com.niuh.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureCancelDemo {

    static ExecutorService executorService = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        // 當任務尚未開始執行
        // demo1();

        // 若是任務已經執行完
        // demo2();

        // 若是任務正在進行中
        demo3();
    }

    private static void demo1() {
        for (int i = 0; i < 1000; i++) {
            executorService.submit(new FutureTask());
        }

        Future<String> future = executorService.submit(new FutureTask());
        try {
            boolean cancel = future.cancel(false);
            System.out.println("Future 任務是否被取消:" + cancel);
            String res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future 線程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    private static void demo2() {
        Future<String> future = executorService.submit(new FutureTask());
        try {
            Thread.sleep(1000);
            boolean cancel = future.cancel(false);
            System.out.println("Future 任務是否被取消:" + cancel);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    private static void demo3() {
        Future<String> future = executorService.submit(new FutureInterruptTask());
        try {
            Thread.sleep(1000);
            boolean cancel = future.cancel(true);
            System.out.println("Future 任務是否被取消:" + cancel);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    static class FutureTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "正常返回";
        }
    }

    static class FutureInterruptTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("循環執行");
                Thread.sleep(500);
            }
            System.out.println("線程被中斷");
            return "正常返回";
        }
    }
}
複製代碼

這裏,咱們來分析下第三種狀況(任務正在進行中),當咱們設置 true 時,線程中止

循環執行
循環執行
Future 任務是否被取消:true
複製代碼

當咱們設置 false 時,任務雖然也被取消成功,可是線程依然執行。

循環執行
循環執行
Future 任務是否被取消:true
循環執行
循環執行
循環執行
循環執行
......
複製代碼

那麼如何選擇傳入 true 仍是 false 呢?

  • 傳入 true 適用的狀況是,明確知道這個任務可以處理中斷。
  • 傳入 false 適用於什麼狀況呢?
    • 若是咱們明確知道這個線程不能處理中斷,那應該傳入 false。
    • 咱們不知道這個任務是否支持取消(是否能響應中斷),由於在大多數狀況下代碼是多人協做的,對於這個任務是否支持中斷,咱們不必定有十足的把握,那麼在這種狀況下也應該傳入 false。
    • 若是這個任務一旦開始運行,咱們就但願它徹底的執行完畢。在這種狀況下,也應該傳入 false。

須要注意的是,雖然示例中寫了 !Thread.currentThread().isInterrupted() 方法來判斷中斷,可是實際上並非經過咱們的代碼來進行中斷,而是 Future#cancel(true) 內部調用 t.interrupt 方法修改線程的狀態以後,Thread.sleep 會拋出 InterruptedException 異常,線程池中會執行異常的相關邏輯,並退出當前任務。 sleep 和 interrupt 會產生意想不到的效果。

好比咱們將 FutureInterruptTask 代碼修改成 while(true) 形式,調用 cancel(true) 方法線程仍是會被中斷。

static class FutureInterruptTask implements Callable<String> {
	@Override
	public String call() throws Exception {
		while (true) {
            System.out.println("循環執行");
            Thread.sleep(500);
		}
	}
}
複製代碼

isCancelled方法(判斷是否被取消)

isCancelled 方法,判斷是否被取消,它和 cancel 方法配合使用,比較簡單,能夠參考上面的示例。

Callable 和 Future 的關係

Callable 接口相比於 Runnable 的一大優點是能夠有返回結果,返回結果就能夠用 Future 類的 get 方法來獲取 。所以,Future 至關於一個存儲器,它存儲了 Callable 的 call 方法的任務結果。

除此以外,咱們還能夠經過 Future 的 isDone 方法來判斷任務是否已經執行完畢了,還能夠經過 cancel 方法取消這個任務,或限時獲取任務的結果等,總之 Future 的功能比較豐富。

FutureTask

Future只是一個接口,不能直接用來建立對象,其實現類是FutureTask,JDK1.8修改了FutureTask的實現,JKD1.8再也不依賴AQS來實現,而是經過一個volatile變量state以及CAS操做來實現。FutureTask結構以下所示:

咱們來看一下 FutureTask 的代碼實現:

public class FutureTask implements RunnableFuture {...}
複製代碼

能夠看到,它實現了一個接口,這個接口叫做 RunnableFuture。

RunnableFuture接口

咱們來看一下 RunnableFuture 接口的代碼實現:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /** * Sets this Future to the result of its computation * unless it has been cancelled. */
    void run();
}

複製代碼

既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口,而 FutureTask 又實現了 RunnableFuture 接口,因此 FutureTask 既能夠做爲 Runnable 被線程執行,又能夠做爲 Future 獲得 Callable 的返回值。

FutureTask源碼分析

成員變量

/* * 當前任務運行狀態 * NEW -> COMPLETING -> NORMAL(正常結束,返回結果) * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結果) * NEW -> CANCELLED(任務被取消,無結果) * NEW -> INTERRUPTING -> INTERRUPTED(任務被打斷,無結果) */
private volatile int state;
private static final int NEW          = 0; // 新建 0
private static final int COMPLETING   = 1; // 執行中 1
private static final int NORMAL       = 2; // 正常 2
private static final int EXCEPTIONAL  = 3; // 異常 3
private static final int CANCELLED    = 4; // 取消 4
private static final int INTERRUPTING = 5; // 中斷中 5
private static final int INTERRUPTED  = 6; // 被中斷 6

/** 將要被執行的任務 */
private Callable<V> callable;
/** 存放執行結果,用於get()方法獲取結果,也可能用於get()方法拋出異常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執行任務Callable的線程; */
private volatile Thread runner;
/** 棧結構的等待隊列,該節點是棧中最頂層的節點 */
private volatile WaitNode waiters;
複製代碼

爲了後面更好的分析FutureTask的實現,這裏有必要解釋下各個狀態。

  • NEW :表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
  • COMPLETING :任務已經執行完成或者執行任務的時候發生異常,可是任務執行結果或者異常緣由尚未保存到outcome字段(outcome字段用來保存任務執行結果,若是發生異常,則用來保存異常緣由)的時候,狀態會從NEW變動到COMPLETING。可是這個狀態會時間會比較短,屬於中間狀態。
  • NORMAL :任務已經執行完成而且任務執行結果已經保存到outcome字段,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。
  • EXCEPTIONAL :任務執行發生異常而且異常緣由已經保存到outcome字段中後,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。
  • CANCELLED :任務還沒開始執行或者已經開始執行可是尚未執行完成的時候,用戶調用了cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從NEW轉化爲CANCELLED狀態。這是一個最終態。
  • INTERRUPTING :任務還沒開始執行或者已經執行可是尚未執行完成的時候,用戶調用了cancel(true)方法取消任務而且要中斷任務執行線程可是尚未中斷任務執行線程以前,狀態會從NEW轉化爲INTERRUPTING。這是一箇中間狀態。
  • INTERRUPTED :調用interrupt()中斷任務執行線程以後狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。

有一點須要注意的是,全部值大於COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。

構造方法

// Callable 構造方法
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

// Runnable 構造方法
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
複製代碼

Runnable的構造器,只有一個目的,就是經過Executors.callable把入參轉化爲RunnableAdapter,主要是由於Callable的功能比Runnable豐富,Callable有返回值,而Runnable沒有。

/** * A callable that runs given task and returns given result */
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
複製代碼

這是一個典型的適配模型,咱們要把 Runnable 適配成 Callable,首先要實現 Callable 的接口,接着在 Callable 的 call 方法裏面調用被適配對象(Runnable)的方法。

內部類

static final class WaitNode {
	volatile Thread thread;
	volatile WaitNode next;
	WaitNode() { thread = Thread.currentThread(); }
}
複製代碼

run方法

/** * run方法能夠直接被調用 * 也能夠開啓新的線程調用 */
public void run() {
	// 狀態不是任務建立,或者當前任務已經有線程在執行了,直接返回
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // Callable 不爲空,而且已經初始化完成
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            	//調用執行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;//執行失敗
                //經過CAS算法設置返回值(COMPLETING)和狀態值(EXCEPTIONAL)
                setException(ex);
            }
            //執行成功經過CAS(UNSAFE)設置返回值(COMPLETING)和狀態值(NORMAL)
            if (ran)
            	//將result賦值給outcome
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //將任務runner設置爲null,避免發生併發調用run()方法
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        //須從新讀取任務狀態,避免不可達(泄漏)的中斷
        int s = state;
        //確保cancle(ture)操做時,運行中的任務能接收到中斷指令
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

複製代碼
  1. run方法是沒有返回值的,經過給outcome屬性賦值(set(result)),get時就能從outcome屬性中拿到返回值。
  2. FutureTask 兩種構造器,最終都轉化成了 Callable,因此在 run 方法執行的時候,只須要執行 Callable 的 call 方法便可,在執行 c.call()代碼時,若是入參是 Runnable 的話, 調用路徑爲 c.call() -> RunnableAdapter.call() -> Runnable.run(),若是入參是 Callable 的話,直接調用。

setException(Throwable t)方法

//發生異常時,將返回值設置到outcome(=COMPLETING)中,並更新任務狀態(EXCEPTIONAL)
protected void setException(Throwable t) {
	//調用UNSAFE類封裝的CAS算法,設置值
	if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    	outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    //喚醒因等待返回值而阻塞的線程
    finishCompletion();
    }
}
複製代碼

set(V v)方法

//任務正常完成,將返回值設置到outcome(=COMPLETING)中,並更新任務狀態(=NORMAL)
protected void set(V v) {
	if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
		outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
複製代碼

 finishCompletion方法

//移除全部等待線程併發出信號,調用done(),以及將任務callable清空
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            //循環喚醒阻塞線程,直到阻塞隊列爲空
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //一直到阻塞隊列爲空,跳出循環
                if (next == null)
                    break;
                q.next = null; // unlink to help gc 方便gc在適當的時候回收
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
複製代碼

handlePossibleCancellationInterrupt方法

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us. Let's spin-wait patiently.
    //自旋等待cancle(true)結束(中斷結束)
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
             Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true). However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}
複製代碼

cancle方法

//取消任務執行
public boolean cancel(boolean mayInterruptIfRunning) {
    //對NEW狀態的任務進行中斷,並根據參數設置state
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //任務已完成(已發出中斷或已取消)
        return false;       
    //中斷線程
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//cancel(true)
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //經過CAS算法,更新狀態
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //喚醒阻塞線程
        finishCompletion();
    }
    return true;
}
複製代碼

get方法

/** * 獲取執行結果 * @throws CancellationException {@inheritDoc} */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //假如任務尚未執行完,則阻塞則塞線程,直至任務執行完成(結果已經存放到對應變量中)
        s = awaitDone(false, 0L);
    //返回結果
    return report(s);
}

/** * 獲取任務執行結果,指定時間結束,則超時返回,再也不阻塞 * @throws CancellationException {@inheritDoc} */
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
複製代碼

awaitDone方法

/** * Awaits completion or aborts on interrupt or timeout. * 如英文註釋:等待任務執行完畢或任務中斷或任務超時 * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //循環等待
    for (;;) {
        //線程已經中斷,則移除等待任務
        if (Thread.interrupted()) {
            removeWaiter(q);
            //移除當前任務後,拋出中斷異常
            throw new InterruptedException();
        }

        //任務已經完成,則返回任務狀態,並對當前任務清場處理
        int s = state;
        if (s > COMPLETING) {
            if (q != null) //任務不爲空,則將執行線程設爲null,避免併發下重複執行
                q.thread = null;
            return s;
        }
        //設置結果,很快就能完成,自旋等待
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();  //任務提早退出
        //正在執行或者還沒開始,則構建新的節點
        else if (q == null)
            q = new WaitNode();
        //判斷是否入隊,新節點通常在下一次循環入隊列阻塞
        else if (!queued)
            //沒有入隊列,設置q.next=waiters,並將waiters設爲q
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
        //假若有超時限制,則判斷是否超時
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //超時則將任務節點從阻塞隊列中移除,並返回狀態
                removeWaiter(q);
                return state;
            }
            //阻塞調用get方法的線程,有超時限制
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞調用get方法的線程,無超時限制
            LockSupport.park(this);
    }
}
複製代碼

removeWaiter方法

//移除任務節點
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                    }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                    continue retry;
            }
            break;
        }
    }
}
複製代碼

done()方法

protected void done() { }
複製代碼

默認實現不起任何做用。子類能夠重寫,此方法調用完成回調或執行。注意:也能夠在實現此方法來肯定此任務是否已取消。

Future的使用

FutureTask可用於異步獲取執行結果或取消執行任務的場景。經過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執行,以後能夠在外部經過FutureTask的get方法異步獲取執行結果,所以,FutureTask很是適合用於耗時的計算,主線程能夠在完成本身的任務後,再去獲取結果。另外,FutureTask還能夠確保即便調用了屢次run方法,它都只會執行一次Runnable或者Callable任務,或者經過cancel取消FutureTask的執行等。

FutureTask執行多任務計算的使用場景

利用FutureTask和ExecutorService,能夠用多線程的方式提交計算任務,主線程繼續執行其餘任務,當主線程須要子線程的計算結果時,在異步獲取子線程的執行結果。

package com.niuh.future;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/** * <p> * FutureTask執行多任務計算的使用場景 * </p> */
public class FutureTaskForMultiCompute {
    public static void main(String[] args) {

        FutureTaskForMultiCompute inst = new FutureTaskForMultiCompute();

        // 建立任務集合
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

        // 建立線程池
        ExecutorService exec = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            // 傳入Callable對象建立FutureTask對象
            FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, "" + i));

            taskList.add(ft);
            
            // 提交給線程池執行任務,也能夠經過exec.invokeAll(taskList)一次性提交全部任務;
            exec.submit(ft);
        }

        System.out.println("全部計算任務提交完畢, 主線程接着幹其餘事情!");

        // 開始統計各計算線程計算結果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                //FutureTask的get方法會自動阻塞,直到獲取計算結果爲止
                totalResult = totalResult + ft.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 關閉線程池
        exec.shutdown();
        System.out.println("多任務計算後的總結果是:" + totalResult);

    }

    private class ComputeTask implements Callable<Integer> {

        private Integer result = 0;
        private String taskName = "";

        public ComputeTask(Integer iniResult, String taskName) {
            result = iniResult;
            this.taskName = taskName;
            System.out.println("生成子線程計算任務: " + taskName);
        }

        public String getTaskName() {
            return this.taskName;
        }

        @Override
        public Integer call() throws Exception {
            // TODO Auto-generated method stub

            for (int i = 0; i < 100; i++) {
                result = +i;
            }
            // 休眠5秒鐘,觀察主線程行爲,預期的結果是主線程會繼續執行,到要取得FutureTask的結果是等待直至完成。
            Thread.sleep(5000);
            System.out.println("子線程計算任務: " + taskName + " 執行完成!");
            return result;
        }
    }
}
複製代碼

執行結果:

生成子線程計算任務: 0
生成子線程計算任務: 1
生成子線程計算任務: 2
生成子線程計算任務: 3
生成子線程計算任務: 4
生成子線程計算任務: 5
生成子線程計算任務: 6
生成子線程計算任務: 7
生成子線程計算任務: 8
生成子線程計算任務: 9
全部計算任務提交完畢, 主線程接着幹其餘事情!
子線程計算任務: 0 執行完成!
子線程計算任務: 1 執行完成!
子線程計算任務: 3 執行完成!
子線程計算任務: 4 執行完成!
子線程計算任務: 2 執行完成!
子線程計算任務: 5 執行完成!
子線程計算任務: 7 執行完成!
子線程計算任務: 9 執行完成!
子線程計算任務: 8 執行完成!
子線程計算任務: 6 執行完成!
多任務計算後的總結果是:990
複製代碼

FutureTask在高併發環境下確保任務只執行一次

在不少高併發的環境下,每每咱們只須要某些任務只執行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設有一個帶key的鏈接池,當key存在時,即直接返回key對應的對象;當key不存在時,則建立鏈接。對於這樣的應用場景,一般採用的方法爲使用一個Map對象來存儲key和鏈接池對應的對應關係,典型的代碼以下面所示:

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/** * @program: 錯誤示例 * @description: 在不少高併發的環境下,每每咱們只須要某些任務只執行一次。 * 這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設有一個帶key的鏈接池, * 當key存在時,即直接返回key對應的對象;當key不存在時,則建立鏈接。對於這樣的應用場景, * 一般採用的方法爲使用一個Map對象來存儲key和鏈接池對應的對應關係,典型的代碼以下 * 在例子中,咱們經過加鎖確保高併發環境下的線程安全,也確保了connection只建立一次,然而卻犧牲了性能。 */
public class FutureTaskConnection1 {
    private static Map<String, Connection> connectionPool = new HashMap<>();
    private static ReentrantLock lock = new ReentrantLock();

    public static Connection getConnection(String key) {
        try {
            lock.lock();
            Connection connection = connectionPool.get(key);
            if (connection == null) {
                Connection newConnection = createConnection();
                connectionPool.put(key, newConnection);
                return newConnection;
            }
            return connection;
        } finally {
            lock.unlock();
        }
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}
複製代碼

在上面的例子中,咱們經過加鎖確保高併發環境下的線程安全,也確保了connection只建立一次,然而卻犧牲了性能。改用ConcurrentHash的狀況下,幾乎能夠避免加鎖的操做,性能大大提升。

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;

/** * @description: 改用ConcurrentHash的狀況下,幾乎能夠避免加鎖的操做,性能大大提升。 * <p> * 可是在高併發的狀況下有可能出現Connection被建立屢次的現象。 * 爲何呢?由於建立Connection是一個耗時操做,假設多個線程涌入getConnection方法,都發現key對應的鍵不存在, * 因而全部涌入的線程都開始執行conn=createConnection(),只不過最終只有一個線程能將connection插入到map裏。 * 可是這樣以來,其它線程建立的的connection就沒啥價值,浪費系統開銷。 */
public class FutureTaskConnection2 {
    private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>();

    public static Connection getConnection(String key) {
        Connection connection = connectionPool.get(key);
        if (connection == null) {
            connection = createConnection();
            //根據putIfAbsent的返回值判斷是否有線程搶先插入了
            Connection returnConnection = connectionPool.putIfAbsent(key, connection);
            if (returnConnection != null) {
                connection = returnConnection;
            }
        } else {
            return connection;
        }
        return connection;
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

}
複製代碼

可是在高併發的狀況下有可能出現Connection被建立屢次的現象。 爲何呢?

由於建立Connection是一個耗時操做,假設多個線程涌入getConnection方法,都發現key對應的鍵不存在,因而全部涌入的線程都開始執行conn=createConnection(),只不過最終只有一個線程能將connection插入到map裏。可是這樣以來,其它線程建立的的connection就沒啥價值,浪費系統開銷。

這時最須要解決的問題就是當key不存在時,建立Connection的動做(conn=createConnection();)能放在connectionPool.putIfAbsent()以後執行,這正是FutureTask發揮做用的時機,基於ConcurrentHashMap和FutureTask的改造代碼以下:

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/** * @description: FutureTask在高併發環境下確保任務只執行一次 * 這時最須要解決的問題就是當key不存在時,建立Connection的動做(conn=createConnection();) * 能放在connectionPool.putIfAbsent()以後執行,這正是FutureTask發揮做用的時機, * 基於ConcurrentHashMap和FutureTask的改造代碼以下: */
public class FutureTaskConnection3 {
    private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();

    public static Connection getConnection(String key) {
        FutureTask<Connection> connectionFutureTask = connectionPool.get(key);
        try {
            if (connectionFutureTask != null) {
                return connectionFutureTask.get();
            } else {
                Callable<Connection> callable = new Callable<Connection>() {
                    @Override
                    public Connection call() throws Exception {
                        return createConnection();
                    }
                };
                FutureTask<Connection> newTask = new FutureTask<>(callable);
                FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask);
                if (returnFt == null) {
                    connectionFutureTask = newTask;
                    newTask.run();
                }
                return connectionFutureTask.get();
            }
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}
複製代碼

FutureTask任務執行完回調

FutureTask有一個方法 void done()會在每一個線程執行完成return結果時回調。 假設如今須要實現每一個線程完成任務執行後主動執行後續任務。

package com.niuh.future;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/** * FutureTask#done() */
@Slf4j
public class FutureTaskDemo1 {

    public static void main(String[] args) throws InterruptedException {
        // 月餅生產者
        final Callable<Integer> productor = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("月餅製做中。。。。");
                Thread.sleep(5000);
                return (Integer) new Random().nextInt(1000);
            }
        };

        // 月餅消費者
        Runnable customer = new Runnable() {
            @Override
            public void run() {
                ExecutorService es = Executors.newCachedThreadPool();
                log.info("老闆給我來一個月餅");
                for (int i = 0; i < 3; i++) {
                    FutureTask<Integer> futureTask = new FutureTask<Integer>(productor) {
                        @Override
                        protected void done() {
                            super.done();
                            try {
                                log.info(String.format(" 編號[%s]月餅已打包好", get()));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    es.submit(futureTask);
                }
            }
        };
        new Thread(customer).start();
    }
}
複製代碼

執行結果:

11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老闆給我來一個月餅
11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月餅製做中。。。。
11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月餅製做中。。。。
11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月餅製做中。。。。
11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 -  編號[804]月餅已打包好
11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 -  編號[88]月餅已打包好
11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 -  編號[166]月餅已打包好
複製代碼

參考

PS:以上代碼提交在 Githubgithub.com/Niuh-Study/…

PS:這裏有一個技術交流羣(扣扣羣:1158819530),方便你們一塊兒交流,持續學習,共同進步,有須要的能夠加一下。

文章持續更新,能夠公衆號搜一搜「 一角錢技術 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經收錄,歡迎 Star。

相關文章
相關標籤/搜索