如何優雅的使用線程池

線程池不只在項目中是很是經常使用的一項技術並且在面試中基本上也是必問的知識點,接下來跟着我一塊兒來鞏固一下線程池的相關知識。在瞭解線程池以前咱們先了解一下什麼是進程什麼是線程java

進程

  • 程序:通常是一組CPU指令的集合構成的文件,靜態存儲在諸如硬盤之類的存儲設備上
  • 進程:當一個程序要被計算機運行時,就是在內存中產生該程序的一個運行時實例,咱們就把這個實例叫作進程

用戶下達運行程序的命令之後,就會產生一個進程,同一個程序能夠產生多個進程(一對多的關係),以容許同時有多個用戶運行同一個程序,卻不會相沖突。git

進程須要一些資源才能工做,如CPU的使用時間、存儲器、文件、以及I/O設備,且爲依序逐一執行,也就是每一個CPU核心任什麼時候間內僅能運行一項進程。可是在一個應用程序中通常不會是隻有一個任務單條線執行下去,確定會有多個任務,而建立進程又是耗費時間和資源的,稱之爲重量級操做。github

  1. 建立進程佔用資源太多
  2. 進程之間的通訊須要數據在不一樣的內存空間傳來傳去,因此進程間通訊會更加耗費時間和資源

線程

線程是操做系統可以進行運算調度的最小單位,大部分狀況下它被包含在進程之中,是進程中實際的運做單位。一個進程能夠併發多個線程,每一個線程執行不一樣的任務。同一個進程中的多條線程共享該進程中的所有虛擬資源,例如虛擬地址空間、文件描述符、信號處理等等。可是同一個進程中的多個線程各自有各自的調用棧。面試

> 一個進程能夠有不少線程,每條線程並行執行不一樣的任務。數據庫

線程中的數據

  1. 線程棧上的本地數據:好比函數執行過程的局部變量,咱們知道在Java中線程模型是使用棧的模型。每一個線程都有本身的棧空間。
  2. 在整個進程裏共享的全局數據:咱們知道在Java程序中,Java就是一個進程,咱們能夠經過ps -ef | grep java能夠看到在程序中運行了多少個Java進程,例如咱們Java中的全局變量,在不一樣進程之間是隔離的,可是在線程之間是共享的。
  3. 線程的私有數據:在Java中咱們能夠經過ThreadLocal來建立線程間私有的數據變量。

> 線程棧上的本地數據只能在本方法內有效,而線程的私有數據是在線程間多個函數共享的。編程

CPU密集型和IO密集型

理解是服務器是CPU密集型仍是IO密集型可以幫助咱們更好的設置線程池中的參數。具體如何設置咱們在後面講到線程池的時候再分析,這裏你們先知道這兩個概念。緩存

  • IO密集型:大部分時間CPU閒着,在等待磁盤的IO操做
  • CPU(計算)密集型:大部分時間磁盤IO閒着,等着CPU的計算操做

線程池

線程池實際上是池化技術的應用一種,常見的池化技術還有不少,例如數據庫的鏈接池、Java中的內存池、常量池等等。而爲何會有池化技術呢?程序的運行本質,就是經過使用系統資源(CPU、內存、網絡、磁盤等等)來完成信息的處理,好比在JVM中建立一個對象實例須要消耗CPU的和內存資源,若是你的程序須要頻繁建立大量的對象,而且這些對象的存活時間短就意味着須要進行頻繁銷燬,那麼頗有可能這段代碼就成爲了性能的瓶頸。總結下來其實就如下幾點。服務器

  • 複用相同的資源,減小浪費,減小新建和銷燬的成本;
  • 減小單獨管理的成本,統一交由"池";
  • 集中管理,減小"碎片";
  • 提升系統響應速度,由於池中有現成的資源,不用從新去建立;

因此池化技術就是爲了解決咱們這些問題的,簡單來講,線程池就是將用過的對象保存起來,等下一次須要這個對象的時候,直接從對象池中拿出來重複使用,避免頻繁的建立和銷燬。在Java中萬物皆對象,那麼線程也是一個對象,Java線程是對於操做系統線程的封裝,建立Java線程也須要消耗操做系統的資源,所以就有了線程池。可是咱們該如何建立呢?網絡

Java提供的四種線程池

