提供Executor的工廠類
java
忽略了自定義的ThreadFactory、callable和unconfigurable相關的方法
newFixedxxx:在任意時刻,最多有nThreads個線程在處理task;若是全部線程都在運行時來了新的任務,它會被扔入隊列;若是有線程在執行期間因某種緣由終止了運行,若是須要執行後續任務,新的線程將取代它多線程
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newCachedxxx:新任務到來若是線程池中有空閒的線程就複用,不然新建一個線程。若是一個線程超過60秒沒有使用,它就會被關閉移除線程池併發
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newSingleThreadExecutor:僅使用一個線程來處理任務,若是這線程掛了,會產生一個新的線程來代替它。每個任務被保證按照順序執行,並且一次只執行一個框架
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
使用newFixedxxx方法也能實現相似的做用,可是ThreadPoolExecutor會提供修改線程數的方法,FinalizableDelegatedExecutorService則沒有修改的途徑,它在DelegatedExecutorService的基礎
上僅提供了執行finalize時候去關閉線程,而DelegatedExecutorService僅暴漏ExecutorService自身的方法
newScheduledThreadPool:提供一個線程池來延遲或者按期執行任務socket
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
newSingleThreadScheduledExecutor:提供單個線程來延遲或者按期執行任務,若是執行的線程掛了,會生成新的。性能
return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
一樣,它保證返回的Executor自身的線程數不可修改
從上述的實現能夠看出,核心在於三個部分網站
對應的,相應也有不一樣的隊列去實現不一樣的場景this
它僅僅是包裝了ExecutorService的方法,交由傳入的ExecutorService來執行,所謂的UnConfigurable實際也就是它沒有暴漏配置各類參數調整的方法spa
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
提供一系列的schedule方法,使得任務能夠延遲或者週期性的執行,對應schedule方法會返回ScheduledFuture以供確認是否執行以及是否要取消。它的實現ScheduledThreadPoolExecutor也支持當即執行由submit提交的任務線程
僅支持相對延遲時間,好比距離如今5分鐘後執行。相似Timer也能夠管理延遲任務和週期任務,可是存在一些缺陷:
- 全部的定時任務只有一個線程,若是某個任務執行時間長,將影響其它TimerTask的精確性。
ScheduledExecutorService的多線程機制可彌補
- TimerTask拋出未檢查的異常,將終止線程執行,此時會錯誤的認爲任務都取消了。
1:可使用try-catch-finally對相應執行快處理;2:經過execute執行的方法能夠設置UncaughtExceptionHandler來接收未捕獲的異常,並做出處理;3:經過submit執行的,將被封裝層ExecutionException從新拋出
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize、maximumPoolSize:ThreadPoolExecutor會根據這兩自動調整線程池的大小,當一個新任務經過execute提交的時候:
若是當前運行的線程數小於corePoolSize就新建線程;
若是當前線程數在corePoolSize與maximumPoolSize之間,則只有在隊列滿的時候纔會建立新的線程;
若是已經達到最大線程數,而且隊列都滿了,在這種飽和狀態下就會執行拒絕策略
默認狀況下,只有新任務到達的時候纔會啓動線程,可經過prestartCoreThread方法實現事先啓動
- corePoolSize:默認線程池所須要維護的最小的worker的數量,就算是worker過時了也會保留。若是想要不保留,則須要設置allowCoreThreadTimeOut,此時最小的就是0
- maximumPoolSize:線程池最大的線程數。java限制最多爲 2^29-1,大約5億個
workQueue:任何BlockingQueue均可以使用,基本上有三種
Executors.defaultThreadFactory
RejectedExecutionHandler:經過execute添加的任務,若是Executor已經關閉或者已經飽和了(線程數達到了maximumPoolSize,而且隊列滿了),就會執行,java提供了4種策略:
ThreadPoolExecutor可自定義beforeExecutor、afterExecutor能夠用來添加日誌統計、計時、件事或統計信息收集功能,不管run是正常返回仍是拋出異常,afterExecutor都會被執行。若是beforeExecutor拋出RuntimeException,任務和afterExecutor都不會被執行。terminated在全部任務都已經完成,而且全部工做者線程關閉後會調用,此時也能夠用來執行發送通知、記錄日誌等等。
計算密集型,一般在擁有$N_{cpu}$個處理器的系統上,線程池大小設置爲$N_{cpu}+1$可以實現最優的利用率;
$N_{cpu}$ cpu的個數
$$N_{threads}=N_{cpu}*U_{cpu}*(1+W/C)$$
將一個網站的業務抽象成以下幾塊
理論上,服務端經過實現約定的接口就能夠實現接收請求和處理接二連三的請求過來
ServerSocket socket = new ServerSocket(80); while(true){ Socket conn = socket.accept(); handleRequest(conn) }
缺點:每次只能處理一個請求,新請求到來時,必須等到正在處理的請求處理完成,才能接收新的請求
爲每一個請求建立新的線程提供服務
ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } new Thread(task).start(); }
缺點:
使用java自帶的Executor框架。
private static final Executor exec = Executors.newFixedThreadPool(100); ... ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } exec.execute(task); } ...
線程池策略經過實現預估好的線程需求,限制併發任務的數量,重用現有的線程,解決每次建立線程的資源耗盡、競爭過於激烈和頻繁建立的問題,也囊括了線程的優點,解耦了任務提交和任務執行。
renderText(source); List<ImageData> imageData = new ArrayList<ImageData>(); for(ImageInfo info:scaForImageInfo(source)){ imageData.add(info.downloadImage()); } for(ImageData data:imageData){ renderImage(data); }
缺點:圖像的下載大部分時間在等待I/O操做執行完成,這期間CPU幾乎不作任何工做,使得用戶看到最終頁面以前要等待過長的時間
渲染過程能夠分紅兩個部分,1是渲染文本,1是下載圖像
private static final ExecutorService exec = Executors.newFixedThreadPool(100); ... final List<ImageInfo> infos=scaForImageInfo(source); Callable<List<ImageData>> task=new Callable<List<ImageData>>(){ public List<ImageData> call(){ List<ImageData> r = new ArrayList<ImageData>(); for(ImageInfo info:infos){ r.add(info.downloadImage()); } return r; } }; Future<List<ImageData>> future = exec.submit(task); renderText(source); try{ List<ImageData> imageData = future.get(); for(ImageData data:imageData){ renderImage(data); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); future.cancel(true); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); }
使用Callable來返回下載的圖片結果,使用future來得到下載的圖片,這樣將減小用戶所須要的等待時間。
缺點:圖片的下載很明顯時間要比文本要慢,這樣的並行化極可能速度可能只提高了1%
使用CompletionService。
private static final ExecutorService exec; ... final List<ImageInfo> infos=scaForImageInfo(source); CompletionService<ImageData> cService = new ExecutorCompletionService<ImageData>(exec); for(final ImageInfo info:infos){ cService.submit(new Callable<ImageData>(){ public ImageData call(){ return info.downloadImage(); } }); } renderText(source); try{ for(int i=0,n=info.size();t<n;t++){ Future<ImageData> f = cService.take(); ImageData imageData=f.get(); renderImage(imageData) } }catch(InterruptedException e){ Thread.currentThread().interrupt(); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); }
核心思路爲爲每一幅圖像下載都建立一個獨立的任務,並在線程池中執行他們,從而將串行的下載過程轉換爲並行的過程
廣告展現若是在必定的時間之內沒有獲取,能夠再也不展現,並取消超時的任務。
ExecutorService exe = Executors.newFixedThreadPool(3); List<MyTask> myTasks = new ArrayList<>(); for (int i=0;i<3;i++){ myTasks.add(new MyTask(3-i)); } try { List<Future<String>> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS); for (int i=0;i<futures.size();i++){ try { String s = futures.get(i).get(); System.out.println("task execut "+myTasks.get(i).getSleepSeconds()+" s"); } catch (ExecutionException e) { System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute "); }catch (CancellationException e){ System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ,because "+e); } } } catch (InterruptedException e) { e.printStackTrace(); } exe.shutdown();
invokeAll方法對於沒有完成的任務會被取消,經過CancellationException能夠捕獲,invokeAll返回的序列順序和傳入的task保持一致。結果以下:
task sleep 3 not execute ,because java.util.concurrent.CancellationException task sleep 2 not execute ,because java.util.concurrent.CancellationException task execut 1 s