讓咱們先看個圖, Taobao的PDP(Product Detail Page)頁.java
打開Chrome Network面板, 讓咱們來看taobao是怎麼加載這個頁面數據的. 根據經驗, 通常是異步加載的, 要麼是XHR,要麼就是js(jsonp), 你應該很快能夠找到node
還能看到這個接口的性能git
神奇的是, taobao居然在一次請求中拉下了整個PDP頁的完整數據, 並且服務端處理耗時不到125msgithub
首先, 這麼作有什麼好處?spring
而後, 這又是怎麼作到的呢?json
你可能會說緩存, 但你要知道, 這樣一個對電商極爲重要的頁面, 絕對涉及到了很是多的團隊, 好比:c#
即便每一個團隊的數據全都是緩存的, 你一個個去拿, 要在125ms內拿完也不容易. 並且做爲跟錢相關的頁面, 部分數據必須保證絕對實時有效, 能用緩存的地方很少. 怎麼辦, 若是是你, 你會怎麼作? 離線打標? 數據預熱? etc..後端
此時, 並行調用不失爲一種好辦法.緩存
分析一下這個頁面, 你會發現, 每個模塊除了屬於同一個商品(入參相同), 其實各個模塊的數據之間, 並無依賴性, 徹底能夠並行去獲取.網絡
並行獲取數據, 能夠提升咱們的接口性能. 但也會引入一些問題, 如:
下面, 咱們來討論下如何簡單\易用\高效的並行獲取數據; 如何解決上述異步問題.
假如你如今須要用戶的基礎信息\博客列表\粉絲列表 3份數據. 哪麼你有哪些方式能夠並行獲取呢?
最簡單原始的辦法, 直接使用Java提供了的線程池和Future機制.
public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(3); CountDownLatch countDownLatch = new CountDownLatch(3); Future<User> userFuture = executorService.submit(() -> { try{ return userService.get(userId); }finally { countDownLatch.countDown(); } }); Future<List<Post>> postsFuture = executorService.submit(() -> { try{ return postService.getPosts(userId); }finally { countDownLatch.countDown(); } }); Future<List<User>> followersFuture = executorService.submit(() -> { try{ return followService.getFollowers(userId); }finally { countDownLatch.countDown(); } }); countDownLatch.await(); User user = userFuture.get(); user.setFollowers(followersFuture.get()); user.setPosts(postsFuture.get()); return user; }
咱們知道, Spring支持@Async註解, 能夠方便的實現異步, 而且支持獲取返回值. 參考: https://www.baeldung.com/spring-async#2-methods-with-return-type
@Async實現的原理實際是在Bean的代理類的方法中, 攔截方法調用, 向taskExecutor Bean中提交Callable任務. 原理跟本身用Java ThreadPool寫其實區別不大.
那麼要用Spring Async實現上述功能. 首先須要修改下面3個方法的返回值, 而且修改返回值類型, 併爲方法添加 @Async註解
class UserServiceImpl implements UserService { @Async public Future<User> get(Long userId) { // ... something } } class PostServiceImpl implements PostService { @Async public Future<List<Post> getPosts(Long userId) { // ... something } } class FollowServiceImpl implements FollowService { @Async public Future<List<User> getFollowers(Long userId) { // ... something } }
並行獲取3份用戶數據而後聚合, 代碼以下:
public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException { Future<User> userFuture = userService.get(userId); Future<List<Post>> postsFuture = postService.getPosts(userId); Future<List<User>> followersFuture = followService.getFollowers(userId); User user = whileGet(userFuture); user.setFollowers(whileGet(followersFuture)); user.setPosts(whileGet(postsFuture)); return user; } private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException { while(true) { if (future.isDone()) { break; } } return future.get(); }
這裏使用自旋去獲取異步數據. 固然你也能夠像前面那樣, 傳遞一個閉鎖(CountDownLatch)到Service中去, 而後讓主調線程在一個閉鎖上面等待.
上面2種方式的確能實現功能, 但首先, 它們都很不直觀, 並且沒有處理前面講到的異步問題, 一旦出現超時\異常\ThreadLocal, 代碼可能不會按照你預期的方式工做. 那有沒有更簡單方即可靠的方法呢?
試想這樣一種方式, 若是你須要的數據, 均可以經過方法入參自動並行獲取, 而後傳遞給你, 那是否是很方便? 就像這樣:
@Component public class UserAggregate { @DataProvider("userWithPosts") public User userWithPosts( @DataConsumer("user") User user, @DataConsumer("posts") List<Post> posts, @DataConsumer("followers") List<User> followers) { user.setPosts(posts); user.setFollowers(followers); return user; } }
這裏的@DataConsumer
聲明瞭你要異步獲取的數據id. @DataProvider
聲明瞭這個方法提供數據, 而且id爲userWithPosts.
或者你不想寫這樣一個Aggregate類, 你不須要複用, 你想直接建立一個"匿名Provider". 那麼你能夠直接在任何地方像下面這樣調用拿結果
User user = dataBeanAggregateQueryFacade.get( Collections.singletonMap("userId", 1L), new Function3<User, List<Post>,List<User>, User>() { @Override public User apply(@DataConsumer("user") User user, @DataConsumer("posts") List<Post> posts, @DataConsumer("followers") List<User> followers) { user.setPosts(posts); user.setFollowers(followers); return user; } }); Assert.notNull(user,"user not null"); Assert.notNull(user.getPosts(),"user posts not null");
這裏的Function3接收4個泛型參數, 最後一個User表示返回值類型, 前3個參數依次對應apply方法的3個入參類型. 項目預約義了Function2-Function5, 支持不超過5個參數, 若是你須要更多參數, 能夠編寫一個接口(FunctionInterface), 繼承MultipleArgumentsFunction接口便可.
很顯然
@DataConsumer
只會對應一個 @DataProvider
.@DataProvider
可能被多個 @DataConsumer
消費 .@DataProvider
經過多個 @DataConsumer
依賴上多個 @DataProvider
.你不用care底層如何實現. 只有在你有定製化的需求時, 纔去關心一些配置參數. 去擴展一些能力.
@DataProvider
和 @DataConsumer
註解. 分析記錄下依賴關係(有向非連通圖), 而且記錄好@DataProvider
和Spring Bean的映射關係.@DataConsumer
註解的入參, 將今後map中取值.@DataProvider
和 @DataConsumer
註解能夠支持一些參數, 用來控制超時時間\異常處理方式\是否冪等緩存等等.@DataProvider
註解支持 timeout
參數, 用來控制超時. 實現原理是經過閉鎖的超時等待方法.
java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)
對異常提供兩種處理方式: 吞沒或者向上層拋出.
@DataConsumer
註解支持exceptionProcessingMethod
參數, 用來表示這個Consumer想怎麼處理Provider拋出的異常.
固然, 也支持在全局維度配置. 全局配置的優先級低於(<)Consumer配置的優先級.
Spring Bean初始化, 由於Bean建立和Bean屬性賦值分了兩步走, 所以能夠用所謂的"早期引用"解決循環依賴的問題.
但若是你循環依賴的Bean, 依賴關係定義在構造函數入參上, 那麼是無法解決循環依賴的問題的.
同理, 咱們經過方法入參, 異步注入依賴數據, 在方法入參沒有變化的狀況下, 也是沒法結束死循環的. 所以必須禁止循環依賴.
那麼問題變爲了怎麼禁止循環依賴. 或者說, 怎麼檢測有向非聯通圖中的循環依賴, 兩個辦法:
這裏咱們用領接表+DFS染色搜索, 來實現環的檢查
private void checkCycle(Map<String,Set<String>> graphAdjMap) { Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2); for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) { if (visitStatusMap.containsKey(item.getKey())) { continue; } dfs(graphAdjMap,visitStatusMap,item.getKey()); } } private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) { if (visitStatusMap.containsKey(node)) { if(visitStatusMap.get(node) == 1) { List<String> relatedNodes = new ArrayList<>(); for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) { if (item.getValue() == 1) { relatedNodes.add(item.getKey()); } } throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes)); } return ; } visitStatusMap.put(node,1); log.info("visited:{}", node); for (String relateNode : graphAdjMap.get(node)) { dfs(graphAdjMap,visitStatusMap,relateNode); } visitStatusMap.put(node,2); }
許多的框架都使用了ThreadLocal來實現Context來保存單次請求中的一些共享數據, Spring也不例外.
衆所周知, ThreadLocal實際是訪問Thread中一個特殊Map的入口. ThreadLocal只能訪問當前Thread的數據(副本), 若是跨越了線程, 是拿不到到其餘ThreadLocalMap的數據的.
如圖
這裏, 咱們先定義一個接口, 來描述這3個動做
public interface AsyncQueryTaskWrapper { /** * 任務提交以前執行. 此方法在提交任務的那個線程中執行 */ void beforeSubmit(); /** * 任務開始執行前執行. 此方法在異步線程中執行 * @param taskFrom 提交任務的那個線程 */ void beforeExecute(Thread taskFrom); /** * 任務執行結束後執行. 此方法在異步線程中執行 * 注意, 無論用戶的方法拋出何種異常, 此方法都會執行. * @param taskFrom 提交任務的那個線程 */ void afterExecute(Thread taskFrom); }
爲了讓咱們定義的3個動做起做用. 咱們須要重寫一下 java.util.concurrent.Callable#call方法.
public abstract class AsyncQueryTask<T> implements Callable<T> { Thread taskFromThread; AsyncQueryTaskWrapper asyncQueryTaskWrapper; public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) { this.taskFromThread = taskFromThread; this.asyncQueryTaskWrapper = asyncQueryTaskWrapper; } @Override public T call() throws Exception { try { if(asyncQueryTaskWrapper != null) { asyncQueryTaskWrapper.beforeExecute(taskFromThread); } return execute(); } finally { if (asyncQueryTaskWrapper != null) { asyncQueryTaskWrapper.afterExecute(taskFromThread); } } } /** * 提交任務時, 業務方實現這個替代方法 * * @return * @throws Exception */ public abstract T execute() throws Exception; }
接下來, 向線程池提交任務時, 再也不直接提交Callable匿名類實例, 而是提交AsyncQueryTask實例. 而且在提交前觸發 taskWrapper.beforeSubmit();
AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper(); // 任務提交前執行動做. taskWrapper.beforeSubmit(); Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) { @Override public Object execute() throws Exception { try { // something to do } finally { stopDownLatch.countDown(); } } });
你只須要定義一個類, 實現這個接口, 並將這個類加到配置文件中去.
@Slf4j public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper { /** * "捆綁" 在任務實例中的數據 */ private Long tenantId; private User user; @Override public void beforeSubmit() { /* 提交任務前, 先從當前線程拷貝出ThreadLocal中的數據到任務中 */ log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName()); this.tenantId = RequestContext.getTenantId(); this.user = ExampleAppContext.getUser(); } @Override public void beforeExecute(Thread taskFrom) { /* 任務提交後, 執行前, 在異步線程中用數據恢復ThreadLocal(Context) */ log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName()); RequestContext.setTenantId(tenantId); ExampleAppContext.setLoggedUser(user); } @Override public void afterExecute(Thread taskFrom) { /* 任務執行完成後, 清理異步線程中的ThreadLocal(Context) */ log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName()); RequestContext.removeTenantId(); ExampleAppContext.remove(); } }
添加配置使TaskWapper生效.
io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper
咱們先把一次查詢, 分爲如下幾個生命週期
轉換成接口以下
public interface AggregateQueryInterceptor { /** * 查詢正常提交, Context已經建立 * * @param aggregationContext 查詢上下文 * @return 返回爲true才繼續執行 */ boolean querySubmitted(AggregationContext aggregationContext) ; /** * 每一個Provider方法執行前, 將調用此方法. 存在併發調用 * * @param aggregationContext 查詢上下文 * @param provideDefinition 將被執行的Provider */ void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition); /** * 每一個Provider方法執行成功以後, 調用此方法. 存在併發調用 * * @param aggregationContext 查詢上下文 * @param provideDefinition 被執行的Provider * @param result 查詢結果 * @return 返回結果, 如不修改不, 請直接返回參數中的result */ Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result); /** * 每一個Provider執行時, 若是拋出異常, 將調用此方法. 存在併發調用 * * @param aggregationContext 查詢上下文 * @param provideDefinition 被執行的Provider * @param e Provider拋出的異常 */ void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e); /** * 一次查詢所有完成. * * @param aggregationContext 查詢上下文 */ void queryFinished(AggregationContext aggregationContext); }
在Spring應用啓動之初, 獲取全部實現了AggregateQueryInterceptor接口的Bean, 並按照Order註解排序, 做爲攔截器鏈.
至於攔截器如何執行. 很簡單, 在遞歸提交查詢任務時, 插入執行一些鉤子(hook)函數便可. 涉及到的代碼不少, 就不貼在這裏, 感興趣的能夠去github clone代碼查看.
你能夠實現一個攔截器, 在攔截器中輸出日誌, 監控節點執行狀態(耗時, 出入參), 以下:
@Component @Order(2) @Slf4j public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor { @Override public boolean querySubmitted(AggregationContext aggregationContext) { log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName()); return true; } @Override public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) { log.info("query before. provider:{}",provideDefinition.getMethod().getName()); } @Override public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) { log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString()); return result; } @Override public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) { log.error(e.getMessage()); } @Override public void queryFinished(AggregationContext aggregationContext) { log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName()); } }
最後, 再次貼一下項目地址: . spring-boot-data-aggregator
歡迎拍磚, 歡迎star, 歡迎使用