CompletionService的實現原理也是內部維護了一個阻塞隊列,當任務執行結束就把任務的執行結果加入到 阻塞隊列中,不一樣的是CompletionService是把任務執行結果的Future對象加入到阻塞隊列中java
將異步任務的結果保存到隊列中,主線程從隊列中取出這些結果數據執行。數據庫
場景: 向不一樣電商平臺詢價,並保存價格api
採用「ThreadPoolExecutor+Future」的方案:異步執行詢價而後再保存併發
// 建立線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 異步向電商S1詢價 Future<Integer> f1 = executor.submit(()->getPriceByS1()); // 異步向電商S2詢價 Future<Integer> f2= executor.submit(()->getPriceByS2()); // 獲取電商S1報價並異步保存 executor.execute(()->save(f1.get())); // 獲取電商S2報價並異步保存 executor.execute(()->save(f2.get())
這個若是獲取電商S1報價的耗時很長,那麼即使獲取電商S2報價的耗時很短,也沒法讓保存S2報價的操做先執行,由於這個主線程都阻塞 在了f1.get()操做上。這點小瑕疵你該如何解決呢?異步
增長一個阻塞隊列,獲取到S一、S2的報價都進入阻塞隊列,而後在主線程中消費阻塞隊列,這樣就能保證先獲取到的報價先保存到數據庫了。Java併發工具庫中的CompletionService就能實現這個方案。工具
ExecutorCompletionService是CompletionService接口的實現類,這個實現類的構造方法有兩個:性能
//一,默認會使用無界的 LinkedBlockingQueue ExecutorCompletionService (Executor executor) //二,能夠自定義隊列 ExecutorCompletionService (Executor executor,BlockingQueue<Future<V>> completionQueue)
示例:線程
// 建立線程池 ExecutorService executor = Executors.newFixedThreadPool(10); // 建立CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 異步向電商S1詢價 cs.submit(()->getPriceByS1()); // 異步向電商S2詢價 cs.submit(()->getPriceByS2()); // 異步向電商S3詢價 cs.submit(()->getPriceByS3()); // 將詢價結果異步保存到數據庫 for (int i=0; i<3;i++) { Integer r = cs.take().get(); executor.execute(()->save(r)); }
提交相關的apicode
// 提交Callable任務 Future<V> submit(Callable<V> task); // 提交Runnable任務及結果引⽤ Future<V> submit(Runnable task, V result);
阻塞相關api對象
//從阻塞隊列中獲取並移除一個元素,若是阻塞隊列是空的,調用take()方法的線程會被阻塞 Future<V> take() throws InterruptedException; //從阻塞隊列中獲取並移除一個元素,若是阻塞隊列是空的返回null Future<V> poll(); /** *從阻塞隊列中獲取並移除一個元素,若是阻塞隊列是空等待了timeout unit時間,阻塞隊列仍是空的, *那麼該方法會返回null值。 */ Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
**** 碼字不易若是對你有幫助請給個關注****
**** 愛技術愛生活 QQ羣: 894109590****