使用ThreadPoolExecutor並行執行獨立的單線程任務

Java SE 5.0中引入了任務執行框架,這是簡化多線程程序設計開發的一大進步。使用這個框架能夠方便地管理任務:管理任務的生命週期以及執行策略。html

在這篇文章中,咱們經過一個簡單的例子來展示這個框架所帶來的靈活與簡單。java

基礎

執行框架引入了Executor接口來管理任務的執行。Executor是一個用來提交Runnable任務的接口。這個接口將任務提交與任務執行隔離起來:擁有不一樣執行策略的executor都實現了同一個提交接口。改變執行策略不會影響任務的提交邏輯。算法

若是你要提交一個Runnable對象來執行,很簡單:api

Executor exec = …;

exec.execute(runnable);


如前所述,executor如何去執行提交的runnable任務並無在Executor接口中規定,這取決於你所用的executor的具體類型。這個框架提供了幾種不一樣的executor,執行策略針對不一樣的場景而不一樣。線程池

你可能會用到的最多見的executor類型就是線程池executor,也就是ThreadPoolExecutor類(及其子類)的實例。ThreadPoolExecutor管理着一個線程池和一個工做隊列,線程池存放着用於執行任務的工做線程。多線程

你確定在其餘技術中也瞭解過「池」的概念。使用「池」的一個最大的好處就是減小資源建立的開銷,用過並釋放後,還能夠重用。另外一個間接的好處是你能夠控制使用資源的多少。好比,你能夠調整線程池的大小達到你想要的負載,而不損害系統的資源。併發

這個框架提供了一個工廠類,叫Executors,來建立線程池。使用這個工程類你能夠建立不一樣特性的線程池。儘管底層的實現經常是同樣的(ThreadPoolExecutor),但工廠類可使你沒必要使用複雜的構造函數就能夠快速地設置一個線程池。工程類的工廠方法有:oracle

  • newFixedThreadPool:該方法返回一個最大容量固定的線程池。它會按需建立新線程,線程數量不大於配置的數量大小。當線程數達到最大之後,線程池會一直維持這麼多不變。
  • newCachedThreadPool:該方法返回一個無界的線程池,也就是沒有最大數量限制。但當工做量減少時,這類線程池會銷燬沒用的線程。
  • newSingleThreadedExecutor:該方法返回一個executor,它能夠保證全部的任務都在一個單線程中執行。
  • newScheduledThreadPool:該方法返回一個固定大小的線程池,它支持延時和定時任務的執行。

這僅僅是一個開端。Executor還有一些其餘用法已超出了這篇文章的範圍,我強烈推薦你研究如下內容:框架

  • 生命週期管理的方法,這些方法由ExecutorService接口聲明(好比shutdown()和awaitTermination())。
  • 使用CompletionService來查詢任務狀態、獲取返回值,若是有返回值的話。

ExecutorService接口特別重要,由於它提供了關閉線程池的方法,並確保清理了再也不使用的資源。使人欣慰的是,ExecutorService接口至關簡單、一目瞭然,我建議全面地學習下它的文檔。ide

大體來講,當你向ExecutorService發送了一個shutdown()消息後,它就不會接收新提交的任務,可是仍在隊列中的任務會被繼續處理完。你可使用isTerminated()來查詢ExecutorService終止狀態,或使用awaitTermination(…)方法來等待ExecutorService終止。若是傳入一個最大超時時間做爲參數,awaitTermination方法就不會永遠等待。函數

警告: 對JVM進程永遠不會退出的理解上,存在着一些錯誤和迷惑。若是你不關閉executorService,只是銷燬了底層的線程,JVM就不會退出。當最後一個普通線程(非守護線程)退出後,JVM也會退出。

配置ThreadPoolExecutor

若是你決定不使用Executor的工廠類,而是手動建立一個 ThreadPoolExecutor,你須要使用構造函數來建立並配置。下面是這個類使用最普遍的一個構造函數:

public ThreadPoolExecutor(

    int corePoolSize,

    int maxPoolSize,

    long keepAlive,

    TimeUnit unit,

    BlockingQueue<Runnable> workQueue,

    RejectedExecutionHandler handler);


核心池的大小(線程池將會使用的大小)如你所見,你能夠配置如下內容:

  • 最大池大小
  • 存活時間,空閒線程在這個時間後被銷燬
  • 存聽任務的工做隊列
  • 任務提交拒絕後要執行的策略

限制隊列中任務數

限制執行任務的併發數、限制線程池大小對應用程序以及程序執行結果的可預期性與穩定性有很大的好處。無盡地建立線程,最終會耗盡運行時資源。你的應用程序所以會產生嚴重的性能問題,甚至致使程序不穩定。

這隻解決了部分問題:限制了併發任務數,但並無限制提交到等待隊列的任務數。若是任務提交的速率一直高於任務執行的速率,那麼應用程序最終會出現資源短缺的情況。