Java爲咱們提供了四種建立線程池的方法。多線程

  • Executors.newCachedThreadPool:建立可緩存無限制數量的線程池,若是線程中沒有空閒線程池的話此時再來任務會新建線程,若是超過60秒此線程無用,那麼就會將此線程銷燬。簡單來講就是忙不來的時候無限制建立臨時線程,閒下來再回收

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<runnable>());
    }
  • Executors.newFixedThreadPool:建立固定大小的線程池,可控制線程最大的併發數,超出的線程會在隊列中等待。簡單來講就是忙不來的時候會將任務放到無限長度的隊列裏。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<runnable>());
    }
  • Executors.newSingleThreadExecutor:建立線程池中線程數量爲1的線程池,用惟一線程來執行任務,保證任務是按照指定順序執行

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<runnable>()));
    }
  • Executors.newScheduledThreadPool:建立固定大小的線程池,支持定時及週期性的任務執行

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

線程池的建立原理

咱們點擊去這四種實現方式的源碼中咱們能夠看到其實它們的底層建立原理都是同樣的,只不過是所傳的參數不一樣組成的四個不一樣類型的線程池。都是使用了ThreadPoolExecutor來建立的。咱們能夠看一下ThreadPoolExecutor 建立所傳的參數。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

那麼這些參數都具體表明什麼意思呢?

  • corePoolSize :線程池中核心線程數的數量
  • maximumPoolSize :在線程池中容許存在的最大線程數
  • keepAliveTime :當存在的線程數大於corePoolSize ,那麼會找到空閒線程去銷燬,此參數是設置空閒多久的線程才被銷燬。
  • unit :時間單位
  • workQueue :工做隊列,線程池中的當前線程數大於核心線程的話,那麼接下來的任務會放入到隊列中
  • threadFactory :在建立線程的時候,經過工廠模式來生產線程。這個參數就是設置咱們自定義的線程建立工廠。
  • handler :若是超過了最大線程數,那麼就會執行咱們設置的拒絕策略

接下來咱們將這些參數合起來看一下他們的處理邏輯是什麼。

  1. corePoolSize 個任務時,來一個任務就建立一個線程
  2. 若是當前線程池的線程數大於了corePoolSize 那麼接下來再來的任務就會放入到咱們上面設置的workQueue 隊列中
  3. 若是此時workQueue 也滿了,那麼再來任務時,就會新建臨時線程,那麼此時若是咱們設置了keepAliveTime 或者設置了allowCoreThreadTimeOut ,那麼系統就會進行線程的活性檢查,一旦超時便銷燬線程
  4. 若是此時線程池中的當前線程大於了maximumPoolSize 最大線程數,那麼就會執行咱們剛纔設置的handler 拒絕策略

爲何建議不用Java提供的線程池建立方法

理解了上面設置的幾個參數之後,咱們再來看一下爲何在《阿里巴巴Java手冊》中有一條這樣規定。

相信你們看到上面提供四種建立線程池的實現原理,應該知道爲何阿里巴巴會有這麼規定了。

  • FixedThreadPoolSingleThreadExecutor:這兩個線程池的實現方式,咱們能夠看到它設置的工做隊列都是LinkedBlockingQueue,咱們知道此隊列是一個鏈表形式的隊列,此隊列是沒有長度限制的,是一個無界隊列,那麼此時若是有大量請求,就有可能形成OOM
  • CachedThreadPoolScheduledThreadPool:這兩個線程池的實現方式,咱們能夠看到它設置的最大線程數都是Integer.MAX_VALUE,那麼就至關於容許建立的線程數量爲Integer.MAX_VALUE。此時若是有大量請求來的時候也有可能形成OOM

如何設置參數

因此咱們在項目中若是要使用線程池的話,那麼就推薦根據本身項目和機器的狀況進行個性化建立線程池。那麼這些參數如何設置呢?爲了正確的定製線程池的長度,須要理解你的計算機配置、所需資源的狀況以及任務的特性。好比部署的計算機安裝了多少個CPU?多少的內存?任務主要執行是IO密集型仍是CPU密集型?所執行任務是否須要數據庫鏈接這樣的稀缺資源?

> 若是你有多個不一樣類別的任務,它們的行爲有很大的差異,那麼應該考慮使用多個線程池。這樣也能根據每一個任務不一樣定製不一樣的線程池,也不至於由於一種類型的任務失敗而託垮另外一個任務。

  • CPU密集型任務:說明包含了大量的運算操做,好比有N個CPU,那麼就配置線程池的容量大小爲N+1,這樣能得到最優的利用率。由於CPU密集型的線程剛好在某時由於發生一個頁錯誤或者由於其餘的緣由而暫停,恰好有一個額外的線程,能夠確保在這種狀況下CPU週期不會中斷工做。

  • IO密集任務:說明CPU大部分時間都是在等待IO的阻塞操做,那麼此時就能夠將線程池的容量大小配置的大一些。此時能夠根據一些參數進行計算大概你的線程池的數量多少合適。

    • N:CPU的數量
    • U:目標CPU的使用率,0<=U<=1
    • W/C:等待時間與計算時間的比率
    • 那麼最優的池的大小就是N*U*(1+W/C)

