在互聯網的高併發場景下,請求會很是多,可是數據庫鏈接池比較少,或者說須要減小CPU壓力,減小處理邏輯的,須要把單個查詢,用某些手段,改成批量查詢多個後返回。
如:支付寶中,查詢「我的信息」,用戶只會觸發一次請求,查詢本身的信息,可是多我的同時這樣作就會產生屢次數據庫鏈接。爲了減小鏈接,須要在JAVA服務端進行合併請求,把多個「我的信息」查詢接口,合併爲批量查詢多個「我的信息」接口,而後以我的信息在數據庫的id做爲Key返回給上游系統或者頁面URL等調用方。java
點贊再看,關注公衆號:【地藏思惟】給你們分享互聯網場景設計與架構設計方案
掘金:地藏Kelvin https://juejin.im/user/5d67da8d6fb9a06aff5e85f7
在上一章已經說了第一二種,鑑於有同窗沒有SpringCloud,因此使用第3種來作請求合併,並一塊兒分析請求合併的原理。git
建議先看第一章,第二章至關於爲HystrixCollapser的內部原理描述
高併發場景-請求合併(一)SpringCloud中Hystrix請求合併
本章節爲利用JDK原生包開發,因此沒有SpringCloud那麼多東西要配置,編寫代碼只有一個類。redis
只須要暴露單個查詢的接口,業務邏輯層裏作請求合併的邏輯。spring
@RestController public class UserController { @Autowired private UserBatchWithFutureServiceImpl userBatchWithFutureServiceImpl; @RequestMapping(method = RequestMethod.GET,value = "/userbyMergeWithFuture/{id}") public User userbyMergeWithFuture(@PathVariable Long id) throws InterruptedException, ExecutionException { User user = this.userBatchWithFutureServiceImpl.getUserById(id); return user; } }
@Component public class UserBatchWithFutureServiceImpl { /** 積攢請求的阻塞隊列 */ private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>(); public User getUserById(Long id) throws InterruptedException, ExecutionException { UserQueryDto userQueryDto = new UserQueryDto(); userQueryDto.setId(id); CompletableFuture<User> completedFuture = new CompletableFuture<>(); userQueryDto.setCompletedFuture(completedFuture); requestQueue.add(userQueryDto); User user = completedFuture.get(); return user; }
HystrixCollapser也是利用這種辦法來作異步通知的手段,讓請求接口主線程在得到真正結果前阻塞等待。sql
在相同的類下建立定時任務,利用@PostConstruct讓當前類的Bean構造完後執行該方法,生成一個5秒定時任務。
你們能夠設定定時的時間,我爲了比較方便測試,而用了5秒。數據庫
/** 線程池數量 */ private int threadNum = 1; /** 定時間隔時長 */ private long period = 5000; @PostConstruct public void init() { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum); // 每5秒執行一次 scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS); }
HystrixCollapser的每隔n毫秒就會處理一次執行單個方法轉批量方法,也是經過這類來實現的。架構
建立內部類爲了定時任務執行此邏輯,而且爲了代碼整潔,不在建立線程池時編寫大方法塊的代碼。併發
在內部類裏面主要邏輯:app
public class UserBatchThread implements Runnable { @Override public void run() { List<UserQueryDto> requestQueueTmp = new ArrayList<>(); // 存放批量查詢的入參 List<Long> requestId = new ArrayList<>(); // 把出請求層放入的消息queue的元素取出來 int size = requestQueue.size(); for (int i = 0; i < size; i++) { UserQueryDto request = requestQueue.poll(); if (Objects.nonNull(request)) { requestQueueTmp.add(request); requestId.add(request.getId()); } } if (!requestId.isEmpty()) { try { List<User> response = getUserBatchById(requestId); Map<Long, User> collect = response.stream().collect( Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2)); // 通知請求的線程 for (UserQueryDto request : requestQueueTmp) { request.getCompletedFuture().complete(collect.get(request.getId())); } } catch (Exception e) { // 通知請求的線程-異常 requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e)); } } } } public List<User> getUserBatchById(List<Long> ids) { System.out.println("進入批量處理方法" + ids); List<User> ps = new ArrayList<>(); for (Long id : ids) { User p = new User(); p.setId(id); p.setUsername("dizang" + id); ps.add(p); } return ps; }
請求接口中入隊列的元素,就會從這裏取出,HystrixCollasper也是利用這種poll方法原子性的獲取隊列裏面元素,不會被定時任務的屢次觸發而重複的獲取,只要知足有至少一個都會作批量查詢,因此HystrixCollasper合併請求時,即便n毫秒內只有一個請求,也會去處理。框架
到這裏相信你們都已經完成了合併請求了。此次沒有依賴框架,基於原生作法,利用隊列存查詢所需的入參,而後利用線程池定時地獲取隊列的入參,再批量處理,利用線程的Future作異步返回結果。這樣咱們就理解了SpringCloud的HystrixCollasper的內部流程了。但願可以幫助沒有框架的項目,或者公司技術棧不合適的狀況下的同窗。
都在我springcloud的demo裏面了,看provider-hystrix-request-merge這個工程下的內容,在UserBatchWithFutureServiceImpl類中。
https://gitee.com/kelvin-cai/spring-cloud-demo
個人公衆號 :地藏思惟
掘金:地藏Kelvin
簡書:地藏Kelvin
個人Gitee: 地藏Kelvin https://gitee.com/kelvin-cai