本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html
Java併發包提供了一套框架,大大簡化了執行異步任務所需的開發,本節咱們就來初步探討這套框架。java
在以前的介紹中,線程Thread既表示要執行的任務,又表示執行的機制,而這套框架引入了一個"執行服務"的概念,它將"任務的提交"和"任務的執行"相分離,"執行服務"封裝了任務執行的細節,對於任務提交者而言,它能夠關注於任務自己,如提交任務、獲取結果、取消任務,而不須要關注任務執行的細節,如線程建立、任務調度、線程關閉等。git
以上描述可能比較抽象,接下來,咱們會一步步具體闡述。github
首先,咱們來看任務執行服務涉及的基本接口:編程
關於Runnable和Callable,咱們在前面幾節都已經瞭解了,都表示任務,Runnable沒有返回結果,而Callable有,Runnable不會拋出異常,而Callable會。swift
Executor表示最簡單的執行服務,其定義爲:微信
public interface Executor {
void execute(Runnable command);
}
複製代碼
就是能夠執行一個Runnable,沒有返回結果。接口沒有限定任務如何執行,多是建立一個新線程,多是複用線程池中的某個線程,也多是在調用者線程中執行。併發
ExecutorService擴展了Executor,定義了更多服務,基本方法有:框架
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//... 其餘方法
}
複製代碼
這三個submit都表示提交一個任務,返回值類型都是Future,返回後,只是表示任務已提交,不表明已執行,經過Future能夠查詢異步任務的狀態、獲取最終結果、取消任務等。咱們知道,對於Callable,任務最終有個返回值,而對於Runnable是沒有返回值的,第二個提交Runnable的方法能夠同時提供一個結果,在異步任務結束時返回,而對於第三個方法,異步任務的最終返回值爲null。dom
咱們來看Future接口的定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼
get用於返回異步任務最終的結果,若是任務還未執行完成,會阻塞等待,另外一個get方法能夠限定阻塞等待的時間,若是超時任務還未結束,會拋出TimeoutException。
cancel用於取消異步任務,若是任務已完成、或已經取消、或因爲某種緣由不能取消,cancel返回false,不然返回true。若是任務還未開始,則再也不運行。但若是任務已經在運行,則不必定能取消,參數mayInterruptIfRunning表示,若是任務正在執行,是否調用interrupt方法中斷線程,若是爲false就不會,若是爲true,就會嘗試中斷線程,但咱們從69節知道,中斷不必定能取消線程。
isDone和isCancelled用於查詢任務狀態。isCancelled表示任務是否被取消,只要cancel方法返回了true,隨後的isCancelled方法都會返回true,即便執行任務的線程還未真正結束。isDone表示任務是否結束,無論什麼緣由都算,多是任務正常結束、多是任務拋出了異常、也多是任務被取消。
咱們再來看下get方法,任務最終大概有三個結果:
若是調用get方法的線程被中斷了,get方法會拋出InterruptedException。
Future是一個重要的概念,是實現"任務的提交"與"任務的執行"相分離的關鍵,是其中的"紐帶",任務提交者和任務執行服務經過它隔離各自的關注點,同時進行協做。
說了這麼多接口,具體怎麼用呢?咱們看個簡單的例子:
public class BasicDemo {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sleepSeconds = new Random().nextInt(1000);
Thread.sleep(sleepSeconds);
return sleepSeconds;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new Task());
// 模擬執行其餘任務
Thread.sleep(100);
try {
System.out.println(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
複製代碼
咱們使用了工廠類Executors建立了一個任務執行服務,Executors有多個靜態方法,能夠用來建立ExecutorService,這裏使用的是:
public static ExecutorService newSingleThreadExecutor() 複製代碼
表示使用一個線程執行全部服務,後續咱們會詳細介紹Executors,注意與Executor相區別,後者是單數,是接口。
無論ExecutorService是如何建立的,對使用者而言,用法都同樣,例子提交了一個任務,提交後,能夠繼續執行其餘事情,隨後能夠經過Future獲取最終結果或處理任務執行的異常。
最後,咱們調用了ExecutorService的shutdown方法,它會關閉任務執行服務。
前面咱們只是介紹了ExecutorService的三個submit方法,其實它還有以下方法:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼
有兩個關閉方法,shutdown和shutdownNow,區別是,shutdown表示再也不接受新任務,但已提交的任務會繼續執行,即便任務還未開始執行,shutdownNow不只不接受新任務,已提交但還沒有執行的任務會被終止,對於正在執行的任務,通常會調用線程的interrupt方法嘗試中斷,不過,線程可能不響應中斷,shutdownNow會返回已提交但還沒有執行的任務列表。
shutdown和shutdownNow不會阻塞等待,它們返回後不表明全部任務都已結束,不過isShutdown方法會返回true。調用者能夠經過awaitTermination等待全部任務結束,它能夠限定等待的時間,若是超時前全部任務都結束了,即isTerminated方法返回true,則返回true,不然返回false。
ExecutorService有兩組批量提交任務的方法,invokeAll和invokeAny,它們都有兩個版本,其中一個限定等待時間。
invokeAll等待全部任務完成,返回的Future列表中,每一個Future的isDone方法都返回true,不過isDone爲true不表明任務就執行成功了,多是被取消了,invokeAll能夠指定等待時間,若是超時後有的任務沒完成,就會被取消。
而對於invokeAny,只要有一個任務在限時內成功返回了,它就會返回該任務的結果,其餘任務會被取消,若是沒有任務能在限時內成功返回,拋出TimeoutException,若是限時內全部任務都結束了,但都發生了異常,拋出ExecutionException。
咱們在64節介紹過使用jsoup下載和分析HTML,咱們使用它看一個invokeAll的例子,同時下載並分析兩個URL的標題,輸出標題內容,代碼爲:
public class InvokeAllDemo {
static class UrlTitleParser implements Callable<String> {
private String url;
public UrlTitleParser(String url) {
this.url = url;
}
@Override
public String call() throws Exception {
Document doc = Jsoup.connect(url).get();
Elements elements = doc.select("head title");
if (elements.size() > 0) {
return elements.get(0).text();
}
return null;
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
String url1 = "http://www.cnblogs.com/swiftma/p/5396551.html";
String url2 = "http://www.cnblogs.com/swiftma/p/5399315.html";
Collection<UrlTitleParser> tasks = Arrays.asList(new UrlTitleParser[] {
new UrlTitleParser(url1), new UrlTitleParser(url2) });
try {
List<Future<String>> results = executor.invokeAll(tasks, 10,
TimeUnit.SECONDS);
for (Future<String> result : results) {
try {
System.out.println(result.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
複製代碼
這裏,使用了Executors的另外一個工廠方法newFixedThreadPool建立了一個線程池,這樣使得多個任務能夠併發執行,關於線程池,咱們下節介紹。
其它代碼比較簡單,咱們就不解釋了。使用ExecutorService,編寫併發異步任務的代碼就像寫順序程序同樣,不用關心線程的建立和協調,只須要提交任務、處理結果就能夠了,大大簡化了開發工做。
瞭解了ExecutorService和Future的基本用法,咱們來看下它們的基本實現原理。
ExecutorService的主要實現類是ThreadPoolExecutor,它是基於線程池實現的,關於線程池咱們下節再介紹。ExecutorService有一個抽象實現類AbstractExecutorService,本節,咱們簡要分析其原理,並基於它實現一個簡單的ExecutorService,Future的主要實現類是FutureTask,咱們也會簡要探討其原理。
AbstractExecutorService提供了submit, invokeAll和invokeAny的默認實現,子類只須要實現以下方法:
public void shutdown() public List<Runnable> shutdownNow() public boolean isShutdown() public boolean isTerminated() public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException public void execute(Runnable command) 複製代碼
除了execute,其餘方法都與執行服務的生命週期管理有關,簡化起見,咱們忽略其實現,主要考慮execute。
submit/invokeAll/invokeAny最終都會調用execute,execute決定了到底如何執行任務,簡化起見,咱們爲每一個任務建立一個線程,一個完整的最簡單的ExecutorService實現類以下:
public class SimpleExecutorService extends AbstractExecutorService {
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
複製代碼
對於前面的例子,建立ExecutorService的代碼能夠替換爲:
ExecutorService executor = new SimpleExecutorService();
複製代碼
能夠實現相同的效果。
ExecutorService最基本的方法是submit,它是如何實現的呢?咱們來看AbstractExecutorService的代碼:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
它調用newTaskFor生成了一個RunnableFuture,RunnableFuture是一個接口,既擴展了Runnable,又擴展了Future,沒有定義新方法,做爲Runnable,它表示要執行的任務,傳遞給execute方法進行執行,做爲Future,它又表示任務執行的異步結果。這可能使人混淆,咱們來看具體代碼:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
複製代碼
就是建立了一個FutureTask對象,FutureTask實現了RunnableFuture接口。它是怎麼實現的呢?
它有一個成員變量表示待執行的任務,聲明爲:
private Callable<V> callable;
複製代碼
有個整數變量state表示狀態,聲明爲:
private volatile int state;
複製代碼
取值可能爲:
NEW = 0; //剛開始的狀態,或任務在運行
COMPLETING = 1; //臨時狀態,任務即將結束,在設置結果
NORMAL = 2; //任務正常執行完成
EXCEPTIONAL = 3; //任務執行拋出異常結束
CANCELLED = 4; //任務被取消
INTERRUPTING = 5; //任務在被中斷
INTERRUPTED = 6; //任務被中斷
複製代碼
有個變量表示最終的執行結果或異常,聲明爲:
private Object outcome;
複製代碼
有個變量表示運行任務的線程:
private volatile Thread runner;
複製代碼
還有個單向鏈表表示等待任務執行結果的線程:
private volatile WaitNode waiters;
複製代碼
FutureTask的構造方法會初始化callable和狀態,若是FutureTask接受的是一個Runnable對象,它會調用Executors.callable轉換爲Callable對象,以下所示:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
複製代碼
任務執行服務會使用一個線程執行FutureTask的run方法,run()代碼爲:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
複製代碼
其基本邏輯是:
對於任務提交者,它經過get方法獲取結果,限時get方法的代碼爲:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
複製代碼
其基本邏輯是,若是任務還未執行完畢,就等待,最後調用report報告結果, report根據狀態返回結果或拋出異常,代碼爲:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
複製代碼
cancel方法的代碼爲:
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}
複製代碼
其基本邏輯爲:
理解了FutureTask,咱們再來看AbstractExecutorService的其餘方法,invokeAll的基本邏輯很簡單,對每一個任務,建立一個FutureTask,並調用execute執行,而後等待全部任務結束。
invokeAny的實現稍微複雜些,它利用了ExecutorCompletionService,關於這個類及invokeAny的實現,咱們後續章節再介紹。
本節介紹了Java併發包中任務執行服務的基本概念和原理,該服務體現了併發異步開發中"關注點分離"的思想,使用者只須要經過ExecutorService提交任務,經過Future操做任務和結果便可,不須要關注線程建立和協調的細節。
本節主要介紹了AbstractExecutorService和FutureTask的基本原理,實現了一個最簡單的執行服務SimpleExecutorService,對每一個任務建立一個單獨的線程。實際中,最常用的執行服務是基於線程池實現的ThreadPoolExecutor,線程池是併發程序中一個很是重要的概念和技術,讓咱們下一節來探討。
(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…)
未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。