java.util.concurrent.FutureTask 源碼

線程池相關

源碼:

package java.util.concurrent;

import java.util.concurrent.locks.LockSupport;

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;


    private Callable<V> callable;

    private Object outcome;

    private volatile Thread runner;

    private volatile WaitNode waiters;


    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }


    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }


    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    public void run() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {

            runner = null;

            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }


    protected void done() {
    }


    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }


    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
            finishCompletion();
        }
    }


    protected boolean runAndReset() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call();
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {

            runner = null;

            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
    }


    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }


    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (; ; ) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            } else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            } else
                LockSupport.park(this);
        }
    }


    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (; ; ) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }

    // Unsafe方法
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}

類 FutureTask<V>

    類型參數:html

    V - 此 FutureTask 的 get 方法所返回的結果類型。java

    全部已實現的接口:node

    RunnableFuture<V>, RunnableFuture<V>api

    可取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。異步

    僅在計算完成時才能獲取結果;若是計算還沒有完成,則阻塞 get 方法。一旦計算完成,就不能再從新開始或取消計算。this

    可以使用 FutureTask 包裝 Callable 或 Runnable 對象。由於 FutureTask 實現了 Runnable,因此可將 FutureTask 提交給 Executor 執行。spa

    除了做爲一個獨立的類外,此類還提供了 protected 功能,這在建立自定義任務類時可能頗有用。.net

 

構造方法摘要線程

 

FutureTask(Callable<V> callable) 
          建立一個 FutureTask,一旦運行就執行給定的 Callable。
FutureTask(Runnable runnable, V result) 
          建立一個 FutureTask,一旦運行就執行給定的 Runnable,並安排成功完成時 get 返回給定的結果 。

 方法摘要設計

 boolean cancel(boolean mayInterruptIfRunning) 
          試圖取消對此任務的執行。
protected  void done() 
          當此任務轉換到狀態 isDone(無論是正常地仍是經過取消)時,調用受保護的方法。
 V get() 
          若有必要,等待計算完成,而後獲取其結果。
 V get(long timeout, TimeUnit unit) 
          若有必要,最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。
 boolean isCancelled() 
          若是在任務正常完成前將其取消,則返回 true。
 boolean isDone() 
          若是任務已完成,則返回 true。
 void run() 
          除非已將此 Future 取消,不然將其設置爲其計算的結果。
protected  boolean runAndReset() 
          執行計算而不設置其結果,而後將此 Future 重置爲初始狀態,若是計算遇到異常或已取消,則該操做失敗。
protected  void set(V v) 
          除非已經設置了此 Future 或已將其取消,不然將其結果設置爲給定的值。
protected  void setException(Throwable t) 
          除非已經設置了此 Future 或已將其取消,不然它將報告一個 ExecutionException,並將給定的 throwable 做爲其緣由。

 從類 java.lang.Object 繼承的方法

cloneequalsfinalizegetClasshashCodenotifynotifyAlltoStringwaitwaitwait

 

FutureTask

public FutureTask(Callable<V> callable)

    建立一個 FutureTask,一旦運行就執行給定的 Callable。

    參數:

    callable - 可調用的任務。

    拋出:

    NullPointerException - 若是 callable 爲 null。

 

FutureTask

public FutureTask(Runnable runnable,
                  V result)

    建立一個 FutureTask,一旦運行就執行給定的 Runnable,並安排成功完成時 get 返回給定的結果 。

    參數:

    runnable - 可運行的任務。

    result - 成功完成時要返回的結果。若是不須要特定的結果,則考慮使用下列形式的構造: Future<?> f = new FutureTask<Object>(runnable, null)

    拋出:

    NullPointerException - 若是 runnable 爲 null。

 

 

isCancelled

public boolean isCancelled()

    從接口 Future 複製的描述

        若是在任務正常完成前將其取消,則返回 true。

    指定者:

        接口 Future<V> 中的 isCancelled

    返回:

        若是任務完成前將其取消,則返回 true

 

 

isDone

public boolean isDone()

    從接口 Future 複製的描述

        若是任務已完成,則返回 true。 可能因爲正常終止、異常或取消而完成,在全部這些狀況中,此方法都將返回 true。

    指定者:

        接口 Future<V> 中的 isDone

    返回:

        若是任務已完成,則返回 true

 

 

cancel

public boolean cancel(boolean mayInterruptIfRunning)

    從接口 Future 複製的描述

        試圖取消對此任務的執行。若是任務已完成、或已取消,或者因爲某些其餘緣由而沒法取消,則此嘗試將失敗。當調用 cancel 時,若是調用成功,而此任務還沒有啓動,則此任務將永不運行。若是任務已經啓動,則 mayInterruptIfRunning 參數肯定是否應該以試圖中止任務的方式來中斷執行此任務的線程。

    此方法返回後,對 Future.isDone() 的後續調用將始終返回 true。若是此方法返回 true,則對 Future.isCancelled() 的後續調用將始終返回 true。

    指定者:

        接口 Future<V> 中的 cancel

    參數:

    mayInterruptIfRunning - 若是應該中斷執行此任務的線程,則爲 true;不然容許正在運行的任務運行完成

    返回:

        若是沒法取消任務,則返回 false,這一般是因爲它已經正常完成;不然返回 true

 

get

public V get()
      throws InterruptedException,
             ExecutionException

    從接口 Future 複製的描述

        若有必要,等待計算完成,而後獲取其結果。

    指定者:

        接口 Future<V> 中的 get

    返回:

        計算的結果

    拋出:

    CancellationException - 若是計算被取消

    InterruptedException - 若是當前的線程在等待時被中斷

    ExecutionException - 若是計算拋出異常

 

get

public V get(long timeout,
             TimeUnit unit)
      throws InterruptedException,
             ExecutionException,
             TimeoutException

    從接口 Future 複製的描述

        若有必要,最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。

    指定者:

        接口 Future<V> 中的 get

    參數:

    timeout - 等待的最大時間

    unit - timeout 參數的時間單位

    返回:

        計算的結果

    拋出:

    CancellationException - 若是計算被取消

    InterruptedException - 若是當前的線程在等待時被中斷

    ExecutionException - 若是計算拋出異常

    TimeoutException - 若是等待超時

 

 

done

protected void done()

    當此任務轉換到狀態 isDone(無論是正常地仍是經過取消)時,調用受保護的方法。默認實現不執行任何操做。子類能夠重寫此方法,以調用完成回調或執行簿記。注意,能夠查詢此方法的實現內的狀態,從而肯定是否已取消了此任務。

 

 

set

protected void set(V v)

    除非已經設置了此 Future 或已將其取消,不然將其結果設置爲給定的值。在計算成功完成時經過 run 方法內部調用此方法。

    參數:

    v - 值

 

setException

protected void setException(Throwable t)

    除非已經設置了此 Future 或已將其取消,不然它將報告一個 ExecutionException,並將給定的 throwable 做爲其緣由。在計算失敗時經過 run 方法內部調用此方法。

    參數:

    t - 失敗的緣由

 

 

run

public void run()

    除非已將此 Future 取消,不然將其設置爲其計算的結果。

    指定者:

        接口 Runnable 中的 run

    指定者:

        接口 RunnableFuture<V> 中的 run

    另請參見:

    Thread.run()

 

runAndReset

protected boolean runAndReset()

    執行計算而不設置其結果,而後將此 Future 重置爲初始狀態,若是計算遇到異常或已取消,則該操做失敗。本操做被設計用於那些本質上要執行屢次的任務。

    返回:

        若是成功運行並重置,則返回 true。

相關文章
相關標籤/搜索