Executor框架是指java5中引入的一系列併發庫中與executor相關的功能類,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(圖片引用自http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html) html
本篇博文分析Executor中幾個比較重要的接口和類。 java
Executor 閉包
1 public interface Executor { 2 void execute(Runnable command); 3 }
Executor接口是Executor框架中最基礎的部分,定義了一個用於執行Runnable的execute方法。它沒有直接的實現類,有一個重要的子接口ExecutorService。 併發
ExecutorService app
1 //繼承自Executor接口 2 public interface ExecutorService extends Executor { 3 /** 4 * 關閉方法,調用後執行以前提交的任務,再也不接受新的任務 5 */ 6 void shutdown(); 7 /** 8 * 從語義上能夠看出是當即中止的意思,將暫停全部等待處理的任務並返回這些任務的列表 9 */ 10 List<Runnable> shutdownNow(); 11 /** 12 * 判斷執行器是否已經關閉 13 */ 14 boolean isShutdown(); 15 /** 16 * 關閉後全部任務是否都已完成 17 */ 18 boolean isTerminated(); 19 /** 20 * 中斷 21 */ 22 boolean awaitTermination(long timeout, TimeUnit unit) 23 throws InterruptedException; 24 /** 25 * 提交一個Callable任務 26 */ 27 <T> Future<T> submit(Callable<T> task); 28 /** 29 * 提交一個Runable任務,result要返回的結果 30 */ 31 <T> Future<T> submit(Runnable task, T result); 32 /** 33 * 提交一個任務 34 */ 35 Future<?> submit(Runnable task); 36 /** 37 * 執行全部給定的任務,當全部任務完成,返回保持任務狀態和結果的Future列表 38 */ 39 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 40 throws InterruptedException; 41 /** 42 * 執行給定的任務,當全部任務完成或超時期滿時(不管哪一個首先發生),返回保持任務狀態和結果的 Future 列表。 43 */ 44 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 45 long timeout, TimeUnit unit) 46 throws InterruptedException; 47 /** 48 * 執行給定的任務,若是某個任務已成功完成(也就是未拋出異常),則返回其結果。 49 */ 50 <T> T invokeAny(Collection<? extends Callable<T>> tasks) 51 throws InterruptedException, ExecutionException; 52 /** 53 * 執行給定的任務,若是在給定的超時期滿前某個任務已成功完成(也就是未拋出異常),則返回其結果。 54 */ 55 <T> T invokeAny(Collection<? extends Callable<T>> tasks, 56 long timeout, TimeUnit unit) 57 throws InterruptedException, ExecutionException, TimeoutException; 58 }
ExecutorService接口繼承自Executor接口,定義了終止、提交任務、跟蹤任務返回結果等方法。 框架
ExecutorService涉及到Runnable、Callable、Future接口,這些接口的具體內容以下。 工具
1 // 實現Runnable接口的類將被Thread執行,表示一個基本的任務 2 public interface Runnable { 3 // run方法就是它全部的內容,就是實際執行的任務 4 public abstract void run(); 5 } 6 // Callable一樣是任務,與Runnable接口的區別在於它接收泛型,同時它執行任務後帶有返回內容 7 public interface Callable<V> { 8 // 相對於run方法的帶有返回值的call方法 9 V call() throws Exception; 10 }
ExecutorService有一個子接口ScheduledExecutorService和一個抽象實現類AbstractExecutorService。 this
ScheduledExecutorService spa
1 // 能夠安排指定時間或週期性的執行任務的ExecutorService 2 public interface ScheduledExecutorService extends ExecutorService { 3 /** 4 * 在指定延遲後執行一個任務,只執行一次 5 */ 6 public ScheduledFuture<?> schedule(Runnable command, 7 long delay, TimeUnit unit); 8 /** 9 * 與上面的方法相同,只是接受的是Callable任務 10 */ 11 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 12 long delay, TimeUnit unit); 13 /** 14 * 建立並執行一個週期性的任務,在initialDelay延遲後每間隔period個單位執行一次,時間單位都是unit 15 * 每次執行任務的時間點是initialDelay, initialDelay+period, initialDelay + 2 * period... 16 */ 17 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 18 long initialDelay, 19 long period, 20 TimeUnit unit); 21 /** 22 * 建立並執行一個週期性的任務,在initialDelay延遲後開始執行,在執行結束後再延遲delay個單位開始執行下一次任務,時間單位都是unit 23 * 每次執行任務的時間點是initialDelay, initialDelay+(任務運行時間+delay), initialDelay + 2 * (任務運行時間+delay)... 24 */ 25 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 26 long initialDelay, 27 long delay, 28 TimeUnit unit); 29 }
ScheduledExecutorService定義了四個方法,已經在上面給出基本的解釋。ScheduledExecutorService有兩個實現類,分別是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,將在後面介紹。還須要解釋的是ScheduledFuture。 線程
ScheduledFuture繼承自Future和Delayed接口,自身沒有添加方法。Delayed接口定義了一個獲取剩餘延遲的方法。
AbstractExecutorService
1 // 提供ExecutorService的默認實現 2 public abstract class AbstractExecutorService implements ExecutorService { 3 /* 4 * 爲指定的Runnable和value構造一個FutureTask,value表示默認被返回的Future 5 */ 6 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 7 return new FutureTask<T>(runnable, value); 8 } 9 /* 10 * 爲指定的Callable建立一個FutureTask 11 */ 12 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 13 return new FutureTask<T>(callable); 14 } 15 /* 16 * 提交Runnable任務 17 */ 18 public Future<?> submit(Runnable task) { 19 if (task == null) throw new NullPointerException(); 20 // 經過newTaskFor方法構造RunnableFuture,默認的返回值是null 21 RunnableFuture<Object> ftask = newTaskFor(task, null); 22 // 調用具體實現的execute方法 23 execute(ftask); 24 return ftask; 25 } 26 /* 27 * 提交Runnable任務 28 */ 29 public <T> Future<T> submit(Runnable task, T result) { 30 if (task == null) throw new NullPointerException(); 31 // 經過newTaskFor方法構造RunnableFuture,默認的返回值是result 32 RunnableFuture<T> ftask = newTaskFor(task, result); 33 execute(ftask); 34 return ftask; 35 } 36 /* 37 * 提交Callable任務 38 */ 39 public <T> Future<T> submit(Callable<T> task) { 40 if (task == null) throw new NullPointerException(); 41 RunnableFuture<T> ftask = newTaskFor(task); 42 execute(ftask); 43 return ftask; 44 } 45 46 /* 47 * doInvokeAny的具體實現(核心內容),其它幾個方法都是重載方法,都對這個方法進行調用 48 * tasks 是被執行的任務集,timed標誌是否認時的,nanos表示定時的狀況下執行任務的限制時間 49 */ 50 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 51 boolean timed, long nanos) 52 throws InterruptedException, ExecutionException, TimeoutException { 53 // tasks空判斷 54 if (tasks == null) 55 throw new NullPointerException(); 56 // 任務數量 57 int ntasks = tasks.size(); 58 if (ntasks == 0) 59 throw new IllegalArgumentException(); 60 // 建立對應數量的Future返回集 61 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 62 ExecutorCompletionService<T> ecs = 63 new ExecutorCompletionService<T>(this); 64 try { 65 // 執行異常 66 ExecutionException ee = null; 67 // System.nanoTime()根據系統計時器當回當前的納秒值 68 long lastTime = (timed)? System.nanoTime() : 0; 69 // 獲取任務集的遍歷器 70 Iterator<? extends Callable<T>> it = tasks.iterator(); 71 72 // 向執行器ExecutorCompletionService提交一個任務,並將結果加入futures中 73 futures.add(ecs.submit(it.next 74 // 修改任務計數器 75 --ntasks; 76 // 活躍任務計數器 77 int active = 1; 78 for (;;) { 79 // 獲取並移除表明已完成任務的Future,若是不存在,返回null 80 Future<T> f = ecs.poll(); 81 if (f == null) { 82 // 沒有任務完成,且任務集中還有未提交的任務 83 if (ntasks > 0) { 84 // 剩餘任務計數器減1 85 --ntasks; 86 // 提交任務並添加結果 87 futures.add(ecs.submit(it.next())); 88 // 活躍任務計數器加1 89 ++active; 90 } 91 // 沒有剩餘任務,且沒有活躍任務(全部任務可能都會取消),跳過這一次循環 92 else if (active == 0) 93 break; 94 else if (timed) { 95 // 獲取並移除表明已完成任務的Future,若是不存在,會等待nanos指定的納秒數 96 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 97 if (f == null) 98 throw new TimeoutException(); 99 // 計算剩餘可用時間 100 long now = System.nanoTime(); 101 nanos -= now - lastTime; 102 lastTime = now; 103 } 104 else 105 // 獲取並移除表示下一個已完成任務的將來,等待,若是目前不存在。 106 // 執行到這一步說明已經沒有任務任務能夠提交,只能等待某一個任務的返回 107 f = ecs.take(); 108 } 109 // f不爲空說明有一個任務完成了 110 if (f != null) { 111 // 已完成一個任務,因此活躍任務計數減1 112 --active; 113 try { 114 // 返回該任務的結果 115 return f.get(); 116 } catch (InterruptedException ie) { 117 throw ie; 118 } catch (ExecutionException eex) { 119 ee = eex; 120 } catch (RuntimeException rex) { 121 ee = new ExecutionException(rex); 122 } 123 } 124 } 125 // 若是沒有成功返回結果則拋出異常 126 if (ee == null) 127 ee = new ExecutionException(); 128 throw ee; 129 130 } finally { 131 // 不管執行中發生異常仍是順利結束,都將取消剩餘未執行的任務 132 for (Future<T> f : futures) 133 f.cancel(true); 134 } 135 } 136 137 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 138 throws InterruptedException, ExecutionException { 139 try { 140 // 非定時任務的doInvokeAny調用 141 return doInvokeAny(tasks, false, 0); 142 } catch (TimeoutException cannotHappen) { 143 assert false; 144 return null; 145 } 146 } 147 // 定時任務的invokeAny調用,timeout表示超時時間,unit表示時間單位 148 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 149 long timeout, TimeUnit unit) 150 throws InterruptedException, ExecutionException, TimeoutException { 151 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 152 } 153 // 無超時設置的invokeAll方法 154 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 155 throws InterruptedException { 156 // 空任務判斷 157 if (tasks == null) 158 throw new NullPointerException(); 159 // 建立大小爲任務數量的結果集 160 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 161 // 是否完成全部任務的標記 162 boolean done = false; 163 try { 164 // 遍歷並執行任務 165 for (Callable<T> t : tasks) { 166 RunnableFuture<T> f = newTaskFor(t); 167 futures.add(f); 168 execute(f); 169 } 170 // 遍歷結果集 171 for (Future<T> f : futures) { 172 // 若是某個任務沒完成,經過f調用get()方法 173 if (!f.isDone()) { 174 try { 175 // get方法等待計算完成,而後獲取結果(會等待)。因此調用get後任務就會完成計算,不然會等待 176 f.get(); 177 } catch (CancellationException ignore) { 178 } catch (ExecutionException ignore) { 179 } 180 } 181 } 182 // 標誌全部任務執行完成 183 done = true; 184 // 返回結果 185 return futures; 186 } finally { 187 // 假如沒有完成全部任務(多是發生異常等狀況),將任務取消 188 if (!done) 189 for (Future<T> f : futures) 190 f.cancel(true); 191 } 192 } 193 // 超時設置的invokeAll方法 194 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 195 long timeout, TimeUnit unit) 196 throws InterruptedException { 197 // 須要執行的任務集爲空或時間單位爲空,拋出異常 198 if (tasks == null || unit == null) 199 throw new NullPointerException(); 200 // 將超時時間轉爲納秒單位 201 long nanos = unit.toNanos(timeout); 202 // 建立任務結果集 203 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 204 // 是否所有完成的標誌 205 boolean done = false; 206 try { 207 // 遍歷tasks,將任務轉爲RunnableFuture 208 for (Callable<T> t : tasks) 209 futures.add(newTaskFor(t)); 210 // 記錄當前時間(單位是納秒) 211 long lastTime = System.nanoTime(); 212 // 獲取迭代器 213 Iterator<Future<T>> it = futures.iterator(); 214 // 遍歷 215 while (it.hasNext()) { 216 // 執行任務 217 execute((Runnable)(it.next())); 218 // 記錄當前時間 219 long now = System.nanoTime(); 220 // 計算剩餘可用時間 221 nanos -= now - lastTime; 222 // 更新上一次執行時間 223 lastTime = now; 224 // 超時,返回保存任務狀態的結果集 225 if (nanos <= 0) 226 return futures; 227 } 228 229 for (Future<T> f : futures) { 230 // 若是有任務沒完成 231 if (!f.isDone()) { 232 // 時間已經用完,返回保存任務狀態的結果集 233 if (nanos <= 0) 234 return futures; 235 try { 236 // 獲取計算結果,最多等待給定的時間nanos,單位是納秒 237 f.get(nanos, TimeUnit.NANOSECONDS); 238 } catch (CancellationException ignore) { 239 } catch (ExecutionException ignore) { 240 } catch (TimeoutException toe) { 241 return futures; 242 } 243 // 計算可用時間 244 long now = System.nanoTime(); 245 nanos -= now - lastTime; 246 lastTime = now; 247 } 248 } 249 // 修改是否所有完成的標記 250 done = true; 251 // 返回結果集 252 return futures; 253 } finally { 254 // 假如沒有完成全部任務(多是時間已經用完、發生異常等狀況),將任務取消 255 if (!done) 256 for (Future<T> f : futures) 257 f.cancel(true); 258 } 259 } 260 }
AbstractExecutor實現了ExecutorService接口的部分方法。具體代碼的分析在上面已經給出。
AbstractExecutor有兩個子類:DelegatedExecutorService、ThreadPoolExecutor。將在後面介紹。
下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。
RunnableFuture繼承自Future和Runnable,只有一個run()方法(Runnable中已經有一個run方法了,爲何RunnableFuture還要從新寫一個run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable兩個接口的組合。
FutureTask實現了RunnableFuture接口,除了實現了Future和Runnable中的方法外,它還有本身的方法和一個內部類Sync。
ExecutorCompletionService實現了CompletionService接口,將結果從複雜的一部分物種解耦出來。這些內容後續會介紹,不過這裏先介紹框架中的其它內容,弄清總體框架。
下面看繼承自AbstractExecutorService的ThreadPoolExecutor。
ThreadPoolExecutor
能夠參考http://xtu-xiaoxin.iteye.com/blog/647744
從上面的框架結構圖中能夠能夠看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一個工具類,提供一些工廠和實用方法。
下面看ScheduledThreadPoolExecutor,它繼承自ThreadPoolExecutor並實現了ScheduledExecutorService接口。
ScheduledThreadPoolExecutor
在代碼中都加了註釋,我想大體能解釋清楚吧。
Executor涉及的類仍是比較多的,到此爲止剩下的還有Executors
Executors
Executors中所定義的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 類的工廠和實用方法。此類支持如下各類方法:
Executors提供的都是工具形式的方法,因此都是static的,而且這個類也沒有必要實例化,因此它的構造方法時private的。下面主要看一下幾個內部類。
RunnableAdapter
1 static final class RunnableAdapter<T> implements Callable<T> { 2 final Runnable task; 3 final T result; 4 RunnableAdapter(Runnable task, T result) { 5 this.task = task; 6 this.result = result; 7 } 8 public T call() { 9 task.run(); 10 return result; 11 } 12 }
適配器。以Callable的形式執行Runnable而且返回給定的result。
PrivilegedCallable
1 static final class PrivilegedCallable<T> implements Callable<T> { 2 private final AccessControlContext acc; 3 private final Callable<T> task; 4 private T result; 5 private Exception exception; 6 PrivilegedCallable(Callable<T> task) { 7 this.task = task; 8 this.acc = AccessController.getContext(); 9 } 10 11 public T call() throws Exception { 12 AccessController.doPrivileged(new PrivilegedAction<T>() { 13 public T run() { 14 try { 15 result = task.call(); 16 } catch (Exception ex) { 17 exception = ex; 18 } 19 return null; 20 } 21 }, acc); 22 if (exception != null) 23 throw exception; 24 else 25 return result; 26 } 27 }
在訪問控制下運行的Callable。涉及到Java.security包中的內容。
PrivilegedCallableUsingCurrentClassLoader類與上面的PrivilegedCallable相似,只是使用的是CurrentClassLoader。
DefaultThreadFactory
1 static class DefaultThreadFactory implements ThreadFactory { 2 static final AtomicInteger poolNumber = new AtomicInteger(1); 3 final ThreadGroup group; 4 final AtomicInteger threadNumber = new AtomicInteger(1); 5 final String namePrefix; 6 7 DefaultThreadFactory() { 8 SecurityManager s = System.getSecurityManager(); 9 group = (s != null)? s.getThreadGroup() : 10 Thread.currentThread().getThreadGroup(); 11 namePrefix = "pool-" + 12 poolNumber.getAndIncrement() + 13 "-thread-"; 14 } 15 16 public Thread newThread(Runnable r) { 17 // 調用Thread構造方法建立線程 18 Thread t = new Thread(group, r, 19 namePrefix + threadNumber.getAndIncrement(), 20 0); 21 // 取消守護線程設置 22 if (t.isDaemon()) 23 t.setDaemon(false); 24 // 設置默認優先級 25 if (t.getPriority() != Thread.NORM_PRIORITY) 26 t.setPriority(Thread.NORM_PRIORITY); 27 return t; 28 } 29 }
DefaultThreadFactory 是默認的線程工程,提供建立線程的方法。
PrivilegedThreadFactory繼承自DefaultThreadFactory,區別在於線程執行的run方法指定了classLoader並受到權限的控制。
DelegatedExecutorService繼承自AbstractExecutorService,是一個包裝類,暴露ExecutorService的方法。
DelegatedScheduledExecutorService繼承自DelegatedExecutorService,實現了ScheduledExecutorService接口。它也是一個包裝類,公開ScheduledExecutorService方法。