線程池很容易理解的

  • 線程池介紹
  • 併發隊列
  • 線程池原理分析
  • 自定義線程池

文中部分代碼使用 lambda 表達式以簡化代碼。html

線程池

什麼是線程池?

Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。在開發過程當中,合理地使用線程池可以帶來3個好處。java

  • 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  • 提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  • 提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控

線程狀態

線程生命週期數組

新建狀態

當用new操做符建立一個線程時, 例如new Thread(r),線程尚未開始運行,此時線程處在新建狀態。緩存

new Thread(() -> System.out.println("run 任務執行"))

就緒狀態

一個新建立的線程並不自動開始運行,要執行線程,必須調用線程的start()方法。處於就緒狀態的線程並不必定當即運行run()方法,線程還必須同其餘線程競爭CPU時間,只有得到CPU時間才能夠運行線程。安全

new Thread(() -> System.out.println("run 任務執行")).start();

運行狀態

當線程得到CPU時間後,它才進入運行狀態,真正開始執行run()方法。也就是 System.out.println("run 任務執行")多線程

run 任務執行

阻塞狀態

線程運行過程當中,可能因爲各類緣由進入阻塞狀態 :併發

  • 線程執行了Thread.sleep(int n)方法,線程放棄CPU,睡眠n毫秒,而後恢復運行。框架

  • 線程要執行一段同步代碼,因爲沒法得到相關的同步鎖,只好進入阻塞狀態,等到得到了同步鎖,才能恢復運行。異步

  • 線程執行了一個對象的wait()方法,進入阻塞狀態,只有等到其餘線程執行了該對象的notify()或notifyAll()方法,纔可能將其喚醒。ide

更多緣由參考

死亡狀態

有兩個緣由會致使線程死亡:

  • run方法正常退出而天然死亡
  • 一個未捕獲的異常終止了run方法而使線程死亡

使用 isAlive() 方法可判斷線程在當前是否存活着

併發隊列

在併發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue爲表明的高性能隊列非阻塞隊列,一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queue

ConcurrentLinkedDeque 非阻塞式隊列

  • 無界線程安全隊列
  • 先進先出的原則
  • 不容許null元素

重要方法:

  • add() 和 offer() 都是加入元素的方法
  • poll() 和 peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會

適用於高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早
加入的,尾是最近加入的,該隊列不容許null元素

public static void main(String[] args) {
        // 非阻塞式用法 無界隊列 - 先進先出的原則
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();
        concurrentLinkedQueue.offer("第一個進隊列");
        concurrentLinkedQueue.offer("第二個進隊列");
        // 獲取單個隊列信息 peek 獲取隊列
        System.out.println(concurrentLinkedQueue.peek());
        System.out.println(concurrentLinkedQueue.size());
        // 獲取單個隊列信息 poll 獲取隊列以後 會刪除隊列信息 移除
        System.out.println(concurrentLinkedQueue.poll());
        System.out.println(concurrentLinkedQueue.size());
    }
第一個進隊列
2
第一個進隊列
1

BlockingQueue 阻塞式隊列

被阻塞的狀況主要有以下兩種:

  • 當隊列滿了的時候進行入隊列操做
  • 當隊列空了的時候進行出隊列操做

如下簡單介紹 BlockingQueue 成員

ArrayBlockingQueue

有邊界的阻塞隊列,它的內部實現是一個數組,先進先出原則。使用阻塞必須加加超時時間

public static void main(String[] args) throws InterruptedException {
        // 阻塞式隊列
        ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        // 沒有加超時時間都是爲非阻塞式
        arrayBlockingQueue.offer("張三");
        // 獲取單個隊列信息,而且會刪除該隊列
        System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS));
        System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS));
        System.out.println(arrayBlockingQueue.size());

    }

new ArrayBlockingQueue<>(3) 設定(隊列)有界大小3。
offer("張三") - 「張三「 進入隊列
第一個 poll("張三") 取出數據,「張三」從隊列中移除
第二個 poll("張三") ,此時隊列已經沒有數據了會阻塞等待3秒,若是隊列在這3秒內有數據入隊列會當即取出數據並結束阻塞,3秒阻塞超時就返回null

張三
null
0

以上示例是 當隊列空了的時候進行出隊列阻塞

public static void main(String[] args) throws InterruptedException {
        // 阻塞式隊列
        ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        // 沒有加超時時間都是爲非阻塞式
        arrayBlockingQueue.offer("張一", 3, TimeUnit.SECONDS);
        arrayBlockingQueue.offer("張二", 3, TimeUnit.SECONDS);
        arrayBlockingQueue.offer("張三", 3, TimeUnit.SECONDS);
        arrayBlockingQueue.offer("張四", 3, TimeUnit.SECONDS);
        // 獲取單個隊列信息,而且會刪除該隊列
        System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS));
        System.out.println(arrayBlockingQueue.size());

    }

