從0學習java併發編程實戰-讀書筆記-線程池的使用(8)

# 在任務與執行策略之間的隱性耦合
Executror框架能夠將任務的提交與任務的執行解耦開。可是雖然Executor框架爲制定和修改執行策略提供了很大的靈活性,但並不是全部的任務都能適用全部的執行策略。有些類型的任務須要明確地制定執行策略,其中包括:html

  • 依賴性任務:大多數行爲正確的任務都是獨立的:它們不依賴於其餘任務的執行時序、執行結果或其餘效果。當在線程池中執行獨立任務時,能夠任意修改線程池大小和配置,這些修改只會對執行性能產生影響。若是提交給線程池的任務須要依賴於其餘任務,那麼隱含的對執行策略帶來了約束,此時必須當心地維持這些執行策略以免產生活躍性問題。
  • 使用線程封閉機制的任務:與線程池相比,單線程的Executor可以對併發性作出更強的承諾。它們能確保任務不會併發的執行。對象能夠封閉在任務線程中,使得在該線程執行的任務在訪問該對象時不須要同步。這種狀況將在任務與執行策略之間造成隱性的耦合:即任務要求其執行所在的Executor是單線程的。若是將Executor從單線程環境改成線程池環境,那麼將會失去線程安全。
  • 對響應時間敏感:若是將一個運行時間較長的任務提交到單線程的Executor中,或者將多個運行時間較長的任務提交到一個只包含少許線程的線程池中,那麼將會下降由該Executor管理的服務性。
  • 使用threadLocal的任務:ThreadLocal使每一個線程都擁有某個變量的一個私有版本。只要條件容許,Executor能夠自由的重用這些線程。若是從任務中拋出一個未受檢查的異常,那麼將用一個新的工做者線程來替代拋出異常的線程。只有線程本地值的生命週期受限於任務的生命週期時,在線程池中的線程使用ThreadLocal纔有意義,而在線程池中的線程中不該該使用ThreadLocal在任務之間傳遞值。
只有當任務都是同類型的而且互相獨立時,線程池的性能才能達到最佳。若是運行時間較長和運行時間較短的任務混合在一塊兒,除非線程池很大,不然很容易形成擁塞。

線程飢餓死鎖

在線程池中,若是任務依賴於其餘任務,那麼可能產生死鎖。
在單線程的Executor中,若是一個任務將另外一個任務提交到同一個Executor,而且等待這個被提交任務的結果,那麼一般會引起死鎖。
若是全部正在執行任務的線程都因爲等待其餘仍處於工做隊列的任務而阻塞,那麼會發生一樣的問題。這種現象被稱爲線程飢餓死鎖(Thread Starvation Deadlock)java

public class ThreadDeadLock{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    public class RenderPageTask implements Callable<String> {
        public String call throws Exception {
            Future<String> header,footer;
            header = exec.submit(new LoadFileTask("header.html"));
            footer = exec.submit(new LoadFileTask("footer.html"));
            String page = renderBody();
            // 這裏將發生死鎖:因爲當前任務在等待子任務的結果
            return header.get() + page + footer.get();
        }
    }
}
每當提交了一個有依賴性的Executor任務時,要清晰的知道可能會出現線程「飢餓」死鎖,所以須要在代碼或配置Executor的配置文件中記錄線程池的大小限制或配置限制。

運行時間較長的任務

若是任務阻塞時間過長,那麼即使不出現死鎖,任務的的響應性也不好。執行時間較長可能會形成線程池阻塞,增長執行時間較短任務的服務時間。若是線程數量遠小於在穩定狀態下執行時間較長任務的數量,那麼到最後可能全部的線程都會運行這些執行時間較長的任務,從而影響總體的響應性。
經過限定任務等待資源的時間,不要無限制的等待,來緩解執行時間任務較長任務的影響。平臺類庫的大多數阻塞方法都提供了限時版本和無限時版本,例如Thread.join,BlockingQueue.put,CountDownLatch.await以及Selector.select等。若是等待超時,那麼能夠把任務標爲失敗,而後終止任務或者將任務放回隊列以供隨後執行。這樣不管任務最終是否能執行成功,至少任務能順利繼續執行下去。不過若是線程池中老是充滿被阻塞的任務,那麼多是線程池的規模太小。node

