當Parallel趕上了DI - Spring並行數據聚合最佳實踐

分析淘寶PDP

讓咱們先看個圖, Taobao的PDP(Product Detail Page)頁.java

打開Chrome Network面板, 讓咱們來看taobao是怎麼加載這個頁面數據的. 根據經驗, 通常是異步加載的, 要麼是XHR,要麼就是js(jsonp), 你應該很快能夠找到node

還能看到這個接口的性能git

神奇的是, taobao居然在一次請求中拉下了整個PDP頁的完整數據, 並且服務端處理耗時不到125msgithub

首先, 這麼作有什麼好處?spring

  • 先後端開發對接簡單
  • 在一次網絡鏈接中儘量多的傳輸數據(數據大小要不影響用戶體驗, 通常不會超過300kb), 減小創建鏈接的次數和請求頭浪費的流量.

而後, 這又是怎麼作到的呢?json

你可能會說緩存, 但你要知道, 這樣一個對電商極爲重要的頁面, 絕對涉及到了很是多的團隊, 好比:c#

  • 商品團隊
  • 賣家團隊
  • 評價團隊
  • 訂單團隊
  • 會員團隊
  • 優惠團隊
  • 問答團隊
  • 推薦團隊
  • 物流系統
  • etc/等等

即便每一個團隊的數據全都是緩存的, 你一個個去拿, 要在125ms內拿完也不容易. 並且做爲跟錢相關的頁面, 部分數據必須保證絕對實時有效, 能用緩存的地方很少. 怎麼辦, 若是是你, 你會怎麼作? 離線打標? 數據預熱? etc..後端

此時, 並行調用不失爲一種好辦法.緩存

分析一下這個頁面, 你會發現, 每個模塊除了屬於同一個商品(入參相同), 其實各個模塊的數據之間, 並無依賴性, 徹底能夠並行去獲取.網絡

並行就沒有問題了嗎?

並行獲取數據, 能夠提升咱們的接口性能. 但也會引入一些問題, 如:

  • 依賴的項可能不少, 怎麼使代碼簡潔清晰?
  • 依賴關係極可能是一個有向圖, 若是作到有向圖中的每一個節點均可以並行執行?
  • 異步處理後, 超時怎麼處理? 業務代碼拋出異常了怎麼處理?
  • 依賴關係若是有死循環怎麼辦?
  • 異步以後, ThreadLocal中的內容怎麼處理? 一些基於ThreadLocal實現的Context不work怎麼辦?
  • 事務被線程隔離了怎麼辦?
  • 如何監控每一次異步執行, 每一個節點的性能?

下面, 咱們來討論下如何簡單\易用\高效的並行獲取數據; 如何解決上述異步問題.

常見的並行方式

假如你如今須要用戶的基礎信息\博客列表\粉絲列表 3份數據. 哪麼你有哪些方式能夠並行獲取呢?

Java ThreadPool並行

最簡單原始的辦法, 直接使用Java提供了的線程池和Future機制.

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CountDownLatch countDownLatch = new CountDownLatch(3);
    Future<User> userFuture = executorService.submit(() -> {
        try{
            return userService.get(userId);
        }finally {
            countDownLatch.countDown();
        }
    });
    Future<List<Post>> postsFuture = executorService.submit(() -> {
        try{
            return postService.getPosts(userId);
        }finally {
            countDownLatch.countDown();
        }
    });
    Future<List<User>> followersFuture = executorService.submit(() -> {
        try{
            return followService.getFollowers(userId);
        }finally {
            countDownLatch.countDown();
        }
    });
    countDownLatch.await();
    User user = userFuture.get();
    user.setFollowers(followersFuture.get());
    user.setPosts(postsFuture.get());
    return user;
}

Spring的異步並行

咱們知道, Spring支持@Async註解, 能夠方便的實現異步, 而且支持獲取返回值. 參考: https://www.baeldung.com/spring-async#2-methods-with-return-type

@Async實現的原理實際是在Bean的代理類的方法中, 攔截方法調用, 向taskExecutor Bean中提交Callable任務. 原理跟本身用Java ThreadPool寫其實區別不大.

那麼要用Spring Async實現上述功能. 首先須要修改下面3個方法的返回值, 而且修改返回值類型, 併爲方法添加 @Async註解

class UserServiceImpl implements UserService {
    @Async
    public Future<User> get(Long userId) {
        // ... something
    }
}
class PostServiceImpl implements PostService {
    @Async
    public Future<List<Post> getPosts(Long userId) {
        // ... something
    }
}
class FollowServiceImpl implements FollowService {
    @Async
    public Future<List<User> getFollowers(Long userId) {
        // ... something
    }
}

