項目中通常會請求第三方的接口,也會對外提供接口,多是RPC,也多是HTTP等方式。在對外提供接口時,有必要提供相應的批量接口,好的批量實現可以提高性能。前端
高併發場景中,調用批量接口相比調用非批量接口有更大的性能優點。但有時候,請求更多的是單個接口,不可以直接調用批量接口,若是這個接口是高頻接口,對其作請求合併就頗有必要了。好比電影網站的獲取電影詳情接口,APP的一次請求是單個接口調用,用戶量少的時候請求也很少,徹底沒問題;但同一時刻每每有大量用戶訪問電影詳情,是個高併發的高頻接口,若是都是單次查詢,後臺就不必定能hold住了。爲了優化這個接口,後臺能夠將相同的請求進行合併,而後調用批量的查詢接口。以下圖所示java
合併請求前,咱們通常是調用服務層的單次建立方法。看起來都比較簡單,且易於理解。spring
以建立設備接口爲例。緩存
@Reference(check = false) private DeviceService deviceService; /** * 註冊設備 * * @param productKey 產品key * @param deviceName 設備名 * @return 設備ID */ public R<Long> registDevice(String productKey, String deviceName) { log.debug("開始註冊: {}, {}", productKey, deviceName); DeviceRequestDto deviceCreateQuery = new DeviceRequestDto() .setProductKey(productKey) .setName(deviceName); Long deviceId = deviceService.createDevice(deviceCreateQuery); return deviceId != null ? R.ok(deviceId) : R.error(DEVICE_CREATE_ERROR); }
請求合併的好處前面有提到,那不能每次寫接口就作請求合併吧?咱們要明白,技術無好壞,要在特定的業務場景下衡量利弊,採用與否須要深思熟慮。合併請求會令代碼變得複雜,也會增長必定的接口延遲,其中還可能存在各類未知的風險。多線程
合併請求是針對高併發場景的一種手段,咱們實現請求合併以前,要結合業務場景思考一番,是否值得承受的合併帶來的訪問延遲?用戶體驗是否會打折扣?自身的技術是否足夠hold住請求合併帶來的未知風險?併發
思路:收到前端的請求時,先存起來,隔段時間批量請求第三方服務批量接口,而後分別通知存起來的請求,而且響應前端。框架
仍是針對上述設備註冊接口,咱們對其進行改造,來實現一個簡單的請求合併。dom
首先,咱們須要有可以批量調用的接口。在對外提供接口時,也很是有必要提供相應的批量接口,且內部實現應該是優化過的。高併發
此處咱們在服務層模擬了一個批量建立設備的接口, 以下:性能
/** * 批量建立設備接口 * * @param deviceRequestDtoList 入參信息 * @return 建立結果 */ R<List<DeviceCreateResp>> batchCreateDevice(List<DeviceCreateQuery> deviceList);
@Data public class DeviceCreateQuery implements Serializable { /** * 產品標識 */ private String productKey; /** * 設備名稱 */ private String name; /** * 請求源,一次批量請求保證惟一 */ private String requestSource; }
@Data public class DeviceCreateResp implements Serializable { /** * 設備ID */ private Long deviceId; /** * 請求源,一次批量請求保證惟一 */ private String requestSource; }
private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>();
@Data static class DeviceCreateRequest { /** 產品key */ private String productKey; /** 設備名 */ private String deviceName; /** 請求源,需保證惟一 */ private String requestSource; /** CompletableFuture接口 */ private CompletableFuture<Long> completedFuture; }
public R<Long> registDevice(String productKey, String deviceName) { log.debug("開始註冊: {}, {}", productKey, deviceName); // 緩存請求 ====== start CompletableFuture<Long> completedFuture = new CompletableFuture<>(); DeviceCreateRequest deviceCreateRequest = new DeviceCreateRequest(); deviceCreateRequest.setProductKey(productKey); deviceCreateRequest.setDeviceName(deviceName); deviceCreateRequest.setRequestSource(UUID.randomUUID().toString()); deviceCreateRequest.setCompletedFuture(completedFuture); deviceCreateQueue.add(deviceCreateRequest); // 緩存請求 ====== end Long deviceId = null; try { deviceId = completedFuture.get(); } catch (Exception e) { log.error("設備註冊失敗", e); } return deviceId != null ? R.ok(deviceId) : R.error(DEVICE_CREATE_ERROR); }
此處使用了spring,在init方法中利用定時任務線程池批量分發請求。同時使用了newScheduledThreadPool
,其中線程池大小和定時間隔時長須要根據業務量作權衡
/** 積攢請求的阻塞隊列 */ private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>(); /** 線程池數量 */ @Value("${iot.register.merge.device.request.num:100}") private int createDeviceMergeNum; /** 定時間隔時長 */ @Value("${iot.register.merge.device.request.period:30}") private long createDeviceMergePeriod; @Reference(check = false) private DeviceService deviceService; @PostConstruct public void init() { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum); scheduledExecutorService.scheduleAtFixedRate(() -> { // 把出queue的請求存儲一次 List<DeviceCreateRequest> questBak = new ArrayList<>(); // 批量建立設備的入參 List<DeviceCreateQuery> deviceCreateQueryList = new ArrayList<>(); int size = deviceCreateQueue.size(); for (int i = 0; i < size; i++) { DeviceCreateRequest deviceCreateRequest = deviceCreateQueue.poll(); if (Objects.nonNull(deviceCreateRequest)) { questBak.add(deviceCreateRequest); deviceCreateQueryList.add(buildDeviceCreateQuery(deviceCreateRequest)); } } if (!deviceCreateQueryList.isEmpty()) { try { List<DeviceCreateResp> response = deviceService.batchCreateDevice(deviceCreateQueryList); Map<String, Long> collect = response.stream() .collect(Collectors.toMap( DeviceCreateResp::getRequestSource, DeviceCreateResp::getDeviceId )); // 通知請求的線程 for (DeviceCreateRequest deviceCreateRequest : questBak) { deviceCreateRequest.getCompletedFuture().complete(collect.get(deviceCreateRequest.getRequestSource())); } } catch (Throwable throwable) { log.error("批量註冊設備異常", throwable); // 通知請求的線程-異常 questBak.forEach(deviceCreateRequest -> deviceCreateRequest.getCompletedFuture().obtrudeException(throwable)); } } }, 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS); }
請求合併是解決高併發場景下某些問題的一種思路,本文只作了一個簡單的實現,算是對這塊知識的一次實踐吧。用到了BlockingDeque
、CompletableFuture
接口,涉及Java多線程相關的知識,實現方式比較野蠻。業界有不少優秀的開源框架作請求合併,好比hystrix
,須要花時間好好學習哈哈。