【知識總結】Java線程池

引出

獲取多線程的方法,咱們都知道有三種,還有一種是實現Callable接口java

  • 實現Runnable接口
  • 實例化Thread類
  • 實現Callable接口
  • 使用線程池獲取

Callable接口

Runnable和Callable的區別

  1. Runnable接口沒有返回值,Callable接口有返回值
  2. Runnable接口不會拋異常,Callable接口能夠拋異常
  3. 接口的方法不同,一個run方法,一個call方法
  4. Callable方法支持泛型

Callable接口實現多線程

  • Callable接口,是一種讓線程執行完成後,可以返回結果的

當咱們實現Ruannable接口的時候,須要重寫run方法,也就是線程啓動的時候,會自動調用的方法。算法

同理,咱們實現Callable接口,也須要實現call方法,可是這個時候咱們還須要有返回值。這個Callable接口的應用場景通常在於批處理業務,好比轉帳的時候,須要給返回結果的狀態碼回來,表明本次操做成功仍是失敗。數據庫

public class MyThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("進入了Callable方法");
        return 1024;
    }
}

而後經過Thread線程,將MyThread實現Callable接口的類包裝起來。小程序

這裏須要用到FutureTask類,他實現了Runnable接口,類的構造函數須要傳遞一個實現Callable接口的類緩存

public static void main(String[] args){
    // 一、實例化FutureTask類,傳進去一個實現了Callable的類
    FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());

    // 二、而後在用Thread進行實例化,傳入實現Runnable接口的FutureTask的類
    Thread t1 = new Thread(futureTask, "A線程");
    t1.start();
    
    // 三、最後經過 futureTask.get() 獲取到返回值
    System.out.println("FutureTask的返回值爲:" + futureTask.get());
}

原來咱們的方式是一個main方法冰糖葫蘆似的串下去,引入Callable後,對於執行比較久的線程,能夠單獨新開一個線程進行執行,最後再進行彙總輸出服務器

若是咱們用get()獲取Callable的計算結果,可是若是並無計算完成,會致使阻塞,直到計算完成爲止。也就是說,futureTask.get() 須要放在最後執行,這樣不會致使主線程阻塞。網絡

//也可使用下面算法,使用相似自旋鎖的方式來進行判斷是否運行完畢
while(!futureTask.isDone()){
    
}

注意

多個線程執行一個FutureTask對象的時候,只會計算一次。多線程

FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());

// 開啓兩個線程計算futureTask
new Thread(futureTask, "A線程").start();
new Thread(futureTask, "B線程").start();

若是咱們須要兩個線程同時計算任務的話,那麼須要定義兩個FutureTask對象架構

FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread);
FutureTask<Integer> futureTask2 = new FutureTask<>(new MyThread);

// 開啓兩個線程計算futureTask
new Thread(futureTask1, "A線程").start();
new Thread(futureTask2, "B線程").start();

ThreadPoolExecutor

爲何要用線程池?

線程池作的主要工做就是控制運行的線程的數量,處理過程當中,將任務放入到阻塞隊列中,而後線程建立後,啓動這些任務,若是線程數量超過了最大數量的線程排隊等候,等其它線程執行完畢,再從隊列中取出任務來執行。併發

它的主要特色爲:線程複用、控制最大併發數、管理線程

線程池的好處

  1. 下降資源消耗。經過重複利用已建立的線程,下降線程建立和銷燬形成的消耗。
  2. 提升響應速度。當任務到達時,任務能夠不須要等到線程建立就當即執行。
  3. 提升線程的可管理性。使用線程池能夠進行統一的分配,調優和監控。

線程池的實現原理

說白了就是一個線程集合(workerSet)和一個阻塞隊列(workQueue)。當用戶向線程池提交一個任務時,線程池會先將任務放入阻塞隊列中。線程集合中的線程會不斷的從阻塞隊列中獲取任務執行,當阻塞隊列中沒有任務的時候,就會阻塞,直到隊列中有任務了就取出來繼續執行。

任務:客戶。 線程集合:辦理窗口。 阻塞隊列:候客區。

架構說明

Java中線程池是經過Executor框架實現的,該框架中用到了ExecutorExecutors(輔助工具類),ExecutorServiceThreadPoolExecutor這幾個類。

