在實際的業務開發中,須要用到併發編程的知識,實際使用線程池來異步執行任務的場景並非特別多,並且通常真的遇到了須要併發使用的時候,可能更加常見的就是直接實現Runnable/Callable接口,丟到Thread中執行了;或者更高級一點,定義一個線程池,扔進去執行;本片博文,將從另外一個角度,藉助JDK提供的ForkJoin,來設計一個簡單易用的併發框架java
實際項目中,使用併發的一個case就是商品詳情頁的展現了,一個詳情頁的展現,除了基本的商品數據以外,還有銷量,地址,評價,推薦,店鋪信息,裝飾信息等,用一段僞代碼來描述拼裝整個詳情數據的過程git
// 獲取商品基本信息 ItemInfo itemInfo = itemService.getInfo(itemId); // 獲取銷量 int sellCount = sellService.getSellCount(itemId); // 獲取評價信息 RateInfo rateInfo = rateService.getRateInfo(itemId); // 獲取店鋪信息 ShopInfo shopInfo = shopService.getShopInfo(shopId); // 獲取裝飾信息 DecorateInfo decoreateInfo = decorateService.getDecorateInfo(itemId); // 獲取推薦商品 RecommandInfo recommandInfo = recommandService.getRecommand(itemId);
若是是正常的執行過程,那麼就是上面的6個調用,串行的執行下來,假設每一個服務的rt是10ms,那麼光是這裏六個服務執行下來,耗時就>60ms了,github
但從業務角度出發,上面6個服務調用,彼此之間沒有什麼關聯,即一個服務的調用,並不依賴另外一個服務返回的結果,她們徹底能夠併發執行,這樣六個服務執行下來,耗時就是六個服務中耗時最久的一個了,可能也就10ms多一點了編程
兩個一對比,發現這種場景下,使用併發的優點很是明顯了,接下來的問題是,咱們但願以最簡單的方式,將上面的代碼改爲併發的併發
以上面的case爲例,若是咱們採用線程池的方式,能夠怎麼實現呢?框架
由於線程池方式不是重點,因此就簡單的演示如下,能夠怎麼實現,以及實現以後的效果如何異步
// 1. 建立線程池 ExecutorService alarmExecutorService = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), new DefaultThreadFactory("service-pool"), new ThreadPoolExecutor.CallerRunsPolicy()); // 2. 將服務調用,封裝到線程任務中執行 Future<ItemInfo> itemFuture = alarmExecutorService.submit(new Callable<ItemInfo>() { @Override public ItemInfo call() throws Exception { return itemService.getInfo(itemId); } }); // ... 其餘的服務依次類推 // 3. 獲取數據 ItemInfo = itemFutre.get(); // 阻塞,直到返回
上面這個實現能夠說是一個很是清晰明瞭的實現方式了,咱們接下來看一下,用Fork/Join框架能夠怎麼玩,又會有什麼好處async
首先可能須要簡單的介紹下,這是個什麼東西,Fork/Join框架是Java7提供了的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架ide
簡單來講,就是講一個複雜的任務,拆分紅不少小任務,併發去執行的機制,任務與任務的執行,可能並不會獨佔線程,採用了一種名爲工做竊取的手段,詳情能夠參考學習
藉助ForkJoin的方式,又能夠怎麼支持上面的場景呢?一個簡單的方案以下
// 1. 建立池 ForkJoinPool pool = new ForkJoinPool(10); // 2. 建立任務並提交 ForkJoinTask<ItemInfo> future = joinPool.submit(new RecursiveTask<ItemInfo>() { public ItemInfo compute() { return itemService.getItemInfo(itemId); } }); // 3. 獲取結果 future.join();
這樣一對比,二者之間並無什麼區別,並且也沒有用到傳說中的任務拆解
如何可以充分的利用ForkJoin的任務拆解的思想來解決問題呢?
將上面的實例,咱們稍微變通一下,將整個詳情頁的數據返回,看作是一個任務,對於內部的服務調用,根據不一樣的應用提供放,再進行任務劃分,假設能夠變成以下的層次結構
從上圖能夠看出,前面的服務調用,還能夠繼續劃分,好比咱們常見的商品信息,就能夠區分爲基本商品信息,sku信息,庫存信息,而這三個又是能夠併發執行的,也就是說從,藉助forjoin的任務拆解,咱們徹底能夠作到更細粒度的併發場景
那麼如今的目標就是,如何實現上面這個任務拆分的場景需求,並且還但願對既有的代碼改動不太大,關鍵還在於寫出來後,得容易看懂+維護(這點其實很重要,筆者接觸過一個封裝得特別好,致使業務交接的維護成本太大,以及排查問題難度飆升的狀況)
首先是定義一個最基本的執行單元,也就是封裝具體的業務邏輯,也就是咱們常說的Task(最終的效果也就是一個一個的task進行執行任務)
由於考慮到任務的拆解的狀況,因此咱們須要一個特殊的task,這個task能夠是多個task的集合(也就是大任務,先稱爲bigTask)
而後就是使用時,全部的task都封裝在一個bigTask中,直接丟給forkJoinPool來執行(支持同步獲取結果的invoke調用方式和異步獲取結果的execute方式)
那麼,核心就在與如何設計這個BigTask了,以及在執行時,將bigTask拆解成更細粒度的bigTask或者task,並最終將全部的task執行結果合併起來並返回
基本task接口
/** * Created by yihui on 2018/4/8. */ public interface IDataLoader<T> { /** * 具體的業務邏輯,放在這個方法裏面執行,將返回的結果,封裝到context內 * * @param context */ void load(T context); }
一個抽象的實現類,繼承forkjoin的RecuriAction,這個就對應上咱們前面定義的基本Task了
public abstract class AbstractDataLoader<T> extends RecursiveAction implements IDataLoader { // 這裏就是用來保存返回的結果,由業務防本身在實現的load()方法中寫入數據 protected T context; public AbstractDataLoader(T context) { this.context = context; } public void compute() { load(context); } /** * 獲取執行後的結果,強制等待執行完畢 * @return */ public T getContext() { this.join(); return context; } public void setContext(T context) { this.context = context; } }
而後就是BigTask的實現了,也比較簡單,內部維持一個List
public class DefaultForkJoinDataLoader<T> extends AbstractDataLoader<T> { /** * 待執行的任務列表 */ private List<AbstractDataLoader> taskList; public DefaultForkJoinDataLoader(T context) { super(context); taskList = new ArrayList<>(); } public DefaultForkJoinDataLoader<T> addTask(IDataLoader dataLoader) { taskList.add(new AbstractDataLoader(this.context) { @Override public void load(Object context) { dataLoader.load(context); } }); return this; } // 注意這裏,藉助fork對任務進行了拆解 @Override public void load(Object context) { this.taskList.forEach(ForkJoinTask::fork); } /** * 獲取執行後的結果 * @return */ public T getContext() { this.taskList.forEach(ForkJoinTask::join); return this.context; } }
接下來就是比較簡單的線程池的設計了,由於咱們須要提供同步獲取結果,和異步獲取結果的兩種姿式,因此對ForkJoinPool須要作個擴展
public class ExtendForkJoinPool extends ForkJoinPool { public ExtendForkJoinPool() { } public ExtendForkJoinPool(int parallelism) { super(parallelism); } public ExtendForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) { super(parallelism, factory, handler, asyncMode); } // 同步阻塞調用時,須要對每一個task執行join,確保執行完畢 public <T> T invoke(ForkJoinTask<T> task) { if (task instanceof AbstractDataLoader) { super.invoke(task); return (T) ((AbstractDataLoader) task).getContext(); } else { return super.invoke(task); } } }
而後就是建立Pool的工廠類,沒什麼特別的了
public class ForkJoinPoolFactory { private int parallelism; private ExtendForkJoinPool forkJoinPool; public ForkJoinPoolFactory() { this(Runtime.getRuntime().availableProcessors() * 16); } public ForkJoinPoolFactory(int parallelism) { this.parallelism = parallelism; forkJoinPool = new ExtendForkJoinPool(parallelism); } public ExtendForkJoinPool getObject() { return this.forkJoinPool; } public int getParallelism() { return parallelism; } public void setParallelism(int parallelism) { this.parallelism = parallelism; } public void destroy() throws Exception { this.forkJoinPool.shutdown(); } }
到此,整個基本上算是完了,每一個類都很簡單,就那麼點東西,接下來就是須要看怎麼用了
先來一個簡單的case,演示下,應該怎麼用
@Data static class Context { public int addAns; public int mulAns; public String concatAns; public Map<String, Object> ans = new ConcurrentHashMap<>(); } @Test public void testForkJoinFramework() { ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject(); Context context = new Context(); DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context); loader.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { context.addAns = 100; System.out.println("add thread: " + Thread.currentThread()); } }); loader.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } context.mulAns = 50; System.out.println("mul thread: " + Thread.currentThread()); } }); loader.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { context.concatAns = "hell world"; System.out.println("concat thread: " + Thread.currentThread()); } }); DefaultForkJoinDataLoader<Context> subTask = new DefaultForkJoinDataLoader<>(context); subTask.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { System.out.println("sub thread1: " + Thread.currentThread() + " | now: " + System.currentTimeMillis()); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis()); } }); subTask.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { System.out.println("sub thread2: " + Thread.currentThread() + " | now: " + System.currentTimeMillis()); context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis()); } }); loader.addTask(subTask); long start = System.currentTimeMillis(); System.out.println("------- start: " + start); // 提交任務,同步阻塞調用方式 forkJoinPool.invoke(loader); System.out.println("------- end: " + (System.currentTimeMillis() - start)); // 輸出返回結果,要求3s後輸出,全部的結果都設置完畢 System.out.println("the ans: " + context); }
使用起來就比較簡單了,簡單的四步驟便可:
new DefaultForkJoinDataLoader<>(context);
上面這個實現中,對於須要將Task進行再次拆分,會變得很是簡單,看下上面的輸出
------- start: 1523200221827 add thread: Thread[ForkJoinPool-1-worker-50,5,main] concat thread: Thread[ForkJoinPool-1-worker-36,5,main] sub thread2: Thread[ForkJoinPool-1-worker-29,5,main] | now: 1523200222000 sub thread1: Thread[ForkJoinPool-1-worker-36,5,main] | now: 1523200222000 mul thread: Thread[ForkJoinPool-1-worker-43,5,main] ------- end: 3176 the ans: ForJoinTest.Context(addAns=100, mulAns=50, concatAns=hell world, ans={ForkJoinPool-1-worker-36=1523200222204, ForkJoinPool-1-worker-29=1523200222000})
對於但願異步執行的狀況,也比較簡單了,僅僅是在提交任務的地方,稍微改動一下便可,而後在須要獲取數據的時候,經過loader來獲取結果便可
@Test public void testForkJoinFramework2() { ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject(); Context context = new Context(); DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context); loader.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } context.addAns = 100; System.out.println("add thread: " + Thread.currentThread()); } }); loader.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { context.mulAns = 50; System.out.println("mul thread: " + Thread.currentThread()); } }); loader.addTask(new IDataLoader<Context>() { @Override public void load(Context context) { context.concatAns = "hell world"; System.out.println("concat thread: " + Thread.currentThread()); } }); long start = System.currentTimeMillis(); System.out.println("------- start: " + start); // 若是暫時不關心返回結果,能夠採用execute方式,異步執行 forkJoinPool.execute(loader); // .... 這裏能夠作其餘的事情 此時,不會阻塞,addAns不會被設置 System.out.println("context is: " + context); System.out.println("------- then: " + (System.currentTimeMillis() - start)); loader.getContext(); // 主動調用這個,表示會等待全部任務執行完畢後,才繼續下去 System.out.println("context is: " + context); System.out.println("------- end: " + (System.currentTimeMillis() - start)); }
相關源碼可在git上查看,主要在Quick-Alarm項目中
我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,已上內容,純屬一家之言,因本人能力通常,見識有限,如發現bug或者有更好的建議,隨時歡迎批評指正