基於ForkJoin構建一個簡單易用的併發組件

基於ForkJoin構建一個簡單易用的併發組件

在實際的業務開發中,須要用到併發編程的知識,實際使用線程池來異步執行任務的場景並非特別多,並且通常真的遇到了須要併發使用的時候,可能更加常見的就是直接實現Runnable/Callable接口,丟到Thread中執行了;或者更高級一點,定義一個線程池,扔進去執行;本片博文,將從另外一個角度,藉助JDK提供的ForkJoin,來設計一個簡單易用的併發框架java

I. 背景

實際項目中,使用併發的一個case就是商品詳情頁的展現了,一個詳情頁的展現,除了基本的商品數據以外,還有銷量,地址,評價,推薦,店鋪信息,裝飾信息等,用一段僞代碼來描述拼裝整個詳情數據的過程git

// 獲取商品基本信息
ItemInfo itemInfo = itemService.getInfo(itemId);

// 獲取銷量
int sellCount = sellService.getSellCount(itemId);

// 獲取評價信息
RateInfo rateInfo = rateService.getRateInfo(itemId);


// 獲取店鋪信息
ShopInfo shopInfo = shopService.getShopInfo(shopId);


// 獲取裝飾信息
DecorateInfo decoreateInfo = decorateService.getDecorateInfo(itemId);

// 獲取推薦商品
RecommandInfo recommandInfo = recommandService.getRecommand(itemId);

若是是正常的執行過程,那麼就是上面的6個調用,串行的執行下來,假設每一個服務的rt是10ms,那麼光是這裏六個服務執行下來,耗時就>60ms了,github

但從業務角度出發,上面6個服務調用,彼此之間沒有什麼關聯,即一個服務的調用,並不依賴另外一個服務返回的結果,她們徹底能夠併發執行,這樣六個服務執行下來,耗時就是六個服務中耗時最久的一個了,可能也就10ms多一點了編程

兩個一對比,發現這種場景下,使用併發的優點很是明顯了,接下來的問題是,咱們但願以最簡單的方式,將上面的代碼改爲併發的併發

II. 設計與實現

以上面的case爲例,若是咱們採用線程池的方式,能夠怎麼實現呢?框架

1. 線程池方式

由於線程池方式不是重點,因此就簡單的演示如下,能夠怎麼實現,以及實現以後的效果如何異步

// 1. 建立線程池
ExecutorService alarmExecutorService = new ThreadPoolExecutor(3, 5, 60,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10), 
                new DefaultThreadFactory("service-pool"),
                new ThreadPoolExecutor.CallerRunsPolicy());


// 2. 將服務調用,封裝到線程任務中執行
Future<ItemInfo> itemFuture = alarmExecutorService.submit(new Callable<ItemInfo>() {
    @Override
    public ItemInfo call() throws Exception {
        return itemService.getInfo(itemId);
    }
});

// ... 其餘的服務依次類推


// 3. 獲取數據
ItemInfo = itemFutre.get(); // 阻塞,直到返回

上面這個實現能夠說是一個很是清晰明瞭的實現方式了,咱們接下來看一下,用Fork/Join框架能夠怎麼玩,又會有什麼好處async

2. ForkJoin方式

首先可能須要簡單的介紹下,這是個什麼東西,Fork/Join框架是Java7提供了的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架ide

簡單來講,就是講一個複雜的任務,拆分紅不少小任務,併發去執行的機制,任務與任務的執行,可能並不會獨佔線程,採用了一種名爲工做竊取的手段,詳情能夠參考學習

ForkJoin 學習使用筆記

藉助ForkJoin的方式,又能夠怎麼支持上面的場景呢?一個簡單的方案以下

// 1. 建立池
ForkJoinPool pool = new ForkJoinPool(10);


// 2. 建立任務並提交
ForkJoinTask<ItemInfo> future = joinPool.submit(new RecursiveTask<ItemInfo>() {
    public ItemInfo compute() {
        return itemService.getItemInfo(itemId);
    }
});


// 3. 獲取結果
future.join();

這樣一對比,二者之間並無什麼區別,並且也沒有用到傳說中的任務拆解

3. 進階

如何可以充分的利用ForkJoin的任務拆解的思想來解決問題呢?

將上面的實例,咱們稍微變通一下,將整個詳情頁的數據返回,看作是一個任務,對於內部的服務調用,根據不一樣的應用提供放,再進行任務劃分,假設能夠變成以下的層次結構

arch

從上圖能夠看出,前面的服務調用,還能夠繼續劃分,好比咱們常見的商品信息,就能夠區分爲基本商品信息,sku信息,庫存信息,而這三個又是能夠併發執行的,也就是說從,藉助forjoin的任務拆解,咱們徹底能夠作到更細粒度的併發場景

