Java8 新特性併發篇(一) | 線程與執行器

本文翻譯整理自 winterbe.com/posts/2015/…html

歡迎進入 Java8 併發篇系列。此係列大概包括三篇教程:java

本篇博文是此係列的第一篇,接下來的 15 分鐘裏,我將會經過一些簡單易懂的示例代碼來教會你,如何在 Java8 中進行併發編程,學會如何經過 Thread, RunableExecutor 來並行執行代碼。git

關於 JDK 中 併發 API 是在 JDK1.5 中被首次引入的,而且在後續的版本中獲得不斷地加強。這篇文章中介紹的大部分名詞概念一樣適用於老版本,這一點你不用擔憂。本文的着重點在代碼演示,如何使用 lambda 表達式以及新特性相關。github

若是你對 lambda 表達式不是很熟悉,我推薦你先閱讀我以前的文章:Java8 新特性指導手冊編程


★★★ 若是此教程有幫助到你, 去小哈的 GitHub 幫忙 Star 一下吧, 謝謝啦! 傳送門 ★★★api


目錄

Threads 與 Runnables

能夠確定的是,全部的現代操做系統支持併發的手段無外乎進程線程。進程一般能夠理解爲獨立運行的程序實例,打個比方,你啓動一個 Java 程序,這個時候操做系統就會建立一個新的進程,它能夠與其餘程序並行運行。而在這些進程的內部,咱們能夠經過線程來併發執行一段代碼,一段業務邏輯。這樣作有啥好處?好處就是,咱們能夠最大程度的發揮機器多核 CPU 的優點。

Java 從 1.0 版本就開始支持線程了。能夠說是最基本的功能了,在老版本中,咱們一般會實現 Runnable 接口,重寫 run() 方法, 在方法內編寫一段業務代碼:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");
複製代碼

由於 Runnable 接口是一個函數式接口,咱們直接採用 lambda 表達式來書寫它,內部的業務邏輯是打印當前的線程名,控制檯的輸出可能存在兩種狀況:

Hello main
Hello Thread-0
Done!
複製代碼

或者:

Hello main
Done!
Hello Thread-0
複製代碼

咱們沒法預測 runnable 是在主線程執行完成後執行仍是以前執行,由於這種順序的不肯定性,使得在一個體量龐大的應用中,併發編程變得異常複雜。

咱們能夠在線程內部設置休眠時間,來模擬一個長時間運行的任務:

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        // 休眠一秒
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();
複製代碼

運行上面的代碼,你會發現兩條打印語句之間會間隔一秒。TimeUnit 在處理單位時間的時候,是一個很實用的枚舉類,你也能夠經過調用 Thread.sleep(1000) 來達到一樣的目的。

咱們不得不認可,使用 Thread 類是很乏味的且容易出錯的。因爲 Java 併發相關的 API 是在 2004 年 Java5 發佈的時候才被正式引入的。它們被放置在 java.util.concurrent 包下,裏面包含了不少處理併發編程有用的類。

這些併發 API 被引入後,在後續的 Java 版本中,又獲得不斷的加強,Java8 甚至提供了新的併發類和方法來處理併發。

廢話很少說,接下來,讓咱們進入併發 API 中最重要的執行器: Executor.

執行器 Executor

設計 ExecutorService 的目的是用來替代咱們直接手動建立 ThreadExecutors 支持運行異步任務,一般管理着一個線程池,線程池內部線程會獲得複用,避免了頻繁建立線程,銷燬線程而帶來的額外的系統開銷。

// 建立一個只包含一個線程的線程池
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
	String threadName = Thread.currentThread().getName();
	System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1
複製代碼

Executors 類提供了便利的工廠方法用來不一樣類型的線程池。上面的示例中咱們建立了一個只包含單線程的線程池 executor

代碼的輸出結果與上面手動 new Runnable() 輸出一致,惟一不一樣的是,Java 進程沒有終止

注意:Executors 必須顯示的終止它,不然它們將持續監聽是否有新的任務須要執行。這也是爲何我在研發組中推薦小組人員建立線程池務必要交給 Spring Ioc 容器管理的緣由。

那麼,如何優雅的關閉 ExecutorService 呢?

ExecutorService 提供了兩個方法:

  • 1.shutdwon() : 它會等待正在執行的任務執行完成後,再銷燬線程池;
  • 2.shutdownNow(): 它會馬上終止正在執行的任務,並銷燬線程池;

關閉 ExecutorService 最佳實踐:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    // 指定關閉以前的等待時間 5s,達到「溫柔滴」關閉
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}
複製代碼

Callable 和 Future

除了 Runnable, executor 還支持另一種任務類型 —— CallableCallableRunnable 相似,惟一不一樣的是,Callable 有返回值。

下面的示例代碼中,咱們經過 lambda 表達式定義了一個 Callable:休眠 1s 鍾後,返回一個整數。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};
複製代碼

Callable 也能夠像 Runnable 同樣,做爲入參,提交給 executor。這樣的話,問題來了,咱們怎麼取到返回值呢?由於 submit() 方法不會阻塞等待任務完成的。

雖然 executor 不能直接返回 Callable 結果,不過,executor 能夠返回一個 Future 類型的結果,他能夠在稍後的某個時機取出實際的返回值。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

// 這裏會阻塞等待返回結果
Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
複製代碼

上面的這段代碼,Callable 提交給 executor 後,咱們先經過調用 isDone() 檢查 future 是否執行完成,結果固然是 false, 由於 future 在返回那個整數以前,會休眠 1s。

