從0學習java併發編程實戰-讀書筆記-結構化併發應用程序(6)

在線程中執行任務

在理想狀況下,任務之間都是相互獨立的:任務並不依賴於其餘任務的狀態,結果或邊界效應。獨立性有助於實現併發。java

大多數服務器的應用程序都提供了一個天然的任務邊界:以獨立的客戶請求爲邊界。web

串行地執行任務

最簡單的方式就是在單個線程中串行的執行各項任務。可是現實中的web服務器的狀況卻並不是如此。在web請求中包含了一組不一樣的運算和I/O操做。服務器必須處理套接字I/O以讀取請求和寫回響應,這些操做一般會因爲網絡或者連通性問題而被阻塞。數據庫

在服務器應用中,串行機制一般沒法提供高吞吐率和快速響應性。緩存

顯示地爲任務建立線程

若是爲每一個請求建立一個新的線程來提供服務,特色有:安全

  • 任務處理過程將主線程中分離出來,使主循環可以更快的接受下一個到來的鏈接,使得程序在完成前面的請求以前能夠接受更多新的請求,從而提升響應性。
  • 任務能夠並行處理,從而能夠同時服務多個請求。程序的吞吐量將會提升。
  • 任務處理代碼必須是線程安全的,由於將會有多個任務併發的調用這段代碼。

只要請求的到達速率不超過服務器的請求處理能力,那麼這種方法能夠同時帶來更快的響應性和更高的吞吐率。服務器

無限制創造線程的不足

在生產環境中,若是爲每一個任務分配一個線程,這種方法有着一些缺陷,尤爲是當須要建立大量線程的時候:網絡

  • 線程生命週期的開銷很是高:若是請求的到達率很是高,且處理過程是輕量級的,那麼沒建立和銷燬一個新線程將消耗大量的計算資源。
  • 資源消耗:活躍的線程會消耗系統資源,尤爲是內存。若是可運行的線程數量大於可用處理器的數量,那麼有些線程將會閒置,並且大量線程在競爭cpu資源還會產生其餘的性能開銷。
  • 穩定性:在可建立線程的數量上存在一個限制。這個限制隨着平臺的不一樣的,受到多個制約因素,包括:數據結構

    • JVM的啓動參數。
    • Thread構造函數中請求棧的大小
    • 以及底層的操做系統對線程的限制等。若是破壞了這些限制,可能拋出OOM。

在必定範圍內,增長線程能夠提高系統的吞吐率,可是若是超過了這個範圍,再建立線程只會下降系統的執行速度,而且若是過多地建立一個線程,那麼整個應用也許都會崩潰。多線程

Executor框架

任務是一組邏輯工做單元,而線程是使任務異步執行的機制。併發

線程池優化了線程管理工做,而且java.util.concurrent提供了一種靈活的線程池實現做爲Executor框架的一部分。在java類庫中,任務執行的主要抽象不是Thread,而是Executor.

public interface Executor{
    void execute(Runnable command);
}

它提供了一種標準的方法將任務的提交過程與執行過程解耦,並用Runnable表示任務。

Executor的實現還提供了對生命週期的支持,以及統計信息收集、應用程序管理機制和性能堅實等機制。

Executor基於生產者和消費者模式,提交任務的操做至關於生產者(生產待完成的工做單元),執行任務的操做至關於消費者。

基於Executor的web服務器

標準的Executor實現:

// 建立線程池
static final Executor executor = Executors.newFixedThreadPool(num);

// 建立任務 
Runnable task = new Runnable(){
    public void run(){
        doSomething();
    }
}

// 執行線程
executor.execute(task);

經過使用Executor,將請求任務的提交與任務的實際行爲解耦,並只須要採用另外一種不一樣的Executor實現,就能夠改變服務器行爲。

執行策略

經過將任務的提交和執行解耦開,就能夠無需太大的困難爲某種類型的任務指定或修改執行策略。最佳策略取決於可用的計算資源以及對服務質量的需求。經過限制併發任務的數量,能夠確保應用程序不會因爲資源耗盡而失敗,或者因爲在稀缺資源的競爭而影響性能。

每當看到 new Thread(Runnable).start() 時,而且你但願得到一個更加靈活的執行策略時,請使用Executor來代替Thread

線程池

線程池是指管理同一組同構工做線程的資源池。線程池是與工做隊列密切相關的,工做隊列中保存了全部等待執行的任務。工做者線程的任務很簡單:從工做隊列中獲取一個任務,執行任務,而後回到線程池等待下一個任務。

經過重用現有線程而不是建立新線程,能夠在處理多個請求的時候分攤掉建立和銷燬線程的成本。並且在請求到達的時候,工做線程通常已經存在,就不須要等待線程建立的時間,提升了響應性。經過限制線程池大小,還能夠避免多線程之間過分競爭資源,致使程序耗盡內存。

