Java併發編程基礎(五)之Executor

線程池與Executor框架

線程池的做用

線程池提供了一種限制和管理資源(包括執行一個任務)。每一個線程池還維護一些基本統計信息,例如完成任務的數量。整體上講,他有如下好處:編程

  • 下降資源消耗:經過反覆利用已經建立的線程下降線程建立和銷燬形成的額消耗.小程序

  • 提升響應速度:當任務到達時,任五能夠不須要的等到線程建立就能當即執行。服務器

  • 提升線程的可管理性:線程時稀缺資源,若是無限的建立,會形成計算機資源的浪費,還會下降系統的穩定性,使用線程池能夠實現統一的管理分配。併發

Executor框架

Executor框架是Java5引入的,使用Executor能夠更好的使用線程,使用效率更高,除此以外還有助於避免this逃逸(指構造函數在返回該對象引用以前就被其餘的線程持有)框架

Executor的組成部分

任務

執行的任務須要實現Runnble接口或者Callable接口(Runnable接口不會返回結果可是Callable接口能夠返回結果)。Runnable接口或者Callable接口實現類能夠被ThreadPoolExector或者ScheduledThreadPoolExecutor自行異步

任務執行

經過上面的圖能夠知道任務的執行的核心是Executor和繼承Executor接口的ExecutorService接口,ScheduledThreadPoolExecutor和ThreadPoolExecutor這兩個關鍵類實現了ExecutorService接口.函數

注意:經過查看ScheduledThreadPoolExecutor源碼能夠發現ScheduledThreadPoolExecutor其實是繼承了ThreadPoolExeecutor並實現了ScheduledExecutorService,而ScheduledExecutorService又實現了ExecutorService。

結果返回

Future接口以及Future接口的實現類FutureTask類。當咱們把Runnable接口或者Callable接口的實現類提交(調用usubmit方法)給ThreadPoolExector時,會返回一個FutureTask對象.工具

public <T> Future<T> submit(Runnable task, T result) {
		return schedule(Executors.callable(task, result), 0, NANOSECONDS);
	}
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
										   long delay,
										   TimeUnit unit) {
		if (callable == null || unit == null)
			throw new NullPointerException();
		RunnableScheduledFuture<V> t = decorateTask(callable,
			new ScheduledFutureTask<V>(callable,
									   triggerTime(delay, unit)));
		delayedExecute(t);
		return t;
	}

Executor框架使用示意圖

1.主線程首先須要建立實現Runnable或者Callable接口的任務對象.備註:工具類Executors能夠實現Runnable對象和Callable對象之間的相互轉換。(Executors.callable(Runnable task)或者Executor.callable(Runnable task,Object result))this

2.將建立的任務Runnable或者Callable對象直接交給ExecutorService執行 (ExecutorService.execute(Runnable command));湖泊這也能夠把Runnable對象或者Callable對象提交給E小ecotrService執行(ExecutorService.submit(Callable task))..net

3.執行ExecutorService.submit(Callable task),將返回一個實現Future接口的對象。

4.主線程能夠執行FutureTask.get()方法來等待任務執行完成。主線程也能夠執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消任務的執行。

ThreadPoolExecutor源碼解讀

根據源碼能夠知道,ThreadPoolExecutor有四個構造方法,可是其餘幾個都時創建在下面解析的這個構造方法上。

