java中的Executors簡介與多線程在網站上逐步優化的運用案例

提供Executor的工廠類
圖片描述java

忽略了自定義的ThreadFactory、callable和unconfigurable相關的方法
  • newFixedxxx:在任意時刻,最多有nThreads個線程在處理task;若是全部線程都在運行時來了新的任務,它會被扔入隊列;若是有線程在執行期間因某種緣由終止了運行,若是須要執行後續任務,新的線程將取代它多線程

    return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
  • newCachedxxx:新任務到來若是線程池中有空閒的線程就複用,不然新建一個線程。若是一個線程超過60秒沒有使用,它就會被關閉移除線程池併發

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
  • newSingleThreadExecutor:僅使用一個線程來處理任務,若是這線程掛了,會產生一個新的線程來代替它。每個任務被保證按照順序執行,並且一次只執行一個框架

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    使用newFixedxxx方法也能實現相似的做用,可是ThreadPoolExecutor會提供修改線程數的方法,FinalizableDelegatedExecutorService則沒有修改的途徑,它在DelegatedExecutorService的基礎
    上僅提供了執行finalize時候去關閉線程,而DelegatedExecutorService僅暴漏ExecutorService自身的方法
  • newScheduledThreadPool:提供一個線程池來延遲或者按期執行任務socket

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue());
        }
  • newSingleThreadScheduledExecutor:提供單個線程來延遲或者按期執行任務,若是執行的線程掛了,會生成新的。性能

    return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
    一樣,它保證返回的Executor自身的線程數不可修改

從上述的實現能夠看出,核心在於三個部分網站

  • ThreadPoolExecutor:提供線程數相關的控制
  • DelegatedExecutorService:僅暴露ExecutorService自身的方法,保證線程數不變來實現語義場景
  • ScheduledExecutorService:提供延遲或者按期執行的功能

對應的,相應也有不一樣的隊列去實現不一樣的場景this

  • LinkedBlockingQueue:無界阻塞隊列
  • SynchronousQueue:沒有消費者消費時,新的任務就會被阻塞
  • DelayQueue:隊列中的任務過時以後才能夠執行,不然沒法查詢到隊列中的元素

DelegatedExecutorService

它僅僅是包裝了ExecutorService的方法,交由傳入的ExecutorService來執行,所謂的UnConfigurable實際也就是它沒有暴漏配置各類參數調整的方法spa

static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }

ScheduledExecutorService

提供一系列的schedule方法,使得任務能夠延遲或者週期性的執行,對應schedule方法會返回ScheduledFuture以供確認是否執行以及是否要取消。它的實現ScheduledThreadPoolExecutor也支持當即執行由submit提交的任務線程