> 頁缺失(英語:Page fault,又名硬錯誤、硬中斷、分頁錯誤、尋頁缺失、缺頁中斷、頁故障等)指的是當軟件試圖訪問已映射在虛擬地址空間中,可是當前並未被加載在物理內存中的一個分頁時,由中央處理器的內存管理單元所發出的中斷

其實線程池大小的設置仍是要根據本身業務類型來設置,好比當前任務須要池化的資源的時候,好比數據庫的鏈接池,俺麼線程池的長度和資源池的長度會相互的影響。若是每個任務都須要一個數據庫鏈接,那麼鏈接池的大小就會限制了線程池的有效大小,相似的,當線程池中的任務是鏈接池的惟一消費者時,那麼線程池的大小反而又會限制了鏈接池的有效大小。

線程池中的線程銷燬

線程池的核心線程數(corePoolSize)、最大線程數(maximumPoolSize)、線程的存活時間(keepAliveTime)共同管理的線程的建立與銷燬。接下來咱們再複習一下線程池是如何建立和銷燬線程的

  • 當前線程數 < 核心線程數:來一個任務建立一個線程
  • 當前線程數 = 核心線程數:來一個任務就會將其加入到隊列中
  • 當前線程數 > 核心線程數:此時有一個前提條件就是隊列已滿,纔會新建線程,此時就會開啓線程的活性檢查,對於設置爲keepAliveTime 時間沒有活動的線程將會被回收

那麼這裏可能有人會想到將corePoolSize 核心線程數設置爲0(若是你們還記得上面講的CachedThreadPool 的話應該還會記得它的核心線程數就是0),由於這樣設置的話線程就會動態的進行建立了,閒的時候沒有線程,忙的時候再在線程池中建立線程。這樣想法當然是好,可是若是咱們自定義參數設置了此參數爲0,而正好又設置了等待隊列不是SynchronousQueue,那麼其實就會有問題,由於只有在隊列滿的狀況下才會新建線程。下面代碼我使用了無界隊列LinkedBlockingQueue ,其實你們看一下輸出

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new LinkedBlockingQueue&lt;&gt;());
for (int i = 0; i &lt; 10; i++) {
    threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("1");
        }
    });
}

你們能夠看一下演示的效果,其實1是每隔一秒打印一次,其實這就和咱們使用線程池初衷相悖了,由於咱們這個至關因而單線程在運行。

可是若是咱們將工做隊列換成SynchronousQueue 呢,咱們發現這些1是一塊輸出出來的。

> SynchronousQueue 並非一個真正的隊列,而是一種管理直接在線程間移交信息的機制,這裏能夠簡單將其想象成一個生產者生產消息交給SynchronousQueue ,而消費者這邊若是有線程來接收,那麼此消息就會直接交給消費者,反之會阻塞。

因此咱們在設置線程池中某些參數的時候應該想一想其建立和銷燬線程流程,否則咱們自定義的線程池還不如使用Java提供的四種線程池了。

線程池中的拒絕策略

ThreadPoolExecutor爲咱們提供了四種拒絕策略,咱們能夠看下Java提供的四種線程池建立所提供的拒絕策略都是其定義的默認的拒絕策略。那麼除了這個拒絕策略其餘的拒絕策略都是什麼呢?

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

咱們能夠到拒絕策略是一個接口RejectedExecutionHandler ,這也就意味我着咱們能夠本身訂本身的拒絕策略,咱們先看一下Java提供四種拒絕策略是什麼。

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

AbortPolicy

這個拒絕策略就是Java提供的四種線程池建立方法提供的默認拒絕策略。咱們能夠看下它的實現。

public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

因此此拒絕策略就是拋RejectedExecutionException異常

CallerRunsPolicy

此拒絕策略簡單來講就是將此任務交給調用者直接執行。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

這裏爲何是交給了調用者來執行呢?咱們能夠看到它是調用了run()方法,而不是start()方法。

DiscardOldestPolicy

