java 線程池 理解

1.  前言

無限制建立線程的不足:html

1) 線程生命週期開銷高;java

2) 資源消耗大,尤爲是內存。若是可運行的線程數量多於可用處理器的數量,那麼有些線程將閒置。大量空閒的線程佔用許多內存,給垃圾回收器帶來壓力(頻繁 stop the world)。因此,若是已經擁有足夠多的線程使全部CPU保持忙碌狀態,那麼建立再多的線程反而會下降性能。web

3) 穩定性。可建立線程的數量存在必定限制。每一個都會維護兩個執行棧,一個用於java代碼,另外一個用於原生代碼。一般JVM在默認狀況下生成一個複合棧,大約爲0.5MB。若是無限制地建立線程,破壞了系統對線程的限制,就極可能拋出OutOfMemoryError異常,使得系統處於不穩定狀態。編程

2.   Executor框架

2.1.      簡單使用

public class ExecutorTest {

 

       private static final int NUMBERS = 100;

       private static final Executor EXECUTOR =

                     Executors.newFixedThreadPool(NUMBERS);

      

       public void processRequst() throws IOException {

             

              ServerSocket serverSocket = new ServerSocket(80);

              while (true) {

                     final Socket conn = serverSocket.accept();

                     Runnable task = new Runnable() {

                           

                            @Override

                            public void run() {

                                   System.out.println("-----------process request----------");

                            }

                     };

                     EXECUTOR.execute(task);

              }

       }

}
View Code

2.2.      線程池

線程池經過調用Executors的靜態工廠方法的四種建立方式:緩存

1) newFixedThreadPool。建立一個固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程池的最大數量,若是某個線程發生異常而結束那麼線程池會補充一個線程。服務器

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
View Code

2) newCachedThreadPool。建立一個可緩存的線程池。若是線程池的當前規模超過了處理需求時,將回收空閒的線程,而當需求增長時,則會添加新的線程,線程池的規模不存在任何限制。網絡

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
View Code

3) newSingleThreadExecutor。這是一個單線程的executor,它建立單個工做者線程來執行任務,若是這個線程異常結束,會建立另外一個線程替代。該方式能確保依照任務在隊列中的順序來串行執行(FIFO,LIFO,優先級)。多線程