僅支持相對延遲時間,好比距離如今5分鐘後執行。相似Timer也能夠管理延遲任務和週期任務,可是存在一些缺陷:

  • 全部的定時任務只有一個線程,若是某個任務執行時間長,將影響其它TimerTask的精確性。ScheduledExecutorService的多線程機制可彌補
  • TimerTask拋出未檢查的異常,將終止線程執行,此時會錯誤的認爲任務都取消了。1:可使用try-catch-finally對相應執行快處理;2:經過execute執行的方法能夠設置UncaughtExceptionHandler來接收未捕獲的異常,並做出處理;3:經過submit執行的,將被封裝層ExecutionException從新拋出

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize、maximumPoolSize:ThreadPoolExecutor會根據這兩自動調整線程池的大小,當一個新任務經過execute提交的時候:
    若是當前運行的線程數小於corePoolSize就新建線程;
    若是當前線程數在corePoolSize與maximumPoolSize之間,則只有在隊列滿的時候纔會建立新的線程;
    若是已經達到最大線程數,而且隊列都滿了,在這種飽和狀態下就會執行拒絕策略

    默認狀況下,只有新任務到達的時候纔會啓動線程,可經過prestartCoreThread方法實現事先啓動

    1. corePoolSize:默認線程池所須要維護的最小的worker的數量,就算是worker過時了也會保留。若是想要不保留,則須要設置allowCoreThreadTimeOut,此時最小的就是0
    2. maximumPoolSize:線程池最大的線程數。java限制最多爲 2^29-1,大約5億個
  • keepAliveTime、unit:若是當前線程池有超過corePoolSize的線程數,只要有線程空閒時間超過keepAliveTime的設定,就會被終止;unit則是它的時間單位
  • workQueue:任何BlockingQueue均可以使用,基本上有三種

    1. Direct handoffs,直接交付任務。好比 SynchronousQueue,若是沒有線程消費,提交任務會失敗,固然能夠新建一個線程來處理。它適合處理有依賴關係的任務,通常它的maximumPoolSizes會被設置成最大的
    2. Unbounded queues,無界隊列。好比LinkedBlockingQueue,這意味着若是有corePoolSize個線程在執行,那麼其餘的任務都只能等待。它適合於處理任務都是互相獨立的,
    3. Bounded queues,有界隊列。好比ArrayBlockingQueue,須要考慮隊列大小和最大線程數之間的關係,來達到更好的資源利用率和吞吐量
  • threadFactory:沒有指定的時候,使用Executors.defaultThreadFactory
  • RejectedExecutionHandler:經過execute添加的任務,若是Executor已經關閉或者已經飽和了(線程數達到了maximumPoolSize,而且隊列滿了),就會執行,java提供了4種策略:

    1. AbortPolicy,拒絕的時候拋出運行時異常RejectedExecutionException;
    2. CallerRunsPolicy,若是executor沒有關閉,那麼由調用execute的線程來執行它;
    3. DiscardPolicy,直接扔掉新的任務;
    4. DiscardOldestPolicy,若是executor沒有關閉,那麼扔掉隊列頭部的任務,再次嘗試;
ThreadPoolExecutor可自定義beforeExecutor、afterExecutor能夠用來添加日誌統計、計時、件事或統計信息收集功能,不管run是正常返回仍是拋出異常,afterExecutor都會被執行。若是beforeExecutor拋出RuntimeException,任務和afterExecutor都不會被執行。terminated在全部任務都已經完成,而且全部工做者線程關閉後會調用,此時也能夠用來執行發送通知、記錄日誌等等。

如何估算線程池的大小

  1. 計算密集型,一般在擁有$N_{cpu}$個處理器的系統上,線程池大小設置爲$N_{cpu}+1$可以實現最優的利用率;

    $N_{cpu}$ cpu的個數
  2. I/O密集型或者其它阻塞型的任務,定義 $N_{cpu}$爲CPU的個數,$U_{cpu}$爲CPU的利用率,$W/C$爲等待時間與計算時間的比率,此時線程池的最優大小爲

$$N_{threads}=N_{cpu}*U_{cpu}*(1+W/C)$$

場景說明

將一個網站的業務抽象成以下幾塊

  • 接收客戶端請求與處理請求
  • 頁面渲染返回的文本和圖片
  • 獲取頁面的廣告

接收請求與處理請求

理論模型

理論上,服務端經過實現約定的接口就能夠實現接收請求和處理接二連三的請求過來

ServerSocket socket = new ServerSocket(80);
while(true){
    Socket conn = socket.accept();
    handleRequest(conn)
}

缺點:每次只能處理一個請求,新請求到來時,必須等到正在處理的請求處理完成,才能接收新的請求

顯示的建立多線程

爲每一個請求建立新的線程提供服務

ServerSocket socket = new ServerSocket(80);
while(true){
    final Socket conn = socket.accept();
    Runnable task = new Runnable(){
        public void run(){
            handleRequest(conn);        
        }
    }
    new Thread(task).start();
}

缺點:

  • 線程的建立和銷燬都有必定的開銷,延遲對請求的處理;
  • 建立後的線程多於可用處理器的數量,形成線程閒置,這會給垃圾回收帶來壓力
  • 存活的大量線程競爭CPU資源會產生不少性能開銷
  • 系統上對可建立的線程數存在限制

