Java併發框架: Executor

介紹

隨着當今處理器中可用的核心數量的增長, 隨着對實現更高吞吐量的需求的不斷增加,多線程 API 變得很是流行。 Java 提供了本身的多線程框架,稱爲 Executor 框架.html

1. Executor 框架是什麼?

Executor 框架包含一組用於有效管理工做線程的組件。Executor API 經過 Executors 將任務的執行與要執行的實際任務解耦。 這是 生產者-消費者 模式的一種實現。java

java.util.concurrent.Executors 提供了用於建立工做線程的線程池的工廠方法。git

爲了使用 Executor 框架,咱們須要建立一個線程池並提交任務給它以供執行。Executor 框架的工做是調度和執行已提交的任務並從線程池中拿到返回的結果。github

浮現於腦海中的一個基本的問題是,當咱們建立 java.lang.Thread 的對象或調用實現了 Runnable/Callable 接口來達到的程序的並行性時,爲何須要線程池?api

答案來源於兩個基本面:緩存

  1. 爲新任務建立新的線程會存在額外的線程建立以及銷燬的開銷。管理這些線程的生命週期會明顯增長 CPU 的執行時間。
  2. 不進行任何限制地爲每一個進程建立線程會致使建立大量線程。這些線程會佔用大量內存並引發資源的浪費。當一個線程利用完 CPU 的時間片後另外一個線程即將利用CPU的時間片時,CPU 會花費大量的時間來切換線程的上下文。

全部的這些因素都會致使系統的吞吐量降低。線程池經過保持線程一直存活並重用這些線程來克服這個問題。當提交到線程池中的任務多於正在執行的線程時,那些多餘的任務將被放到隊列中。 一旦執行任務的線程有空閒的了,它們會從隊列中取下一個任務來執行。對於 JDK 提供的現成的 executors 此任務隊列基本是無界的。bash

2. Executors 的類型

如今咱們已經瞭解了 executors 是什麼, 讓咱們來看看不一樣類型的 executors。多線程

2.1 SingleThreadExecutor

此線程池 executor 只有一個線程。它用於以順序方式的形式執行任務。若是此線程在執行任務時因異常而掛掉,則會建立一個新線程來替換此線程,後續任務將在新線程中執行。oracle

ExecutorService executorService = Executors.newSingleThreadExecutor()
複製代碼

2.2 FixedThreadPool(n)

顧名思義,它是一個擁有固定數量線程的線程池。提交給 executor 的任務由固定的 n 個線程執行,若是有更多的任務,它們存儲在 LinkedBlockingQueue 裏。這個數字 n 一般跟底層處理器支持的線程總數有關。框架

ExecutorService executorService = Executors.newFixedThreadPool(4);
複製代碼

2.3 CachedThreadPool

該線程池主要用於執行大量短時間並行任務的場景。與固定線程池不一樣,此線程池的線程數不受限制。若是全部的線程都在忙於執行任務而且又有新的任務到來了,這個線程池將建立一個新的線程並將其提交到 executor。只要其中一個線程變爲空閒,它就會執行新的任務。 若是一個線程有 60 秒的時間都是空閒的,它們將被結束生命週期並從緩存中刪除。

可是,若是管理得不合理,或者任務不是很短的,則線程池將包含大量的活動線程。這可能致使資源紊亂並所以致使性能降低。

ExecutorService executorService = Executors.newCachedThreadPool();
複製代碼

2.4 ScheduledExecutor

當咱們有一個須要按期運行的任務或者咱們但願延遲某個任務時,就會使用此類型的 executor。

ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1);
複製代碼

可使用 scheduleAtFixedRatescheduleWithFixedDelayScheduledExecutor 中按期的執行任務。

scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)
複製代碼

這兩種方法的主要區別在於它們對連續執行按期任務之間的延遲的應答。

scheduleAtFixedRate:不管前一個任務什麼時候結束,都以固定間隔執行任務。

scheduleWithFixedDelay:只有在當前任務完成後纔會啓動延遲倒計時。

3. 對於 Future 對象的理解

可使用 executor 返回的 java.util.concurrent.Future 對象訪問提交給 executor 的任務的結果。 Future 能夠被認爲是 executor 對調用者的響應。

Future<String> result = executorService.submit(callableTask);
複製代碼