從源碼中應該能看出來,此拒絕策略是丟棄隊列中最老的任務,而後再執行。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy

從源碼中應該能看出來,此拒絕策略是對於當前任務不作任何操做,簡單來講就是直接丟棄了當前任務不執行。

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

線程池的拒絕策略給咱們默認提供了這四種的實現方式,固然咱們也可以自定義拒絕策略使線程池更加符合咱們當前的業務,在後面講解Tomcat自定義本身的線程池時也會講解它本身實現的拒絕策略。

線程飢餓死鎖

線程池爲「死鎖」這一律念帶來了一種新的可能:線程飢餓死鎖。在線程池中,若是一個任務另外一個任務提交到同一個Executor,那麼一般會引起死鎖。第二個線程停留在工做隊列中等待第一個提交的任務執行完成,可是第一個任務又沒法執行完成,由於它在等待第二個任務執行完成。用圖表示以下

用代碼表示的話以下,這裏注意咱們這裏定義的線程池是SingleThreadExecutor,線程池中只有一個線程,這樣好模擬出這樣的狀況,若是在更大的線程池中,若是全部線程都在等待其餘仍處於工做隊列的任務而阻塞,那麼這種狀況被稱爲線程飢餓死鎖。因此儘可能避免在同一個線程池中處理兩種不一樣類型的任務。

public class AboutThread {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    public static void main(String[] args) {
        AboutThread aboutThread = new AboutThread();
        aboutThread.threadDeadLock();
    }

    public void threadDeadLock(){
        Future<string> taskOne  = executorService.submit(new TaskOne());
        try {
            System.out.printf(taskOne.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public class TaskOne implements Callable{

        @Override
        public Object call() throws Exception {
            Future<string> taskTow = executorService.submit(new TaskTwo());
            return "TaskOne" + taskTow.get();
        }
    }

    public class TaskTwo implements Callable{

        @Override
        public Object call() throws Exception {
            return "TaskTwo";
        }
    }
}

拓展ThreadPoolExecutor

若是咱們想要對線程池進行一些擴展,那麼可使用ThreadPoolExecutor 給我預留的一些接口可使咱們進行更深層次話的定製線程池。

線程工廠

若是咱們想要給咱們的線程池中的每一個線程自定義一些名稱,那麼咱們就可使用線程工廠來實現一些自定義化的一些操做。只要咱們將咱們自定義的工廠傳給ThreadPoolExecutor,那麼不管什麼時候線程池須要建立一個線程,都要經過咱們定義的工廠來進行建立。接下來咱們看一下接口ThreadFactory,只要咱們實現了此接口就能自定義本身線程獨有的信息。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

接下來咱們能夠看咱們本身寫的線程池工廠類

class CustomerThreadFactory implements ThreadFactory{

    private String name;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    CustomerThreadFactory(String name){
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r,name+threadNumber.getAndIncrement());
        return thread;
    }
}

只須要在進行線程池實例化的時候將此工廠類加上去便可

public static void customerThread(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new SynchronousQueue&lt;&gt;(),
                new CustomerThreadFactory("customerThread"));

        for (int i = 0; i &lt; 10; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.printf(Thread.currentThread().getName());
                    System.out.printf("\n");
                }
            });
        }
    }

接下來咱們執行此語句,發現每一個線程的名字已經變了

customerThread1
customerThread10
customerThread9
customerThread8
customerThread7
customerThread6
customerThread5
customerThread4
customerThread3
customerThread2

經過繼承ThreadPoolExecutor擴展

咱們查看ThreadPoolExecutor 源碼能夠發現源碼中有三個方法都是protected

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

> 被protected修飾的成員對於本包和其子類可見

咱們能夠經過繼承來覆寫這些方法,那麼就能夠進行咱們獨有的擴展了。執行任務的線程會調用beforeExecute afterExecute 方法,能夠經過它們添加日誌、時序、監視器或者同級信息收集的功能。不管任務是正常從run中返回,仍是拋出一個異常,afterExecute 都會被調用(若是任務完成後拋出一個Error,則afterExecute 不會被調用)。若是beforeExecute 拋出一個RuntimeException,任務將不會被執行,afterExecute 也不會被調用。

在線程池完成關閉時調用terminated,也就是在全部任務都已經完成而且全部工做者線程也已經關閉後,terminated能夠用來釋放Executor在其生命週期裏分配的各類資源,此外還能夠執行發送通知、記錄日誌或者手機finalize統計等操做。

本篇文章代碼地址

參考

相關文章
相關標籤/搜索