高併發場景-請求合併(二)揭祕HystrixCollapser-利用Queue和線程池異步實現

背景

在互聯網的高併發場景下,請求會很是多,可是數據庫鏈接池比較少,或者說須要減小CPU壓力,減小處理邏輯的,須要把單個查詢,用某些手段,改成批量查詢多個後返回。
如:支付寶中,查詢「我的信息」,用戶只會觸發一次請求,查詢本身的信息,可是多我的同時這樣作就會產生屢次數據庫鏈接。爲了減小鏈接,須要在JAVA服務端進行合併請求,把多個「我的信息」查詢接口,合併爲批量查詢多個「我的信息」接口,而後以我的信息在數據庫的id做爲Key返回給上游系統或者頁面URL等調用方。java

目的

  1. 減小訪問數據庫的次數
  2. 單位時間內的多個請求,合併爲一個請求。讓業務邏輯層把單個查詢的sql,改成批量查詢的sql。或者邏輯裏面須要調用redis,那批量邏輯裏面就能夠用redis的pipeline去實現。
  3. 本次須要使用JDK原生手段來實現請求合併,由於你們不必定會有Hystrix,因此用原生辦法實現,並解析HystrixCollapser裏面是如何實現的。
點贊再看,關注公衆號:【地藏思惟】給你們分享互聯網場景設計與架構設計方案
掘金:地藏Kelvin https://juejin.im/user/5d67da8d6fb9a06aff5e85f7

主要解決手段

  1. SpringCloud的Hystrix的自定義HystrixCollapse和HystrixCommand
  2. SpringCloud的Hystrix註解方式。
  3. 沒有服務治理框架時,利用JDK隊列、定時任務線程池處理。

在上一章已經說了第一二種,鑑於有同窗沒有SpringCloud,因此使用第3種來作請求合併,並一塊兒分析請求合併的原理。git

建議先看第一章,第二章至關於爲HystrixCollapser的內部原理描述
高併發場景-請求合併(一)SpringCloud中Hystrix請求合併

交互流程

開發

本章節爲利用JDK原生包開發,因此沒有SpringCloud那麼多東西要配置,編寫代碼只有一個類。redis

1. 建立請求層

只須要暴露單個查詢的接口,業務邏輯層裏作請求合併的邏輯。spring

@RestController
public class UserController {
    @Autowired
    private UserBatchWithFutureServiceImpl userBatchWithFutureServiceImpl;
   @RequestMapping(method = RequestMethod.GET,value = "/userbyMergeWithFuture/{id}")
    public User userbyMergeWithFuture(@PathVariable Long id) throws InterruptedException, ExecutionException {
        User user = this.userBatchWithFutureServiceImpl.getUserById(id);
        return user;
    }
}

2. 請求合併邏輯層

  1. 建立請求合併邏輯入口
  2. 建立阻塞隊列,用於累計多個請求參數
  3. 建立CompletableFuture類,爲了本條線程阻塞,等批量查詢處理完後,異步獲取當前id對應的User結果信息。
  4. 執行CompletableFuture.get方法等待異步結果通知。
@Component
public class UserBatchWithFutureServiceImpl {
    /** 積攢請求的阻塞隊列 */
    private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>();

    public User getUserById(Long id) throws InterruptedException, ExecutionException {

        UserQueryDto userQueryDto = new UserQueryDto();
        userQueryDto.setId(id);
        CompletableFuture<User> completedFuture = new CompletableFuture<>();
        userQueryDto.setCompletedFuture(completedFuture);

        requestQueue.add(userQueryDto);

        User user = completedFuture.get();
        return user;
    }

HystrixCollapser也是利用這種辦法來作異步通知的手段,讓請求接口主線程在得到真正結果前阻塞等待。sql

3. 定時任務

在相同的類下建立定時任務,利用@PostConstruct讓當前類的Bean構造完後執行該方法,生成一個5秒定時任務。
你們能夠設定定時的時間,我爲了比較方便測試,而用了5秒。數據庫

/** 線程池數量 */
    private int threadNum = 1;
    /** 定時間隔時長 */
    private long period = 5000;
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);
        // 每5秒執行一次
        scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, createDeviceMergePeriod,
                TimeUnit.MILLISECONDS);
    }