new ArrayBlockingQueue<>(3) 設定(隊列)有界大小3。
offer("張一") - 「張一「 進入隊列
offer("張二") - 「張二「 進入隊列
offer("張三") - 「張三「 進入隊列 , 此時隊列已滿
offer("張四",3, TimeUnit.SECOND) 此時隊列已滿 , 這裏給了3秒阻塞時間等待數據出列纔有位置,期間若是有數據出列當即添加「張四」入隊列並結束阻塞,若是沒數據出列阻塞超時會廢棄當前數據「張四」

張一
2

以上示例是 當隊列滿了的時候進行入隊列阻塞

常見的阻塞式隊列以下:

  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

有興趣能夠去實踐下

ThreadPoolExecutor

  • Executor - execute 方法接口
  • Executors - 工廠類
  • ThreadPoolExecutor - Executor 框架的最頂層實現類

Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool 靜態方法其實也只是ThreadPoolExecutor的構造函數參數不一樣而已。經過傳入不一樣的參數,就能夠構造出適用於不一樣應用場景下的線程池,來看下構造的參數。

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {   
     ...   
}
  • corePoolSize
    線程池核心線程數,默認狀況下,核心線程會在線程池中一直存活,即便它們處於閒置狀態。若是ThreadPoolExecutor的allowCoreThreadTimeOut屬性設置爲true,那麼閒置的核心線程在等待新任務到來時會有超時策略,超過keepAliveTime 時間後,核心線程就會被終止。
  • maximumPoolSize
    線程池所能容納的最大線程數,當活動線程達到這個數值後,後續的新任務將被阻塞。
  • keepAliveTime
    非核心線程超過這個時長就會被回收。當ThreadPoolExecutor的allowCoreThreadTimeOut屬性設置爲true時,keepAliveTime一樣會做用於核心線程
  • unit
    keepAliveTime 參數的時間單位
  • workQueue
    執行前用於保持任務的隊列, 此隊列僅由 execute 方法提交的 Runnable 任務。

newCachedThreadPool

這裏用 可緩存的 newCachedThreadPool 線程池來分析

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
        0,
        Integer.MAX_VALUE,
        60L, 
        TimeUnit.SECONDS, 
        new SynchronousQueue<Runnable>());
    }

newCachedThreadPool的corePoolSize被設置爲0, maximumPoolSize被設置爲Integer.MAX_VALUE, 即maximum是無界的。這裏keepAliveTime設置爲60秒,意味着空閒的線程最多能夠等待任務60秒,不然將被回收。

newCachedThreadPool使用沒有容量的SynchronousQueue做爲主線程池的工做隊列,它是一個不存儲元素阻塞隊列。

線程池中的線程是被線程池緩存了的,也就是說,線程沒有任務要執行時,便處於空閒狀態,處於空閒狀態的線程並不會被當即銷燬(會被緩存住),只有當空閒時間超出一段時間(默認爲60s)後,線程池纔會銷燬該線程(至關於清除過期的緩存)。新任務到達後,線程池首先會讓被緩存住的線程(空閒狀態)去執行任務,若是沒有可用線程(無空閒線程),便會建立新的線程。

根據特性可知 :

  • 長時間不提交任務的CachedThreadPool不會佔用系統資源
  • 極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU資源及內存

代碼示例

public static void main(String[] args) {
        // 建立線程(四種方式) 1.可緩存、定長、定時、單例
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            final int temp = i;
            // execute方法做用: 執行任務
            newCachedThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + ",i:" + temp);
            });
        }
    }

newCachedThreadPool 線程池執行 100 次任務

pool-1-thread-3,i:2
pool-1-thread-1,i:0
pool-1-thread-2,i:1
pool-1-thread-4,i:3
pool-1-thread-5,i:4
pool-1-thread-7,i:6
pool-1-thread-6,i:5
pool-1-thread-9,i:8
pool-1-thread-10,i:9
pool-1-thread-8,i:7
pool-1-thread-11,i:10
pool-1-thread-1,i:21
pool-1-thread-3,i:20

能夠看到線程 pool-1-thread-三、pool-1-thread-1 被重複使用了

newFixedThreadPool

定長的 newFixedThreadPool 線程池

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(
          nThreads, nThreads,
          0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
}