Executor的靜態工廠方法

  • newFixedThreadPool:newFixedThreadPool將建立一個固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程的最大數量,這時線程池的規模將再也不變化。
  • newCacheThreadPool:newCacheThreadPool將建立一個可緩存的線程池,若是線程池的當前規模超過了處理需求時,那麼將回收空閒的線程。而當需求增長的時候,會添加新的線程,線程池規模不受限制。
  • newSingleThreadExecutor:newSingleThreadExecutor是一個單線程Executor,它建立單個工做線程來執行任務,若是這個線程異常結束,會建立另外一個線程來替代,能確保依照任務在隊列中的順序來串行執行。
  • newScheduledThreadPool:newScheduledThreadPool建立了一個固定長度的線程池,並且以延遲或者定時的方式來執行任務,相似Timer。

newFixedThreadPool和newCacheThreadPool這兩個方法返回通用的ThreadPoolExecutor實例,能夠用來構建專門用途的executor。

Executor的生命週期

Executor的實現一般會建立線程來執行任務。但JVM只有在全部的非守護線程所有終止後纔會退出。若是沒法正確的關閉Executor,那麼JVM將沒法結束。
爲了解決執行服務生命週期的問題,ExecutorService拓展了Executor接口,添加了一些生命週期的管理方法。

public interface ExecutorService extends Executor{
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException;
}

ExecutorService的生命週期有3種狀態:運行關閉終止

  • shutdown()將執行平緩的關閉過程:再也不接受新的任務,同時等待已提交的任務執行完成,包括那些還未開始執行的任務。
  • shutdownNow()將執行粗暴的關閉過程:它將嘗試取消全部運行中的任務,而且再也不啓動隊列中還沒有開始執行的任務。

在ExecutorService關閉後提交的任務將由拒絕執行處理器(Rejected Execution Handle)來處理,它會拋棄任務,或使execute方法拋出一個未檢查的RejectedExecutionException。等全部任務完成後,ExecutorService將轉入終止狀態。

延遲任務與週期任務

Timer類負責管理延遲任務(「在100ms後執行該任務」)以及週期任務(「每100ms執行一次該任務」)。然而,Timer存在一些缺陷,所以應該考慮使用ScheduledThreadPoolExecutor來替代它(Timer支持的是絕對時間而不是相對時間的調度制度,所以任務的執行對系統時間很是敏感。而ScheduledThreadPoolExecutor只支持系統相對時間)。

Timer在執行全部定時任務時只會建立一個線程。若是某個任務的執行時間過長,那麼會破壞其餘timerTask的定時精準性。例如某個週期TimerTask須要每10ms執行一次,而另外一個Task執行了50ms,那麼TimerTask會在50ms之後快速的連續調用5次,或者直接丟掉這5次執行。(取決於Timer是基於固定速率仍是說基於固定延時來調度)。

Timer的另外一個問題是,若是TimerTask拋出了一個未檢查的異常,Timer線程並不捕獲異常。所以當TimerTask拋出未檢查的異常時,將終止定時線程。這種狀況下,Timer不會恢復線程的執行,而是會錯誤地認爲整個Timer都被取消了。所以已經被調度可是未執行的Task不會被執行,新的任務也不會被調度。

如今基本不會使用Timer

找出可利用的並行性

Executor框架幫助指定執行策略,可是若是要使用Executor,必須將任務表述爲一個Runnable。在大多數服務器應用程序中都存在一個明顯的任務邊界:單個客戶請求。可是不少邊界並不是明顯可見的,即便是服務器應用程序,在用戶請求中仍存在可發掘的並行性,例如數據庫服務器。

攜帶結果的任務Callable和Future

Executor框架使用Runnable做爲其基本的任務表示形式。Runnable是一種有很大侷限的抽象,雖然run能寫入到日誌文件或將結果放入某個共享的數據結構,可是它不能返回一個值或拋出一個受檢查的異常

對於某些存在延遲的計算,Callable是一種更好的抽象:它認爲主入口點(即call)將返回一個值,並可能拋出一個異常。在Executor中包含了一些輔助方法,能將其餘類型的任務封裝爲一個Callable,例如Runnable和java.security.PrivilegedAction。

Runnable和Callable描述的都是抽象的計算任務。這些任務一般是有範圍的,即都有一個明確的起始點,而且最終都會結束。

Executor執行的任務有4個生命週期階段:

  • 建立
  • 提交
  • 開始
  • 完成