並行獲取3份用戶數據而後聚合, 代碼以下:

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
    Future<User> userFuture = userService.get(userId);
    Future<List<Post>> postsFuture = postService.getPosts(userId);
    Future<List<User>> followersFuture = followService.getFollowers(userId);
    
    User user = whileGet(userFuture);
    user.setFollowers(whileGet(followersFuture));
    user.setPosts(whileGet(postsFuture));
    return user;
}

private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
    while(true) {
        if (future.isDone()) {
            break;
        }
    }
    return future.get();
}

這裏使用自旋去獲取異步數據. 固然你也能夠像前面那樣, 傳遞一個閉鎖(CountDownLatch)到Service中去, 而後讓主調線程在一個閉鎖上面等待.

並行結合DI(依賴注入)

上面2種方式的確能實現功能, 但首先, 它們都很不直觀, 並且沒有處理前面講到的異步問題, 一旦出現超時\異常\ThreadLocal, 代碼可能不會按照你預期的方式工做. 那有沒有更簡單方即可靠的方法呢?

試想這樣一種方式, 若是你須要的數據, 均可以經過方法入參自動並行獲取, 而後傳遞給你, 那是否是很方便? 就像這樣:

@Component
public class UserAggregate {
    @DataProvider("userWithPosts")
    public User userWithPosts(
            @DataConsumer("user") User user,
            @DataConsumer("posts") List<Post> posts,
            @DataConsumer("followers") List<User> followers) {
        user.setPosts(posts);
        user.setFollowers(followers);
        return user;
    }
}

這裏的@DataConsumer聲明瞭你要異步獲取的數據id. @DataProvider聲明瞭這個方法提供數據, 而且id爲userWithPosts.

或者你不想寫這樣一個Aggregate類, 你不須要複用, 你想直接建立一個"匿名Provider". 那麼你能夠直接在任何地方像下面這樣調用拿結果

User user = dataBeanAggregateQueryFacade.get(
     Collections.singletonMap("userId", 1L), 
     new Function3<User, List<Post>,List<User>, User>() {
            @Override
            public User apply(@DataConsumer("user") User user, 
                              @DataConsumer("posts") List<Post> posts,
                              @DataConsumer("followers") List<User> followers) {
                user.setPosts(posts);
                user.setFollowers(followers);
                return user;
            }
     });
Assert.notNull(user,"user not null");
Assert.notNull(user.getPosts(),"user posts not null");

這裏的Function3接收4個泛型參數, 最後一個User表示返回值類型, 前3個參數依次對應apply方法的3個入參類型. 項目預約義了Function2-Function5, 支持不超過5個參數, 若是你須要更多參數, 能夠編寫一個接口(FunctionInterface), 繼承MultipleArgumentsFunction接口便可.

很顯然

  • 每個 @DataConsumer 只會對應一個 @DataProvider .
  • 一個 @DataProvider 可能被多個 @DataConsumer 消費 .
  • 一個 @DataProvider 經過多個 @DataConsumer 依賴上多個 @DataProvider.
**如今, 就有這樣一個項目, 實現了上述功能. 只須要在你的方法上, 添加一些註解. 就能夠迅速地讓你的調用樹轉爲並行.** **項目地址:** https://github.com/lvyahui8/spring-boot-data-aggregator

你不用care底層如何實現. 只有在你有定製化的需求時, 纔去關心一些配置參數. 去擴展一些能力.

實現原理

  1. 在Spring啓動之時, 掃描應用中的 @DataProvider@DataConsumer 註解. 分析記錄下依賴關係(有向非連通圖), 而且記錄好@DataProvider和Spring Bean的映射關係.
  2. 當進行查詢時, 從已經記錄好的依賴關係中拿出依賴樹, 使用線程池和閉鎖(CountLatchDown), 遞歸異步調用孩子節點對應的Bean方法, 拿到結果後做爲入參注入當前節點 (近似廣度優先, 但由於並行的緣由, 節點的訪問順序是不肯定的).
  3. 在發起遞歸調用前, 傳入進一個map, 用來存放查詢參數, 方法中沒有@DataConsumer註解的入參, 將今後map中取值.
  4. @DataProvider@DataConsumer 註解能夠支持一些參數, 用來控制超時時間\異常處理方式\是否冪等緩存等等.

怎麼解決並行/異步後引入的新問題

超時怎麼控制 ?

@DataProvider 註解支持 timeout 參數, 用來控制超時. 實現原理是經過閉鎖的超時等待方法.

java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)

異常怎麼處理 ?

對異常提供兩種處理方式: 吞沒或者向上層拋出.