/**
*用給定的初始值建立一個新的[@code](https://my.oschina.net/codeo) threadpoolExecutor
*參數。
*
*[@param](https://my.oschina.net/u/2303379) corepoolsize保留在池中的線程數,偶數
*若是它們是空閒的,除非設置了[@code](https://my.oschina.net/codeo) allowcorethreadTimeout
*[@param](https://my.oschina.net/u/2303379) maximumpoolsize容許的最大線程數
*池
*[@param](https://my.oschina.net/u/2303379) keepalivetime當線程數大於
*核心,這是多餘空閒線程的最長時間
*將在終止前等待新任務。
*@param unit@code keepalivetime參數的時間單位
*@param workqueue用於在任務以前保存任務的隊列
*執行。此隊列將只包含@代碼可運行
*由@code execute方法提交的任務。
*@param threadfactory執行器時要使用的工廠
*建立新線程
*@param handler執行被阻止時要使用的處理程序
*由於達到了線程邊界和隊列容量
*@throws illegalargumentexception if one of the following holds:<br>
*@code corepoolsize<0<br>
*@code keepaliveTime<0<br>
*@code maximumpoolsize<=0<br>
*@code maximumpoolsize<corepoolsize
*@throws nullpointerException if@code workqueue
*或@code threadFactory或@code handler爲空
*/
public ThreadPoolExecutor(int corePoolSize,
							  int maximumPoolSize,
							  long keepAliveTime,
								  TimeUnit unit,
				 BlockingQueue<Runnable> workQueue,
							  ThreadFactory threadFactory,
		  RejectedExecutionHandler handler) {
		if (corePoolSize < 0 ||
			maximumPoolSize <= 0 ||
			maximumPoolSize < corePoolSize ||
			keepAliveTime < 0)
			throw new IllegalArgumentException();
		if (workQueue == null || threadFactory == null || handler == null)
			throw new NullPointerException();
		this.acc = System.getSecurityManager() == null ?
				null :
				AccessController.getContext();
		this.corePoolSize = corePoolSize;
		this.maximumPoolSize = maximumPoolSize;
		this.workQueue = workQueue;
		this.keepAliveTime = unit.toNanos(keepAliveTime);
		this.threadFactory = threadFactory;
		this.handler = handler;
	}

經過上面的圖片和代碼片斷的解析能夠知道該方法的參數比較多,並且都比較麻煩,因此不建議使用構造函數去建立ThreadPoolExecutor對象,可是可使用下面的幾種方式建立ThreadPoolExecutor對象.

FixedThreadPool源碼解讀

/**
	  *	建立重用固定數量線程的線程池
	  *	使用提供的在須要時建立新線程。在任什麼時候候,
	  *最多N個線程將處於活動處理狀態
	  *任務。若是在全部線程
	  *活動,它們將在隊列中等待,直到線程
	  *可用。若是任何線程在
	  *在關閉前執行,若是
	  *須要執行後續任務。池中的線程將
	  *存在,直到它顯式執行服務關閉
	 * @param nThreads 線程池的線程數
	 * @param threadFactory 建立線程工廠類
	 * @return 返回新建立的線程池
	 * @throws NullPointerException if threadFactory is null
	 * @throws IllegalArgumentException if {@code nThreads <= 0}
	 */
	public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
		return new ThreadPoolExecutor(nThreads, nThreads,
									  0L, TimeUnit.MILLISECONDS,
									  new LinkedBlockingQueue<Runnable>(),
									  threadFactory);
	}

或者

	public static ExecutorService newFixedThreadPool(int nThreads) {
		return new ThreadPoolExecutor(nThreads, nThreads,
									  0L, TimeUnit.MILLISECONDS,
									  new LinkedBlockingQueue<Runnable>());
	}

FixedThreadPool的執行的方式大體如《併發編程藝術的》的示意圖所示:

說明:

  1. 若是當前的運行的線程小於corePoolSize,則建立新的線程來執行任務

  2. 當前運行的線程等於corePoolSize後,將任務加入LinkedBlockingQueue<Runnable>

  3. 線程執行完成步驟1的任務以後,會在循環中反覆的從LinkedBlockingQueue<Runnable>中獲取任務來執行。

FixedThreadPool使用的LinkedBlockingQueue(隊列容量爲Integr.MAX_VALUE,若是構造函數沒有設置大小的時候),有些文章說FixedThreadPool是無界隊列是不正確的。除此以外還會有以下的影響:

1.當線程中的線程達到了CorePoolSize後,新任務將在無界對壘中等待,所以線程不會超過corePoolSize;

2.因爲1,使用的沒設置大小的LinkedBlockingQueue,將使得maxinumPoolSize將是一個無效的參數。

