Java併發之任務的描述和執行

簡單概念java

《Java編程思想》對併發概念的重要性闡述:編程

Java是一種多線程語言,而且提出了併發問題,無論你是否意識到了。所以,有不少使用中的Java程序,要麼只是偶爾工做,要麼是在大多數時間裏工做,而且會因爲未發現的併發缺陷而時不時地神祕崩潰。有事這種崩潰是溫和的,但有時卻意味着重要數據的丟失,而且若是沒有意識到併發問題,你能最終會認爲問題出如今其餘什麼地方,而不是你的軟件中。若是程序被遷移到多處理器系統中,這些種類的問題還會被暴露或放大。基本上,瞭解併發可使你意識到明顯正確的程序可能展現出不正確的行爲。數組

因此在你編寫任何複雜程序以前,應該學習一下專門討論併發主題的數據。緩存

使用併發解決的問題能夠分爲兩種多線程

  • 更快的速度
    更快的速度是針對阻塞(一般是I/O)來說的,其實若是沒有阻塞使用併發是沒有任何意義,反而比順序執行還增長了「上下文切換(從一個任務切換到另外一個任務)」的開銷。由於有了阻塞,才把程序斷開爲多個片斷,而後在單處理器上運行每一個片斷。
    架構

  • 改進代碼的設計
    線程一般使你可以建立更加鬆散耦合的設計如用戶交互,不然,你的代碼中各個部分都必須顯示地關注那些一般能夠由線程來處理的任務。
    併發

進程與線程框架

  • 進程:
    進程是運行在它本身的地址空間內的自包容的程序,進程是自願分配的基本單位,它也是搶佔處理器的調度單。多任務操做系統能夠經過週期性地將CPU從一個進程切換到另外一個進程,來實現同時運行多個進程(程序)。異步

  • 線程:
    臺灣稱爲"執行者",我認爲更確切些,它是進程中某個單一順序的控制流,是程序(進程)執行流的最小單位,也是被系統獨立調度和分派的基本單位,線程本身不擁有系統資源,只擁有一點兒在運行中必不可少的資源,但它可與同屬於一個進程的其餘線程共享進程所擁有的所有資源。ide

發生進程切換與發生線程切換時相比較,進程切換時涉及到有關資源指針的保存以及地址空間的變化等問題;線程切換時,因爲同進程內的線程共享資源和地址 空間,將不涉及資源信息的保存和地址變化問題,從而減小了操做系統的開銷時間。並且,進程的調度與切換都是由操做系統內核完成,而線程則既可由操做系統內 核完成,也可由用戶程序進行。

任務描述

線程能夠驅動,所以能夠說線程是任務的執行載體,而任務是真正的業務邏輯。而描述任務可使用繼承Thread類或使用Runnablecallable兩個接口。

實現Runnable接口便可定義爲任務,而後使用線程驅動,Callable 接口也相似於 Runnable,二者都是爲那些其實例可能被另外一個線程執行的類設計的。可是 Runnable 不會返回結果,而且沒法拋出通過檢查的異常,而Callable能夠。

使用Runable定義任務和執行簡單示例:

public class App {		public static void main(String[] args) {		for(int i=0;i<10;i++){//開10個線程			new Thread(new Task()).start();		}	}}/** * 定義任務 * @author Administrator */class Task implements Runnable{	@Override	public void run() {		System.out.println("線程ID:"+Thread.currentThread().getId());	}}

輸出:

線程ID:8
線程ID:10
線程ID:12
線程ID:14
線程ID:16
線程ID:9
線程ID:11
線程ID:13
線程ID:15
線程ID:17

使用繼承Thread類的方式,與此相似就不作舉例了。

Executor框架

Runnable 的任務能夠直接使用Thread類來執行,但Callable卻不能,須要使用Executor框架來執行,其實最好不要直接使用Thread來執行Runnable任務,而是使用Executor框架。

Executor是JAVA SE5的java.util.concurrent包中的執行器,它爲你管理Thread對象。Executor框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架。Executor存在的目的是提供一種將"任務提交"與"任務如何運行"分離開來的機制。架構以下圖: 

Executor接口定義以下:

public interface Executor {      void execute(Runnable command);  }

雖然只有一個方法,但卻爲靈活強大的異步任務執行框架提供了基礎。它將任務的提交過程與執行過程解耦:用Runnable/Callable來表示任務,執行的任務放入run/call方法中便可,將Runnable/Callable接口的實現類交給線程池的execute方法來執行。實際上咱們並非直接使用Execuotr接口的,而是使用更爲方便的ExecutorService接口,由於它對任務的生命週期作了管理:

public interface ExecutorService extends Executor {      void shutdown();      List<Runnable> shutdownNow();      boolean isShutdown();      boolean isTerminated();      boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;      ......}

對於上述框架圖:

ExecutorService: 真正的線程池接口。
ScheduledExecutorService接口: 能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。
ThreadPoolExecutor: ExecutorService的默認實現。
ScheduledThreadPoolExecutor: 繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,週期性任務調度的類實現。

線程池

那麼線程池從而而來呢?這時Executors就出場了,它爲Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable類提供了一些工具方法,相似於集合中的Collections類的功能。

Executors能夠很方便的建立線程池

static ExecutorService newSingleThreadExecutor();static ExecutorService newFixedThreadPool(int nThreads);static ExecutorService newCachedThreadPool();static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
  • newSingleThreadExecutor:建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。

  • newFixedThreadPool:建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

  • newCachedThreadPool:建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。

