java多線程之ThreadPoolExcutor

1、爲何須要使用線程池

  • 線程建立和銷燬的開銷很是高:頻繁的建立和銷燬線程須要消耗時間,會使響應變慢;同時消耗計算資源。
  • 資源耗盡:空閒的線程會佔用內存,會給垃圾回收帶來壓力,線程競爭CPU也會產生性能開銷,線程池可合理管理空閒線程
  • 穩定性:在必定範圍內,增長線程能夠提高系統的處理能力,若是超過這個範圍,繼續建立線程只會下降執行速度,甚至致使系統OOM,線程池便可以經過程序控制線程數。

2、幾個經常使用線程池

首先看下線程池的構造函數:緩存

/**
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

複製代碼
  1. newFixedThreadPool:將建立一個長度固定的線程池,每當新提交一個任務時就建立一個新線程,直到達到線程池的最大數量,這時線程池的大小不在變化(若是某個線程發生未預期的Exception而結束,那麼線程池會補充一個新線程)。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
複製代碼
  1. newCachedThreadPool:將建立一個可緩存的線程池,若是有線程空閒,會回收空閒線程;若是任務增長時,可添加新線程,線程池的規模不存在任何界限。
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼
  1. newSingleThreadPool:將建立一個單線程的線程池,若是線程異常結束,將建立另外一個線程來替代;阻塞隊列爲LinkedBlockingQueue,提交的任務可按照順序執行
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼
  1. newScheduledThreadPool: 建立了一個固定長度的線程池,並且以延遲或定時的方式來執行任務,相似於Timer。
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
複製代碼

3、線程池的六個重要參數:

  1. corePoolSize:核心線程數
    • 核心線程會一直存活,及時沒有任務須要執行
    • 線程數 < corePoolSize時,即便有線程空閒,線程池也會優先建立新線程處理
    • 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
  2. queueCapacity:任務隊列容量(阻塞隊列) 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
  3. maxPoolSize:最大線程數
    • 線程數>=corePoolSize,且任務隊列已滿時。線程池會建立新線程來處理任務
    • 線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
  4. keepAliveTime:線程空閒時間
    • 當線程空閒時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
    • 若是allowCoreThreadTimeout=true,則會直到線程數量=0
  5. allowCoreThreadTimeout:容許核心線程超時
  6. rejectedExecutionHandler:任務拒絕處理器,兩種狀況會拒絕處理任務:
    • 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
    • 當線程池被調用shutdown()後,會等待線程池裏的任務執行完畢,再shutdown。若是在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
    • 線程池會調用rejectedExecutionHandler來處理這個任務。若是沒有設置默認是AbortPolicy,會拋出異常
  7. 拒絕策略:
    • AbortPolicy 直接拋出一個RejectedExecutionException,這也是JDK默認的拒絕策略
    • CallerRunsPolicy 嘗試直接運行被拒絕的任務,若是線程池已經被關閉了,任務就被丟棄了
    • DiscardPolicy 不能執行的任務將被刪除
    • DiscardOldestPolicy 移除最晚的那個沒有被處理的任務,而後執行被拒絕的任務。一樣,若是線程池已經被關閉了,任務就被丟棄了
    • 自定義:實現RejectedExecutionHandler接口,可自定義處理器

4、ThreadPoolExecutor執行順序:

線程池按如下行爲執行任務
複製代碼
  1. currentThreadNum < corePoolSize->建立線程。
  2. corePoolSize =< currentThreadNum ,對列未滿,加入隊列。
  3. corePoolSize =< currentThreadNum <maxPoolSize ,且任務隊列已滿,建立線程
  4. corePoolSize =< currentThreadNum &&currentThreadNum>=maxPoolSize ,且任務隊列已滿,建立線程,拋出異常,拒絕任務

5、如何設置參數

  • 默認值 corePoolSize=1 queueCapacity=Integer.MAX_VALUE maxPoolSize=Integer.MAX_VALUE keepAliveTime=60s allowCoreThreadTimeout=false rejectedExecutionHandler=AbortPolicy()
  • 如何來設置
    1. 須要根據幾個值來決定
    • tasks :每秒的任務數,假設爲500~1000
    • taskcost:每一個任務花費時間,假設爲0.1s
    • responsetime:系統容許容忍的最大響應時間,假設爲1s
    1. 作幾個計算
    • corePoolSize = 每秒須要多少個線程處理? threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50 根據8020原則,若是80%的每秒任務數小於800,那麼corePoolSize設置爲80便可
    • queueCapacity = (coreSizePool/taskcost)responsetime 計算可得 queueCapacity = 80/0.11 = 80。意思是隊列裏的線程能夠等待1s,超過了的須要新開線程來執行 切記不能設置爲Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
    • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) 計算可得 maxPoolSize = (1000-80)/10 = 92 (最大任務數-隊列容量)/每一個線程每秒處理能力 = 最大線程數
    • rejectedExecutionHandler:根據具體狀況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
    • keepAliveTime和allowCoreThreadTimeout採用默認一般能知足
相關文章
相關標籤/搜索