在實際的業務開發中,須要用到併發編程的知識,實際使用線程池來異步執行任務的場景並非特別多,並且通常真的遇到了須要併發使用的時候,可能更加常見的就是直接實現Runnable/Callable接口,丟到Thread中執行了;或者更高級一點,定義一個線程池,扔進去執行;本片博文,將從另外一個角度,藉助JDK提供的ForkJoin,來設計一個簡單易用的併發框架java
實際項目中,使用併發的一個case就是商品詳情頁的展現了,一個詳情頁的展現,除了基本的商品數據以外,還有銷量,地址,評價,推薦,店鋪信息,裝飾信息等,用一段僞代碼來描述拼裝整個詳情數據的過程git
// 獲取商品基本信息
ItemInfo itemInfo = itemService.getInfo(itemId);
// 獲取銷量
int sellCount = sellService.getSellCount(itemId);
// 獲取評價信息
RateInfo rateInfo = rateService.getRateInfo(itemId);
// 獲取店鋪信息
ShopInfo shopInfo = shopService.getShopInfo(shopId);
// 獲取裝飾信息
DecorateInfo decoreateInfo = decorateService.getDecorateInfo(itemId);
// 獲取推薦商品
RecommandInfo recommandInfo = recommandService.getRecommand(itemId);
複製代碼
若是是正常的執行過程,那麼就是上面的6個調用,串行的執行下來,假設每一個服務的rt是10ms,那麼光是這裏六個服務執行下來,耗時就>60ms了,github
但從業務角度出發,上面6個服務調用,彼此之間沒有什麼關聯,即一個服務的調用,並不依賴另外一個服務返回的結果,她們徹底能夠併發執行,這樣六個服務執行下來,耗時就是六個服務中耗時最久的一個了,可能也就10ms多一點了編程
兩個一對比,發現這種場景下,使用併發的優點很是明顯了,接下來的問題是,咱們但願以最簡單的方式,將上面的代碼改爲併發的bash
以上面的case爲例,若是咱們採用線程池的方式,能夠怎麼實現呢?併發
由於線程池方式不是重點,因此就簡單的演示如下,能夠怎麼實現,以及實現以後的效果如何框架
// 1. 建立線程池
ExecutorService alarmExecutorService = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
new DefaultThreadFactory("service-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
// 2. 將服務調用,封裝到線程任務中執行
Future<ItemInfo> itemFuture = alarmExecutorService.submit(new Callable<ItemInfo>() {
@Override
public ItemInfo call() throws Exception {
return itemService.getInfo(itemId);
}
});
// ... 其餘的服務依次類推
// 3. 獲取數據
ItemInfo = itemFutre.get(); // 阻塞,直到返回
複製代碼
上面這個實現能夠說是一個很是清晰明瞭的實現方式了,咱們接下來看一下,用Fork/Join框架能夠怎麼玩,又會有什麼好處異步
首先可能須要簡單的介紹下,這是個什麼東西,Fork/Join框架是Java7提供了的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架async
簡單來講,就是講一個複雜的任務,拆分紅不少小任務,併發去執行的機制,任務與任務的執行,可能並不會獨佔線程,採用了一種名爲工做竊取的手段,詳情能夠參考ide
藉助ForkJoin的方式,又能夠怎麼支持上面的場景呢?一個簡單的方案以下
// 1. 建立池
ForkJoinPool pool = new ForkJoinPool(10);
// 2. 建立任務並提交
ForkJoinTask<ItemInfo> future = joinPool.submit(new RecursiveTask<ItemInfo>() {
public ItemInfo compute() {
return itemService.getItemInfo(itemId);
}
});
// 3. 獲取結果
future.join();
複製代碼
這樣一對比,二者之間並無什麼區別,並且也沒有用到傳說中的任務拆解
如何可以充分的利用ForkJoin的任務拆解的思想來解決問題呢?
將上面的實例,咱們稍微變通一下,將整個詳情頁的數據返回,看作是一個任務,對於內部的服務調用,根據不一樣的應用提供放,再進行任務劃分,假設能夠變成以下的層次結構
從上圖能夠看出,前面的服務調用,還能夠繼續劃分,好比咱們常見的商品信息,就能夠區分爲基本商品信息,sku信息,庫存信息,而這三個又是能夠併發執行的,也就是說從,藉助forjoin的任務拆解,咱們徹底能夠作到更細粒度的併發場景
那麼如今的目標就是,如何實現上面這個任務拆分的場景需求,並且還但願對既有的代碼改動不太大,關鍵還在於寫出來後,得容易看懂+維護(這點其實很重要,筆者接觸過一個封裝得特別好,致使業務交接的維護成本太大,以及排查問題難度飆升的狀況)
首先是定義一個最基本的執行單元,也就是封裝具體的業務邏輯,也就是咱們常說的Task(最終的效果也就是一個一個的task進行執行任務)
由於考慮到任務的拆解的狀況,因此咱們須要一個特殊的task,這個task能夠是多個task的集合(也就是大任務,先稱爲bigTask)
而後就是使用時,全部的task都封裝在一個bigTask中,直接丟給forkJoinPool來執行(支持同步獲取結果的invoke調用方式和異步獲取結果的execute方式)
那麼,核心就在與如何設計這個BigTask了,以及在執行時,將bigTask拆解成更細粒度的bigTask或者task,並最終將全部的task執行結果合併起來並返回
基本task接口
/** * Created by yihui on 2018/4/8. */
public interface IDataLoader<T> {
/** * 具體的業務邏輯,放在這個方法裏面執行,將返回的結果,封裝到context內 * * @param context */
void load(T context);
}
複製代碼
一個抽象的實現類,繼承forkjoin的RecuriAction,這個就對應上咱們前面定義的基本Task了
public abstract class AbstractDataLoader<T> extends RecursiveAction implements IDataLoader {
// 這裏就是用來保存返回的結果,由業務防本身在實現的load()方法中寫入數據
protected T context;
public AbstractDataLoader(T context) {
this.context = context;
}
public void compute() {
load(context);
}
/** * 獲取執行後的結果,強制等待執行完畢 * @return */
public T getContext() {
this.join();
return context;
}
public void setContext(T context) {
this.context = context;
}
}
複製代碼
而後就是BigTask的實現了,也比較簡單,內部維持一個List
public class DefaultForkJoinDataLoader<T> extends AbstractDataLoader<T> {
/** * 待執行的任務列表 */
private List<AbstractDataLoader> taskList;
public DefaultForkJoinDataLoader(T context) {
super(context);
taskList = new ArrayList<>();
}
public DefaultForkJoinDataLoader<T> addTask(IDataLoader dataLoader) {
taskList.add(new AbstractDataLoader(this.context) {
@Override
public void load(Object context) {
dataLoader.load(context);
}
});
return this;
}
// 注意這裏,藉助fork對任務進行了拆解
@Override
public void load(Object context) {
this.taskList.forEach(ForkJoinTask::fork);
}
/** * 獲取執行後的結果 * @return */
public T getContext() {
this.taskList.forEach(ForkJoinTask::join);
return this.context;
}
}
複製代碼
接下來就是比較簡單的線程池的設計了,由於咱們須要提供同步獲取結果,和異步獲取結果的兩種姿式,因此對ForkJoinPool須要作個擴展
public class ExtendForkJoinPool extends ForkJoinPool {
public ExtendForkJoinPool() {
}
public ExtendForkJoinPool(int parallelism) {
super(parallelism);
}
public ExtendForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) {
super(parallelism, factory, handler, asyncMode);
}
// 同步阻塞調用時,須要對每一個task執行join,確保執行完畢
public <T> T invoke(ForkJoinTask<T> task) {
if (task instanceof AbstractDataLoader) {
super.invoke(task);
return (T) ((AbstractDataLoader) task).getContext();
} else {
return super.invoke(task);
}
}
}
複製代碼
而後就是建立Pool的工廠類,沒什麼特別的了
public class ForkJoinPoolFactory {
private int parallelism;
private ExtendForkJoinPool forkJoinPool;
public ForkJoinPoolFactory() {
this(Runtime.getRuntime().availableProcessors() * 16);
}
public ForkJoinPoolFactory(int parallelism) {
this.parallelism = parallelism;
forkJoinPool = new ExtendForkJoinPool(parallelism);
}
public ExtendForkJoinPool getObject() {
return this.forkJoinPool;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public void destroy() throws Exception {
this.forkJoinPool.shutdown();
}
}
複製代碼
到此,整個基本上算是完了,每一個類都很簡單,就那麼點東西,接下來就是須要看怎麼用了
先來一個簡單的case,演示下,應該怎麼用
@Data
static class Context {
public int addAns;
public int mulAns;
public String concatAns;
public Map<String, Object> ans = new ConcurrentHashMap<>();
}
@Test
public void testForkJoinFramework() {
ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();
Context context = new Context();
DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.addAns = 100;
System.out.println("add thread: " + Thread.currentThread());
}
});
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.mulAns = 50;
System.out.println("mul thread: " + Thread.currentThread());
}
});
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.concatAns = "hell world";
System.out.println("concat thread: " + Thread.currentThread());
}
});
DefaultForkJoinDataLoader<Context> subTask = new DefaultForkJoinDataLoader<>(context);
subTask.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
System.out.println("sub thread1: " + Thread.currentThread() + " | now: " + System.currentTimeMillis());
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis());
}
});
subTask.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
System.out.println("sub thread2: " + Thread.currentThread() + " | now: " + System.currentTimeMillis());
context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis());
}
});
loader.addTask(subTask);
long start = System.currentTimeMillis();
System.out.println("------- start: " + start);
// 提交任務,同步阻塞調用方式
forkJoinPool.invoke(loader);
System.out.println("------- end: " + (System.currentTimeMillis() - start));
// 輸出返回結果,要求3s後輸出,全部的結果都設置完畢
System.out.println("the ans: " + context);
}
複製代碼
使用起來就比較簡單了,簡單的四步驟便可:
new DefaultForkJoinDataLoader<>(context);
上面這個實現中,對於須要將Task進行再次拆分,會變得很是簡單,看下上面的輸出
------- start: 1523200221827
add thread: Thread[ForkJoinPool-1-worker-50,5,main]
concat thread: Thread[ForkJoinPool-1-worker-36,5,main]
sub thread2: Thread[ForkJoinPool-1-worker-29,5,main] | now: 1523200222000
sub thread1: Thread[ForkJoinPool-1-worker-36,5,main] | now: 1523200222000
mul thread: Thread[ForkJoinPool-1-worker-43,5,main]
------- end: 3176
the ans: ForJoinTest.Context(addAns=100, mulAns=50, concatAns=hell world, ans={ForkJoinPool-1-worker-36=1523200222204, ForkJoinPool-1-worker-29=1523200222000})
複製代碼
對於但願異步執行的狀況,也比較簡單了,僅僅是在提交任務的地方,稍微改動一下便可,而後在須要獲取數據的時候,經過loader來獲取結果便可
@Test
public void testForkJoinFramework2() {
ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();
Context context = new Context();
DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.addAns = 100;
System.out.println("add thread: " + Thread.currentThread());
}
});
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.mulAns = 50;
System.out.println("mul thread: " + Thread.currentThread());
}
});
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.concatAns = "hell world";
System.out.println("concat thread: " + Thread.currentThread());
}
});
long start = System.currentTimeMillis();
System.out.println("------- start: " + start);
// 若是暫時不關心返回結果,能夠採用execute方式,異步執行
forkJoinPool.execute(loader);
// .... 這裏能夠作其餘的事情 此時,不會阻塞,addAns不會被設置
System.out.println("context is: " + context);
System.out.println("------- then: " + (System.currentTimeMillis() - start));
loader.getContext(); // 主動調用這個,表示會等待全部任務執行完畢後,才繼續下去
System.out.println("context is: " + context);
System.out.println("------- end: " + (System.currentTimeMillis() - start));
}
複製代碼
相關源碼可在git上查看,主要在Quick-Alarm項目中
我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,已上內容,純屬一家之言,因本人能力通常,見識有限,如發現bug或者有更好的建議,隨時歡迎批評指正