@DataConsumer 註解支持exceptionProcessingMethod 參數, 用來表示這個Consumer想怎麼處理Provider拋出的異常.

固然, 也支持在全局維度配置. 全局配置的優先級低於(<)Consumer配置的優先級.

依賴關係有死循環怎麼辦 ?

Spring Bean初始化, 由於Bean建立和Bean屬性賦值分了兩步走, 所以能夠用所謂的"早期引用"解決循環依賴的問題.

但若是你循環依賴的Bean, 依賴關係定義在構造函數入參上, 那麼是無法解決循環依賴的問題的.

同理, 咱們經過方法入參, 異步注入依賴數據, 在方法入參沒有變化的狀況下, 也是沒法結束死循環的. 所以必須禁止循環依賴.

那麼問題變爲了怎麼禁止循環依賴. 或者說, 怎麼檢測有向非聯通圖中的循環依賴, 兩個辦法:

  • 帶染色的DFS遍歷: 節點入棧訪問前, 先標記節點狀態爲"訪問中", 以後遞歸訪問孩子節點, 遞歸完成後, 將節點標記爲"訪問完成". 若是在DFS遞歸過程當中, 再次訪問到"訪問中"的節點, 說明有環.
  • 拓撲排序: 把有向圖的節點排成一個序列, 不存在索引號較高的節點指向索引號較低的節點, 表示圖存在拓撲排序. 拓撲排序的實現方法是, 先刪除入度爲0的節點, 並將領接節點的入度 - 1, 直到全部節點都被刪除. 很顯然, 若是有向圖中有環, 那麼環裏節點的入度不可能爲0 , 那麼節點不可能刪完. 所以, 只要知足節點未刪完 && 不存在入度爲0的節點, 那麼必定有環.

這裏咱們用領接表+DFS染色搜索, 來實現環的檢查

private void checkCycle(Map<String,Set<String>> graphAdjMap) {
    Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
    for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
        if (visitStatusMap.containsKey(item.getKey())) {
            continue;
        }
        dfs(graphAdjMap,visitStatusMap,item.getKey());
    }
}

private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
    if (visitStatusMap.containsKey(node)) {
        if(visitStatusMap.get(node) == 1) {
            List<String> relatedNodes = new ArrayList<>();
            for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
                if (item.getValue() == 1) {
                    relatedNodes.add(item.getKey());
                }
            }
            throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
        }
        return ;
    }
    visitStatusMap.put(node,1);
    log.info("visited:{}", node);
    for (String relateNode : graphAdjMap.get(node)) {
        dfs(graphAdjMap,visitStatusMap,relateNode);
    }
    visitStatusMap.put(node,2);
}

ThreadLocal怎麼處理?

許多的框架都使用了ThreadLocal來實現Context來保存單次請求中的一些共享數據, Spring也不例外.

衆所周知, ThreadLocal實際是訪問Thread中一個特殊Map的入口. ThreadLocal只能訪問當前Thread的數據(副本), 若是跨越了線程, 是拿不到到其餘ThreadLocalMap的數據的.

解決方法

如圖

  1. 在當前線程提交異步任務前, 將當前線程ThreadLocal執行的數據"捆綁"到任務實例中
  2. 當任務開始執行時, 從任務實例中取出數據, 恢復到當前異步線程的ThreadLocal中
  3. 當任務結束後, 清理當前異步線程的ThreadLocal.

這裏, 咱們先定義一個接口, 來描述這3個動做

public interface AsyncQueryTaskWrapper {
    /**
     * 任務提交以前執行. 此方法在提交任務的那個線程中執行
     */
    void beforeSubmit();

    /**
     * 任務開始執行前執行. 此方法在異步線程中執行
     * @param taskFrom 提交任務的那個線程
     */
    void beforeExecute(Thread taskFrom);

    /**
     * 任務執行結束後執行. 此方法在異步線程中執行
     * 注意, 無論用戶的方法拋出何種異常, 此方法都會執行.
     * @param taskFrom 提交任務的那個線程
     */
    void afterExecute(Thread taskFrom);
}

爲了讓咱們定義的3個動做起做用. 咱們須要重寫一下 java.util.concurrent.Callable#call方法.

public abstract class AsyncQueryTask<T> implements Callable<T> {
    Thread      taskFromThread;
    AsyncQueryTaskWrapper asyncQueryTaskWrapper;

    public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
        this.taskFromThread = taskFromThread;
        this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
    }

    @Override
    public T call() throws Exception {
        try {
            if(asyncQueryTaskWrapper != null) {
                asyncQueryTaskWrapper.beforeExecute(taskFromThread);
            }
            return execute();
        } finally {
            if (asyncQueryTaskWrapper != null) {
                asyncQueryTaskWrapper.afterExecute(taskFromThread);
            }
        }
    }

    /**
     * 提交任務時, 業務方實現這個替代方法
     *
     * @return
     * @throws Exception
     */
    public abstract T  execute() throws Exception;
}