設置線程池的大小

要想正確的設置線程池的大小,必須分析計算環境、資源預算和任務的特性。在部署的系統中有多少個CPU?多大的內存?任務是計算密集型、I/O密集型仍是兩者皆可?它們是否須要像JDBC鏈接這樣的稀缺資源?而且若是它們之間的行爲相差很大,那麼應該考慮使用多個線程池,從而使每一個線程池能夠根據各自的工做負載來調整。算法

  • 對於計算密集型的任務,在擁有N個Cpu的系統上,當線程池的大小爲N+1時,一般能有最優的利用率:即便當計算密集型的線程偶爾因爲頁缺失故障或者其餘緣由暫停時,這個「額外」的線程也能保證cpu的時鐘週期不會被浪費。
  • 對於包含IO操做或者其餘阻塞操做的任務,因爲線程並不會一直執行,所以線程池的規模應該更大。若是要正確的設置線程池的大小,你須要估算任務的等待時間和計算時間的比值。

CPU週期並非惟一影響線程池大小的資源,還包括內存、文件句柄、套接字句柄和數據庫鏈接等。數據庫

配置ThreadPoolExcecutor

ThreadPoolExecutor爲一些Executor提供了基本的實現,這些Executor是由Executors中的newCachedThreadPoolnewFixedThreadPool等工廠方法返回的。ThreadPoolExecutor是一個靈活的,穩定的線程池,且支持各類定製。
若是默認的構造函數不能知足需求,那麼能夠經過ThreadPoolExecutor的構造函數,而且根據本身的需求來定製。ThreadPoolExecutor定義了不少構造函數。數組

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

線程的建立和銷燬

線程池的基本大小(corePoolSize)最大大小(maximumPoolSize)存活時間等因素共同負責線程的建立與銷燬。安全

  • 基本大小就是線程池的目標大小,即沒有執行任務時線程池的大小。服務器

    建立ThreadPoolExecutor的初期,線程並不會當即啓動,而是等到有任務提交時才啓動,除非調用prestartAllCoreThread)
  • 最大大小表示可同時活動的線程數量的上限,若是某個線程的空閒時間超出了存活時間,那麼則會被標記爲可回收的,當線程池大小超過了基本大小時,那麼這個線程將被終止。

經過調節線程池的基本大小和存活時間,能夠幫助線程池回收空閒線程佔有的資源(回收線程時會產生額外的延遲,由於當需求增長時,必須建立新的線程來知足需求)。併發

  • newFixedThreadPool工廠方法將線程池的基本大小和最大大小設置爲參數中指定的值,而後建立的線程不會超時。
  • newCachedThreadPool工廠方法將線程池最大的大小設置爲Integer.MAX_VALUE,而基本大小設置爲0,超時時間設置爲1分鐘,這樣建立出來的線程能夠被無限擴展,當需求下降的時候自動收縮。

管理隊列任務

在有限的線程池中限制可併發執行的任務數量(單線程的Executor是一種特例:它們能確保不會有任務併發執行,由於它們經過線程封閉來實現線程安全性。)
若是無限制的建立線程,那麼將致使系統的不穩定性,而且經過固定大小的線程池(而不是收到一個請求就建立一個線程)來解決這樣的問題。然而這個方案並不完整。在高負載的狀況下,應用程序仍可能耗盡資源。若是新請求的到達速率超過了線程池的處理速率,那麼新來的請求將累積起來。在線程池中,這些請求會在一個由Executor管理的Runnable隊列中等待,而不會像線程那樣去競爭CPU資源。經過一個Runnable和一個鏈表節點來表示一個等待中的任務,固然比用線程來表示開銷低不少。可是若是客戶提交給服務器請求的速率超過了服務器的處理速度,那麼資源仍可能被耗盡。

即便請求到達的速率很穩定,也有可能出現請求突增的狀況。儘管隊列有足浴緩解任務的突增問題,可是若是任務持續高速的到來,那麼最終仍是會抑制請求的到達率以免耗盡內存,甚至在耗盡內存以前,響應性能也隨着任務隊列的增加而愈來愈糟。框架

