Java中的多線程與線程池——線程池篇

線程池

線程池是什麼?

簡單來講,線程池是指提早建立若干個線程,當有任務須要處理時,線程池裏的線程就會處理任務,處理完成後的線程並不會被銷燬,而是繼續等待下一個任務。因爲建立和銷燬線程都是消耗系統資源的,因此,當某個業務須要頻繁進行線程的建立和銷燬時,就能夠考慮使用線程池來提升系統的性能啦。java

線程池能夠作什麼?

藉由《Java併發編程的藝術》,使用線程池可以幫助 :git

  • 下降資源消耗。經過重複利用已經建立的線程,可以下降線程建立和銷燬形成的消耗。
  • 提升響應速度。當任務到達時,任務能夠不須要等待線程的建立就能當即執行。
  • 提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。

如何建立一個線程池

首先建立一個 Runnable 接口實現類。編程

package demo;

import java.util.Date;

/** * @author yuanyiwen * @create 2020-02-28 16:05 * @description */
public class DemoThread implements Runnable {

    private String command;

    public DemoThread(String command) {
        this.command = command;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 開始時間 : " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " 結束時間 : " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "DemoThread{" +
                "command='" + command + '\'' +
                '}';
    }
}
複製代碼

這裏讓咱們使用 ThreadPoolExecutor 來建立一個線程池進行測試:緩存

package demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * @author yuanyiwen * @create 2020-02-28 16:19 * @description */
public class DemoThreadPoolExecutor {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE  = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {
        // 使用線程池來建立線程
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                // 核心線程數爲 :5
                CORE_POOL_SIZE,
                // 最大線程數 :10
                MAX_POOL_SIZE,
                // 等待時間 :1L
                KEEP_ALIVE_TIME,
                // 等待時間的單位 :秒
                TimeUnit.SECONDS,
                // 任務隊列爲 ArrayBlockingQueue,且容量爲 100
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                // 飽和策略爲 CallerRunsPolicy
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for(int i = 0; i < 15; i++) {
            // 建立WorkerThread對象,該對象須要實現Runnable接口
            Runnable worker = new DemoThread("任務" + i);
            // 經過線程池執行Runnable
            threadPoolExecutor.execute(worker);
        }
        // 終止線程池
        threadPoolExecutor.shutdown();
        while (!threadPoolExecutor.isTerminated()) {

        }
        System.out.println("所有線程已終止");
    }
}
複製代碼

最後讓咱們來看一下運行結果 :多線程

能夠看到,當核心線程數爲 5 時,即便總共要運行的線程有 15 個,每次也只會同時執行 5 個任務,剩下的任務則會被放入等待隊列,等待覈心線程空閒後執行。總的來講步驟以下 :併發

Executor框架

Executor 框架是 Java5 以後引進的。在 Java5 以後,經過 Executor 來啓動線程比使用 Thread 的 start 方法更好。除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點 :有助於避免 this 逃逸問題。框架

this 逃逸

this 逃逸是指在構造函數返回以前其餘線程就持有該對象的引用,調用還沒有構造徹底的對象的方法時可能引起奇怪的錯誤。ide

引起 this 逃逸一般須要知足兩個條件 :一個是在構造函數中建立內部類,另外一個就是在構造函數中將這個內部類發佈了出去。函數

因爲發佈出去的內部類對象自帶對外部類 this 的訪問權限,這就致使在經過內部類對象訪問外部類 this 時,外部類可能並未構造完成,從而致使一些意想不到的問題。性能

典型的 this 逃逸情景以下 :

public class DemoThisEscape {

    private int a = 10;

    public DemoThisEscape() {
        // 在外部類的構造函數中調用內部類
        new Thread(new InnerClass()).start();
    }

    private class InnerClass implements Runnable {
        @Override
        public void run() {
            // 在這裏經過 DemoThisEscape.this 引用還沒有構造完畢的對象,好比這樣 :
            System.out.println(DemoThisEscape.this.a);
        }
    }
}
複製代碼

經過使用線程池進行統一的線程調度,省去了在程序中手動啓動線程的步驟,從而避免了在構造器中啓動一個線程的狀況,所以可以有效規避 this 逃逸。

ThreadPoolExecutor經常使用參數

1. corePoolSize :核心線程線程數

定義了最小能夠同時運行的線程數量。

2. maximumPoolSize :最大線程數

當隊列中存放的任務達到隊列容量時,當前能夠同時運行的線程數量會擴大到最大線程數。

3. keepAliveTime :等待時間

當線程數大於核心線程數時,多餘的空閒線程存活的最長時間。

4. unit :時間單位。

keepAliveTime 參數的時間單位,包括 TimeUnit.SECONDSTimeUnit.MINUTESTimeUnit.HOURSTimeUnit.DAYS 等等。

5. workQueue :任務隊列

任務隊列,用來儲存等待執行任務的隊列。

6. threadFactory :線程工廠

線程工廠,用來建立線程,通常默認便可。

7. handler :拒絕策略

也稱飽和策略;當提交的任務過多而不能及時處理時,能夠經過定製策略來處理任務。

ThreadPoolExecutor 飽和策略 : 指當前同時運行的線程數量達到最大線程數量而且隊列也已經被放滿時,ThreadPoolTaskExecutor 所執行的策略。

經常使用的拒絕策略包括 :

