CompletionService批量執行異步任務

CompletionService的實現原理也是內部維護了一個阻塞隊列,當任務執行結束就把任務的執行結果加入到 阻塞隊列中,不一樣的是CompletionService是把任務執行結果的Future對象加入到阻塞隊列中java

CompletionService能作什麼?

將異步任務的結果保存到隊列中,主線程從隊列中取出這些結果數據執行。數據庫

場景: 向不一樣電商平臺詢價,並保存價格api

採用「ThreadPoolExecutor+Future」的方案:異步執行詢價而後再保存併發

//	建立線程池 
ExecutorService	executor = Executors.newFixedThreadPool(3); 
//	異步向電商S1詢價 
Future<Integer>	f1 = executor.submit(()->getPriceByS1()); 
//	異步向電商S2詢價 
Future<Integer>	f2=	executor.submit(()->getPriceByS2()); 			
//	獲取電商S1報價並異步保存 
executor.execute(()->save(f1.get()));		
//	獲取電商S2報價並異步保存 
executor.execute(()->save(f2.get())

這個若是獲取電商S1報價的耗時很長,那麼即使獲取電商S2報價的耗時很短,也沒法讓保存S2報價的操做先執行,由於這個主線程都阻塞 在了f1.get()操做上。這點小瑕疵你該如何解決呢?異步

增長一個阻塞隊列,獲取到S一、S2的報價都進入阻塞隊列,而後在主線程中消費阻塞隊列,這樣就能保證先獲取到的報價先保存到數據庫了。Java併發工具庫中的CompletionService就能實現這個方案。工具

建立CompletionService

ExecutorCompletionService是CompletionService接口的實現類,這個實現類的構造方法有兩個:性能

//一,默認會使用無界的 LinkedBlockingQueue
ExecutorCompletionService (Executor	executor)
//二,能夠自定義隊列    
ExecutorCompletionService (Executor	executor,BlockingQueue<Future<V>> completionQueue)

示例:線程

//	建立線程池 
ExecutorService	executor =	Executors.newFixedThreadPool(10); 
//	建立CompletionService 
CompletionService<Integer> cs =	new	ExecutorCompletionService<>(executor); 
//	異步向電商S1詢價 
cs.submit(()->getPriceByS1()); 
//	異步向電商S2詢價 
cs.submit(()->getPriceByS2()); 
//	異步向電商S3詢價 
cs.submit(()->getPriceByS3()); 
//	將詢價結果異步保存到數據庫 
for	(int i=0; i<3;i++)	{		
    Integer r =	cs.take().get();
    executor.execute(()->save(r)); 
}

CompletionService API

提交相關的apicode

//	提交Callable任務
Future<V> submit(Callable<V> task); 
//	提交Runnable任務及結果引⽤
Future<V> submit(Runnable task, V result);

阻塞相關api對象

//從阻塞隊列中獲取並移除一個元素,若是阻塞隊列是空的,調用take()方法的線程會被阻塞
Future<V>	take() throws InterruptedException; 
//從阻塞隊列中獲取並移除一個元素,若是阻塞隊列是空的返回null
Future<V>	poll(); 
/**
 *從阻塞隊列中獲取並移除一個元素,若是阻塞隊列是空等待了timeout unit時間,阻塞隊列仍是空的,
 *那麼該方法會返回null值。
 */
Future<V>	poll(long timeout, TimeUnit	unit) throws InterruptedException;

總結

  • 當須要批量提交異步任務的時候建議你使用CompletionService。
  • CompletionService將線程池Executor和阻塞隊列BlockingQueue的功能融合在了一塊兒,可以讓批量異步任務的管理更簡單。
  • CompletionService可以讓異步任務的執行結果有序化,先執行完的先進入阻塞隊列,利用這個特性,你能夠輕鬆實現後續處理的有序性,避免無謂的等待,同時還能夠快速實現諸如Forking Cluster這樣的需求。
  • CompletionService的實現類ExecutorCompletionService,須要你本身建立線程池,雖看上去有些囉嗦,但 好處是你可讓多個ExecutorCompletionService的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。

**** 碼字不易若是對你有幫助請給個關注****

**** 愛技術愛生活 QQ羣: 894109590****

相關文章
相關標籤/搜索