線程池參數原理及應用

線程池原理

    Java建立一個線程很方便,只需new Thread()就能夠, 可是當有多個任務須要進行進行處理時,頻繁的進行建立和啓用線程一樣須要系統開銷,也不利於管理,因而同mysql的鏈接池同樣,天然有對線程的管理池即線程池。java

    作個比喻,線程池比如一個公司,那麼線程自己就是一個個的員工,來對線程的建立和銷燬進行管理,最大化的進行資源的合理調度。mysql

    Java的線程池建立也很簡單,concurrent這個併發包下有Executors能夠很方便的進行四種經常使用線程的建立:spring

    newFixedThreadPool:建立固定數量的線程的線程池,能夠控制最大併發數,經常使用於知道具體任務的數量,須要進行多線程的操做,如批量插入數據庫任務,須要進行10萬條數據分頁,每1萬條數據一頁,配置一個線程處理,一共配置10個線程,進行並行批量插入,就可使用這個線程池來進行,大大減小響應時間sql

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    newCachedThreadPool: 建立可一段時間內重複利用的線程池,經常使用於不知道具體的任務數量,可是還須要進行並行處理的狀況,如springboot @Aysnc就能夠指定使用這個線程池,來進行一些埋點等的各類業務的異步處理數據庫

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    newSingleThreadExecutor: 建立單個線程的線程池,這個線程池能夠在線程死後(或發生異常時)從新啓動一個線程來替代原來的線程繼續執行下去!編程

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    newScheduledThreadPool: 建立一個能夠定時和重複執行的線程池,經常使用於定時任務和延時任務的執行線程池數組

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    固然線程池還能夠自定義,Java只是提供了幾種經常使用的靜態線程池的建立方法,以上也已經將4種線程池的建立源碼顯示出來了,能夠發現線程池的建立都是經過new ThreadPoolExecutor()來實現的,如今主要介紹下幾個重要的參數和接口:緩存

    首先ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,AbstractExecutorService又實現了ExecutorService接口,ExecutorService接口繼承了只有一個方法execute的Executor。springboot

   下面解釋下一下構造器中各個參數的含義:多線程

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個很是重要的參數,它表示在線程池中最多能建立多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;
  • unit:參數keepAliveTime的時間單位,有7種取值,分別表明一種時間的單位,秒,分,小時等:
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇:  
ArrayBlockingQueue; 有界阻塞隊列,由數組實現,須要指定數組大小
LinkedBlockingQueue; 無界阻塞隊列,由鏈表實現,最大值是Integer的最大值 
SynchronousQueue; 這個隊列不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
  • threadFactory:線程工廠,主要用來建立線程;
  • handler:表示當拒絕處理任務時的策略,有如下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

    其中注意這幾個參數都是volatile修飾的,用來保證多線程下的可見性,咱們也能夠根據這些參數的不一樣配置,來產生咱們須要的線程池。

    有了線程池後,咱們須要關注幾個線程池的狀態:

        

    下圖代表幾個狀態之間的轉化關係:

        

    接下來就是舉個栗子來代表如何使用:

    ExecutorService executorService = Executors.newFixedThreadPool(15);

    在執行完上述代碼後,咱們其實就建立了一個有15個核心線程數量,最大也是15個線程數量,空閒線程保存時間爲1分鐘,採用無限阻塞隊列,任務拒絕採用AbortPolicy:丟棄任務並拋出RejectedExecutionException異常的線程池。在建立後,並無進行活躍的線程工人產生,可用線程數爲0,好比接下來有10個任務進來,就會建立10個線程工人來進行工做,而且工做完不會銷燬,以後又來了10個任務,以前的10個線程尚未處理完他們本身的任務,這個時候就又會建立5個線程工人來進行任務的處理,有小夥伴有疑問了,那剩下的5個任務怎麼辦呢,對了,還有阻塞隊列,這些沒有工人處理的任務會進入待辦事項般的阻塞隊列,先進先出,待15個工人將手頭的活辦完以後進行依次處理,由於阻塞隊列是無界阻塞隊列,所以,任務會不斷的丟到這個隊列中,因此,並不會建立由於隊列過小,而不得已建立幾個個臨時工來處理,這個幾個數量即在最大線程和核心線程之間的差值數量,這些臨時線程的有效時間只有keepAliveTime的時間,此外在來了多個任務以後,若是隊列是有界的,且任務數超過了最大可以建立的線程數,即工人不能再招了,待辦事項列表也滿了,這個時候公司舊不幹了,拋出異常,任務拒絕策略。

    接下了是實戰,結合CompletableFuture進行展現:

    簡單介紹下CompletableFuture:CompletableFuture提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,而且提供了函數式編程的能力,能夠經過回調的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法,結合線程池能夠達到併發編程的目的    

package cn.chinotan;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.*;

/**
 * @program: test
 * @description: 多線程測試
 * @author: xingcheng
 * @create: 2019-03-23 17:27
 **/
@Slf4j
public class ExecutorTest {

    @Test
    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(15);

        CompletableFuture[] completableFutures = new CompletableFuture[15];
        List<Integer> integers = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 15; i++) {
            int finalI = i;
            CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> costMethod(finalI), executorService)
                    .whenComplete((r, e) -> {
                        if (null != e) {
                           e.printStackTrace(); 
                        } else {
                            integers.add(r);
                        }
                    });

            completableFutures[i] = integerCompletableFuture;
        }

        CompletableFuture.allOf(completableFutures).join();
        long count = integers.stream().count();
        log.info("一共處理成功:{}", count);
    }

    /**
     * 耗時的操做
     *
     * @param i
     * @return
     */
    public int costMethod(int i) {
        try {
            TimeUnit.SECONDS.sleep(5);
            log.info("耗時的操做 {}", i);
            return 1;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return 0;
        }
    }
}

    運行結果:

    能夠看到15個耗時的操做很快就並行執行完成,而且還能返回執行的成功結果數

    以上就是我對線程池的理解和應用,歡迎你們關注和瀏覽提問,謝謝你們

相關文章
相關標籤/搜索