在Executor框架中,已提交但還沒有開始的任務能夠取消,但對於那些已經開始執行的任務,只有當它們能響應中斷時,才能取消。取消一個已經完成的任務不會有任何的影響。
Future表示一個任務的生命週期,並提供了相應的方法來判斷是否已經完成取消,以及獲取任務的結果和取消任務。
Future規範中包含的隱含含義是:任務的生命週期只能前進,不能後退,當某個任務完成後,它將永遠的停留在完成狀態上。
get方法的行爲取決於任務的狀態(還沒有開始,正在運行,已完成)。

  • 若是任務已經完成,那麼get會當即返回或者拋出一個Exception。
  • 若是任務沒有完成,那麼get將阻塞並直到任務完成。
  • 若是任務拋出了異常,那麼get將該異常封裝爲ExecutionException並從新拋出。
  • 若是任務被取消,那麼get將拋出CancellationException。
  • 若是get拋出了ExecutorException,那麼能夠經過getCause獲取被封裝的初始異常。
public interface Callable<V>{
    V call() throws Exception;
}

public interface Future<V>{
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCanceled();
    boolean isDone();
    V get() throws InterruptedException, ExcutionException, CancellationException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExcutionException,
                 CancellationException, TimeoutException;
}

能夠經過不少方法建立一個Future來描述任務。ExecutorService中的全部submit方法均可以返回一個Future,從而將一個Runnable和Callable提交給Executor,並獲得一個Future用來得到任務的執行結果或取消任務。還能夠顯式的爲某個指定的Runnable或者Callable實例化一個FutureTask。

Future和Callable例子:

Callable<List<A>> task = new Callable<List<A>>(){
    public List<A> call(){
        List<A> list  = new ArrayList();
        return list;
    }
}

Future<List<A>> future = executor.submit(task);

List<A> list = future.get();

get方法擁有狀態依賴的內在特性,於是調用者不須要知道任務的狀態,此外在任務提交和得到結果中包含的安全發佈屬性也確保了這個方法是線程安全的。

在異構任務並行化中存在的侷限

經過對異構任務進行並行化來得到重大的性能提高是很困難的。若是沒有在類似的任務之間找出細粒度的並行性,那麼這種方法帶來的好處就回減小。只有當大量相互獨立且同構的任務能夠併發進行處理時,才能體現出將程序的工做負載分配到多個任務中帶來的真正的性能提高。

CompletionService:Executor與BlockingQueue

CompletionService將Executor和BlockingQueue的功能融合在一塊兒。能夠將Callable任務提交給他來執行,而後使用相似於隊列操做的take和poll等方法來得到已完成的結果,這個結果將會封裝爲Future。

ExecutorCompletionService實現了CompletionService,並將計算部分委託給一個Executor。

ExecutorCompletionService的實現

  • 在構造函數中建立一個BlockingQueue來保存計算完成的結果。
  • 當計算完成時,調用Future-Task中的done方法。
  • 當提交某個任務時,該任務首先包裝爲一個QueueingFuture,是FutureTask的一個子類。
  • 改寫子類的done()方法,並將結果放入BlockingQueue中。take和poll方法委託BlockingQueue,這些方法將會在出結果以前阻塞。
private class QueueingFuture<V> extends FutureTask<V> {
    QueueingFuture(Callable<V> c) { super(c); }
    QueueingFuture(Runnable t , V r) { super(t,r); }

    protected void done() {
        completionQueue.add(this);
    }
}

爲任務設置時限

若是某個任務沒法在指定時間內完成,若是將再也不須要它的結果,此時能夠放棄這個任務。但可能只會在指定的時間內等待數據,若是超出了時間,那麼只顯示已經得到的數據。
要實現這個功能,能夠由任務自己來管理它的限定時間,而且在超時之後停止執行或取消任務。
此時可再次使用Future,若是一個限時的get方法拋出了TimeoutException,那麼能夠提早停止它,避免消耗更多資源。

long endNanos = System.nanoTime() + TIME_BUDGET;
Future<A> f = exec.submit(task);
long timeLeft = endNanos - System.nanoTime();
try{
    A a = f.get(timeLeft,NANOSECONDS);// timeLeft若是<=0,就會中斷
} catch(ExecutionException e){

} catch(TimeoutException){
    f.cancel(true);
}

小結

經過圍繞任務執行來設計應用程序,能夠簡化開發過程,並有助於實現併發。Executor框架將任務提交與執行策略解耦開,還支持許多不一樣類型的執行策略。當須要建立線程來執行任務時,能夠考慮使用Executor。要想在將應用程序分解爲不一樣的任務時得到最大的好處,必須定義清晰的任務邊界。某些應用程序有比較明顯的邊界,而在其餘一些程序中則須要進一步分析才能揭示出粒度更細的並行性。

相關文章
相關標籤/搜索