/**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
View Code

4) newScheduledThreadPool。建立一個固定長度的線程池,而且以延遲或定時的方式執行任務併發

 /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

 /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
View Code

 

建立線程池的方法最終都是建立了一個ThreadPoolExecutors實例,該類的構造方法以下app

 /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
View Code

參數說明(摘抄自JDK1.6參考文檔)

1) 核心和最大池大小(corePoolSize和maximumPoolSize)

ThreadPoolExecutor 將根據 corePoolSiz和 maximumPoolSize設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。若是將 maximumPoolSize 設置爲基本的無界值(如 Integer.MAX_VALUE),則容許池適應任意數量的併發任務。在大多數狀況下,核心和最大池大小僅基於構造來設置,不過也可使用 setCorePoolSize(int)setMaximumPoolSize(int) 進行動態更改。

2) 保持活動時間(keepAliveTime)

若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止。這提供了當池處於非活動狀態時減小資源消耗的方法。若是池後來變爲活動,則能夠建立新的線程。也可使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。默認狀況下,保持活動策略只在有多於 corePoolSizeThreads 的線程時應用。可是隻要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可將此超時策略應用於核心線程。

TimeUnit爲超時時間單位。

3)阻塞隊列(BlockingQueue)

全部 BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:

  • 若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
  • 若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
  • 若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。

排隊有三種通用策略:

  1. 直接提交。工做隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。SynchronousQueue 內部沒有容量,可是因爲一個插入操做老是對應一個移除操做,反過來一樣須要知足。那麼一個元素就不會再SynchronousQueue 裏面長時間停留,一旦有了插入線程和移除線程,元素很快就從插入線程移交給移除線程。也就是說這更像是一種信道(管道),資源從一個方向快速傳遞到另外一方向。
  2. 無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙時,新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
  3. 有界隊列。當使用有限的maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。

4) 建立新線程(ThreadFactory)

使用 ThreadFactory 建立新線程。若是沒有另外說明,則在同一個 ThreadGroup 中一概使用 Executors.defaultThreadFactory() 建立線程,而且這些線程具備相同的 NORM_PRIORITY 優先級和非守護進程狀態。經過提供不一樣的 ThreadFactory,能夠改變線程的名稱、線程組、優先級、守護進程狀態,等等。若是從 newThread 返回 null 時 ThreadFactory 未能建立線程,則執行程序將繼續運行,但不能執行任何任務。

5) 被拒絕的任務(RejectedExecutionHandler)

當 Executor 已經關閉,而且 Executor 將有限邊界用於最大線程和工做隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種狀況下,execute 方法都將調用其 RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預約義的處理程序策略:

  1. 在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的 execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。
  3. ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
  4. ThreadPoolExecutor.DiscardOldestPolicy 中,若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)。

定義和使用其餘種類的 RejectedExecutionHandler 類也是可能的,但這樣作須要很是當心,尤爲是當策略僅用於特定容量或排隊策略時。

2.3.     Executor生命週期

咱們知道,JVM只有在全部線程所有終止後纔會退出。全部,若是咱們沒法正確地關閉Executor,JVM將沒法結束。爲了解決執行服務的生命週期問題,ExecutorService繼承Executor接口,添加了一些用於生命週期管理的方法。ExecutorService的生命週期有三種狀態:運行、關閉和已終止。

public interface ExecutorService extends Executor {

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     * 執行平緩的關閉過程,再也不接受新的任務,同時等待已經提交的任務執行完成——包括哪些還未開始執行的任務
     */
    void shutdown();

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     * 嘗試取消全部運行中的任務,而且再也不啓動隊列中還沒有開始執行的任務
     */
    List<Runnable> shutdownNow();

    /**
     * Returns {@code true} if this executor has been shut down.
     * 查詢 ExecutorService 是否已經關閉
     */
    boolean isShutdown();

    /**
     * Returns {@code true} if all tasks have completed following shut down.
     * Note that {@code isTerminated} is never {@code true} unless
     * either {@code shutdown} or {@code shutdownNow} was called first.
     *
     * @return {@code true} if all tasks have completed following shut down
     * 
     * 查詢 ExecutorService 是否已經終止
     */
    boolean isTerminated();

    /**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     *
     * 等待ExecutorService進入終止狀態
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /*其餘用於提交任務的方法……*/
}
View Code

3.   可利用的並行性

3.1.     異構任務中的並行

採用future和callable攜帶任務結果,長時間執行的任務能夠先進行計算,在以後經過future取得計算結果。

 

示例程序

/*
 * 網頁數據加載並行的可能性,假設網頁只包含文本和圖像兩種數據
 */
public class CompletionTest {
    
    class ImageData{
        // 屬性....
    }
    
    // 從序列化的數據中加載圖像
    public ImageData loadFrom(String source) {
        return new ImageData();
    }

