[Java併發-16] CompletionService:批量執行異步任務

咱們思考下這個場景:從三個電商詢價,而後保存在本身的數據庫裏。經過以前所學,咱們可能這麼實現。數據庫

// 建立線程池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 異步向電商 S1 詢價
Future<Integer> f1 = 
  executor.submit(
    ()->getPriceByS1());
// 異步向電商 S2 詢價
Future<Integer> f2 = 
  executor.submit(
    ()->getPriceByS2());
// 異步向電商 S3 詢價
Future<Integer> f3 = 
  executor.submit(
    ()->getPriceByS3());
    
// 獲取電商 S1 報價並保存
r=f1.get();
executor.execute(()->save(r));
  
// 獲取電商 S2 報價並保存
r=f2.get();
executor.execute(()->save(r));
  
// 獲取電商 S3 報價並保存  
r=f3.get();
executor.execute(()->save(r));

上面的這個方案自己沒有太大問題,可是有個地方的處理須要你注意,那就是若是獲取電商 S1 報價的耗時很長,那麼即使獲取電商 S2 報價的耗時很短,也沒法讓保存 S2 報價的操做先執行,由於這個主線程都阻塞在了 f1.get(),那咱們如何解決了?併發

咱們能夠增長一個阻塞隊列,獲取到 S一、S二、S3 的報價都進入阻塞隊列,而後在主線程中消費阻塞隊列,這樣就能保證先獲取到的報價先保存到數據庫了。下面的示例代碼展現瞭如何利用阻塞隊列實現先獲取到的報價先保存到數據庫。異步

// 建立阻塞隊列
BlockingQueue<Integer> bq =
  new LinkedBlockingQueue<>();
// 電商 S1 報價異步進入阻塞隊列  
executor.execute(()->
  bq.put(f1.get()));
// 電商 S2 報價異步進入阻塞隊列  
executor.execute(()->
  bq.put(f2.get()));
// 電商 S3 報價異步進入阻塞隊列  
executor.execute(()->
  bq.put(f3.get()));
// 異步保存全部報價  
for (int i=0; i<3; i++) {
  Integer r = bq.take();
  executor.execute(()->save(r));
}

利用 CompletionService 實現詢價系統

不過在實際項目中,並不建議你這樣作,由於 Java SDK 併發包裏已經提供了設計精良的 CompletionService。利用 CompletionService 能讓代碼更簡練。性能

CompletionService 的實現原理也是內部維護了一個阻塞隊列,當任務執行結束就把任務的執行結果加入到阻塞隊列中,不一樣的是 CompletionService 是把任務執行結果的 Future 對象加入到阻塞隊列中,而上面的示例代碼是把任務最終的執行結果放入了阻塞隊列中。線程

那到底該如何建立 CompletionService 呢?

CompletionService 接口的實現類是 ExecutorCompletionService,這個實現類的構造方法有兩個,分別是:設計

  1. ExecutorCompletionService(Executor executor)
  2. ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

這兩個構造方法都須要傳入一個線程池,若是不指定 completionQueue,那麼默認會使用無界的 LinkedBlockingQueue。任務執行結果的 Future 對象就是加入到 completionQueue 中。code

下面的示例代碼完整地展現瞭如何利用 CompletionService 來實現高性能的詢價系統。其中,咱們沒有指定 completionQueue,以後經過 CompletionService 接口提供的 submit() 方法提交了三個詢價操做,這三個詢價操做將會被 CompletionService 異步執行。最後,咱們經過 CompletionService 接口提供的 take() 方法獲取一個 Future 對象,調用 Future 對象的 get() 方法就能返回詢價操做的執行結果了。對象

// 建立線程池
ExecutorService executor = 
  Executors.newFixedThreadPool(3);
// 建立 CompletionService
CompletionService<Integer> cs = new 
  ExecutorCompletionService<>(executor);
