我關注的任務提交方法爲:invokeAll, submit, executor
java
1 單任務提交:submit , executor函數
AbstractExecutorService 類的方法實現this
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); //1 生成一個 RunnableFuture 對象,並調用 Executor接口中的void execute(Runnable command) 函數 execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); //2 生成一個 RunnableFuture 對象,並調用 Executor接口中的void execute(Runnable command) 函數 execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); //3 生成一個 RunnableFuture 對象,並調用 Executor接口中的void execute(Runnable command) 函數 execute(ftask); return ftask; }
不論何種形式的submit函數,都是把task封裝成一個RunnableFuture對象,並調用execute函數。spa
//4 newTaskFor 函數,把Runnable,Callable封裝成統一的FutureTask對象 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
那麼FutureTask和RunnableFuture的關係是什麼?線程
可知,FutureTask 是 RunnableFuture 的實現類。FutureTask對象既有任務,又會有任務執行結果。code
//5 RunnableFuture 接口繼承了Runnable, Future public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); } public class FutureTask<V> implements RunnableFuture<V> { //6 構造函數,把Callable 封裝成 FutureTask public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } //7構造函數,把Runnable封裝成FutrueTask public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
2 批量提交 invokeAll對象
//整體思路: 一一提交任務, 等待返回結果 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); // 存放線程的執行結果 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); //不一樣的線程池,不一樣的實現方式 execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); // 是否線程執行完畢, 若是沒有執行完畢,則調用get() if (!f.isDone()) { try { f.get();//阻塞至執行完畢 } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // 只有全部的線程都執行完, 才返回 return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }