Java線程池之ThreadPoolExecutor

前言java

  線程池能夠提升程序的併發性能(固然是合適的狀況下),由於對於沒有線程的狀況下,咱們每一次提交任務都新建一個線程,這種方法存在很多缺陷:安全

1.  線程的建立和銷燬的開銷很是高,線程的建立須要時間,會延遲任務的執行,會消耗大量的系統資源。併發

2.  活躍的線程會消耗系統資源,而大量的空閒線程會佔用許多內存,給垃圾回收器帶來很大的壓力,而大量線程在競爭CPU資源的時間還會產生氣體的性能開銷。框架

3.  系統在可建立的線程上存在一個限制,若是超過了這個限制,極可能拋出OOM。ide

  咱們不難發現,在必定範圍下,增長線程可以提升系統的吞吐量,而當線程數超過合理值後,增長的線程反而會下降程序的執行速度。函數

Executor的引入性能

  在JDK1.5中,咱們引入了一個線程池框架,Executor框架,它可以分解任務的建立和執行過程。它包括Executor、ExecutorService、Callable等接口和Executors、ThreadPoolExecutor實現類等。this

注意點atom

  固然了看待事物須要辯證,是否使用了Executor框架就能很好地將複雜的任務執行解耦開來的。這邊咱們其實須要限制一下它的使用範圍。spa

  •          對於依賴型的任務而言,不是很適合使用線程池去操做,容易引起死鎖,由於這種狀況下咱們須要當心維持這些任務的執行順序,以保證不會觸發死鎖。
  •          對於經過線程封閉實現線程安全的任務而言,使用單線程的Executor可以保證更安全的併發。
  •          使用ThreadLocal的任務,由於線程池會複用線程,這將致使任務的ThreadLocal值失去意義(除非線程本地值受限於任務的生命週期)。
  •          對響應時間敏感的任務,假設咱們將一個執行時間很長的任務,或者多個執行時間很長的任務放到一個單線程的Executor中或者一個包含少許線程的線程池中,都會下降程序的響應速度。

合適的線程數

  線程池在對處理同一類型的任務且相互獨立的時候,能達到性能上的最佳,不然任務時長不一致很容易引發擁塞或是飢餓。

那線程池的線程數以多少爲合適呢,對於計算密集型的任務而言,咱們最好設置的線程數 = CPU數+1;(+1是爲了保證當某個線程由於缺頁故障或其餘緣由而暫停時,這個+1的線程可以確保CPU的時鐘週期不會被浪費);而對於包含IO操做或者其餘阻塞操做的任務時,因爲線程不會一直執行,因此線程池的規模應該更大點。

線程池的使用 

 在實際使用過程當中,咱們通常藉助於ThreadPoolExecutor來完成線程池的建立。ThreadPoolExecutor具備極好的擴展性,除了系統提供的四種經常使用的線程池,如CachedThreadPoolExecutor,FixedThreadPoolExecutor,SingleThreadPoolExecutor,ScheduledThreadExecutor。咱們能夠自定義線程池的構造函數,如線程池的基本線程數、最大線程數、線程池的超時時間(時間+時間單位),線程池的任務隊列,線程池的線程工廠,線程池的飽和策略。

固然,咱們在使用系統提供的四種線程池的時候,一樣能夠在後來修改線程池的配置。

線程的任務隊列

LinkedBlockQueue:無界   CachedThreadPoolExecutor  FixedThreadPoolExecutor的默認任務隊列

ArrayBlockQueue:有界

PriorityQueue:優先級隊列,有界   ScheduledThreadPoolExecutor的默認任務隊列

SynchronousQueue:同步移交,隊列容量爲0,僅當有線程準備好時纔會將任務放到隊列中。

飽和策略

飽和策略的設置

  對於有界隊列而言,當有界隊列被填滿後,這時候咱們須要用到線程的飽和策略,前面提到咱們能夠在後面配置線程池的設置,而飽和策略的修改就是經過ThreadPoolExecutor的setRejectedExecutionHandler方法來進行修改的(當某個任務唄提交到一個已經被關閉的Executor中,也會用到飽和策略)

         飽和策略主要有如下幾種:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

  1.  AbortPolicy:停止策略,該策略將拋出未受檢的RejectedExecution,調用者能夠捕獲這個異常根據實際須要編寫直接的處理代碼。
  2. CallerRunsPolicy:調用者運行,該策略不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者中,從而下降新任務的流量。它不會在線程池的某個線程中執行新提交的任務,而是調用了execute的線程中執行任務,當線程池的任務隊列被填滿後,下一個任務會在調用execute時在主線程中執行,因爲執行任務須要必定時間,這段時間內主線程顯然不能提交任務,從而保證線程池在這段時間內處理現有任務。
  3. DiscardPolicy:拋棄策略,捨棄任務。
  4. DiscardOldestPolicy:拋棄下一個將被執行的任務,而後嘗試從新提交新的任務,(不適用於和優先隊列合用,由於這樣將拋棄的將是優先級最高的等待任務)

         固然,咱們可使用Semaphore(信號量)來控制任務的提交速率。

線程工廠 

  咱們能夠經過自定義線程工廠,來實現咱們本身的線程。具體示例以下所示:

  自定義的線程工廠,記錄了線程池的名字。

import java.util.concurrent.ThreadFactory;

/**
 * Created by DB on 2017/9/1.
 */
public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyAPPThread(r,poolName);
    }
}

自定義的線程類:實現了指定線程的名字,設置自定義UncaughtExecptionHandler。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Created by DB on 2017/9/1.
 */
public class MyAPPThread extends Thread {
    public static  final  String DEFALUT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAPPThread(Runnable r){
        this(r,DEFALUT_NAME);
    }
    public MyAPPThread(Runnable r,String name){
        super(r,name +"-"+created.incrementAndGet());
        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.log(Level.SEVERE,"Uncaught in thread"+t.getName(),e);
            }
        });
    }

    public void run(){
        boolean debug = debugLifecycle;
        if(debug){
            log.log(Level.FINE,"Created" +getName());
        }
        try {
            alive.incrementAndGet();
            super.run();
        }finally {
            alive.decrementAndGet();
            if(debug){
                log.log(Level.FINE,"Exiting"+getName());
            }
        }
    }
    public static int getThreadsCreated(){
        return  created.get();
    }
    public static int getThreadsAlive(){
        return alive.get();
    }
    public static boolean getDebug(){
        return  debugLifecycle;
    }
    public static  void setDebug(Boolean b){
        debugLifecycle=b;
    }
}

擴展ThreadPoolExecutor

  ThreadPoolExecutor是可擴展的,它提供了幾個在子類中能夠改寫的方法:beforeExecute、afterExecute和terminated,經過實現這些方法咱們能夠實現擴展。

         在執行任務的線程中將調用beforeExecute和afterExecute方法,在這些方法中咱們能夠添加日誌、計時。監視或者統計信息收集的功能。不管任務是從run中正常返回仍是拋出一個異常而返回,afterExecute都會被調用。而beforeExecute拋出RuntimeException時,任務將不被執行,afterExecute固然也不會被調用。

         在線程池完成關閉操做時,會調用terminated,也就在全部任務都已經完成而且全部工做者線程也已經關閉後。在這個方法中咱們能夠實現Executor在其生命週期中分配的各類資源,還有執行發送通知、記錄日誌或者收集finalize統計信息等操做。

  具體的框架以下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by DB on 2017/9/1.
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
    }
}
相關文章
相關標籤/搜索