那麼如今的目標就是,如何實現上面這個任務拆分的場景需求,並且還但願對既有的代碼改動不太大,關鍵還在於寫出來後,得容易看懂+維護(這點其實很重要,筆者接觸過一個封裝得特別好,致使業務交接的維護成本太大,以及排查問題難度飆升的狀況)

4. 實現

a. 設計思路

首先是定義一個最基本的執行單元,也就是封裝具體的業務邏輯,也就是咱們常說的Task(最終的效果也就是一個一個的task進行執行任務)

由於考慮到任務的拆解的狀況,因此咱們須要一個特殊的task,這個task能夠是多個task的集合(也就是大任務,先稱爲bigTask)

而後就是使用時,全部的task都封裝在一個bigTask中,直接丟給forkJoinPool來執行(支持同步獲取結果的invoke調用方式和異步獲取結果的execute方式)

那麼,核心就在與如何設計這個BigTask了,以及在執行時,將bigTask拆解成更細粒度的bigTask或者task,並最終將全部的task執行結果合併起來並返回

b. 實現

基本task接口

/**
 * Created by yihui on 2018/4/8.
 */
public interface IDataLoader<T> {


    /**
     * 具體的業務邏輯,放在這個方法裏面執行,將返回的結果,封裝到context內
     *
     * @param context
     */
    void load(T context);

}

一個抽象的實現類,繼承forkjoin的RecuriAction,這個就對應上咱們前面定義的基本Task了

public abstract class AbstractDataLoader<T> extends RecursiveAction implements IDataLoader {

    // 這裏就是用來保存返回的結果,由業務防本身在實現的load()方法中寫入數據
    protected T context;

    public AbstractDataLoader(T context) {
        this.context = context;
    }

    public void compute() {
        load(context);
    }


    /**
     * 獲取執行後的結果,強制等待執行完畢
     * @return
     */
    public T getContext() {
        this.join();
        return context;
    }

    public void setContext(T context) {
        this.context = context;
    }
}

而後就是BigTask的實現了,也比較簡單,內部維持一個List

public class DefaultForkJoinDataLoader<T> extends AbstractDataLoader<T> {
    /**
     * 待執行的任務列表
     */
    private List<AbstractDataLoader> taskList;


    public DefaultForkJoinDataLoader(T context) {
        super(context);
        taskList = new ArrayList<>();
    }


    public DefaultForkJoinDataLoader<T> addTask(IDataLoader dataLoader) {
        taskList.add(new AbstractDataLoader(this.context) {
            @Override
            public void load(Object context) {
                dataLoader.load(context);
            }
        });
        return this;
    }


    // 注意這裏,藉助fork對任務進行了拆解
    @Override
    public void load(Object context) {
        this.taskList.forEach(ForkJoinTask::fork);
    }


    /**
     * 獲取執行後的結果
     * @return
     */
    public T getContext() {
        this.taskList.forEach(ForkJoinTask::join);
        return this.context;
    }
}

接下來就是比較簡單的線程池的設計了,由於咱們須要提供同步獲取結果,和異步獲取結果的兩種姿式,因此對ForkJoinPool須要作個擴展

public class ExtendForkJoinPool extends ForkJoinPool {

    public ExtendForkJoinPool() {
    }

    public ExtendForkJoinPool(int parallelism) {
        super(parallelism);
    }

    public ExtendForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) {
        super(parallelism, factory, handler, asyncMode);
    }


    // 同步阻塞調用時,須要對每一個task執行join,確保執行完畢
    public <T> T invoke(ForkJoinTask<T> task) {
        if (task instanceof AbstractDataLoader) {
            super.invoke(task);
            return (T) ((AbstractDataLoader) task).getContext();
        } else {
            return super.invoke(task);
        }
    }
}

而後就是建立Pool的工廠類,沒什麼特別的了

public class ForkJoinPoolFactory {

    private int parallelism;

    private ExtendForkJoinPool forkJoinPool;

    public ForkJoinPoolFactory() {
        this(Runtime.getRuntime().availableProcessors() * 16);
    }

    public ForkJoinPoolFactory(int parallelism) {
        this.parallelism = parallelism;
        forkJoinPool = new ExtendForkJoinPool(parallelism);
    }

    public ExtendForkJoinPool getObject() {
        return this.forkJoinPool;
    }

    public int getParallelism() {
        return parallelism;
    }

    public void setParallelism(int parallelism) {
        this.parallelism = parallelism;
    }