ThreadPoolExecutor容許提供一個BlockingQueue來保存等待執行的任務。基本的任務排列方法有3種:

  • 無界隊列
  • 有界隊列
  • 同步移交(Synchronous Handoff)

隊列的選擇與其餘配置參數有關,例如線程池的大小等。

newFixedThreadPool和newSingleThreadExecutor在默認狀況下將使用一個無界的LinkedBlockingQueue。若是全部線程都處於忙碌,那麼任務將在隊列中等待,若是任務快速的到達,超過了cpu處理任務的速度,那麼隊列將無限制的增長。

更穩妥的策略是使用有界隊列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue等。有界隊列能夠避免資源耗盡。可是帶來了一個新問題:當隊列填滿之後該怎麼辦?(飽和策略能夠解決這個問題)。在使用有界的工做隊列時,隊列的大小與線程池的大小必須一塊兒調節:

  • 若是線程池小而隊列較大,那麼有助於減小內存的使用量,下降CPU的使用率,同時能夠減小上下文切換,可是代價就是可能會限制吞吐量。

對於很是大的或者無界的線程池,能夠經過使用SynchronousQueue來避免任務排隊,以及直接將任務從生產者移交給工做者線程。SynchronousQueue並非一個真正的隊列,而是一種在線程間進行移交的機制。要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接收這個元素。若是沒有線程正在等待,而且線程池的當前大小小於最大值,那麼ThreadPoolExecutor將會建立一個新的線程。不然根據飽和策略,這個任務將被拒絕。
使用直接移交將更加高效,由於任務會直接交給執行它的線程,而不是被首先放入隊列裏,而後由工做線程從隊列中提取任務。只有當線程池是無界的或者是能夠拒絕的時候,SynchronousQueue纔有實際價值。在newCachedThreadPool中就使用了SynchronousQueue。
當使用像LinkedBlockingQueueArrayBlockingQueue這樣的FIFO隊列,任務的執行順序和它們的到達順序相同,若是想進一步控制任務的執行順序,可使用PriorityBlockingQueue,內容根據天然順序或者Comparable定義。

對於Executor,newCachedThreadPool工廠方法是一種很好的默認選擇,它能提供比固定大小更好的排隊性能(因爲使用了SynchronousQueue而不是LinkedBlockingQueue),當須要限制當前任務的數量以知足資源管理器需求時,能夠選擇固定大小的線程池,避免過載問題。

飽和策略

