Java 8 併發: Threads 和 Executors

原文地址: Java 8 Concurrency Tutorial: Threads and Executorshtml

Java 5 初次引入了Concurrency API,並在隨後的發佈版本中不斷優化和改進。這篇文章的大部分概念也適用於老的版本。個人代碼示例主要聚焦在Java 8上,並大量適用 lambda 表達式和一些新特性。若是你還不熟悉 lambda 表達式,建議先閱讀 Java 8 Tutorialjava

ThreadsRunnables

全部現代操做系統都是經過進程線程來支持併發的。進程一般是相互獨立運行的程序實例。例如,你啓動一個 Java 程序,操做系統會產生一個新的進程和其餘程序並行運行。在這些進程中能夠利用線程同時執行代碼。這樣咱們就能夠充分利用 CPUshell

JavaJDK 1.0 開始就支持線程。在開始一個新線程以前,必須先指定運行的代碼,一般稱爲 Task。下面是經過實現 Runnable 接口來啓動一個新線程的例子:編程

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");
複製代碼

因爲 Runnable 是一個函數式接口,咱們可使用 lambda 表達式來打印線程的名字到控制檯。咱們直接在主線程上執行Runnable,而後開始一個新線程。在控制檯你將看到這樣的結果:api

Hello main
Hello Thread-0
Done!
複製代碼

或者:併發

Hello main
Done!
Hello Thread-0
複製代碼

因爲是併發執行,咱們沒法預測 Runnable 是在打印 Done 以前仍是以後調用,順序不是不肯定的,所以併發編程成爲大型應用程序開發中一項複雜的任務。oracle

線程也能夠休眠一段時間,例以下面的例子:異步

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();
複製代碼

執行上面的代碼會在兩個打印語句之間停留1秒鐘。TimeUnit 是一個時間單位的枚舉,或者能夠經過調用 Thread.sleep(1000) 實現。函數

使用 Thread 類可能很是繁瑣且容易出錯。因爲這個緣由,在2004年,Java 5版本引入了 Concurrency APIAPI 位於 java.util.concurrent 包下,包含了許多有用的有關併發編程的類。從那時起,每一個新發布的 Java 版本都增長了併發 APIJava 8 也提供了新的類和方法來處理併發。post

如今咱們來深刻了解一下Concurrency API中最重要的部分 - executor services

Executors

Concurrency API 引入了 ExecutorService 的概念,做爲處理線程的高級別方式用來替代 ThreadsExecutors 可以異步的執行任務,而且一般管理一個線程池。這樣咱們就不用手動的去建立線程了,線程池中的全部線程都將被重用。從而能夠在一個 executor service 的整個應用程序生命週期中運行儘量多的併發任務。

下面是一個簡單的 executors 例子:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1
複製代碼

Executors 類提供了方便的工廠方法來建立不一樣類型的 executor services 。在這個例子中使用了只執行一個線程的 executor

執行結果看起來和上面的示例相似,可是你會注意到一個重要區別:Java 進程永遠不會中止,執行者必須明確的中止它,不然它會不斷的接受新的任務。

ExecutorService 爲此提供了兩種方法:shutdown() 等待當前任務執行完畢,而 shutdownNow() 則中斷全部正在執行的任務,並當即關閉執行程序。在 shudown 以後不能再提交任務到線程池。

下面是我關閉程序的首選方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}
複製代碼

執行者調用 shutdown 關閉 executor,在等待 5 秒鐘鍾後,無論任務有沒有執行完畢都調用 shutdownNow 中斷正在執行的任務而關閉。

Callables 和 Futures

除了 Runnable 之外,executors 還支持 Callable 任務,和 Runnable 同樣是一個函數式接口,但它是有返回值的。

下面是一個使用 lambda 表達式定義的 Callable ,在睡眠 1 秒後返回一個整形值。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};
複製代碼

Runnable 同樣,Callable 也能夠提交到 executor services,可是執行的結果是什麼?因爲 submit() 不等待任務執行完成,executor service 不能直接返回調用的結果。相對應的,它返回一個 Future 類型的結果,使用 Future 能夠檢索實際執行結果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
複製代碼