    public void destroy() throws Exception {
        this.forkJoinPool.shutdown();
    }

}

到此,整個基本上算是完了,每一個類都很簡單,就那麼點東西,接下來就是須要看怎麼用了

III. 測試驗證

先來一個簡單的case,演示下,應該怎麼用

@Data
static class Context {
    public int addAns;

    public int mulAns;

    public String concatAns;

    public Map<String, Object> ans = new ConcurrentHashMap<>();
}


@Test
public void testForkJoinFramework() {
    ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();

    Context context = new Context();
    DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.addAns = 100;
            System.out.println("add thread: " + Thread.currentThread());
        }
    });
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            context.mulAns = 50;
            System.out.println("mul thread: " + Thread.currentThread());
        }
    });
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.concatAns = "hell world";
            System.out.println("concat thread: " + Thread.currentThread());
        }
    });


    DefaultForkJoinDataLoader<Context> subTask = new DefaultForkJoinDataLoader<>(context);
    subTask.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            System.out.println("sub thread1: " + Thread.currentThread() + " | now: " + System.currentTimeMillis());
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis());

        }
    });
    subTask.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            System.out.println("sub thread2: " + Thread.currentThread() + " | now: " + System.currentTimeMillis());
            context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis());
        }
    });

    loader.addTask(subTask);


    long start = System.currentTimeMillis();
    System.out.println("------- start: " + start);

    // 提交任務,同步阻塞調用方式
    forkJoinPool.invoke(loader);


    System.out.println("------- end: " + (System.currentTimeMillis() - start));

    // 輸出返回結果,要求3s後輸出,全部的結果都設置完畢
    System.out.println("the ans: " + context);
}

使用起來就比較簡單了,簡單的四步驟便可:

  • 建立Pool
  • 指定保存結果的容器類ContextHolder
  • 建立任務
    • 建立根任務 new DefaultForkJoinDataLoader<>(context);
    • 添加子任務
  • 提交

上面這個實現中,對於須要將Task進行再次拆分,會變得很是簡單,看下上面的輸出

------- start: 1523200221827
add thread: Thread[ForkJoinPool-1-worker-50,5,main]
concat thread: Thread[ForkJoinPool-1-worker-36,5,main]
sub thread2: Thread[ForkJoinPool-1-worker-29,5,main] | now: 1523200222000
sub thread1: Thread[ForkJoinPool-1-worker-36,5,main] | now: 1523200222000
mul thread: Thread[ForkJoinPool-1-worker-43,5,main]
------- end: 3176
the ans: ForJoinTest.Context(addAns=100, mulAns=50, concatAns=hell world, ans={ForkJoinPool-1-worker-36=1523200222204, ForkJoinPool-1-worker-29=1523200222000})
  • 首先是各個子任務執行的線程輸出能夠看出確實是不一樣線程執行的任務(併發)
  • 3s後,輸出結果,即invoke以後,會阻塞直到全部的任務執行完畢
  • subTask進行了任務拆解,兩個子任務的執行時間相同,可是一個sleep,另外一個則不受影響(子任務也是並行執行)

對於但願異步執行的狀況,也比較簡單了,僅僅是在提交任務的地方,稍微改動一下便可,而後在須要獲取數據的時候,經過loader來獲取結果便可

@Test
public void testForkJoinFramework2() {
    ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();

    Context context = new Context();
    DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            context.addAns = 100;
            System.out.println("add thread: " + Thread.currentThread());
        }
    });
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.mulAns = 50;
            System.out.println("mul thread: " + Thread.currentThread());
        }
    });
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.concatAns = "hell world";
            System.out.println("concat thread: " + Thread.currentThread());
        }
    });


    long start = System.currentTimeMillis();
    System.out.println("------- start: " + start);

    // 若是暫時不關心返回結果,能夠採用execute方式,異步執行
    forkJoinPool.execute(loader);

    // .... 這裏能夠作其餘的事情 此時,不會阻塞,addAns不會被設置
    System.out.println("context is: " + context);
    System.out.println("------- then: " + (System.currentTimeMillis() - start));


    loader.getContext(); // 主動調用這個,表示會等待全部任務執行完畢後,才繼續下去
    System.out.println("context is: " + context);
    System.out.println("------- end: " + (System.currentTimeMillis() - start));
}

IV. 其餘

源碼

相關源碼可在git上查看,主要在Quick-Alarm項目中

我的博客: 一灰灰Blog

我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛

聲明

盡信書則不如,已上內容,純屬一家之言,因本人能力通常,見識有限,如發現bug或者有更好的建議,隨時歡迎批評指正

掃描關注

QrCode

相關文章
相關標籤/搜索