ThreadPoolExecutor

1、概述

2、參數列表

參數名 做用
corePoolSize 核心線程池大小
maximumPoolSize 最大線程池大小
keepAliveTime 線程池中超過corePoolSize數目的空閒線程最大存活時間;能夠allowCoreThreadTimeOut(true)使得核心線程有效時間
TimeUnit keepAliveTime時間單位
workQueue 阻塞任務隊列
threadFactory 新建線程工廠
RejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

重點講解:java

其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關係。 
    1.當線程池小於corePoolSize時,新提交任務將建立一個新線程執行任務,即便此時線程池中存在空閒線程。 
    2.當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行 
    3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新線程執行任務 
    4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理 
    5.當線程池中超過corePoolSize線程,空閒時間達到keepAliveTime時,關閉空閒線程(只會關閉超過corePoolSize的數) 
    6.當設置allowCoreThreadTimeOut(true)時,線程池中corePoolSize線程空閒時間達到keepAliveTime也將關閉ide

線程管理機制圖示:spa

3、Executors配置方案

一、構造一個固定線程數目的線程池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,所以多餘的任務將存在再阻塞隊列,不會由RejectedExecutionHandler處理。線程

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }

二、構造一個緩衝功能的線程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞隊列 SynchronousQueue,所以任務提交以後,將會建立新的線程執行;線程空閒超過60s將會銷燬。debug

public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }

三、構造一個只支持一個線程的線程池,配置corePoolSize=maximumPoolSize=1,無界阻塞隊列LinkedBlockingQueue;保證任務由一個線程串行執行。code

public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1,  
                                    0L, TimeUnit.MILLISECONDS,  
                                    new LinkedBlockingQueue<Runnable>()));  
    }

四、 構造有定時功能的線程池,配置corePoolSize,無界延遲阻塞隊列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,因爲DelayedWorkQueue是無界隊列,因此這個值是沒有意義的。隊列

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
        return new ScheduledThreadPoolExecutor(corePoolSize);  
    }  
  
public static ScheduledExecutorService newScheduledThreadPool(  
            int corePoolSize, ThreadFactory threadFactory) {  
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);  
    }  
  
public ScheduledThreadPoolExecutor(int corePoolSize,  
                             ThreadFactory threadFactory) {  
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,  
              new DelayedWorkQueue(), threadFactory);  
    }

4、定製本身的線程池

public class ThreadPoolManager {

	private static final Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);
    private static ThreadPoolExecutor threadpool = 
new ThreadPoolExecutor(10, 100, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(50),new MessageThreadFactory(),new MessageRejectedExecutionHandler());
     // 是指線程池一啓動的時候,就會直接建立核心數線程10個,而後當再有任務進來的時候,在入隊到阻塞隊列裏,若是
     // 隊列數超過50個了,纔會開始繼續建立額外的線程,直到達到最大線程數100個
    static class MessageThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = ThreadPoolManager.class.getSimpleName() + count.addAndGet(1);
            logger.debug("新建線程:"+logger);
            t.setName(threadName);
            return t;
        }
    }
    static class MessageRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);  // 加了這句,則就是阻塞線程池了
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void submit(Runnable task){
        threadpool.submit(task);
    }

    public static Future<String> submit(Callable<String> task){
        return threadpool.submit(task);
    }
}
相關文章
相關標籤/搜索