線程池系列--ExecutorService接口:任務提交


我關注的任務提交方法爲:invokeAll, submit, executor
java

1 單任務提交:submit , executor函數

AbstractExecutorService 類的方法實現this

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //1 生成一個 RunnableFuture 對象,並調用 Executor接口中的void execute(Runnable command) 函數
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    //2 生成一個 RunnableFuture 對象,並調用 Executor接口中的void execute(Runnable command) 函數
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    //3 生成一個 RunnableFuture 對象,並調用 Executor接口中的void execute(Runnable command) 函數
    execute(ftask);
    return ftask;
}

  不論何種形式的submit函數,都是把task封裝成一個RunnableFuture對象,並調用execute函數。spa

//4 newTaskFor 函數,把Runnable,Callable封裝成統一的FutureTask對象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

  那麼FutureTask和RunnableFuture的關係是什麼?線程

  可知,FutureTask 是 RunnableFuture 的實現類。FutureTask對象既有任務,又會有任務執行結果。code

//5  RunnableFuture 接口繼承了Runnable, Future
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

public class FutureTask<V> implements RunnableFuture<V> {
    //6 構造函數,把Callable 封裝成 FutureTask
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    //7構造函數,把Runnable封裝成FutrueTask
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
}

  2 批量提交 invokeAll對象

//整體思路: 一一提交任務, 等待返回結果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    // 存放線程的執行結果
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            //不一樣的線程池,不一樣的實現方式
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            // 是否線程執行完畢, 若是沒有執行完畢,則調用get()
            if (!f.isDone()) {
                try {
                    f.get();//阻塞至執行完畢
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        // 只有全部的線程都執行完, 才返回
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}
相關文章
相關標籤/搜索