後面到執行 get() 方法,線程會阻塞等待返回結果。整個流程走完,咱們再來看下控制檯的輸出:

future done? false
future done? true
result: 123
複製代碼

你須要注意,若是你關閉 executor, 全部爲關閉的 future 都會拋出異常。

executor.shutdownNow();
future.get();
複製代碼

還有一點,你可能也注意到了,此次咱們建立線程池並不是使用 newSingleThreadExecutor()。而是經過newFixedThreadPool(1)來建立一個單線程線程池的 executor。 它等同於使用newSingleThreadExecutor,不過使用第二種方式咱們動態的調整入參,好比傳入一個比 1 大的值來增長線程池的大小。

超時 Timeouts

上面提到了,future.get() 那裏會阻塞等待,直到 callable 返回值。咱們試想一個最糟糕的狀況,callable 持續運行致使你的程序沒有響應,這顯然是不能忍受的。咱們能夠經過傳入一個超時時間來避免發生這種狀況:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

// 設置超時時間爲 1s
future.get(1, TimeUnit.SECONDS);
複製代碼

執行上面這段代碼,會拋出一個 TimeoutException:

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
複製代碼

爲何拋出這個異常,緣由很顯然,咱們指定的超時時間是 1s, 而任務中光休眠就是 2s 了。

invokeAll

Executors 支持經過 invokeAll 方法一次性批量提交多個 callable。這個方法的返回結果是一個 future 集合:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            // 拿到每一個 future 的返回值
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println); // for 循環輸出結果
複製代碼

上面這段示例代碼,咱們先利用 Java8 的 Stream 流來處理 invokeAll() 方法返回的 future 集合,取出每一個 future 的返回值將其映射到一個 List<String> 集合中,最後循環輸出結果。

invokeAny

批量提交的另一種方式是 invokeAny() , 一樣是批量提交,它與 invokeAll() 不一樣點在於:它會阻塞等待,當某個 callable 第一個執行完成,它會馬上返回執行結果,而再也不等待那些正在執行中的 callable 了。

爲了測試,咱們定義一個方法,用來模擬建立擁有不一樣執行時間的 callable

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}
複製代碼

而後,咱們建立一組 callable, 他們擁有不一樣的執行時間,1s 到 3s 的,經過invokeAny()將這些 callable 提交給executor,看看返回最快的 callable 的字符串結果是哪一個:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2
複製代碼

最快的是 task2, 它的執行時間最短,因此輸出是它。

上面這段示例代碼中,建立 ExecutorService 的方式又變了,經過 Executors.newWorkStealingPool()。這個工廠方法是 Java8 才引入的,返回值是一個 ForkJoinPool類型的 executor,它和指定固定大小的線程池不一樣,ForkJoinPools 容許指定一個並行因子來建立,默認的值爲物理機可用的 CPU 核心數。

ForkJoinPools 是在 Java7 中才被引入的,這將會在後面系列的教程中作詳細介紹。敬請期待。

任務調度 ScheduledExecutor

上面咱們已經學習瞭如何在一個 executor 中提交和運行一次任務。還有中場景是,咱們須要屢次運行一個常見的任務,這個時候,咱們能夠利用調度線程池。

ScheduledExecutorService支持任務調度,持續執行或者延遲一段時間後再執行。

下面的代碼示例,指定一個任務延遲 3 分鐘再執行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
複製代碼

ScheduledExecutorService 會返回一個加強類型的 future 類型 —— ScheduleFuture,它除了保有原有 Future 提供的全部方法外,還提供了 getDelay() 方法, 用來獲取距離目標時間的時間差,也就是剩餘的延遲時間。

爲了保證調度任務的持續執行,executors 提供了兩個 API:

  • 1.scheduleAtFixedRate()固定週期執行一個任務, 不考慮任務的耗時,即便上個任務尚未執行完畢,到了時間,下個任務依然會去執行;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

// 第一次執行的延遲時間
int initialDelay = 0;
// 週期頻率
int period = 1;
// 指定任務馬上執行,且每次之間間隔爲 1s
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
複製代碼

注意:initialDelay 表示第一次執行的延遲時間,0 表示當即執行

  • 2.scheduleWithFixedDelay()固定延遲執行任務,就是說,必需要等到上個任務執行完成後纔開始計時,達到設定的延遲時間後,下個任務纔會被觸發
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
複製代碼

總結一下:scheduleWithFixedDelay()scheduleAtFixedRate() 最大的區別就是,scheduleWithFixedDelay()須要等到前一個任務執行完成後纔開始計延時,再觸發下一個任務。

如上面的示例代碼中這個調度任務,設置了 1s 的延時,初始化延時爲 0,假設理想狀況下任務的耗時就是 2s, 則任務的調度週期爲: 0s -> 3s -> 6s -> 9s ...., 它更適用於當你沒法預測任務的執行時長的場景中使用。

總結

這篇文章主要探討了如何在用 lambda 表達式,在 Java8 中使用線程和執行器,並演示了相關示例代碼。除此以外,咱們還學習了 CallableFuture, 超時設置,任務的批量提交 invokeAll()invokeAny() API。最後, 瞭解瞭如何在 Java8 中使用任務調度 ScheduledExecutor,但願你能有所收穫。

GitHub 地址

github.com/weiwosuoai/…

小哈的微信公衆號,歡迎關注

相關文章
相關標籤/搜索