java 經常使用線程池講解

1.線程池的好處

  1. 重用線程池中的線程,避免由於線程的建立和銷燬所帶來的性能開銷。
  2. 能有效控制線程池的最大併發數,避免大量的線程之間由於互相搶奪系統資源而致使的阻塞現象。
  3. 能對線程進行簡單管理,並提供定時執行以及指定間隔循環執行等功能。

相關連接

2. 原理分析

系統組織結構圖

2.1 Executor

只有一個excute方法java

/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */
    void execute(Runnable command);
複製代碼

2.2 public interface ExecutorService extends Executor

ExecutorService是Executor的子接口,增長了一些經常使用的對線程的控制方法,以後使用線程池主要也是使用這些方法。緩存

2.3 AbstractExecutorService

AbstractExecutorService是一個抽象類。安全

2.4 ThreadPoolExecutor

是AbstractExcutor的具體實現類。併發

先來看看它的構造器

*
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
複製代碼

這是第一種方式,能夠傳入blockingQueue.

接下來是第二種方式

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default rejected execution handler. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} is null */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }


複製代碼

第三種方式

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code handler} is null */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
複製代碼

第四種方式

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

複製代碼

構造方法參數說明

corePoolSize

核心線程數,默認狀況下核心線程會一直存活,即便處於閒置狀態也不會受存keepAliveTime限制。除非將allowCoreThreadTimeOut設置爲trueless

maximumPoolSize

線程池所能容納的最大線程數。超過這個數的線程將被阻塞。當任務隊列爲沒有設置大小的LinkedBlockingDeque時,這個值無效。post

keepAliveTime

非核心線程的閒置超時時間,超過這個時間就會被回收。性能

unit

指定keepAliveTime的單位,如TimeUnit.SECONDS。當將allowCoreThreadTimeOut設置爲true時對corePoolSize生效。this

workQueue

線程池中的任務隊列. 經常使用的有三種隊列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。spa

threadFactory

線程工廠,提供建立新線程的功能。ThreadFactory是一個接口,只有一個方法.net

public interface ThreadFactory {
  Thread newThread(Runnable r);
}
複製代碼

RejectedExecutionHandler

RejectedExecutionHandler也是一個接口,只有一個方法

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
複製代碼

線程池規則

線程池使用的隊列

BlockingQueue

先進先出隊列

synchronousQueue

線程安全的隊列,裏面是沒有固定的緩存的。 ==SynchronousQueue沒有數量限制==。由於他根本不保持這些任務,而是直接交給線程池去執行。當任務數量超過最大線程數時會直接拋異常
也是okhttp使用的

PriorityBlokingQueue

無序的,能夠根據優先級進行排序。 執行的對象須要實現compareable

okhttp使用的是SynchronousQueue。

下面都假設任務隊列沒有大小限制:

  1. 若是線程數量<=核心線程數量,那麼直接啓動一個核心線程來執行任務,不會放入隊列中。
  2. 若是線程數量>核心線程數,但<=最大線程數,而且任務隊列是LinkedBlockingDeque的時候,超過核心線程數量的任務會放在任務隊列中排隊。
  3. 若是線程數量>核心線程數,但<=最大線程數,而且任務隊列是SynchronousQueue的時候,線程池會建立新線程執行任務,這些任務也不會被放在任務隊列中。這些線程屬於非核心線程,在任務完成後,閒置時間達到了超時時間就會被清除。
  4. 若是線程數量>核心線程數,而且>最大線程數,當任務隊列是LinkedBlockingDeque,會將超過核心線程的任務放在任務隊列中排隊。也就是當任務隊列是LinkedBlockingDeque而且沒有大小限制時,線程池的最大線程數設置是無效的,他的線程數最多不會超過核心線程數。
  5. 若是線程數量>核心線程數,而且>最大線程數,當任務隊列是SynchronousQueue的時候,會由於線程池拒絕添加任務而拋出異常

任務隊列大小有限時

  1. ==當LinkedBlockingDeque塞滿時==,新增的任務會直接建立新線程來執行,當建立的線程數量超過最大線程數量時會拋異常。
  2. ==SynchronousQueue沒有數量限制==。由於他根本不保持這些任務,而是直接交給線程池去執行。當任務數量超過最大線程數時會直接拋異常。

線程池的分類

1. FixedThreadPool

經過Executors的newFIxedThreadPool方法來建立,他是一種線程數量固定的線程池,當線程處於空閒狀態時,她們並不會回收,除非線程關閉了。當全部的線程都處於活動狀態時,新任務都會處於等待狀態,直到有線程空閒出來。因爲FixedThreadPool只有核心線程而且這些核心線程都不會回收,這意味着它能更加快速地響應外界的請求。
FixedTHreadPool中只有核心線程,而且這些核心線程沒有超時機制,另外任務隊列是沒有大小限制的

public static ExecutorService newFixedThreadPool(int nThreads){
     return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISENDS,new LinkedBlockingQueue<Runnable>());
 }
複製代碼

2. CachedTHreadPool

  • 這是一種線程數量不定的線程池,它只有非核心線程,而且其最大線程數爲Integer.MAX_VALUE。因爲Integer.MAX_VALUE是一個最大的數,實際上就至關於最大線程數能夠任意大,當線程池中的線程都處於活動狀態時,線程池會建立新的線程來處理任務。不然就會利用空閒的線程來處理新任務。
  • 線程池中的空閒線程都有超時機制,這個超時時長爲60秒,超過60秒閒置線程就會被回收。和FixedTHreadPool不一樣的是,CacheThreadPool的任務隊列其實至關於一個空集合,這將致使任何任務都會當即被執行。
  • 這類線程池比較適合執行大量的耗時較少的任務。當整個線程池都處於閒置狀態時,線程池中的線程都會超時而被中止這個時候CachethreadPool之中其實是沒有任何線程的。它幾乎不佔任何系統資源。
public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,INterger.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
複製代碼

3. ScheduledThreadPool

  • 他的核心線程數量是固定的,而非核心線程數是沒有限制的,而且當非核心線程閒置時會被當即回收,ScheduledThreadPool這類線程池主要用於執行定時任務和具備固定週期的重複任務。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
    return new ScheduledThreadPoolExecutor(corePoolSize);
    
}
public ScheduledThreadPoolExecutor(int corePoolSize){
    super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,new DelayedWorkQueue());
}
複製代碼

4 SingleThreadExecutor

  • 經過Executor的newSingleThreadExecutor方法來建立。這類線程池內部只有一個核心線程,他確保全部的任務都在同一個線程中按順序執行。SingleThreadExector的

參考文獻

csdn

相關文章
相關標籤/搜索