建立線程池

  • 咱們經過Executors工具類來建立線程池
一、FixedThreadPool(固定線程的線程池)
ExecutorService threadPool = Executors.newFixedThreadPool(5);
  1. 執行長期的任務,性能好不少
  2. 可控制線程數最大併發數,超出的線程會在隊列中等待
  3. 使用場景:執行長期的任務
二、SingleThreadExecutor(一個線程的 單線程池)
ExecutorService threadPool = Executors.newSingleThreadExecutor();
  1. 一個任務一個任務執行的場景
  2. 用惟一的工做線程來執行任務,保證全部任務按照指定順序執行
  3. 執行場景:一個任務一個任務執行的場景
三、newCacheThreadPool(可擴容的線程池)
ExecutorService threadPool = Executors.newCacheThreadPool();
  1. 執行不少短時間異步的小程序或者負載較輕的服務器
  2. 線程長度超過處理須要,可靈活回收空閒線程,如無可回收,則新建新線程
  3. 執行不少短時間異步的小程序或者負載較輕的服務器
代碼演示
  • 模擬10個用戶來辦理業務,每一個用戶就是一個來自外部請求線程
ExecutorService threadPool = Executors.newFixedThreadPool(5);
    // 模擬10個用戶來辦理業務,每一個用戶就是一個來自外部請求線程
    try {
        // 循環十次,模擬業務辦理,讓5個線程處理這10個請求
        for (int i = 0; i < 10; i++) {
            final int tempInt = i;

            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 給用戶:" + tempInt + "辦理業務");
            });
        }

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        threadPool.shutdown(); // 用池化技術,必定要記得關閉
    }

結果:

咱們可以看到,一共有5個線程,在給10個用戶辦理業務。

底層實現

咱們經過查看源碼,發現底層都是使用了ThreadPoolExecutor

爲何單線程池和固定線程池使用的任務阻塞隊列是LinkedBlockingQueue(),而緩存線程池使用的是SynchronousQueue()呢?

由於在單線程池和固定線程池中,線程數量是有限的,所以提交的任務須要在隊列中等待空閒的線程。

而在緩存線程池中,線程數量幾乎無限,所以提交的任務在 SynchronousQueue 隊列中同步給空餘線程便可。

Tips:SynchronousQueue是一個沒有存儲空間的隊列,生產者進行 put 操做時,必需要等待消費者的 take 操做。

線程池的重要參數

線程池在建立的時候,一共有7大參數:

  • corePoolSize:核心線程數,線程池中的常駐核心線程數
  • maximumPoolSize:線程池可以容納同時執行的最大線程數,此值必須大於等於1
  • keepAliveTime:多餘的空閒線程存活時間
  • unit:keepAliveTime的單位
  • workQueue:任務隊列,被提交的但未被執行的任務(相似於銀行裏面的候客區)

    • LinkedBlockingQueue:鏈表阻塞隊列
    • SynchronousBlockingQueue:同步阻塞隊列
  • threadFactory:表示生成線程池中工做線程的線程工廠,用於建立線程池 通常用默認便可
  • handler:拒絕策略,表示當隊列滿了而且工做線程大於線程池的最大線程數(maximumPoolSize)時,如何來拒絕請求執行的Runnable的策略

線程池底層工做原理

整個線程池的工做就像銀行辦理業務同樣。

  1. 最開始假設來了兩個顧客,由於corePoolSize爲2,所以這兩個顧客直接可以去窗口辦理。
  2. 後面又來了三個顧客,由於corePool已經被顧客佔用了,所以只有去候客區,也就是阻塞隊列中等待
  3. 後面人愈來愈多,候客區可能不夠用了,這時須要申請增長處理請求的窗口,假如maximumPoolSize爲5,就會建立這3個非核心線程運行這個任務。
  4. 假設受理窗口已經達到最大數,而且請求數仍是不斷遞增,此時候客區和線程池都已經滿了,爲了防止大量請求沖垮線程池,已經須要開啓拒絕策略
  5. 臨時增長的線程若是超過了最大存活時間,就會銷燬,最後從最大數縮容到核心線程數
ps:臨時增長的業務窗口,會先處理那些後面來的,沒位置坐的客戶。(候客區客戶os:憑什麼= =)

四種拒絕策略的解析