  • ThreadPoolExecutor.AbortPolicy: 拋出 RejectedExecutionException 來拒絕新任務的處理,是 Spring 中使用的默認拒絕策略。
  • ThreadPoolExecutor.CallerRunsPolicy: 線程調用運行該任務的 execute 自己,也就是直接在調用 execute 方法的線程中運行 (run) 被拒絕的任務,若是執行程序已關閉,則會丟棄該任務。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度,但可能形成延遲。若應用程序能夠承受此延遲且不能丟棄任何一個任務請求,能夠選擇這個策略。
  • ThreadPoolExecutor.DiscardPolicy: 不處理新任務,直接丟棄掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略將丟棄最先的未處理的任務請求。

爲何推薦使用 ThreadPoolExecutor 來建立線程?

規約一 :線程資源必須經過線程池提供,不容許在應用中自行顯示建立線程。

使用線程池的好處是減小在建立和銷燬線程上所消耗的時間以及系統資源開銷,解決資源不足的問題。若是不使用線程池,有可能會形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。

規約二 :強制線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 構造函數的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。

Executors 返回線程池對象的弊端以下:

FixedThreadPoolSingleThreadExecutor : 容許請求的隊列長度爲 Integer.MAX_VALUE,可能會堆積大量請求,從而致使 OOM。

CachedThreadPoolScheduledThreadPool : 容許建立的線程數量爲 Integer.MAX_VALUE,可能會建立大量線程,從而致使 OOM。

幾種常見的線程池

FixThreadPool 固定線程池

FixThreadPool :可重用固定線程數的線程池。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(
            nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
    }
複製代碼

執行機制 :

  • 若當前運行的線程數小於 corePoolSize,來新任務時,就建立新的線程來執行任務;
  • 當前運行的線程數等於 corePoolSize 後,若是再來新任務的話,會將任務加到 LinkedBlockingQueue
  • 線程池中的線程執行完手頭的工做後,會在循環中反覆從 LinkedBlockingQueue 中獲取任務來執行。

FixThreadPool 使用的是無界隊列 LinkedBlockingQueue(隊列容量爲 Integer.MAX_VALUE),而它會給線程池帶來以下影響 :

  • 當線程池中的線程數達到 corePoolSize 後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過 corePoolSize
  • 因爲使用的是一個無界隊列,因此 maximumPoolSize 將是一個無效參數,由於不可能存在任務隊列滿的狀況,因此 FixedThreadPool 的 corePoolSizemaximumPoolSize 被設置爲同一個值,且 keepAliveTime 將是一個無效參數;
  • 運行中的 FixedThreadPool(指未執行 shutdown()shutdownNow() 的)不會拒絕任務,所以在任務較多的時候可能會致使 OOM。

SingleThreadExecutor 單一線程池

SingleThreadExecutor 是隻有一個線程的線程池。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(
                    1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    threadFactory));
}
複製代碼

除了池中只有一個線程外,其餘和 FixThreadPool 是基本一致的。

CachedThreadPool 緩存線程池

CachedThreadPool 是一個會根據須要建立新線程的線程池,但會在先前構建的線程可用時重用它。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(
            0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            threadFactory);
}
複製代碼

corePoolSize 被設置爲 0,maximumPoolSize 被設置爲 Integer.MAX.VALUE,也就是無界的。雖然是無界,但因爲該線程池還存在一個銷燬機制,即若是一個線程 60 秒內未被使用過,則該線程就會被銷燬,這樣就節省了不少資源。

可是,若是主線程提交任務的速度高於 maximunPool 中線程處理任務的速度,CachedThreadPool 將會源源不斷地建立新的線程,從而依然可能致使 CPU 耗盡或內存溢出。

執行機制 :

  • 首先執行 offer 操做,提交任務到任務隊列。若當前 maximumPool 中有空閒線程正在執行 poll 操做,且主線程的 offer 與空閒線程的 poll 配對成功時,主線程將把任務交給空閒線程執行,此時視做 execute() 方法執行完成;不然,將執行下面的步驟。
  • 當初始 maximum 爲空,或 maximumPool 中沒有空閒線程時,將沒有線程執行 poll 操做。此時,CachedThreadPool 會建立新線程執行任務,execute() 方法執行完成。

如何擬定線程池的大小?

上下文切換

多線程變編程中通常線程的個數都大於 CPU 核心的個數,而一個 CPU 核心在任意時刻只能被一個線程使用。爲了讓這些線程都能獲得有效執行,CPU 採起的策略是爲每一個線程分配時間片並輪轉的形式。當一個線程的時間片用完的時候就會從新處於就緒狀態讓給其餘線程使用,這個過程就屬於一次上下文切換。

歸納來講就是,當前任務在執行完 CPU 時間片切換到另外一個任務以前,會先保存本身的狀態,以便下次再切換回這個任務時,能夠直接加載到上次的狀態。任務從保存到再加載的過程就是一次上下文切換。

上下文切換一般是計算密集型的。也就是說,它須要至關可觀的處理器時間,在每秒幾十上百次的切換中,每次切換都須要納秒量級的時間。因此,上下文切換對系統來講意味着消耗大量的 CPU 時間,事實上,多是操做系統中時間消耗最大的操做。

Linux 相比與其餘操做系統(包括其餘類 Unix 系統)有許多,其中有一項就是,其上下文切換和模式切換的時間消耗很是少。

簡單的擬定判斷

CPU 密集型任務(N+1):

這種任務消耗的主要是 CPU 資源,能夠將線程數設置爲 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是爲了防止線程偶發的缺頁中斷,或者其它緣由致使的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閒狀態,而在這種狀況下多出來的一個線程就能夠充分利用 CPU 的空閒時間。

I/O 密集型任務(2N):

這種任務應用起來,系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會佔用 CPU 來處理,這時就能夠將 CPU 交出給其它線程使用。所以在 I/O 密集型任務的應用中,咱們能夠多配置一些線程,具體的計算方法是 2N。


參考文章 :JavaGuide

相關文章
相關標籤/搜索