3.因爲1,2使用不設置大小的LinkedBlockingQueue,將使得keepAliveTime將是無效的參數

4.運行中FixedThreadPool(未執行shudown()或者shudownNow())就不會拒絕任務.

SigleThreadExecutor

1.當前的運行的線程數少雲corePoolSize,則建立一個新的線程執行任務。

2.當前線程池中有一個運行的線程後,將任務加入LinkedBlockQueue

3.線程執行完1中的任務後,會在循環中反覆的從LinkedBlockingQueue中獲取任務來執行。

CacheThreadPool

CacheThreadPool是一個會根據須要建立新線程的線程池。

/**
	 * 建立一個線程池,根據須要建立新線程,但會在先前構建的線程可用時重用它,
	 *並在須要時使用提供的ThreadFactory建立新線程。
	 * @param threadFactory 建立新線程使用的factory
	 * @return 新建立的線程池
	 * @throws NullPointerException 若是threadFactory爲空
	 */
	public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
	//CachedThreadPool的corePoolSize被設置爲空(0),maximumPoolSize被設置爲Integer.MAX.VALUE,即它是無界的,這也就意味着若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新的線程。極端狀況下,這樣會致使耗盡cpu和內存資源。
		return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
									  60L, TimeUnit.SECONDS,
									  new SynchronousQueue<Runnable>(),
									  threadFactory);
	}

	public static ExecutorService newCachedThreadPool() {
		return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
									  60L, TimeUnit.SECONDS,
									  new SynchronousQueue<Runnable>());
	}

ScheduledThreadPoolExecutor

scheduledThreadPoolExecutor主要用來在給定的延遲運行任務,或者按期執行任務。ScheduledThreadPoolExecutor使用任務隊列DelayQueue封裝了一個PriorityQueue,PrioprityQueue會對隊列中的任務進行排序,執行所需時間短的放到最前面執行,若是執行時間一樣則使先提交的先執行。

ScheduledThreadPoolExecutor和Time的比較

  • Time對系統時鐘的變化比較敏感,ScheduledThreadPoolExecutor則相反。

  • Timer只有一個執行執行現場,所以長時間運行任務能夠延遲其餘任務。ScheduledThreadPoolExecutor能夠配置任意現場,此外,若是你想(通過ThreadFactory),你能夠徹底控制創建的綫程。

  • 在TimerTask中拋出的運行異常會殺死一個線程,從而致使司機:(計劃任務將再也不運行,ScheduledThreadExecutor不只捕獲運行異常,還容許您再須要時處理他們(經過重寫afterExecute方法)。拋出的異常將被取消,其餘的任務能夠繼續運行。

ScheduledThreadPoolExecutor運行機制

ScheduledThreadPoolExecutor的執行主要包括兩部分:

1.當調用ScheduledThreadPoolExecutor的scheduleAtFixRate()或者scheduleWirhFixedDelay()時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現了RunnableScheduledFuture接口的ScheduledFutureTask.

2.線程池中的線程從DelayQueue中獲取ScheduledFutureTask,而後執行任務.

ScheduledThreadPoolExecutor未了實現週期性的執行任務,對ThreadPoolExecutor作了以下的修改:

  • 使用DelayQueue做爲任務隊列
  • 獲取任務的方法不一樣
  • 執行週期任務後,增長了額外的處理.

各類線程的使用場景

  • FixedThreadPool: 適用於爲了知足資源管理需求,限制當前線程數量的應用場景。它適用於負載比較重的服務器;

  • SingleThreadExecutor: 適用於保證順序地執行各個任務而且在任意時間點,不會有多個線程是活動的應用場景。

  • CachedThreadPool: 適用於執行不少的短時間異步任務的小程序,或者是負載較輕的服務器;

  • ScheduledThreadPoolExecutor: 適用於須要多個後臺執行週期任務,同時爲了知足資源管理需求而須要限制後臺線程的數量的應用場景,

  • SingleThreadScheduledExecutor: 適用於須要單個後臺線程執行週期任務,同時保證順序地執行各個任務的應用場景。

相關文章
相關標籤/搜索