Java 中提供的線程池 Api
Executors 裏面提供了幾個線程池的工廠方法,使用Executors 的工廠方法,就能夠使用線程池:
newFixedThreadPool:該方法返回一個固定數量的線程池,線程數不變,當有一個任務提交時,若線程池中空閒,則當即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閒的線程去執行。
newSingleThreadExecutor: 建立一個線程的線程池,若空閒則執行,若沒有空閒線程則暫緩在任務隊列中。
newCachedThreadPool:返回一個可根據實際狀況調整線程個數的線程池,不限制最大線程數量,若用空閒的線程則執行任務,若無任務則不建立線程。而且每個空閒線程會在 60 秒後自動回收
newScheduledThreadPool: 建立一個能夠指定線程的數量的線程池,可是這個線程池還帶有延遲和週期性執行任務的功能,相似定時器。數組
public ThreadPoolExecutor (int corePoolSize, //核心線程數量 int maximumPoolSize, //最大線程數 long keepAliveTime, // 超時時間,超出核心線程數量之外的線程空餘存活時間 TimeUnit unit, // 存活時間單位 BlockingQueue<Runnable> workQueue, // 保存執行任務的隊列 ThreadFactory threadFactory, // 建立新線程使用的工廠 RejectedExecutionHandler handler // 當任務沒法執行的時候的處理方式)
線程池原理分析(FixedThreadPool)
拒絕策略
當核心線程數等於設置的核心線程數,任務隊列已滿,非核心線程數已經等於達到最大線程數量,拒絕任務。緩存
- AbortPolicy:直接拋出異常,默認策略;
- CallerRunsPolicy:用調用者所在的線程來執行任務;
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
- DiscardPolicy:直接丟棄任務;
- 根據應用場景實現 RejectedExecutionHandler 接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務
線程池的大小
CPU 密集型,主要是執行計算任務,響應時間很快,cpu 一直在運行,這種任務 cpu的利用率很高,那麼線程數的配置應該根據 CPU 核心數來決定,CPU 核心數=最大同時執行線程數,假如 CPU 核心數爲 4,那麼服務器最多能同時執行 4 個線程。過多的線程會致使上下文切換反而使得效率下降。
線程池的最大線程數能夠配置爲 cpu 核心數+1 IO 密集型,主要是進行 IO 操做,執行 IO 操做的時間較長,這時 cpu 出於空閒狀態,致使 cpu 的利用率不高,這種狀況下能夠增長線程池的大小。這種狀況下能夠結合線程的等待時長來作判斷,等待時間越高,那麼線程數也相對越多。通常能夠配置 cpu 核心數的 2 倍。
公式:線程池設定最佳線程數目 = ((線程池設定的線程等待時間+線程 CPU 時間)/線程 CPU 時間 )* CPU 數目
這個公式的線程 cpu 時間是預估的程序單個線程在 cpu 上運行的時間服務器
線程初始化
默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:
prestartCoreThread():初始化一個核心線程;
prestartAllCoreThreads():初始化全部核心線程
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();函數
線程池的關閉
ThreadPoolExecutor 提供了兩個方法,用於線程池的關閉, 分別是shutdown()和shutdownNow() shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務 shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務線程
線程池容量的動態調整
ThreadPoolExecutor 提 供 了 動 態 調 整 線 程 池 容 量 大 小 的 方 法 : setCorePoolSize() 和setMaximumPoolSize()
setCorePoolSize:設置核心池大小
setMaximumPoolSize:設置線程池最大能建立的線程數目大小
任務緩存隊列及排隊策略
任務緩存隊列,即 workQueue,用來存放等待執行的任務。
workQueue 的類型爲 BlockingQueue,一般能夠取下面三種類型:rest
- ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
- LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲 Integer.MAX_VALUE;
- SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
線程池的監控
線程池提供了相應的擴展方法,咱們經過重寫線程池的 beforeExecute、afterExecute 和 shutdown 等方式就能夠實現對線程的監控日誌
Callable/Future
線程池的執行任務有兩種方法,一種是 submit、一種是 execute;code
execute 和 submit 區別
execute | submit |
---|---|
只能夠接收一個 Runnable 的參數 | 能夠接收 Runable 和 Callable 這兩種類型的參數 |
execute 若是出現異常會拋出 | 調用不會拋異常,除非調用 Future.get |
execute 沒有返回值 | 若是傳入一個 Callable,能夠獲得一個 Future 的返回值 |
Callable/Future 原理分析
Callable 是一個函數式接口,裏面就只有一個 call 方法。 Future 提供了get() get 方法就是阻塞獲取線程執行結果,這裏主要作了兩個事情blog
- 判斷當前的狀態,若是狀態小於等於 COMPLETING,表示 FutureTask 任務尚未完結,因此調用 awaitDone 方法,讓當前線程等待。
- report 返回結果值或者拋出異常
public V get() throws InterruptedException, ExecutionException { int s = state; if ( s <= COMPLETING ) s = awaitDone( false, 0L ); return(report( s ) ); }
awaitDone 若是當前的結果尚未被執行完,把當前線程線程和插入到等待隊列接口
run 方法執行完成以後喚醒阻塞線程,響應結果
AbstractExecutorService.submit
調用抽象類中的 submit 方法,這裏其實相對於 execute 方法來講,只多作了一步操做,就是封裝了一個 RunnableFuture
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
ThreadpoolExecutor.execute
調用 execute 方法,經過 worker 線程來調用過 ftask 的run 方法。而這個 ftask 其實就是 FutureTask 裏面最終實現的邏輯