當有界隊列被填滿後,飽和策略開始發揮做用。
ThreadPoolExecutor的飽和策略能夠經過調用setRejectedExecutionHandle來修改。若是某個任務被提交到已經關閉的Executor時,也會觸發飽和策略。
JDK提供了幾種不一樣的RejectedExecutionHandle實現,每種實現都包含不一樣的策略:

  • AbortPolicy:停止策略,是默認的飽和策略。該策略將會拋出未檢查的RejectedExecutionException,調用者能夠捕獲這個異常,而後根據需求來編寫本身的處理代碼。
  • DiscardPolicy:拋棄策略,會悄悄的拋棄該任務。
  • DiscardOldestPolicy:拋棄最舊的策略,會拋棄下一個將被執行的任務,而後嘗試提交當前任務。(若是是優先隊列,則會拋棄優先級最高的任務,所以不要將DiscardOldestPolicy和優先隊列一塊兒使用
  • CallerRunsPolicy:調用者運行策略,實現了一種調節機制,既不會拋棄任務,也不會拋出異常,而是將某些任務回退給調用者,從而下降新任務的流量。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。當線程池中的全部線程都被佔用,而且工做隊列被填滿的時候,下一個任務會在調用execute時在主線執行。因爲執行須要必定時間,所以主線至少在一段時間內不能提交任何任務,從而使得工做者線程有時間來處理完正在執行的任務。在這期間,主線程不會調用accept,所以請求將會被保存到TCP層的隊列中而不是在應用程序的隊列中,若是持續過載,TCP層最終發現它的請求隊列被填滿,所以一樣會開始拋棄請求。從線程池 -> 工做隊列 -> 應用程序 -> TCP層,最終到達客戶端,這種策略可以實現一種平緩的性能下降。
/**
 * 建立一個固定大小的線程池,同時使用「調用者運行」的飽和策略
 */
ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandle(new ThreadPoolExecutor.CallerRunsPolicy);

當工做隊列被填滿後,沒有預約義的飽和策略來阻塞execute。能夠經過使用Semaphore信號量來限制任務的到達率。

線程工廠

每當線程池須要建立一個線程時,都是經過線程工廠方法來完成的。默認的線程工廠方法將建立一個新的、非守護的線程。
ThreadFactory接口只定義了一個方法Thread new Thread(Runnable r),每當線程池須要建立一個新線程時都會調用這個方法。若是在應用程序中須要利用安全策略來控制對某些特殊代碼庫的訪問權限,那麼能夠經過Executors中的privilegedThreadFactory工廠來定製本身的線程工廠。經過這樣的方式建立出來的線程,將於privilegedThreadFactory擁有一樣的訪問權限。若是不使用privilegedThreadFactory,線程池建立的線程將從在須要更新線程時調用execute或submit的客戶端程序中繼承訪問權限,從而致使一些使人困惑的安全問題。

在調用構造函數後再定製ThreadPoolExecutor

在調用完成ThreadPoolExecutor的構造函數以後,仍然能夠設置大多數傳遞給它的構造函數的參數。若是Executor是經過Executors中的某個(newSingleThreadExecutor除外)工廠方法建立的,那麼能夠將結果的類型轉化爲ThreadPoolExecutor。

ExecutorService exec = Executors.newCachedThreadPool();
if(exec instanceof ThreadPoolExecutor){
    ((ThreadPoolExecutor) exec).setCorePool(10);
}else {
    throw new AssertionError("Oops,bad assumpion");
}

在Executors中包含一個unconfigurableExecutorService工廠方法,該方法能夠對ExecutorService進行包裝,若是你將ExecutorService暴露給不信任的代碼,又不指望其被修改,就能夠經過unconfigurableExecutorService來包裝它。

拓展ThreadPoolExecutor

ThreadPoolExecutor是可拓展的,它提供了幾個能夠在子類化中改寫的方法:

  • beforeExecute
  • afterExecute
  • terminated

這幾個方法有利於拓展ThreadPoolExecutor的行爲。在執行任務的線程池中將調用beforeExecute和afterExecute方法,以便與添加日誌,計時。不管是從run中正常返回,仍是拋出一個異常而返回,afterExcute都會被調用,若是beforeExecute拋出一個RuntimeException,那麼任務將不被執行,afterExecute也不被調用。
在線程池關閉操做時執行terminated,能夠用來釋放Executor在其生命週期裏分配的各類資源,還能夠發送通知,記錄日誌等。

遞歸算法的並行化

若是循環中的迭代操做都是獨立的,而且不須要等待全部的迭代操做都完成再繼續執行,那麼就可使用Executor將串行方法轉化爲並行循環。

void processSequentially(List<Element> elements){
    for(Element e : elements){
        process(e);
    }
}

void processInparallet(Executor exec, List<Element> elements){
    for(final Element e : elements){
        exec.excute(new Runnable(){
            public void run(){
                process(e);
            };
        });
    }
}

調用processInparallet比processSequentially能更好的返回,由於一當列表中的任務提交完成,就會當即返回,而不會等待這些任務執行完成。

在每一個迭代中都不須要來自後續遞歸迭代的結果

public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
    for(Node<T> n : nodes){
        results.add(n.compute());
        sequentialRecursive(n.getChildren(), results);
    }
}

public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){
    for(final Node<T> n : nodes){
        exec.execute(new Runnbale(){
            public void run(){
                results.add(n.compute());
            }
        });
        parallelRecursive(exec, n.getChildren(), results);
    }
}

當parallelRecursive返回的時候,樹中的各個節點都已經訪問過了(遍歷過程還是串行,compute()調用纔是並行),而且每一個節點的計算任務也已經放入了Executor的工做隊列。

小結

對於併發執行的任務,Executor框架是一種強大且靈活的框架。它提供了大量可調節的選項,例如建立線程和關閉線程的策略,處理隊列任務的策略,處理過多任務的策略,而且提供了幾個鉤子方法來拓展它的行爲。固然,其中有些參數不能很好的工做,某些類型的任務須要特定的執行策略,而一些參數組合可能會產生想象以外的結果。

相關文章
相關標籤/搜索