Java 8 併發教程:線程和執行器

Java 8 併發教程:線程和執行器

原文:Java 8 Concurrency Tutorial: Threads and Executorsjava

譯者:BlankKellygit

來源:Java8併發教程:Threads和Executorsgithub

歡迎閱讀個人Java8併發教程的第一部分。這份指南將會以簡單易懂的代碼示例來教給你如何在Java8中進行併發編程。這是一系列教程中的第一部分。在接下來的15分鐘,你將會學會如何經過線程,任務(tasks)和 exector services來並行執行代碼。編程

併發在Java5中首次被引入並在後續的版本中不斷獲得加強。在這篇文章中介紹的大部分概念一樣適用於之前的Java版本。不過個人代碼示例聚焦於Java8,大量使用lambda表達式和其餘新特性。若是你對lambda表達式不屬性,我推薦你首先閱讀個人Java 8 教程post

ThreadRunnable

全部的現代操做系統都經過進程和線程來支持併發。進程是一般彼此獨立運行的程序的實例,好比,若是你啓動了一個Java程序,操做系統產生一個新的進程,與其餘程序一塊兒並行執行。在這些進程的內部,咱們使用線程併發執行代碼,所以,咱們能夠最大限度的利用CPU可用的核心(core)。性能

Java從JDK1.0開始執行線程。在開始一個新的線程以前,你必須指定由這個線程執行的代碼,一般稱爲task。這能夠經過實現Runnable——一個定義了一個無返回值無參數的run()方法的函數接口,以下面的代碼所示:學習

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

由於Runnable是一個函數接口,因此咱們利用lambda表達式將當前的線程名打印到控制檯。首先,在開始一個線程前咱們在主線程中直接運行runnable。

控制檯輸出的結果可能像下面這樣:

Hello main
Hello Thread-0
Done!

或者這樣:

Hello main
Done!
Hello Thread-0

因爲咱們不能預測這個runnable是在打印'done'前執行仍是在以後執行。順序是不肯定的,所以在大的程序中編寫併發程序是一個複雜的任務。

咱們能夠將線程休眠肯定的時間。在這篇文章接下來的代碼示例中咱們能夠經過這種方法來模擬長時間運行的任務。

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

當你運行上面的代碼時,你會注意到在第一條打印語句和第二條打印語句之間存在一分鐘的延遲。TimeUnit在處理單位時間時一個有用的枚舉類。你能夠經過調用Thread.sleep(1000)來達到一樣的目的。

使用Thread類是很單調的且容易出錯。因爲併發API在2004年Java5發佈的時候才被引入。這些API位於java.util.concurrent包下,包含不少處理併發編程的有用的類。自從這些併發API引入以來,在隨後的新的Java版本發佈過程當中獲得不斷的加強,甚至Java8提供了新的類和方法來處理併發。

接下來,讓咱們走進併發API中最重要的一部——executor services。

Executor

併發API引入了ExecutorService做爲一個在程序中直接使用Thread的高層次的替換方案。Executos支持運行異步任務,一般管理一個線程池,這樣一來咱們就不須要手動去建立新的線程。在不斷地處理任務的過程當中,線程池內部線程將會獲得複用,所以,在咱們可使用一個executor service來運行和咱們想在咱們整個程序中執行的同樣多的併發任務。

下面是使用executors的第一個代碼示例:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1

Executors類提供了便利的工廠方法來建立不一樣類型的 executor services。在這個示例中咱們使用了一個單線程線程池的 executor。

代碼運行的結果相似於上一個示例,可是當運行代碼時,你會注意到一個很大的差異:Java進程從沒有中止!Executors必須顯式的中止-不然它們將持續監聽新的任務。

ExecutorService提供了兩個方法來達到這個目的——shutdwon()會等待正在執行的任務執行完而shutdownNow()會終止全部正在執行的任務並當即關閉execuotr。

這是我喜歡的一般關閉executors的方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
    }
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor經過等待指定的時間讓當前執行的任務終止來「溫柔的」關閉executor。在等待最長5分鐘的時間後,execuote最終會經過中斷全部的正在執行的任務關閉。

CallableFuture

除了Runnable,executor還支持另外一種類型的任務——Callable。Callables也是相似於runnables的函數接口,不一樣之處在於,Callable返回一個值。

下面的lambda表達式定義了一個callable:在休眠一分鐘後返回一個整數。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callbale也能夠像runnbales同樣提交給 executor services。可是callables的結果怎麼辦?由於submit()不會等待任務完成,executor service不能直接返回callable的結果。不過,executor 能夠返回一個Future類型的結果,它能夠用來在稍後某個時間取出實際的結果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在將callable提交給exector以後,咱們先經過調用isDone()來檢查這個future是否已經完成執行。我十分肯定這會發生什麼,由於在返回那個整數以前callable會休眠一分鐘、