    /**
     * 單線程模式
     * @param sequence 序列化後的網頁數據
     */
    public void loadWithSingleThread(CharSequence source) {
        System.out.print("加載文本");
        List<ImageData> list = new ArrayList<>();
        
        // 從 source 解析圖像數據並加入到 list 中  .....
        // loadImage(source)
        
        for (ImageData imageData : list) {
            System.out.println("圖像加載完成" + imageData);
        }
    }
    
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    
    /*
     * 單線程加載CPU利用率低,若是程序依賴於長時間的 io(當前從網絡加載圖像就是)那麼將很費時間
     * 結合 futureTask 預加載圖像 
     */
    public void loadInFuture(CharSequence source) {
        
        Callable<List<ImageData>> task = new Callable<List<ImageData>>() {

            @Override
            public List<ImageData> call() throws Exception {
                List<ImageData> result = new ArrayList<>();
                
                /*
                 * 加載圖像數據到 result
                 * loadImageFrom source to list.....
                 */
                
                return result;
            }
        };
        
        Future<List<ImageData>> future = executorService.submit(task);
        
        /*
         * loadText from source.....
         */
        System.out.println("loading text");
        
        try {
            List<ImageData> imageDatas = future.get();
            for (ImageData imageData : imageDatas) {
                System.out.println("圖像數據" + imageData);
            }
        } catch (InterruptedException e) {
            // 拋出中斷異常,從新設置線程的中斷狀態
            Thread.currentThread().interrupt();
            // 中斷了,結果已不須要
            future.cancel(true);
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    /*    
     * 採用 future 來預加載圖像在必定程度上提供了併發性,但在本問題中效率仍比較低,由於咱們採用的是一次性加載完圖像
     * 再返回,而相對於加載文原本說,圖像加載速度要低不少,在本問題中幾乎能夠說效率與串行差異不大,那怎麼改進?
     * 爲每一圖片設置一個相應的 future計算任務,而後循環操做,每計算完就直接加載,那樣用戶看到的頁面是一張張加載
     * 出來的,這可行,但比較繁瑣,咱們能夠直接使用CompletionService。
     * 結合 completionService 加載圖像
     */
    public void loadWithCompeletionSevice(CharSequence source) {

        List<String> imageInfo = new ArrayList<>();
        // imageInfo = load from source.......
        
        CompletionService<ImageData> service = 
                new ExecutorCompletionService<>(executorService);
        for (String string : imageInfo) {    
            service.submit(new Callable<CompletionTest.ImageData>() {

                @Override
                public ImageData call() throws Exception {
                    ImageData imageData = loadFrom(string);
                    // imageDate = loadFromSource();
                    return imageData;
                }
            });
        }
        
        // loadText(source)
        System.out.println("loading text");
        
        try {
            for (int i = 0; i < imageInfo.size(); i++) {
                // 在得出結果以前阻塞
                Future<ImageData> future = service.take();
                ImageData imageData = future.get();
                
                // loading image ...
                System.out.println("loading image" + imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }        
    }    
}
View Code

3.2.     爲任務設定時限

有時候,咱們可能沒法在指定的時間內完成某個任務,那麼咱們將不須要它的結果,此時咱們能夠放棄這個任務。例如,一個web應用程序須要從外部的廣告服務器獲取廣告,當若是該應用程序在2秒內得不到響應,那麼將顯示一個默認的廣告,這樣即便沒法獲取廣告信息,也不會下降站點的性能。

 

程序示例

public class GetAd {

    class Ad{
        /*
         * 屬性....
         */
    }
    
    private final ExecutorService execute = Executors.newFixedThreadPool(100);
    private final Ad DEFAULT_AD = new Ad();
    
    private final long TIME_LOAD = 2000;
    public void loadAd() {
        
        long endTime = System.currentTimeMillis() + TIME_LOAD;
        Future<Ad> future = execute.submit(new FetchAdTask());
        
        // loading page text.....
        
        Ad ad;
        try {
            // leftTime 有可能爲負數,但 future.get 方法會把負數視爲0
            long leftTime = endTime - System.currentTimeMillis();
            ad = future.get(leftTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            
            Thread.currentThread().interrupt();
            future.cancel(true);
            ad = DEFAULT_AD;
            
        } catch (TimeoutException e) {
            
            future.cancel(true);
            ad = DEFAULT_AD;
            
        } catch (ExecutionException e) {
            ad = DEFAULT_AD;
        }
        
        System.out.println("load complete" + ad);
    }
    
    class FetchAdTask implements Callable<Ad>{

        @Override
        public Ad call() throws Exception {
            // load ad from ad server
            return new Ad();
        }
    
    }
}
View Code

 

總結自《java併發編程實戰》

相關文章
相關標籤/搜索