解決方法是:

  • 爲Executor提供一個存放待執行任務的阻塞隊列。若是隊列填滿,之後提交的任務會被「拒絕」。
  • 當任務提交被拒絕時會觸發RejectedExecutionHandler,這也是爲何這個類名中引用動詞「rejected」。你能夠實現本身的拒絕策略,或者使用框架內置的策略。

默認的拒絕策略可讓executor拋出一個RejectedExecutionException異常。然而,還有其餘的內建策略:

  • 悄悄地丟棄一個任務
  • 丟棄最舊的任務,從新提交最新的
  • 在調用者的線程中執行被拒絕的任務

何時以及爲何咱們纔會這樣配置線程池?讓咱們看一個例子。

示例:並行執行獨立的單線程任務

最近,我被叫去解決一個好久之前的任務的問題,個人客戶以前就運行過這個任務。大體來講,這個任務包含一個組件,這個組件監聽目錄樹所產生的文件系統事件。每當一個事件被觸發,必須處理一個文件。一個專門的單線程執行文件處理。說真的,根據任務的特色,即便我能把它並行化,我也不想那麼作。一天的某些時候,事件到達率才很高,文件也不必實時處理,在次日以前處理完便可。

當前的實現採用了一些混合且匹配的技術,包括使用UNIX SHELL腳本掃描目錄結構,並檢測是否發生改變。實現完成後,咱們採用了雙核的執行環境。一樣,事件的到達率至關低:目前爲止,事件數以百萬計,總共要處理1~2T字節的原始數據。

運行處理程序的主機是12核的機器:很好機會去並行化這些舊的單線程任務。基本上,咱們有了食譜的全部原料,咱們須要作的僅僅是把程序創建起來並調節。在寫代碼前,咱們必須瞭解下程序的負載。我列一下我檢測到的內容:

  • 有很是多的文件須要被週期性地掃描:每一個目錄包含1~2百萬個文件
  • 掃描算法很快,能夠並行化
  • 處理一個文件至少須要1s,甚至上升到2s或3s
  • 處理文件時,性能瓶頸主要是CPU
  • CPU利用率必須可調,根據一天時間的不一樣而使用不一樣的負載配置。

我須要這樣一個線程池,它的大小在程序運行的時候經過負載配置來設置。我傾向於根據負載策略建立一個固定大小的線程池。因爲線程的性能瓶頸在CPU,它的核心使用率是100%,不會等待其餘資源,那麼負載策略就很好計算了:用執行環境的CPU核心數乘以一個負載因子(保證計算的結果在峯值時至少有一個核心):

int cpus = Runtime.getRuntime().availableProcessors();

int maxThreads = cpus * scaleFactor;

maxThreads = (maxThreads > 0 ? maxThreads : 1);


並且,我將使用ThreadPoolExecutor.CallerRunsPolicy做爲拒絕策略。爲何?由於當隊列已滿時,線程池的線程忙於處理文件,我讓提交任務的線程去執行它(被拒絕的任務)。這樣,掃面會中止,轉而去處理一個文件,處理結束後立刻又會掃描目錄。而後我須要使用阻塞隊列建立一個ThreadPoolExecutor,能夠限制提交的任務數。爲何?是這樣,掃描算法執行很快,很快就產生龐大數量須要處理的文件。數量有多龐大呢?很難預測,由於變更太大了。我不想讓executor內部的隊列不加選擇地填滿了要執行的任務實例(這些實例包含了龐大的文件描述符)。我寧願在隊列填滿時,拒絕這些文件。

下面是建立executor的代碼:

ExecutorService executorService =

    new ThreadPoolExecutor(

        maxThreads, // core thread pool size

        maxThreads, // maximum thread pool size

        1, // time to wait before resizing pool

        TimeUnit.MINUTES,

        new ArrayBlockingQueue<Runnable>(maxThreads, true),

        new ThreadPoolExecutor.CallerRunsPolicy());

下面是程序的框架(極其簡化版):

// scanning loop: fake scanning

while (!dirsToProcess.isEmpty()) {

    File currentDir = dirsToProcess.pop();



    // listing children

    File[] children = currentDir.listFiles();



    // processing children

    for (final File currentFile : children) {

        // if it's a directory, defer processing

        if (currentFile.isDirectory()) {

            dirsToProcess.add(currentFile);

            continue;

        }



        executorService.submit(new Runnable() {

            @Override

            public void run() {

                try {

                    // if it's a file, process it

                    new ConvertTask(currentFile).perform();

                } catch (Exception ex) {

                    // error management logic

                }

            }

        });

    }

}



// ...

// wait for all of the executor threads to finish

executorService.shutdown();

try {

    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {

        // pool didn't terminate after the first try

        executorService.shutdownNow();

    }



    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {

        // pool didn't terminate after the second try

    }

} catch (InterruptedException ex) {

    executorService.shutdownNow();

    Thread.currentThread().interrupt();

}

總結

看到了吧,Java併發API很是簡單易用,十分靈活,也很強大。真但願我多年前能夠多花點功夫寫一個這樣簡單的程序。這樣我就能夠在幾小時內解決由傳統單線程組件所引起的擴展性問題。

相關文章
相關標籤/搜索