在調用get()方法時,當前線程會阻塞等待,直到callable在返回實際的結果123以前執行完成。如今future執行完畢,咱們能夠在控制檯看到以下的結果:

future done? false
future done? true
result: 123

Future與底層的executor service緊密的結合在一塊兒。記住,若是你關閉executor,全部的未停止的future都會拋出異常。

executor.shutdownNow();
future.get();

你可能注意到咱們此次建立executor的方式與上一個例子稍有不一樣。咱們使用newFixedThreadPool(1)來建立一個單線程線程池的 execuot service。
這等同於使用newSingleThreadExecutor不過使用第二種方式咱們能夠稍後經過簡單的傳入一個比1大的值來增長線程池的大小。

超時

任何future.get()調用都會阻塞,而後等待直到callable停止。在最糟糕的狀況下,一個callable持續運行——所以使你的程序將沒有響應。咱們能夠簡單的傳入一個時長來避免這種狀況。

ExecutorService executor = Executors.newFixedThreadPool(1);

    Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

    future.get(1, TimeUnit.SECONDS);

運行上面的代碼將會產生一個TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已經猜到俄爲何會排除這個異常。咱們指定的最長等待時間爲1分鐘,而這個callable在返回結果以前實際須要兩分鐘。

invokeAll

Executors支持經過invokeAll()一次批量提交多個callable。這個方法結果一個callable的集合,而後返回一個future的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在這個例子中,咱們利用Java8中的函數流(stream)來處理invokeAll()調用返回的全部future。咱們首先將每個future映射到它的返回值,而後將每一個值打印到控制檯。若是你還不屬性stream,能夠閱讀個人Java8 Stream 教程

invokeAny

批量提交callable的另外一種方式就是invokeAny(),它的工做方式與invokeAll()稍有不一樣。在等待future對象的過程當中,這個方法將會阻塞直到第一個callable停止而後返回這一個callable的結果。

爲了測試這種行爲,咱們利用這個幫助方法來模擬不一樣執行時間的callable。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

咱們利用這個方法建立一組callable,這些callable擁有不一樣的執行時間,從1分鐘到3分鐘。經過invokeAny()將這些callable提交給一個executor,返回最快的callable的字符串結果-在這個例子中爲任務2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上面這個例子又使用了另外一種方式來建立executor——調用newWorkStealingPool()。這個工廠方法是Java8引入的,返回一個ForkJoinPool類型的 executor,它的工做方法與其餘常見的execuotr稍有不一樣。與使用一個固定大小的線程池不一樣,ForkJoinPools使用一個並行因子數來建立,默認值爲主機CPU的可用核心數。

ForkJoinPools 在Java7時引入,將會在這個系列後面的教程中詳細講解。讓咱們深刻了解一下 scheduled executors 來結束本次教程。

ScheduledExecutor

咱們已經學習瞭如何在一個 executor 中提交和運行一次任務。爲了持續的屢次執行常見的任務,咱們能夠利用調度線程池。

ScheduledExecutorService支持任務調度,持續執行或者延遲一段時間後執行。

下面的實例,調度一個任務在延遲3分鐘後執行:

ScheduledExecutorService executor =                 Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

調度一個任務將會產生一個專門的future類型——ScheduleFuture,它除了提供了Future的全部方法以外,他還提供了getDelay()方法來得到剩餘的延遲。在延遲消逝後,任務將會併發執行。

爲了調度任務持續的執行,executors 提供了兩個方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一個方法用來以固定頻率來執行一個任務,好比,下面這個示例中,每分鐘一次:

ScheduledExecutorService executor =     Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,這個方法還接收一個初始化延遲,用來指定這個任務首次被執行等待的時長。

請記住:scheduleAtFixedRate()並不考慮任務的實際用時。因此,若是你指定了一個period爲1分鐘而任務須要執行2分鐘,那麼線程池爲了性能會更快的執行。

在這種狀況下,你應該考慮使用scheduleWithFixedDelay()。這個方法的工做方式與上咱們上面描述的相似。不一樣之處在於等待時間 period 的應用是在一次任務的結束和下一個任務的開始之間。例如:

ScheduledExecutorService executor =         Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

這個例子調度了一個任務,並在一次執行的結束和下一次執行的開始之間設置了一個1分鐘的固定延遲。初始化延遲爲0,任務執行時間爲0。因此咱們分別在0s,3s,6s,9s等間隔處結束一次執行。如你所見,scheduleWithFixedDelay()在你不能預測調度任務的執行時長時是頗有用的。

這是併發系列教程的第一部分。我推薦你親手實踐一下上面的代碼示例。你能夠從 Github 上找到這篇文章中全部的代碼示例,因此歡迎你fork這個倉庫,並收藏它

我但願你會喜歡這篇文章。若是你有任何的問題均可以在下面評論或者經過 Twitter 向我反饋。

相關文章
相關標籤/搜索