使用線程池

使用java自帶的Executor框架。

private static final Executor exec = Executors.newFixedThreadPool(100);
...
ServerSocket socket = new ServerSocket(80);
while(true){
    final Socket conn = socket.accept();
    Runnable task = new Runnable(){
        public void run(){
            handleRequest(conn);        
        }
    }
    exec.execute(task);
}
...

線程池策略經過實現預估好的線程需求,限制併發任務的數量,重用現有的線程,解決每次建立線程的資源耗盡、競爭過於激烈和頻繁建立的問題,也囊括了線程的優點,解耦了任務提交和任務執行。

頁面渲染返回的文本和圖片

串行渲染

renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for(ImageInfo info:scaForImageInfo(source)){
    imageData.add(info.downloadImage());
}
for(ImageData data:imageData){
    renderImage(data);
}

缺點:圖像的下載大部分時間在等待I/O操做執行完成,這期間CPU幾乎不作任何工做,使得用戶看到最終頁面以前要等待過長的時間

並行化

渲染過程能夠分紅兩個部分,1是渲染文本,1是下載圖像

private static final ExecutorService exec = Executors.newFixedThreadPool(100);
...
final List<ImageInfo> infos=scaForImageInfo(source);
Callable<List<ImageData>> task=new Callable<List<ImageData>>(){
    public List<ImageData> call(){
        List<ImageData> r = new ArrayList<ImageData>();
        for(ImageInfo info:infos){
            r.add(info.downloadImage());
        }
        return r;
    }
};
Future<List<ImageData>> future = exec.submit(task);
renderText(source);
try{
    List<ImageData> imageData = future.get();
    for(ImageData data:imageData){
        renderImage(data);
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
    future.cancel(true);
}catche(ExecutionException e){
    throw launderThrowable(e.getCause());
}

使用Callable來返回下載的圖片結果,使用future來得到下載的圖片,這樣將減小用戶所須要的等待時間。
缺點:圖片的下載很明顯時間要比文本要慢,這樣的並行化極可能速度可能只提高了1%

並行性能提高

使用CompletionService。

private static final ExecutorService exec;
...
final List<ImageInfo> infos=scaForImageInfo(source);
CompletionService<ImageData> cService =  new ExecutorCompletionService<ImageData>(exec);
for(final ImageInfo info:infos){
    cService.submit(new Callable<ImageData>(){
        public ImageData call(){
            return info.downloadImage();
        }
    });
}
renderText(source);
try{
    for(int i=0,n=info.size();t<n;t++){
        Future<ImageData> f = cService.take();
        ImageData imageData=f.get();
        renderImage(imageData)
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
}catche(ExecutionException e){
    throw launderThrowable(e.getCause());
}

核心思路爲爲每一幅圖像下載都建立一個獨立的任務,並在線程池中執行他們,從而將串行的下載過程轉換爲並行的過程

獲取頁面的廣告

廣告展現若是在必定的時間之內沒有獲取,能夠再也不展現,並取消超時的任務。

ExecutorService exe = Executors.newFixedThreadPool(3);
        List<MyTask> myTasks = new ArrayList<>();
        for (int i=0;i<3;i++){
          myTasks.add(new MyTask(3-i));
        }

        try {

            List<Future<String>> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS);
            for (int i=0;i<futures.size();i++){
                try {
                    String s = futures.get(i).get();
                    System.out.println("task execut "+myTasks.get(i).getSleepSeconds()+" s");
                } catch (ExecutionException e) {
                    System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ");
                }catch (CancellationException e){
                    System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ,because "+e);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        exe.shutdown();

invokeAll方法對於沒有完成的任務會被取消,經過CancellationException能夠捕獲,invokeAll返回的序列順序和傳入的task保持一致。結果以下:

task sleep 3 not execute ,because java.util.concurrent.CancellationException
task sleep 2 not execute ,because java.util.concurrent.CancellationException
task execut 1 s
相關文章
相關標籤/搜索