本文翻譯整理自 winterbe.com/posts/2015/…html
歡迎進入 Java8 併發篇系列。此係列大概包括三篇教程:java
本篇博文是此係列的第一篇,接下來的 15 分鐘裏,我將會經過一些簡單易懂的示例代碼來教會你,如何在 Java8 中進行併發編程,學會如何經過 Thread
, Runable
和 Executor
來並行執行代碼。git
關於 JDK 中 併發 API 是在 JDK1.5 中被首次引入的,而且在後續的版本中獲得不斷地加強。這篇文章中介紹的大部分名詞概念一樣適用於老版本,這一點你不用擔憂。本文的着重點在代碼演示,如何使用 lambda
表達式以及新特性相關。github
若是你對 lambda
表達式不是很熟悉,我推薦你先閱讀我以前的文章:Java8 新特性指導手冊編程
★★★ 若是此教程有幫助到你, 去小哈的 GitHub 幫忙 Star 一下吧, 謝謝啦! 傳送門 ★★★api
3、Callable 和 Futureoracle
4、超時設置異步
能夠確定的是,全部的現代操做系統支持併發的手段無外乎進程和線程。進程一般能夠理解爲獨立運行的程序實例,打個比方,你啓動一個 Java 程序,這個時候操做系統就會建立一個新的進程,它能夠與其餘程序並行運行。而在這些進程的內部,咱們能夠經過線程來併發執行一段代碼,一段業務邏輯。這樣作有啥好處?好處就是,咱們能夠最大程度的發揮機器多核 CPU 的優點。
Java 從 1.0 版本就開始支持線程了。能夠說是最基本的功能了,在老版本中,咱們一般會實現 Runnable
接口,重寫 run()
方法, 在方法內編寫一段業務代碼:
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
};
task.run();
Thread thread = new Thread(task);
thread.start();
System.out.println("Done!");
複製代碼
由於 Runnable 接口是一個函數式接口,咱們直接採用 lambda 表達式來書寫它,內部的業務邏輯是打印當前的線程名,控制檯的輸出可能存在兩種狀況:
Hello main
Hello Thread-0
Done!
複製代碼
或者:
Hello main
Done!
Hello Thread-0
複製代碼
咱們沒法預測 runnable
是在主線程執行完成後執行仍是以前執行,由於這種順序的不肯定性,使得在一個體量龐大的應用中,併發編程變得異常複雜。
咱們能夠在線程內部設置休眠時間,來模擬一個長時間運行的任務:
Runnable runnable = () -> {
try {
String name = Thread.currentThread().getName();
System.out.println("Foo " + name);
// 休眠一秒
TimeUnit.SECONDS.sleep(1);
System.out.println("Bar " + name);
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Thread thread = new Thread(runnable);
thread.start();
複製代碼
運行上面的代碼,你會發現兩條打印語句之間會間隔一秒。TimeUnit
在處理單位時間的時候,是一個很實用的枚舉類,你也能夠經過調用 Thread.sleep(1000)
來達到一樣的目的。
咱們不得不認可,使用 Thread
類是很乏味的且容易出錯的。因爲 Java 併發相關的 API 是在 2004 年 Java5 發佈的時候才被正式引入的。它們被放置在 java.util.concurrent
包下,裏面包含了不少處理併發編程有用的類。
這些併發 API 被引入後,在後續的 Java 版本中,又獲得不斷的加強,Java8 甚至提供了新的併發類和方法來處理併發。
廢話很少說,接下來,讓咱們進入併發 API 中最重要的執行器: Executor
.
設計 ExecutorService
的目的是用來替代咱們直接手動建立 Thread
。Executors
支持運行異步任務,一般管理着一個線程池,線程池內部線程會獲得複用,避免了頻繁建立線程,銷燬線程而帶來的額外的系統開銷。
// 建立一個只包含一個線程的線程池
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});
// => Hello pool-1-thread-1
複製代碼
Executors
類提供了便利的工廠方法用來不一樣類型的線程池。上面的示例中咱們建立了一個只包含單線程的線程池 executor
。
代碼的輸出結果與上面手動 new Runnable() 輸出一致,惟一不一樣的是,Java 進程沒有終止!
注意:
Executors
必須顯示的終止它,不然它們將持續監聽是否有新的任務須要執行。這也是爲何我在研發組中推薦小組人員建立線程池務必要交給 Spring Ioc 容器管理的緣由。
那麼,如何優雅的關閉 ExecutorService
呢?
ExecutorService
提供了兩個方法:
shutdwon()
: 它會等待正在執行的任務執行完成後,再銷燬線程池;shutdownNow()
: 它會馬上終止正在執行的任務,並銷燬線程池;關閉 ExecutorService
最佳實踐:
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
// 指定關閉以前的等待時間 5s,達到「溫柔滴」關閉
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
複製代碼
除了 Runnable
, executor
還支持另一種任務類型 —— Callable
。Callable
和 Runnable
相似,惟一不一樣的是,Callable
有返回值。
下面的示例代碼中,咱們經過 lambda 表達式定義了一個 Callable
:休眠 1s 鍾後,返回一個整數。
Callable<Integer> task = () -> {
try {
TimeUnit.SECONDS.sleep(1);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
};
複製代碼
Callable
也能夠像 Runnable
同樣,做爲入參,提交給 executor
。這樣的話,問題來了,咱們怎麼取到返回值呢?由於 submit()
方法不會阻塞等待任務完成的。
雖然 executor
不能直接返回 Callable
結果,不過,executor
能夠返回一個 Future
類型的結果,他能夠在稍後的某個時機取出實際的返回值。
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
// 這裏會阻塞等待返回結果
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
複製代碼
上面的這段代碼,Callable
提交給 executor
後,咱們先經過調用 isDone()
檢查 future
是否執行完成,結果固然是 false
, 由於 future
在返回那個整數以前,會休眠 1s。
後面到執行 get()
方法,線程會阻塞等待返回結果。整個流程走完,咱們再來看下控制檯的輸出:
future done? false
future done? true
result: 123
複製代碼
你須要注意,若是你關閉 executor
, 全部爲關閉的 future
都會拋出異常。
executor.shutdownNow();
future.get();
複製代碼
還有一點,你可能也注意到了,此次咱們建立線程池並不是使用 newSingleThreadExecutor()
。而是經過newFixedThreadPool(1)
來建立一個單線程線程池的 executor
。 它等同於使用newSingleThreadExecutor
,不過使用第二種方式咱們動態的調整入參,好比傳入一個比 1 大的值來增長線程池的大小。
上面提到了,future.get()
那裏會阻塞等待,直到 callable
返回值。咱們試想一個最糟糕的狀況,callable
持續運行致使你的程序沒有響應,這顯然是不能忍受的。咱們能夠經過傳入一個超時時間來避免發生這種狀況:
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});
// 設置超時時間爲 1s
future.get(1, TimeUnit.SECONDS);
複製代碼
執行上面這段代碼,會拋出一個 TimeoutException:
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
複製代碼
爲何拋出這個異常,緣由很顯然,咱們指定的超時時間是 1s, 而任務中光休眠就是 2s 了。
Executors
支持經過 invokeAll
方法一次性批量提交多個 callable
。這個方法的返回結果是一個 future
集合:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
// 拿到每一個 future 的返回值
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println); // for 循環輸出結果
複製代碼
上面這段示例代碼,咱們先利用 Java8 的 Stream 流來處理 invokeAll()
方法返回的 future
集合,取出每一個 future
的返回值將其映射到一個 List<String>
集合中,最後循環輸出結果。
批量提交的另一種方式是 invokeAny()
, 一樣是批量提交,它與 invokeAll()
不一樣點在於:它會阻塞等待,當某個 callable
第一個執行完成,它會馬上返回執行結果,而再也不等待那些正在執行中的 callable
了。
爲了測試,咱們定義一個方法,用來模擬建立擁有不一樣執行時間的 callable
:
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}
複製代碼
而後,咱們建立一組 callable
, 他們擁有不一樣的執行時間,1s 到 3s 的,經過invokeAny()
將這些 callable 提交給executor
,看看返回最快的 callable
的字符串結果是哪一個:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2
複製代碼
最快的是 task2, 它的執行時間最短,因此輸出是它。
上面這段示例代碼中,建立 ExecutorService
的方式又變了,經過 Executors.newWorkStealingPool()
。這個工廠方法是 Java8 才引入的,返回值是一個 ForkJoinPool
類型的 executor
,它和指定固定大小的線程池不一樣,ForkJoinPools
容許指定一個並行因子來建立,默認的值爲物理機可用的 CPU 核心數。
ForkJoinPools
是在 Java7 中才被引入的,這將會在後面系列的教程中作詳細介紹。敬請期待。
上面咱們已經學習瞭如何在一個 executor
中提交和運行一次任務。還有中場景是,咱們須要屢次運行一個常見的任務,這個時候,咱們能夠利用調度線程池。
ScheduledExecutorService
支持任務調度,持續執行或者延遲一段時間後再執行。
下面的代碼示例,指定一個任務延遲 3 分鐘再執行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
複製代碼
ScheduledExecutorService
會返回一個加強類型的 future
類型 —— ScheduleFuture
,它除了保有原有 Future
提供的全部方法外,還提供了 getDelay()
方法, 用來獲取距離目標時間的時間差,也就是剩餘的延遲時間。
爲了保證調度任務的持續執行,executors
提供了兩個 API:
scheduleAtFixedRate()
:固定週期執行一個任務, 不考慮任務的耗時,即便上個任務尚未執行完畢,到了時間,下個任務依然會去執行;ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
// 第一次執行的延遲時間
int initialDelay = 0;
// 週期頻率
int period = 1;
// 指定任務馬上執行,且每次之間間隔爲 1s
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
複製代碼
注意:
initialDelay
表示第一次執行的延遲時間,0 表示當即執行
scheduleWithFixedDelay()
:固定延遲執行任務,就是說,必需要等到上個任務執行完成後纔開始計時,達到設定的延遲時間後,下個任務纔會被觸發;ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
};
executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
複製代碼
總結一下:
scheduleWithFixedDelay()
和scheduleAtFixedRate()
最大的區別就是,scheduleWithFixedDelay()
須要等到前一個任務執行完成後纔開始計延時,再觸發下一個任務。
如上面的示例代碼中這個調度任務,設置了 1s 的延時,初始化延時爲 0,假設理想狀況下任務的耗時就是 2s, 則任務的調度週期爲: 0s -> 3s -> 6s -> 9s ....
, 它更適用於當你沒法預測任務的執行時長的場景中使用。
這篇文章主要探討了如何在用 lambda
表達式,在 Java8 中使用線程和執行器,並演示了相關示例代碼。除此以外,咱們還學習了 Callable
和 Future
, 超時設置,任務的批量提交 invokeAll()
和 invokeAny()
API。最後, 瞭解瞭如何在 Java8 中使用任務調度 ScheduledExecutor
,但願你能有所收穫。