HystrixCollapser的每隔n毫秒就會處理一次執行單個方法轉批量方法,也是經過這類來實現的。架構

4. 在UserBatchWithFutureServiceImpl 類下建立內部類

建立內部類爲了定時任務執行此邏輯,而且爲了代碼整潔,不在建立線程池時編寫大方法塊的代碼。併發

在內部類裏面主要邏輯:app

  1. 從存放請求接口參數的requestQueue 隊列中,獲取全部成員,並放入當此觸發任務邏輯的局部變量中
  2. 而且取出關鍵的請求參數id放入局部變量List中。
  3. 只要獲取出變量,則進行批量查詢
  4. 最後利用CompletedFuture異步通知並喚醒getUserById方法等待的線程。
public class UserBatchThread implements Runnable {

        @Override
        public void run() {
            List<UserQueryDto> requestQueueTmp = new ArrayList<>();
            // 存放批量查詢的入參
            List<Long> requestId = new ArrayList<>();

            // 把出請求層放入的消息queue的元素取出來
            int size = requestQueue.size();
            for (int i = 0; i < size; i++) {
                UserQueryDto request = requestQueue.poll();
                if (Objects.nonNull(request)) {
                    requestQueueTmp.add(request);
                    requestId.add(request.getId());
                }
            }

            if (!requestId.isEmpty()) {
                try {
                    List<User> response = getUserBatchById(requestId);
                    Map<Long, User> collect = response.stream().collect(
                            Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2));
                    // 通知請求的線程
                    for (UserQueryDto request : requestQueueTmp) {
                        request.getCompletedFuture().complete(collect.get(request.getId()));
                    }

                } catch (Exception e) {
                    // 通知請求的線程-異常
                    requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e));
                }
            }
        }

    }

    public List<User> getUserBatchById(List<Long> ids) {
        System.out.println("進入批量處理方法" + ids);
        List<User> ps = new ArrayList<>();
        for (Long id : ids) {
            User p = new User();
            p.setId(id);
            p.setUsername("dizang" + id);
            ps.add(p);
        }
        return ps;
    }

請求接口中入隊列的元素,就會從這裏取出,HystrixCollasper也是利用這種poll方法原子性的獲取隊列裏面元素,不會被定時任務的屢次觸發而重複的獲取,只要知足有至少一個都會作批量查詢,因此HystrixCollasper合併請求時,即便n毫秒內只有一個請求,也會去處理。框架

測試驗證

  1. 同上一章同樣觸發Swagger-ui頁面
  2. 請求兩次不一樣的參數
  3. 結果以下圖中,console日誌已經輸出了兩次請求的入參

總結

到這裏相信你們都已經完成了合併請求了。此次沒有依賴框架,基於原生作法,利用隊列存查詢所需的入參,而後利用線程池定時地獲取隊列的入參,再批量處理,利用線程的Future作異步返回結果。這樣咱們就理解了SpringCloud的HystrixCollasper的內部流程了。但願可以幫助沒有框架的項目,或者公司技術棧不合適的狀況下的同窗。

本文Demo

都在我springcloud的demo裏面了,看provider-hystrix-request-merge這個工程下的內容,在UserBatchWithFutureServiceImpl類中。

https://gitee.com/kelvin-cai/spring-cloud-demo


歡迎關注公衆號,文章更快一步

個人公衆號 :地藏思惟

掘金:地藏Kelvin

簡書:地藏Kelvin

個人Gitee: 地藏Kelvin https://gitee.com/kelvin-cai

相關文章
相關標籤/搜索