話說有一天,產品經理忽然找到正在摸魚的你。
產品:『咱們要加一個聚合搜索功能,當用戶在咱們網站查詢一件商品時,咱們分別從 A、B、C 三個網站上查詢這個信息,而後再把獲得的結果返回給用戶』
你:『哦,就是寫個爬蟲,從 3 個網站上抓取數據是吧?』
產品:『呸,爬蟲是犯法的,這叫數據分析,怎麼樣,能實現吧?』
你:『能夠』
產品:『好的,明天上線』
你:『。。。』
html
你很快完成了開發,代碼以下:
java
/* * * * * * * * blog.coder4j.cn * * * Copyright (C) B0A6-B0B0 All Rights Reserved. * * * */ package cn.coder4j.study.example.thread; import cn.hutool.core.thread.ThreadUtil; import com.google.common.collect.Lists; import java.util.List; /** * @author buhao * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao */ public class TestCompletionService { public static void main(String[] args) { // 查詢信息 String queryName = "java"; // 調用查詢接口 long startTime = System.currentTimeMillis(); List<String> result = queryInfoCode1(queryName); System.out.println("耗時: " + (System.currentTimeMillis() - startTime)); System.out.println(result); } /** * 聚合查詢信息 code 1 * * @param queryName * @return */ private static List<String> queryInfoCode1(String queryName) { List<String> resultList = Lists.newArrayList(); String webA = searchWebA(queryName); resultList.add(webA); String webB = searchWebB(queryName); resultList.add(webB); String webC = searchWebC(queryName); resultList.add(webC); return resultList; } /** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; } }
你運行了一下代碼,結果以下:git
耗時: 8512 [webA, webB, webC]
我去,怎麼請求一下要8秒多?上線了,產品還不砍死我。程序員
debug 了一下代碼,發現問題出在了請求的網站上:github
/** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; }
網站 A、網站 B 由於年久失修,沒人維護,接口響應很慢,平均響應時間一個是 5秒,一個是 3秒(這裏使用 sleep 模擬)。網站 C 性能還能夠,平均響應時間 0.5 秒。 而咱們程序的執行時間就是 網站A 響應時間 + 網站 B 響應時間 + 網站 C 響應時間。web
好了,問題知道了,由於請求的網站太慢了,那麼如何解決呢?總不能打電話找他們把網站優化一下讓我爬吧。書上教導咱們要先從本身身上找問題。先看看本身代碼哪裏能夠優化。spring
一分析代碼發現,咱們的代碼全是串行化, A 網站請求完,再請求 B 網站,B 網站請求完再請求 C 網站。忽然想到提升效率的第一要義,提升代碼的並行率。爲何要一個一個串行請求,而不是 A、B、C 三個網站一塊兒請求呢,Java 的多線程很輕鬆就能夠實現,代碼以下:segmentfault
/* * * * * * * * blog.coder4j.cn * * * Copyright (C) B0A6-B0B0 All Rights Reserved. * * * */ package cn.coder4j.study.example.thread; import cn.hutool.core.thread.ThreadUtil; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author buhao * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao */ public class TestCompletionService { public static void main(String[] args) throws ExecutionException, InterruptedException { // 查詢信息 String queryName = "java"; // 調用查詢接口 long startTime = System.currentTimeMillis(); List<String> result = queryInfoCode2(queryName); System.out.println("耗時: " + (System.currentTimeMillis() - startTime)); System.out.println(result); } /** * 聚合查詢信息 code 1 * * @param queryName * @return */ private static List<String> queryInfoCode1(String queryName) { List<String> resultList = Lists.newArrayList(); String webA = searchWebA(queryName); resultList.add(webA); String webB = searchWebB(queryName); resultList.add(webB); String webC = searchWebC(queryName); resultList.add(webC); return resultList; } /** * 聚合查詢信息 code 2 * * @param queryName * @return */ private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException { List<String> resultList = Lists.newArrayList(); // 建立3個線程的線程池 ExecutorService pool = Executors.newFixedThreadPool(3); try { // 建立任務的 feature Future<String> webAFuture = pool.submit(() -> searchWebA(queryName)); Future<String> webBFuture = pool.submit(() -> searchWebB(queryName)); Future<String> webCFuture = pool.submit(() -> searchWebC(queryName)); // 獲得任務結果 resultList.add(webAFuture.get()); resultList.add(webBFuture.get()); resultList.add(webCFuture.get()); } finally { // 關閉線程池 pool.shutdown(); } return resultList; } /** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; } }
這裏的重點代碼以下:websocket
/** * 聚合查詢信息 code 2 * * @param queryName * @return */ private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException { List<String> resultList = Lists.newArrayList(); // 建立3個線程的線程池 ExecutorService pool = Executors.newFixedThreadPool(3); try { // 建立任務的 feature Future<String> webAFuture = pool.submit(() -> searchWebA(queryName)); Future<String> webBFuture = pool.submit(() -> searchWebB(queryName)); Future<String> webCFuture = pool.submit(() -> searchWebC(queryName)); // 獲得任務結果 resultList.add(webAFuture.get()); resultList.add(webBFuture.get()); resultList.add(webCFuture.get()); } finally { // 關閉線程池 pool.shutdown(); } return resultList; }
請求網站的代碼其實一行沒變,變的是咱們調用請求方法的地方,把以前串行的代碼,變成了多線程的形式,並且還不是普通的多線程的形式,由於咱們要在主線程得到線程的結果,因此還要使用 Future 的形式。(這裏能夠參考以前的文章【併發那些事】建立線程的三種方式)。多線程
好的運行一下代碼,看看效果,結果以下:
耗時: 5058 [webA, webB, webC]
嗯,效果明顯,從 8 秒多降低到了 5 秒多,可是仍是很長,無法接受的長。作爲一個有追求的程序員,還要去優化。咱們分析一下,剛開始代碼是串行的,流程以下,總請求時間是三次請求的總時長。
而後咱們優化了一下,把串行請求給並行化,流程以下:
由於是並行化,相似木桶效應,決定最長時間的因素,是你請求中最耗時的的那個操做,這裏是時間爲 5 秒的請求 A 網站操做。
其實分析到這裏,在不能優化 AB 網站的請求時間的前提下,已經很難優化了。可是方法總比困難多,咱們的確沒辦法再去壓縮總請求時間,可是可讓用戶體驗更好一點,這裏須要引入兩個技術一個是 Websocket,一個是 CompletionService。其中websocket 能夠簡單的理解成服務端推送技術,就是不須要客戶端主動請求,而是經過服務端主動推送消息(ws 在本文中不是重點,會一筆帶過,具體實現能夠參考前文【websocket】spring boot 集成 websocket 的四種方式),下面咱們直接上代碼
/* * * * * * * * blog.coder4j.cn * * * Copyright (C) B0A6-B0B0 All Rights Reserved. * * * */ package cn.coder4j.study.example.thread; import cn.hutool.core.thread.ThreadUtil; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author buhao * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao */ public class TestCompletionService { public static void main(String[] args) throws ExecutionException, InterruptedException { // 查詢信息 String queryName = "java"; // 調用查詢接口 long startTime = System.currentTimeMillis(); queryInfoCode3(queryName); System.out.println("耗時: " + (System.currentTimeMillis() - startTime)); } /** * 聚合查詢信息 code 1 * * @param queryName * @return */ private static List<String> queryInfoCode1(String queryName) { List<String> resultList = Lists.newArrayList(); String webA = searchWebA(queryName); resultList.add(webA); String webB = searchWebB(queryName); resultList.add(webB); String webC = searchWebC(queryName); resultList.add(webC); return resultList; } /** * 聚合查詢信息 code 2 * * @param queryName * @return */ private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException { List<String> resultList = Lists.newArrayList(); // 建立3個線程的線程池 ExecutorService pool = Executors.newFixedThreadPool(3); try { // 建立任務的 feature Future<String> webAFuture = pool.submit(() -> searchWebA(queryName)); Future<String> webBFuture = pool.submit(() -> searchWebB(queryName)); Future<String> webCFuture = pool.submit(() -> searchWebC(queryName)); // 獲得任務結果 resultList.add(webAFuture.get()); resultList.add(webBFuture.get()); resultList.add(webCFuture.get()); } finally { // 關閉線程池 pool.shutdown(); } return resultList; } /** * 聚合查詢信息 code 3 * * @param queryName * @return */ private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException { // 開始時間 long startTime = System.currentTimeMillis(); // 建立 CompletionService ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3)); // 建立任務的 feature executorCompletionService.submit(() -> searchWebA(queryName)); executorCompletionService.submit(() -> searchWebB(queryName)); executorCompletionService.submit(() -> searchWebC(queryName)); for (int i = 0; i < 3; i++) { Future take = executorCompletionService.take(); System.out.println("得到請求結果 -> " + take.get()); System.out.println("經過 ws 推送給客戶端,總共耗時" + (System.currentTimeMillis() - startTime)); } } /** * 查詢網站 A * * @param name * @return */ public static String searchWebA(String name) { ThreadUtil.sleep(5000); return "webA"; } /** * 查詢網站B * * @param name * @return */ public static String searchWebB(String name) { ThreadUtil.sleep(3000); return "webB"; } /** * 查詢網站C * * @param name * @return */ public static String searchWebC(String name) { ThreadUtil.sleep(500); return "webC"; } }
核心代碼以下:
/** * 聚合查詢信息 code 3 * * @param queryName * @return */ private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException { // 開始時間 long startTime = System.currentTimeMillis(); // 建立 CompletionService ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3)); // 建立任務的 feature executorCompletionService.submit(() -> searchWebA(queryName)); executorCompletionService.submit(() -> searchWebB(queryName)); executorCompletionService.submit(() -> searchWebC(queryName)); for (int i = 0; i < 3; i++) { Future take = executorCompletionService.take(); System.out.println("得到請求結果 -> " + take.get()); System.out.println("經過 ws 推送給客戶端,總共耗時" + (System.currentTimeMillis() - startTime)); } }
先看執行結果:
得到請求結果 -> webC 經過 ws 推送給客戶端,總共耗時561 得到請求結果 -> webB 經過 ws 推送給客戶端,總共耗時3055 得到請求結果 -> webA 經過 ws 推送給客戶端,總共耗時5060 耗時: 5060
咱們來分析一下執行結果,首先總耗時時間仍是 5 秒多沒變,可是咱們不是等所有執行完再推送給客戶端,而是執行完一個就推送一個,而且發現了一個規律,最早推送的是請求最快的,而後是第二快的,最後推最慢的那一個。也就是說推送結果是有序的。給用戶的體驗就是點擊按鈕後,1秒內會展現網站 C 的數據,而後過了2秒又在原有基礎上又添加導示了網站 B 數據,又過了2秒,又增長展現了網站 A數據。 這種體驗要比用戶一直白屏 5 秒,而後一下返回全部數據要好的多。
是否是很神奇,這背後的功臣就是 CompletionService,他的源碼以下:
package java.util.concurrent; /** * A service that decouples the production of new asynchronous tasks * from the consumption of the results of completed tasks. Producers * {@code submit} tasks for execution. Consumers {@code take} * completed tasks and process their results in the order they * complete. A {@code CompletionService} can for example be used to * manage asynchronous I/O, in which tasks that perform reads are * submitted in one part of a program or system, and then acted upon * in a different part of the program when the reads complete, * possibly in a different order than they were requested. * * <p>Typically, a {@code CompletionService} relies on a separate * {@link Executor} to actually execute the tasks, in which case the * {@code CompletionService} only manages an internal completion * queue. The {@link ExecutorCompletionService} class provides an * implementation of this approach. * * <p>Memory consistency effects: Actions in a thread prior to * submitting a task to a {@code CompletionService} * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * actions taken by that task, which in turn <i>happen-before</i> * actions following a successful return from the corresponding {@code take()}. */ public interface CompletionService<V> { /** * Submits a value-returning task for execution and returns a Future * representing the pending results of the task. Upon completion, * this task may be taken or polled. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<V> submit(Callable<V> task); /** * Submits a Runnable task for execution and returns a Future * representing that task. Upon completion, this task may be * taken or polled. * * @param task the task to submit * @param result the result to return upon successful completion * @return a Future representing pending completion of the task, * and whose {@code get()} method will return the given * result value upon completion * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<V> submit(Runnable task, V result); /** * Retrieves and removes the Future representing the next * completed task, waiting if none are yet present. * * @return the Future representing the next completed task * @throws InterruptedException if interrupted while waiting */ Future<V> take() throws InterruptedException; /** * Retrieves and removes the Future representing the next * completed task, or {@code null} if none are present. * * @return the Future representing the next completed task, or * {@code null} if none are present */ Future<V> poll(); /** * Retrieves and removes the Future representing the next * completed task, waiting if necessary up to the specified wait * time if none are yet present. * * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the * {@code timeout} parameter * @return the Future representing the next completed task or * {@code null} if the specified waiting time elapses * before one is present * @throws InterruptedException if interrupted while waiting */ Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
能夠看到 CompletionService 方法,分別以下:
submit 用於提交一個 Callable 對象,用於提交一個能夠得到結果的線程任務
submit 用於提交一個 Runnable 對象及 result 對象,相似於上面的 submit,可是 runnable 的返回值 void 沒法得到線程的結果,因此添加了 result 用於作爲參數的橋樑
take 用於取出最新的線程執行結果,注意這裏是阻塞的
take 用於取出最新的線程執行結果,是非阻塞的,若是沒有結果就返回 null
同上,只是加了一個超時時間
另外,CompletionService 是接口,沒法直接使用,一般使用他的實現類 ExecutorCompletionService,具體使用方法如上面的 demo。
可能看到這裏會很好奇 ExecutorCompletionService 實現原理,其實原理很簡單,他在內部維護了一個阻塞隊列,提交的任務,先執行完的先進入隊列,因此你經過 poll 或 take 得到的確定是最早執行完的任務結果。
由於篇幅有限,沒法貼完全部代碼,如遇到問題可到github上查看源碼。
歡迎關注個人我的公衆號 KIWI的碎碎念 ,關注後回覆 福利,海量學習內容免費分享!
歡迎關注個人我的公衆號 KIWI的碎碎念 ,關注後回覆 學習資料,海量學習內容直接分享!