接下來, 向線程池提交任務時, 再也不直接提交Callable匿名類實例, 而是提交AsyncQueryTask實例. 而且在提交前觸發 taskWrapper.beforeSubmit();

AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
// 任務提交前執行動做.
taskWrapper.beforeSubmit();
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
    @Override
    public Object execute() throws Exception {
        try {
            // something to do
        } finally {
            stopDownLatch.countDown();
        }
    }
});
你要作什麼?

你只須要定義一個類, 實現這個接口, 並將這個類加到配置文件中去.

@Slf4j
public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
    /**
     * "捆綁" 在任務實例中的數據
     */
    private Long tenantId;
    private User user;

    @Override
    public void beforeSubmit() {
        /* 提交任務前, 先從當前線程拷貝出ThreadLocal中的數據到任務中 */
        log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
        this.tenantId = RequestContext.getTenantId();
        this.user = ExampleAppContext.getUser();
    }

    @Override
    public void beforeExecute(Thread taskFrom) {
        /* 任務提交後, 執行前, 在異步線程中用數據恢復ThreadLocal(Context) */
        log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
        RequestContext.setTenantId(tenantId);
        ExampleAppContext.setLoggedUser(user);
    }

    @Override
    public void afterExecute(Thread taskFrom) {
        /* 任務執行完成後, 清理異步線程中的ThreadLocal(Context) */
        log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
        RequestContext.removeTenantId();
        ExampleAppContext.remove();
    }
}

添加配置使TaskWapper生效.

io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper

怎麼監控每一次的異步調用?

解決辦法

咱們先把一次查詢, 分爲如下幾個生命週期

  • 查詢任務初次提交 (querySubmitted)
  • 某一個Provider節點開始執行前 (queryBefore)
  • 某一個Provider節點執行完成後 (queryAfter)
  • 查詢所有完成 (queryFinished)
  • 查詢異常 (exceptionHandle)

轉換成接口以下

public interface AggregateQueryInterceptor {
    /**
     * 查詢正常提交, Context已經建立
     *
     * @param aggregationContext 查詢上下文
     * @return 返回爲true才繼續執行
     */
    boolean querySubmitted(AggregationContext aggregationContext) ;

    /**
     * 每一個Provider方法執行前, 將調用此方法. 存在併發調用
     *
     * @param aggregationContext 查詢上下文
     * @param provideDefinition 將被執行的Provider
     */
    void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition);

    /**
     * 每一個Provider方法執行成功以後, 調用此方法. 存在併發調用
     *
     * @param aggregationContext 查詢上下文
     * @param provideDefinition 被執行的Provider
     * @param result 查詢結果
     * @return 返回結果, 如不修改不, 請直接返回參數中的result
     */
    Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result);

    /**
     * 每一個Provider執行時, 若是拋出異常, 將調用此方法. 存在併發調用
     *
     * @param aggregationContext  查詢上下文
     * @param provideDefinition 被執行的Provider
     * @param e Provider拋出的異常
     */
    void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e);

    /**
     * 一次查詢所有完成.
     *
     * @param aggregationContext 查詢上下文
     */
    void queryFinished(AggregationContext aggregationContext);
}

在Spring應用啓動之初, 獲取全部實現了AggregateQueryInterceptor接口的Bean, 並按照Order註解排序, 做爲攔截器鏈.

至於攔截器如何執行. 很簡單, 在遞歸提交查詢任務時, 插入執行一些鉤子(hook)函數便可. 涉及到的代碼不少, 就不貼在這裏, 感興趣的能夠去github clone代碼查看.

你要作什麼?

你能夠實現一個攔截器, 在攔截器中輸出日誌, 監控節點執行狀態(耗時, 出入參), 以下:

@Component
@Order(2)
@Slf4j
public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor {
    @Override
    public boolean querySubmitted(AggregationContext aggregationContext) {
        log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName());
        return true;
    }

    @Override
    public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) {
        log.info("query before. provider:{}",provideDefinition.getMethod().getName());
    }

    @Override
    public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) {
        log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString());
        return result;
    }

    @Override
    public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) {
        log.error(e.getMessage());
    }

    @Override
    public void queryFinished(AggregationContext aggregationContext) {
        log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName());
    }
}

項目地址

最後, 再次貼一下項目地址: . spring-boot-data-aggregator

歡迎拍磚, 歡迎star, 歡迎使用

相關文章
相關標籤/搜索