本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html
![]()
上節,咱們提到,在異步任務程序中,一種常見的場景是,主線程提交多個異步任務,而後但願有任務完成就處理結果,而且按任務完成順序逐個處理,對於這種場景,Java併發包提供了一個方便的方法,使用CompletionService,這是一個接口,它的實現類是ExecutorCompletionService,本節咱們就來探討它們。java
與77節介紹的ExecutorService同樣,CompletionService也能夠提交異步任務,它的不一樣是,它能夠按任務完成順序獲取結果,其具體定義爲:git
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
複製代碼
其submit方法與ExecutorService是同樣的,多了take和poll方法,它們都是獲取下一個完成任務的結果,take()會阻塞等待,poll()會當即返回,若是沒有已完成的任務,返回null,帶時間參數的poll方法會最多等待限定的時間。github
CompletionService的主要實現類是ExecutorCompletionService,它依賴於一個Executor完成實際的任務提交,而本身主要負責結果的排隊和處理,它的構造方法有兩個:編程
public ExecutorCompletionService(Executor executor) public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) 複製代碼
至少須要一個Executor參數,能夠提供一個BlockingQueue參數,用做完成任務的隊列,沒有提供的話,ExecutorCompletionService內部會建立一個LinkedBlockingQueue。swift
咱們在77節的invokeAll的示例中,演示了併發下載並分析URL的標題,那個例子中,是要等到全部任務都完成才處理結果的,這裏,咱們修改一下,一有任務完成就輸出其結果,代碼以下:微信
public class CompletionServiceDemo {
static class UrlTitleParser implements Callable<String> {
private String url;
public UrlTitleParser(String url) {
this.url = url;
}
@Override
public String call() throws Exception {
Document doc = Jsoup.connect(url).get();
Elements elements = doc.select("head title");
if (elements.size() > 0) {
return url + ": " + elements.get(0).text();
}
return null;
}
}
public static void parse(List<String> urls) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
CompletionService<String> completionService = new ExecutorCompletionService<>(
executor);
for (String url : urls) {
completionService.submit(new UrlTitleParser(url));
}
for (int i = 0; i < urls.size(); i++) {
Future<String> result = completionService.take();
try {
System.out.println(result.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} finally {
executor.shutdown();
}
}
public static void main(String[] args) throws InterruptedException {
List<String> urls = Arrays.asList(new String[] {
"http://www.cnblogs.com/swiftma/p/5396551.html",
"http://www.cnblogs.com/swiftma/p/5399315.html",
"http://www.cnblogs.com/swiftma/p/5405417.html",
"http://www.cnblogs.com/swiftma/p/5409424.html" });
parse(urls);
}
}
複製代碼
在parse方法中,首先建立了一個ExecutorService,而後纔是CompletionService,經過後者提交任務、按完成順序逐個處理結果,這樣,是否是很方便?併發
ExecutorCompletionService是怎麼讓結果有序處理的呢?其實,也很簡單,如前所述,它有一個額外的隊列,每一個任務完成以後,都會將表明結果的Future入隊。異步
那問題是,任務完成後,怎麼知道入隊呢?咱們具體來看下。ide
在77節咱們介紹過FutureTask,任務完成後,無論是正常完成、異常結束、仍是被取消,都會調用finishCompletion方法,而該方法會調用一個done方法,該方法代碼爲:
protected void done() { }
複製代碼
它的實現爲空,但它是一個protected方法,子類能夠重寫該方法。
在ExecutorCompletionService中,提交的任務類型不是通常的FutureTask,而是一個子類QueueingFuture,以下所示:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
複製代碼
該子類重寫了done方法,在任務完成時將結果加入到完成隊列中,其代碼爲:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
複製代碼
ExecutorCompletionService的take/poll方法就是從該隊列獲取結果,以下所示:
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
複製代碼
咱們在77節提到,AbstractExecutorService的invokeAny的實現,就利用了ExecutorCompletionService,它的基本思路是,提交任務後,經過take方法獲取結果,獲取到第一個有效結果後,取消全部其餘任務,不過,它的具體實現有一些優化,比較複雜。咱們看一個模擬的示例,從多個搜索引擎查詢一個關鍵詞,但只要任意一個的結果就能夠,模擬代碼以下:
public class InvokeAnyDemo {
static class SearchTask implements Callable<String> {
private String engine;
private String keyword;
public SearchTask(String engine, String keyword) {
this.engine = engine;
this.keyword = keyword;
}
@Override
public String call() throws Exception {
// 模擬從給定引擎搜索結果
Thread.sleep(engine.hashCode() % 1000);
return "<result for> " + keyword;
}
}
public static String search(List<String> engines, String keyword) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<String> cs = new ExecutorCompletionService<>(executor);
List<Future<String>> futures = new ArrayList<Future<String>>(
engines.size());
String result = null;
try {
for (String engine : engines) {
futures.add(cs.submit(new SearchTask(engine, keyword)));
}
for (int i = 0; i < engines.size(); i++) {
try {
result = cs.take().get();
if (result != null) {
break;
}
} catch (ExecutionException ignore) {
// 出現異常,結果無效,繼續
}
}
} finally {
// 取消全部任務,對於已完成的任務,取消沒有什麼效果
for (Future<String> f : futures)
f.cancel(true);
executor.shutdown();
}
return result;
}
public static void main(String[] args) throws InterruptedException {
List<String> engines = Arrays.asList(new String[] { "www.baidu.com",
"www.sogou.com", "www.so.com", "www.google.com" });
System.out.println(search(engines, "老馬說編程"));
}
}
複製代碼
SearchTask模擬從指定搜索引擎查詢結果,search利用CompletionService/ExecutorService執行併發查詢,在獲得第一個有效結果後,取消其餘任務。
本節比較簡單,主要就是介紹了CompletionService的用法和原理,它經過一個額外的結果隊列,方便了對於多個異步任務結果的處理。
下一節,咱們來探討一種常見的需求 - 定時任務。
(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…)
未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。