如上所述,提交給 executor 的任務是異步的,即程序不會等待當前任務執行完成,而是直接進入下一步。相反,每當任務執行完成時,executor 在此 Future 對象中設置它。

調用者能夠繼續執行主程序,當須要提交任務的結果時,他能夠在這個 Future 對象上調用.get() 方法來獲取。若是任務完成,結果將當即返回給調用者,不然調用者將被阻塞,直到 executor 完成此操做的執行並計算出結果。

若是調用者不能無限期地等待任務執行的結果,那麼這個等待時間也能夠設置爲定時地。能夠經過 Future.get(long timeout,TimeUnit unit) 方法實現,若是在規定的時間範圍內沒有返回結果,則拋出 TimeoutException。調用者能夠處理此異常並繼續執行該程序。

若是在執行任務時出現異常,則對 get 方法的調用將拋出一個ExecutionException

對於 Future.get()方法返回的結果,一個重要的事情是,只有提交的任務實現了java.util.concurrent.Callable接口時才返回 Future。若是任務實現了Runnable接口,那麼一旦任務完成,對 .get() 方法的調用將返回 null

另外一個關注點是 Future.cancel(boolean mayInterruptIfRunning) 方法。此方法用於取消已提交任務的執行。若是任務已在執行,則 executor 將嘗試在mayInterruptIfRunning 標誌爲 true 時中斷任務執行。

4. Example: 建立和執行一個簡單的 Executor

咱們如今將建立一個任務並嘗試在 fixed pool executor 中執行它:

public class Task implements Callable<String> {

    private String message;

    public Task(String message) {
        this.message = message;
    }

    @Override
    public String call() throws Exception {
        return "Hello " + message + "!";
    }
}
複製代碼

Task 類實現 Callable 接口並有一個 String 類型做爲返回值的方法。 這個方法也能夠拋出 Exception。這種向 executor 拋出異常的能力以及 executor 將此異常返回給調用者的能力很是重要,由於它有助於調用者知道任務執行的狀態。

如今讓咱們來執行一下這個任務:

public class ExecutorExample {  
    public static void main(String[] args) {

        Task task = new Task("World");

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> result = executorService.submit(task);

        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Error occured while executing the submitted task");
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}
複製代碼

咱們建立了一個具備4個線程數的 FixedThreadPool executors,由於這個 demo 是在四核處理器上開發的。若是正在執行的任務執行大量 I/O 操做或花費較長時間等待外部資源,則線程數可能超過處理器的核心數。

咱們實例化了 Task 類,並將它提交給 executors 執行。 結果由 Future 對象返回,而後咱們在屏幕上打印。

讓咱們運行 ExecutorExample 並查看其輸出:

Hello World!
複製代碼

正如所料,任務追加了問候語 Hello 並經過 Future object 返回結果。

最後,咱們調用 executorService 對象上的 shutdown 來終止全部線程並將資源返回給 OS。

.shutdown() 方法等待 executor 完成當前提交的任務。 可是,若是要求是當即關閉 executor 而不等待,那麼咱們可使用 .shutdownNow() 方法。

任何待執行的任務都將結果返回到 java.util.List 對象中。

咱們也能夠經過實現 Runnable 接口來建立一樣的任務:

public class Task implements Runnable{

    private String message;

    public Task(String message) {
        this.message = message;
    }

    public void run() {
        System.out.println("Hello " + message + "!");
    }
}
複製代碼

當咱們實現 Runnable 時,這裏有一些重要的變化。

  1. 沒法從 run() 方法獲得任務執行的結果。 所以,咱們直接在這裏打印。
  2. run() 方法不可拋出任何已受檢的異常。

5. 總結

隨着處理器時鐘速度難以提升,多線程正變得愈來愈主流。 可是,因爲涉及複雜性,處理每一個線程的生命週期很是困難。

在本文中,咱們展現了一個高效而簡單的多線程框架,即 Executor Framework,並解釋了它的不一樣組件。 咱們還看了一下在 executor 中建立提交和執行任務的不一樣示例。

與往常同樣,此示例的代碼能夠在 GitHub上找到。

原文:stackabuse.com/concurrency…

做者:Chandan Singh

譯者:KeepGoingPawn

相關文章
相關標籤/搜索