在將 Callable 提交給 executor 後,首先經過 isDone() 來檢查 future 是否執行完畢。我敢確定,狀況並不是如此,由於上面的調用在返回整數以前睡眠了 1 秒鐘。

調用方法 get() 會阻塞當前線程,直到 callable 執行完成返回結果,如今 future 執行完成,並在控制檯輸出下面的結果:

future done? false
future done? true
result: 123
複製代碼

Futureexecutor service 緊密結合,若是關閉 executor service, 每一個 Future 都會拋出異常。

executor.shutdownNow();
future.get();
複製代碼

這裏建立 executor 的方式與前面的示例不一樣,這裏使用 newFixedThreadPool(1) 來建立一個線程數量爲 1 的線程池來支持 executor, 這至關於 newSingleThreadExecutor() ,稍後咱們咱們會經過傳遞一個大於 1 的值來增長線程池的大小。

Timeouts

任何對 future.get()的調用都會阻塞並等待 Callable 被終止。 在最壞的狀況下,一個可調用函數將永遠運行,從而使應用程序沒法響應。能夠簡單地經過超時來抵消這些狀況:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(1, TimeUnit.SECONDS);
複製代碼

執行上面的代碼會拋出 TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
複製代碼

指定了 1 秒鐘的最長等待時間,可是在返回結果以前,可調用事實上須要 2 秒鐘的時間。

InvokeAll

Executors 支持經過 invokeAll() 批量提交多個 Callable 。這個方法接受一個 Callable 類型集合的參數,並返回一個 Future 類型的 List

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);
複製代碼

在這個例子中,咱們利用 Java 8 的流來處理 invokeAll 調用返回的全部 Future。 咱們首先映射每一個 Future 的返回值,而後將每一個值打印到控制檯。 若是還不熟悉流,請閱讀Java 8 Stream Tutorial

InvokeAny

批量提交可調用的另外一種方法是 invokeAny(),它與 invokeAll() 略有不一樣。 該方法不會返回全部的 Future 對象,它只返回第一個執行完畢任務的結果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}
複製代碼

咱們使用這種方法來建立一個有三個不一樣睡眠時間的 Callable。 經過 invokeAny()將這些可調用對象提交給 executor,返回最快執行完畢結果,在這種狀況下,task2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task1", 2),
    callable("task2", 1),
    callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2
複製代碼

上面的例子使用經過 newWorkStealingPool() 建立的另外一種類型的 executor。 這個工廠方法是 Java 8 的一部分,而且返回一個類型爲 ForkJoinPoolexecutor,它與正常的 executor 略有不一樣。 它不使用固定大小的線程池,默認狀況下是主機CPU的可用內核數。

Scheduled Executors

咱們已經學會了如何在 Executors 上提交和運行任務。 爲了屢次按期運行任務,咱們可使用 scheduled thread pools

ScheduledExecutorService 可以安排任務按期運行或在一段時間事後運行一次。

下面代碼示例一個任務在三秒鐘後運行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
複製代碼

調度任務產生一個類型爲 ScheduledFuture的值,除了 Future 以外,它還提供getDelay() 方法來檢索任務執行的剩餘時間。

爲了定時執行的任務,executor 提供了兩個方法 scheduleAtFixedRate()scheduleWithFixedDelay() 。 第一種方法可以執行具備固定時間間隔的任務,例如, 每秒一次:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
複製代碼

此外,此方法還能夠設置延遲時間,該延遲描述了首次執行任務以前的等待時間。

scheduleWithFixedDelay() 方法與 scheduleAtFixedRate() 略有不一樣,不一樣之處是它們的等待時間,scheduleWithFixedDelay() 的等待時間是在上一個任務結束和下一個任務開始之間施加的。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
複製代碼

本示例在執行結束和下一次執行開始之間延遲 1 秒。 初始延遲爲 0,任務持續時間爲 2 秒。 因此咱們結束了一個0s,3s,6s,9s等的執行間隔。

相關文章
相關標籤/搜索