// 異步向電商 S1 詢價
cs.submit(()->getPriceByS1());
// 異步向電商 S2 詢價
cs.submit(()->getPriceByS2());
// 異步向電商 S3 詢價
cs.submit(()->getPriceByS3());
// 將詢價結果異步保存到數據庫
for (int i=0; i<3; i++) {
  Integer r = cs.take().get();
  executor.execute(()->save(r));
}

CompletionService 接口說明

下面咱們詳細地介紹一下 CompletionService 接口提供的方法,CompletionService 接口提供的方法有 5 個,這 5 個方法的方法簽名以下所示。接口

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() 
  throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) 
  throws InterruptedException;

CompletionService 後3 個方法,都是和阻塞隊列相關的,take()、poll() 都是從阻塞隊列中獲取並移除一個元素;它們的區別在於若是阻塞隊列是空的,那麼調用 take() 方法的線程會被阻塞,而 poll() 方法會返回 null 值。隊列

利用 CompletionService 實現 Dubbo 中的 Forking Cluster

Dubbo 中有一種叫作Forking 的集羣模式,這種集羣模式下,支持並行地調用多個查詢服務,只要有一個成功返回結果,整個服務就能夠返回了。例如你須要提供一個地址轉座標的服務,爲了保證該服務的高可用和性能,你能夠並行地調用 3 個地圖服務商的 API,而後只要有 1 個正確返回告終果 r,那麼地址轉座標這個服務就能夠直接返回 r 了。這種集羣模式能夠容忍 2 個地圖服務商服務異常,但缺點是消耗的資源偏多。

geocoder(addr) {
  // 並行執行如下 3 個查詢服務, 
  r1=geocoderByS1(addr);
  r2=geocoderByS2(addr);
  r3=geocoderByS3(addr);
  // 只要 r1,r2,r3 有一個返回
  // 則返回
  return r1|r2|r3;
}

利用 CompletionService 能夠快速實現 Forking 這種集羣模式,好比下面的示例代碼就展現了具體是如何實現的。首先咱們建立了一個線程池 executor 、一個 CompletionService 對象 cs 和一個Future<Integer>類型的列表 futures,每次經過調用 CompletionService 的 submit() 方法提交一個異步任務,會返回一個 Future 對象,咱們把這些 Future 對象保存在列表 futures 中。經過調用cs.take().get(),咱們可以拿到最快返回的任務執行結果,只要咱們拿到一個正確返回的結果,就能夠取消全部任務而且返回最終結果了。

// 建立線程池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 建立 CompletionService
CompletionService<Integer> cs =
  new ExecutorCompletionService<>(executor);
// 用於保存 Future 對象
List<Future<Integer>> futures =
  new ArrayList<>(3);
// 提交異步任務,並保存 future 到 futures 
futures.add(
  cs.submit(()->geocoderByS1()));
futures.add(
  cs.submit(()->geocoderByS2()));
futures.add(
  cs.submit(()->geocoderByS3()));
// 獲取最快返回的任務執行結果
Integer r = 0;
try {
  // 只要有一個成功返回,則 break
  for (int i = 0; i < 3; ++i) {
    r = cs.take().get();
    // 簡單地經過判空來檢查是否成功返回
    if (r != null) {
      break;
    }
  }
} finally {
  // 取消全部任務
  for(Future<Integer> f : futures)
    f.cancel(true);
}
// 返回結果
return r;

總結

當須要批量提交異步任務的時候建議你使用 CompletionService。CompletionService 將線程池 Executor 和阻塞隊列 BlockingQueue 的功能融合在了一塊兒,可以讓批量異步任務的管理更簡單。除此以外,CompletionService 可以讓異步任務的執行結果有序化,先執行完的先進入阻塞隊列,利用這個特性,你能夠輕鬆實現後續處理的有序性,避免無謂的等待,同時還能夠快速實現諸如 Forking Cluster 這樣的需求。

相關文章
相關標籤/搜索