  • newScheduledThreadPool:建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

其實還有不少與線程池相關的方法,具體看參考JDK API文檔,其實上面列出的方法都是使用ThreadPoolExecutor類來實現的,由於ThreadPoolExecutor是ExecutorService的默認實現。咱們來看看ThreadPoolExecutor的幾個構造函數:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

各參數含義以下:

  • corePoolSize(基本線程池的大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads方法,線程池會提早建立並啓動全部基本線程。

  • maximumPoolSize(線程池最大大小):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是若是使用了無界(不限制線程數)的任務隊列這個參數就沒什麼效果。

  • keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。

  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

  • workQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。

    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。

    • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。

    • SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。

    • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。

  • ThreadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字或後臺線程等等。

  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。如下是JDK1.5提供的四種策略。

    • AbortPolicy:直接拋出異常。

    • CallerRunsPolicy:只用調用者所在線程來運行任務。

    • DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。

    • DiscardPolicy:不處理,丟棄掉。

    • 固然也能夠根據應用場景須要來實現RejectedExecutionHandler接口自定義策略。如記錄日誌或持久化不能處理的任務。

如Executors的「newCachedThreadPool()」方法的實現使用的ThreadPoolExecutor的構造函數如:

public static ExecutorService newCachedThreadPool() {	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,			      60L, TimeUnit.SECONDS,			      new SynchronousQueue<Runnable>());}

因爲ThreadPoolExecutor 將根據 corePoolSize和 maximumPoolSize設置的邊界自動調整池大小,當新任務在方法 execute(java.lang.Runnable) 中提交時

  1. 若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的;

  2. 若是設置的corePoolSize 和 maximumPoolSize相同,則建立的線程池是大小固定的,若是運行的線程與corePoolSize相同,當有新請求過來時,若workQueue未滿,則將請求放入workQueue中,等待有空閒的線程去從workQueue中取任務並處理

  3. 若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程才建立新的線程去處理請求;

  4. 若是運行的線程多於corePoolSize 而且等於maximumPoolSize,若隊列已經滿了,則經過handler所指定的策略來處理新請求;

  5. 若是將 maximumPoolSize 設置爲基本的無界值(如 Integer.MAX_VALUE),則容許池適應任意數量的併發任務

也就是說,處理任務的優先級爲: 

  1. 核心線程corePoolSize > 任務隊列workQueue > 最大線程maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。

  2. 當池中的線程數大於corePoolSize的時候,多餘的線程會等待keepAliveTime長的時間,若是無請求可處理就自行銷燬。

使用Executors來提交和執行任務

  1. 使用Runnable來表示的無返回值的任務:


    1. import java.util.concurrent.ExecutorService;

    2. import java.util.concurrent.Executors;

    3.  

    4. public class App {

    5. public static void main(String[] args) {

    6. ExecutorService exec=Executors.newFixedThreadPool(10);

    7. for(int i=0;i<10;i++){

    8. exec.execute(new Task());

    9. }

    10. exec.shutdown();

    11. }

    12. }

    13. /**

    14. * 定義任務

    15. * @author Administrator

    16. */

    17. class Task implements Runnable{

    18.  

    19. @Override

    20. public void run() {

    21. System.out.println(Thread.currentThread().getId());

    22. }

    23. }

  2. 使用Callable來表示的有返回值的任務:


    Callable接口只能使用submit來提交任務,submit會產生Future對象結果,能夠用阻塞的get()來獲取這個結果,但你使用isDone方法來查詢是否已經產生了Future結果,而後再獲取。

    1. import java.util.ArrayList;

    2. import java.util.concurrent.Callable;

    3. import java.util.concurrent.ExecutorService;

    4. import java.util.concurrent.Executors;

    5. import java.util.concurrent.Future;

    6.  

    7. public class App {

    8. public static void main(String[] args) throws Exception {

    9. ExecutorService exec=Executors.newFixedThreadPool(10);

    10. ArrayList<Future<Long>> results=new ArrayList<Future<Long>>();

    11. for(int i=0;i<10;i++){

    12. results.add(exec.submit(new Task()));

    13. }

    14. for(Future<Long> fs :results){

    15. System.out.println(fs.get());

    16. }

    17. exec.shutdown();

    18. }

    19. }

    20. /**

    21. * 定義任務

    22. * @author Administrator

    23. */

    24. class Task implements Callable<Long>{

    25.  

    26. @Override

    27. public Long call() throws Exception {

    28. return Thread.currentThread().getId();

    29. }

    30. }

關閉線程池:

  1. shutdown():平緩的關閉線程池。線程池中止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列尚未開始的任務;

  2. shutdownNow():當即關閉線程池。線程池中止接受新的任務,同時線程池取消全部執行的任務和已經進入隊列可是尚未執行的任務;

向任務傳值

Runnable的run和Callable的call都是無參接口,那麼在運行狀態如何傳遞值呢?網上說又三種方式,其實本質是一種:向Runnable/Callable的實現類傳值,因此你有多種方式好比構造函數,屬性等等:

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class App {		public static void main(String[] args) {		ExecutorService exec=Executors.newFixedThreadPool(10);		for(int i=0;i<10;i++){			exec.execute(new Task("線程ID"));		}		exec.shutdown();	}}/** * 定義任務 * @author Administrator */class Task implements Runnable{	private String outVal;		public Task(String outVal){		this.outVal=outVal;	}	@Override	public void run() {		System.out.println(outVal+":"+Thread.currentThread().getId());	}}

輸出:

線程ID:8線程ID:10線程ID:12線程ID:14線程ID:16線程ID:9線程ID:11線程ID:13線程ID:15線程ID:17

相關文章
相關標籤/搜索