Callable和Future

目的

咱們日常使用線程的方式bash

  1. 繼承Thread
  2. 實現Runnable 可是這兩種方式都有一個問題,那就是不能拿到運行的返回值,除非使用共享變量等複雜的方式來間接的實現。

因此,jdk5開始就提供了CallableFutureide

什麼是callable(可調用而且有返回值的一種類)

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
複製代碼

爲了解決Runnable不能作返回值的限制,因此有了Callable。
可是我以爲,Callable並非和Runnable或者Thread一個層面的,它僅僅表示一個能夠調用,而且會有返回值的一種類。
想實現有返回值的線程還必須依靠FutureTaskui

什麼是 FutureTask、Future

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V>


public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

複製代碼

從上面的代碼能夠看出來,
FutureTask實現了RunnableFuture ---》
RunnableFuture又是繼承了Runnable和Future ---》
因此FutureTask就是Runnable和Future接口的實現類。
因此,她既是咱們原先使用的線程又能夠對這個線程作取消,獲取返回值等操做this

具體是怎麼實現的

經常使用的使用方式:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主線程在執行任務");

        try {
            System.out.println("task運行結果" + result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("全部任務執行完畢");
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("子線程在進行計算");
        Thread.sleep(3000);
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}
複製代碼

上面這段代碼是咱們使用Future和Callable的通常使用方式,其中用到了spa

//根據阿里的開發手冊不推薦直接使用Executors.newCachedThreadPool()
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(Callable callable)
複製代碼

具體調用的代碼內容以下:線程

1. 將線程丟進線程池,獲取到線程的引用。

建立一個線程池執行器
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
    
public class ThreadPoolExecutor extends AbstractExecutorService
....


//調用submit將線程丟進線程池
public abstract class AbstractExecutorService implements ExecutorService {
...
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

   
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
...
    
}



public class FutureTask<V> implements RunnableFuture<V> {
。。。
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
。。。
}


public class Executors {
...
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
...

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
    
}
複製代碼

咱們能夠在AbstractExecutorService中提交Callable或者Runnable; 最終,會經過Executors.callable用RunnableAdapter將Runnable也轉化成Callablecode

在上面的代碼裏面能夠看到,在submit的時候,繼承

  1. 不管是Runnable仍是Callable最後都被封裝成了RunnableFuture(一個能夠獲取到返回值的線程)
  2. 調用execute -- > 將這個任務丟進線程池,用線程池來協調線程的運行。
  3. 將RunnableFuture作爲返回值返回出去---->咱們可使用這個引用獲取到線程的執行結果或者取消這個線程。
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
複製代碼

執行線程

要獲取線程的執行結果,就得講到FutureTask,仔細看上面的代碼,其實最終的不管是Callable仍是Runnable最終轉化的都是FutureTask接口

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}


public class FutureTask<V> implements RunnableFuture<V> {

/**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
}
複製代碼
屬性 意義
callable 本線程須要執行的任務,會在最後有一個返回值
outcome callable執行的返回值會被放在outcome裏面
runner 在調用FutureTask的run方法是,會將當前線程的放在這個runner中,會在取消線程時有做用
waiters 等待中的線程,在本線程執行完成以後,會喚醒下一個等待線程
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}


protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
    
複製代碼
  1. 若是當前state仍是NEW的話,現將runnerOffset指向當前線程
  2. 調用callable的call方法
  3. 若是調用成功,調用set方法,狀態設置爲COMPLETING -》將返回值設置進去,-》狀態設置爲NORMAL
  4. 若是調用失敗,將狀態設置成COMPLETING-》將異常放入返回值中-》將狀態設置成EXCEPTIONAL

獲取結果

FutureTask 中:
/**
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

 /**
 * Returns result or throws exception for completed task.
 *
 * @param s completed state value
 */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
複製代碼
  1. 若是當前狀態比COMPLETING小,也就是NEW,任務還在執行中。等待任務執行完成
  2. 若是執行成功,狀態爲NORMAL,將outcome做爲返回值返回
  3. 若是狀態爲取消,拋出CancellationException
  4. 以上都不是,將outcome做爲異常拋出
相關文章
相關標籤/搜索