產品:『咱們要加一個聚合搜索功能,當用戶在咱們網站查詢一件商品時,咱們分別從 A、B、C 三個網站上查詢這個信息,而後再把獲得的結果返回給用戶』
你:『哦,就是寫個爬蟲,從 3 個網站上抓取數據是吧?』
/* * * * * * * * blog.coder4j.cn * * * Copyright (C) B0A6-B0B0 All Rights Reserved. * * * */ package cn.coder4j.study.example.thread; import cn.hutool.core.thread.ThreadUtil; import com.google.common.collect.Lists; import java.util.List; /** * @author buhao * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao */ public class TestCompletionService { public static void main(String[] args) { // 查詢信息 String queryName = "java"; // 調用查詢接口 long startTime = System.currentTimeMillis(); List<String> result = queryInfoCode1(queryName); System.out.println("耗時: " + (System.currentTimeMillis() - startTime)); System.out.println(result); } /** * 聚合查詢信息 code 1 * * @param queryName * @return */ private static List<String> queryInfoCode1(String queryName) { List<String> resultList = Lists.newArrayList(); String webA = searchWebA(queryName); resultList.add(webA); String webB = searchWebB(queryName); resultList.add(webB); String webC = searchWebC(queryName); resultList.add(webC); return resultList; } /** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; } }
耗時: 8512 [webA, webB, webC]
debug 了一下代碼,發現問題出在了請求的網站上:github
網站 A、網站 B 由於年久失修,沒人維護,接口響應很慢,平均響應時間一個是 5秒,一個是 3秒(這裏使用 sleep 模擬)。網站 C 性能還能夠,平均響應時間 0.5 秒。 而咱們程序的執行時間就是 網站A 響應時間 + 網站 B 響應時間 + 網站 C 響應時間。web
一分析代碼發現,咱們的代碼全是串行化, A 網站請求完,再請求 B 網站,B 網站請求完再請求 C 網站。忽然想到提升效率的第一要義,提升代碼的並行率。爲何要一個一個串行請求,而不是 A、B、C 三個網站一塊兒請求呢,Java 的多線程很輕鬆就能夠實現,代碼以下:segmentfault
/* * * * * * * * blog.coder4j.cn * * * Copyright (C) B0A6-B0B0 All Rights Reserved. * * * */ package cn.coder4j.study.example.thread; import cn.hutool.core.thread.ThreadUtil; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author buhao * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao */ public class TestCompletionService { public static void main(String[] args) throws ExecutionException, InterruptedException { // 查詢信息 String queryName = "java"; // 調用查詢接口 long startTime = System.currentTimeMillis(); List<String> result = queryInfoCode2(queryName); System.out.println("耗時: " + (System.currentTimeMillis() - startTime)); System.out.println(result); } /** * 聚合查詢信息 code 1 * * @param queryName * @return */ private static List<String> queryInfoCode1(String queryName) { List<String> resultList = Lists.newArrayList(); String webA = searchWebA(queryName); resultList.add(webA); String webB = searchWebB(queryName); resultList.add(webB); String webC = searchWebC(queryName); resultList.add(webC); return resultList; } /** * 聚合查詢信息 code 2 * * @param queryName * @return */ private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException { List<String> resultList = Lists.newArrayList(); // 建立3個線程的線程池 ExecutorService pool = Executors.newFixedThreadPool(3); try { // 建立任務的 feature Future<String> webAFuture = pool.submit(() -> searchWebA(queryName)); Future<String> webBFuture = pool.submit(() -> searchWebB(queryName)); Future<String> webCFuture = pool.submit(() -> searchWebC(queryName)); // 獲得任務結果 resultList.add(webAFuture.get()); resultList.add(webBFuture.get()); resultList.add(webCFuture.get()); } finally { // 關閉線程池 pool.shutdown(); } return resultList; } /** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; } }
請求網站的代碼其實一行沒變,變的是咱們調用請求方法的地方,把以前串行的代碼,變成了多線程的形式,並且還不是普通的多線程的形式,由於咱們要在主線程得到線程的結果,因此還要使用 Future 的形式。(這裏能夠參考以前的文章【併發那些事】建立線程的三種方式)。多線程
耗時: 5058 [webA, webB, webC]
嗯,效果明顯,從 8 秒多降低到了 5 秒多,可是仍是很長,無法接受的長。作爲一個有追求的程序員,還要去優化。咱們分析一下,剛開始代碼是串行的,流程以下,總請求時間是三次請求的總時長。
由於是並行化,相似木桶效應,決定最長時間的因素,是你請求中最耗時的的那個操做,這裏是時間爲 5 秒的請求 A 網站操做。
其實分析到這裏,在不能優化 AB 網站的請求時間的前提下,已經很難優化了。可是方法總比困難多,咱們的確沒辦法再去壓縮總請求時間,可是可讓用戶體驗更好一點,這裏須要引入兩個技術一個是 Websocket,一個是 CompletionService。其中websocket 能夠簡單的理解成服務端推送技術,就是不須要客戶端主動請求,而是經過服務端主動推送消息(ws 在本文中不是重點,會一筆帶過,具體實現能夠參考前文【websocket】spring boot 集成 websocket 的四種方式),下面咱們直接上代碼
/* * * * * * * * blog.coder4j.cn * * * Copyright (C) B0A6-B0B0 All Rights Reserved. * * * */ package cn.coder4j.study.example.thread; import cn.hutool.core.thread.ThreadUtil; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author buhao * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao */ public class TestCompletionService { public static void main(String[] args) throws ExecutionException, InterruptedException { // 查詢信息 String queryName = "java"; // 調用查詢接口 long startTime = System.currentTimeMillis(); queryInfoCode3(queryName); System.out.println("耗時: " + (System.currentTimeMillis() - startTime)); } /** * 聚合查詢信息 code 1 * * @param queryName * @return */ private static List<String> queryInfoCode1(String queryName) { List<String> resultList = Lists.newArrayList(); String webA = searchWebA(queryName); resultList.add(webA); String webB = searchWebB(queryName); resultList.add(webB); String webC = searchWebC(queryName); resultList.add(webC); return resultList; } /** * 聚合查詢信息 code 2 * * @param queryName * @return */ private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException { List<String> resultList = Lists.newArrayList(); // 建立3個線程的線程池 ExecutorService pool = Executors.newFixedThreadPool(3); try { // 建立任務的 feature Future<String> webAFuture = pool.submit(() -> searchWebA(queryName)); Future<String> webBFuture = pool.submit(() -> searchWebB(queryName)); Future<String> webCFuture = pool.submit(() -> searchWebC(queryName)); // 獲得任務結果 resultList.add(webAFuture.get()); resultList.add(webBFuture.get()); resultList.add(webCFuture.get()); } finally { // 關閉線程池 pool.shutdown(); } return resultList; } /** * 聚合查詢信息 code 3 * * @param queryName * @return */ private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException { // 開始時間 long startTime = System.currentTimeMillis(); // 建立 CompletionService ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3)); // 建立任務的 feature executorCompletionService.submit(() -> searchWebA(queryName)); executorCompletionService.submit(() -> searchWebB(queryName)); executorCompletionService.submit(() -> searchWebC(queryName)); for (int i = 0; i < 3; i++) { Future take = executorCompletionService.take(); System.out.println("得到請求結果 -> " + take.get()); System.out.println("經過 ws 推送給客戶端,總共耗時" + (System.currentTimeMillis() - startTime)); } } /** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; } }
得到請求結果 -> webC 經過 ws 推送給客戶端,總共耗時561 得到請求結果 -> webB 經過 ws 推送給客戶端,總共耗時3055 得到請求結果 -> webA 經過 ws 推送給客戶端,總共耗時5060 耗時: 5060
咱們來分析一下執行結果,首先總耗時時間仍是 5 秒多沒變,可是咱們不是等所有執行完再推送給客戶端,而是執行完一個就推送一個,而且發現了一個規律,最早推送的是請求最快的,而後是第二快的,最後推最慢的那一個。也就是說推送結果是有序的。給用戶的體驗就是點擊按鈕後,1秒內會展現網站 C 的數據,而後過了2秒又在原有基礎上又添加導示了網站 B 數據,又過了2秒,又增長展現了網站 A數據。 這種體驗要比用戶一直白屏 5 秒,而後一下返回全部數據要好的多。
是否是很神奇,這背後的功臣就是 CompletionService,他的源碼以下:
package java.util.concurrent; /** * A service that decouples the production of new asynchronous tasks * from the consumption of the results of completed tasks. Producers * {@code submit} tasks for execution. Consumers {@code take} * completed tasks and process their results in the order they * complete. A {@code CompletionService} can for example be used to * manage asynchronous I/O, in which tasks that perform reads are * submitted in one part of a program or system, and then acted upon * in a different part of the program when the reads complete, * possibly in a different order than they were requested. * * <p>Typically, a {@code CompletionService} relies on a separate * {@link Executor} to actually execute the tasks, in which case the * {@code CompletionService} only manages an internal completion * queue. The {@link ExecutorCompletionService} class provides an * implementation of this approach. * * <p>Memory consistency effects: Actions in a thread prior to * submitting a task to a {@code CompletionService} * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * actions taken by that task, which in turn <i>happen-before</i> * actions following a successful return from the corresponding {@code take()}. */ public interface CompletionService<V> { /** * Submits a value-returning task for execution and returns a Future * representing the pending results of the task. Upon completion, * this task may be taken or polled. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<V> submit(Callable<V> task); /** * Submits a Runnable task for execution and returns a Future * representing that task. Upon completion, this task may be * taken or polled. * * @param task the task to submit * @param result the result to return upon successful completion * @return a Future representing pending completion of the task, * and whose {@code get()} method will return the given * result value upon completion * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<V> submit(Runnable task, V result); /** * Retrieves and removes the Future representing the next * completed task, waiting if none are yet present. * * @return the Future representing the next completed task * @throws InterruptedException if interrupted while waiting */ Future<V> take() throws InterruptedException; /** * Retrieves and removes the Future representing the next * completed task, or {@code null} if none are present. * * @return the Future representing the next completed task, or * {@code null} if none are present */ Future<V> poll(); /** * Retrieves and removes the Future representing the next * completed task, waiting if necessary up to the specified wait * time if none are yet present. * * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the * {@code timeout} parameter * @return the Future representing the next completed task or * {@code null} if the specified waiting time elapses * before one is present * @throws InterruptedException if interrupted while waiting */ Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
能夠看到 CompletionService 方法,分別以下:
submit 用於提交一個 Callable 對象,用於提交一個能夠得到結果的線程任務
submit 用於提交一個 Runnable 對象及 result 對象,相似於上面的 submit,可是 runnable 的返回值 void 沒法得到線程的結果,因此添加了 result 用於作爲參數的橋樑
take 用於取出最新的線程執行結果,注意這裏是阻塞的
take 用於取出最新的線程執行結果,是非阻塞的,若是沒有結果就返回 null
另外,CompletionService 是接口,沒法直接使用,一般使用他的實現類 ExecutorCompletionService,具體使用方法如上面的 demo。
可能看到這裏會很好奇 ExecutorCompletionService 實現原理,其實原理很簡單,他在內部維護了一個阻塞隊列,提交的任務,先執行完的先進入隊列,因此你經過 poll 或 take 得到的確定是最早執行完的任務結果。
