併發編程--線程池

Java 中提供的線程池 Api

Executors 裏面提供了幾個線程池的工廠方法,使用Executors 的工廠方法,就能夠使用線程池:
newFixedThreadPool:該方法返回一個固定數量的線程池,線程數不變,當有一個任務提交時,若線程池中空閒,則當即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閒的線程去執行。
newSingleThreadExecutor: 建立一個線程的線程池,若空閒則執行,若沒有空閒線程則暫緩在任務隊列中。
newCachedThreadPool:返回一個可根據實際狀況調整線程個數的線程池,不限制最大線程數量,若用空閒的線程則執行任務,若無任務則不建立線程。而且每個空閒線程會在 60 秒後自動回收
newScheduledThreadPool: 建立一個能夠指定線程的數量的線程池,可是這個線程池還帶有延遲和週期性執行任務的功能,相似定時器。數組

public ThreadPoolExecutor (int corePoolSize,                    //核心線程數量 
			   int maximumPoolSize,                 //最大線程數  
			   long keepAliveTime,                  // 超時時間,超出核心線程數量之外的線程空餘存活時間  
			   TimeUnit unit,                       // 存活時間單位  
			   BlockingQueue<Runnable> workQueue,   // 保存執行任務的隊列  
			   ThreadFactory threadFactory,         // 建立新線程使用的工廠  
			   RejectedExecutionHandler handler     // 當任務沒法執行的時候的處理方式)

線程池原理分析(FixedThreadPool)

拒絕策略

當核心線程數等於設置的核心線程數,任務隊列已滿,非核心線程數已經等於達到最大線程數量,拒絕任務。緩存

  1. AbortPolicy:直接拋出異常,默認策略;
  2. CallerRunsPolicy:用調用者所在的線程來執行任務;
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  4. DiscardPolicy:直接丟棄任務;
  5. 根據應用場景實現 RejectedExecutionHandler 接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務

線程池的大小

CPU 密集型,主要是執行計算任務,響應時間很快,cpu 一直在運行,這種任務 cpu的利用率很高,那麼線程數的配置應該根據 CPU 核心數來決定,CPU 核心數=最大同時執行線程數,假如 CPU 核心數爲 4,那麼服務器最多能同時執行 4 個線程。過多的線程會致使上下文切換反而使得效率下降。
線程池的最大線程數能夠配置爲 cpu 核心數+1 IO 密集型,主要是進行 IO 操做,執行 IO 操做的時間較長,這時 cpu 出於空閒狀態,致使 cpu 的利用率不高,這種狀況下能夠增長線程池的大小。這種狀況下能夠結合線程的等待時長來作判斷,等待時間越高,那麼線程數也相對越多。通常能夠配置 cpu 核心數的 2 倍。
公式:線程池設定最佳線程數目 = ((線程池設定的線程等待時間+線程 CPU 時間)/線程 CPU 時間 )* CPU 數目
這個公式的線程 cpu 時間是預估的程序單個線程在 cpu 上運行的時間服務器

線程初始化

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:
prestartCoreThread():初始化一個核心線程;
prestartAllCoreThreads():初始化全部核心線程
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();函數

線程池的關閉

ThreadPoolExecutor 提供了兩個方法,用於線程池的關閉, 分別是shutdown()和shutdownNow() shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務 shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務線程

線程池容量的動態調整

ThreadPoolExecutor 提 供 了 動 態 調 整 線 程 池 容 量 大 小 的 方 法 : setCorePoolSize() 和setMaximumPoolSize()
setCorePoolSize:設置核心池大小
setMaximumPoolSize:設置線程池最大能建立的線程數目大小
任務緩存隊列及排隊策略
任務緩存隊列,即 workQueue,用來存放等待執行的任務。
workQueue 的類型爲 BlockingQueue,一般能夠取下面三種類型:rest

  1. ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
  2. LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲 Integer.MAX_VALUE;
  3. SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

線程池的監控

線程池提供了相應的擴展方法,咱們經過重寫線程池的 beforeExecute、afterExecute 和 shutdown 等方式就能夠實現對線程的監控日誌

Callable/Future

線程池的執行任務有兩種方法,一種是 submit、一種是 execute;code

execute 和 submit 區別

execute submit
只能夠接收一個 Runnable 的參數 能夠接收 Runable 和 Callable 這兩種類型的參數
execute 若是出現異常會拋出 調用不會拋異常,除非調用 Future.get
execute 沒有返回值 若是傳入一個 Callable,能夠獲得一個 Future 的返回值

Callable/Future 原理分析

Callable 是一個函數式接口,裏面就只有一個 call 方法。 Future 提供了get() get 方法就是阻塞獲取線程執行結果,這裏主要作了兩個事情blog

  1. 判斷當前的狀態,若是狀態小於等於 COMPLETING,表示 FutureTask 任務尚未完結,因此調用 awaitDone 方法,讓當前線程等待。
  2. report 返回結果值或者拋出異常
public V get() throws InterruptedException, ExecutionException
{
	int s = state;
	if ( s <= COMPLETING )
		s = awaitDone( false, 0L );
	return(report( s ) );
}

awaitDone 若是當前的結果尚未被執行完,把當前線程線程和插入到等待隊列接口

run 方法執行完成以後喚醒阻塞線程,響應結果

AbstractExecutorService.submit

調用抽象類中的 submit 方法,這裏其實相對於 execute 方法來講,只多作了一步操做,就是封裝了一個 RunnableFuture

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 RunnableFuture<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

ThreadpoolExecutor.execute

調用 execute 方法,經過 worker 線程來調用過 ftask 的run 方法。而這個 ftask 其實就是 FutureTask 裏面最終實現的邏輯

相關文章
相關標籤/搜索