如下全部拒絕策略都實現了RejectedExecutionHandler接口

  • AbortPolicy(默認):直接拋出RejectedExcutionException異常,阻止系統正常運行
  • DiscardPolicy:直接丟棄任務,不予任何處理也不拋出異常,若是運行任務丟失,這是一種好方案
  • CallerRunsPolicy:用調用者所在的線程處理任務,可以減緩新任務的提交速度。
  • DiscardOldestPolicy:丟棄最老的任務,執行當前任務。

爲何不用默認建立的線程池?

工做中應該用哪個方法來建立線程池呢?答案是一個都不用,咱們生產上只能使用自定義的。

阿里巴巴開發手冊:【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。

弊端以下:

  1. FixedThreadPool 和 SingleThreadPool 使用了無界隊列(LinkedBlockingQueue),可能會堆積大量的請求,從而致使OOM
  2. CacheThreadPool 和 ScheduledThreadPool 的最大線程爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM

手寫線程池

採用默認拒絕策略(AbortPolicy)
ExecutorService threadPool = new ThreadPoolExecutor(
                                2, 
                                5, 
                                1L, 
                                TimeUnit.SECONDS,
                                new LinkedBlockingQueue<Runnable>(3),//候客區3個座位
                                Executors.defaultThreadFactory(),
                                new ThreadPoolExecutor.AbortPolicy());
//而後使用for循環,模擬10個用戶來進行請求
for (int i = 0; i < 10; i++) {
    
    final int tempInt = i;
    threadPool.execute(() -> {
        System.out.println(Thread.currentThread().getName()+" 給用戶:"+tempInt+"辦理業務");
    });
    
}

可是用戶執行到第九個的時候,觸發了異常,程序中斷。

這是由於觸發了AbortPolicy的拒絕策略:直接報異常。

觸發條件是,請求的線程大於 阻塞隊列大小 + 最大線程數 = 8的時候,也就是說第9個線程來獲取線程池中的線程時,就會拋出異常。

採用CallerRunsPolicy拒絕策略

也稱爲回退策略,就是用調用者所在的線程處理任務。

咱們看運行結果:

咱們發現,輸出的結果裏面出現了main線程,由於線程池觸發了拒絕策略,把任務回退到main線程,而後main線程對任務進行處理。

DiscardPolicy拒絕策略、DiscardOldestPolicy拒絕策略

這兩種策略都是把任務丟棄。

前者丟棄的是,進來排隊排不上的任務。

後者丟棄的是當前隊列中最老的任務,即排隊下一個就到你了,可是由於有人進來,致使你被丟棄了(爲何這麼慘?)。處理邏輯以下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 if (!e.isShutdown()) {    
     e.getQueue().poll();        //用 poll() 移除隊列中的首個元素
     e.execute(r);                //執行當前任務
 }
}

線程池的合理參數

生產環境中如何配置 corePoolSize 和 maximumPoolSize?。這個是根據具體業務來配置的,分爲CPU密集型和IO密集型。

//能夠看CPU是幾核的
Runtime.getRuntime().availableProcessors();
  • CPU密集型

CPU密集型也叫計算密集型,指的是系統的硬盤、內存性能比CPU要好,CPU的IO操做很快,可是CPU還有不少運算要處理,致使系統的CPU大部分都是 100%。

須要儘量少的線程數量,通常爲:CPU核數 + 1

  • IO密集型

即該任務須要大量的IO,即大量的阻塞。IO包括:數據庫交互,文件上傳下載,網絡傳輸等

IO密集型指的是系統的CPU性能相對硬盤、內存要好不少。此時大部分的情況是CPU在等IO操做,則應配置儘量多的線程,如 CPU核數 * 2

參考公式:CPU核數 / (1 - 阻塞系統)。

通常阻塞係數是0.9,好比 8 核CPU,應該配置 8 / 0.1 = 80 個線程數

一個線程池中的線程異常了,那麼線程池會怎麼處理這個線程?

若是執行方式是 execute 時,會看到堆棧異常的輸出。

當執行方式是 submit 時,堆棧異常沒有輸出。而且調用 Future.get() 方法時,能夠捕獲到異常。不會影響線程池裏面其餘線程的正常執行,線程池會把這個異常的線程移除掉,並建立一個新的線程放入線程池中。?

相關文章
相關標籤/搜索