能夠看到 newFixedThreadPool 的核心線程和最大的線程都是同樣的,最多3個線程將處於活動狀態。若是提交了3個以上的線程,那麼它們將保持在隊列中,直到線程可用。

代碼示例

public static void main(String[] args) {
        // 可固定長度的線程池 核心線程數 爲3 最多建立3個線程
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            // execute方法做用: 執行任務
            newFixedThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + ",i:" + temp);
            });
        }
    }
pool-1-thread-3,i:2
pool-1-thread-2,i:1
pool-1-thread-1,i:0
pool-1-thread-3,i:3
pool-1-thread-2,i:5
pool-1-thread-3,i:6
pool-1-thread-1,i:4
pool-1-thread-3,i:8
pool-1-thread-2,i:7
pool-1-thread-1,i:9

能夠看到始終只有pool-1-thread-一、pool-1-thread-二、pool-1-thread-3 三個線程

newScheduledThreadPool

可定時的 newScheduledThreadPool 線程池

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

newScheduledThreadPool 的核心線程爲 傳入的corePoolSize,最大線程爲Integer.MAX_VALUE , DelayedWorkQueue隊列實現BlockingQueue接口,因此使用阻塞式的BlockingQueue隊列。

代碼示例

public static void main(String[] args) {
        // 可定時線程池 3核心線程數
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(3);
        newScheduledThreadPool.scheduleAtFixedRate(
                    // Runnable 任務
                () -> System.out.println(Thread.currentThread().getName() + "run "), 
                1000,  // 初次執行需等待時間
                100, TimeUnit.MILLISECONDS);  // 週期執行時間(3個線程搶cpu時間)

    }
pool-1-thread-1run 
pool-1-thread-2run 
pool-1-thread-3run 
pool-1-thread-3run 
pool-1-thread-2run

每 100 毫秒執行一次

newSingleThreadExecutor

單例的 newSingleThreadExecutor 線程池

代碼示例

public static void main(String[] args) {
        // 單線線程
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            // execute方法做用: 執行任務
            newSingleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ",i:" + temp));
        }
    }
pool-1-thread-1,i:0
pool-1-thread-1,i:1
pool-1-thread-1,i:2
pool-1-thread-1,i:3
pool-1-thread-1,i:4
pool-1-thread-1,i:5
pool-1-thread-1,i:6
pool-1-thread-1,i:7
pool-1-thread-1,i:8
pool-1-thread-1,i:9

始終只有一個線程執行任務

線程池原理剖析

提交一個任務到線程池中,線程池的處理流程以下:

  • 1 .判斷線程池裏的核心線程是否都在執行任務,若是不是(核心線程空閒或者還有核心線程沒有被建立)則建立一個新的工做線程來執行任務。若是核心線程都在執行任務,則進入下個流程。

  • 2 .線程池判斷工做隊列是否已滿,若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。

  • 3 . 判斷線程池裏的線程是否都處於工做狀態,若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。

  • 給你們推薦一個交流學習qq羣:698581634 進羣便可免費獲取資料。

自定義線程線程池

上面列舉到 newCachedThreadPool、newFixedThreadPool ... 底層都是對ThreadPoolExecutor的構造器進行包裝使用。在來看下

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

因此我們若是要實現一個自定義線程池就 直接 newThreadPoolExecutor(...) 就就行,如今來自定義一個。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4));

這裏自定義線程池 :核心線程1個,最大建立2個線程,60s等待超時銷燬,使用阻塞式有界ArrayBlockingQueue 隊列,最多緩存4個任務。

public class CustomThread {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4));
        for (int i = 1; i <= 6; i++) {
            TaskThred t1 = new TaskThred("任務" + i);
            executor.execute(t1);
        }
        executor.shutdown();
    }
}

class TaskThred implements Runnable {
    private String taskName;

    public TaskThred(String taskName) {
        this.taskName = taskName;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + taskName);
    }
}

execute 執行6個任務

pool-1-thread-1任務1
pool-1-thread-2任務6
pool-1-thread-2任務2
pool-1-thread-1任務4
pool-1-thread-1任務5
pool-1-thread-2任務3

相關的異常信息

RejectedExecutionException
緣由:

  • 提交任務量大, 隊列緩存較小
  • 確保不要在shutdown()以後在執行任務
java.util.concurrent.RejectedExecutionException: Task com.snail.demo.TaskThred@74a14482 rejected from java.util.concurrent.ThreadPoolExecutor@1540e19d[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)

要出現這個異常只需將上邊 new ArrayBlockingQueue<>(4) 大小 4 改成 2,以下

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));

原文連接:https://www.jianshu.com/p/3a3b00bb6ab9?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

相關文章
相關標籤/搜索