工做中,咱們常見的請求模型都是請求-應答
式,即一次請求中,服務給請求分配一個獨立的線程,一塊獨立的內存空間,全部的操做都是獨立的,包括資源和系統運算。咱們也知道,在請求中處理一次系統 I/O 的消耗是很是大的,若是有很是多的請求都進行同一類 I/O 操做,那麼是否能夠將這些 I/O 操做都合併到一塊兒,進行一次 I/O 操做,是否能夠大大下降下游資源服務器的負擔呢?css
最近我工做之餘的大部分時間都花在這個問題的探究上了,對比了幾個現有類庫,爲了解決一個小問題把 hystrix javanica 的代碼翻了一遍,也根據本身工做中遇到的業務需求實現了一個簡單的合併類,收穫仍是挺大的。可能這個需求有點偏門
,在網上搜索結果並很少,也沒有綜合一點的資料,索性本身總結分享一下,但願能幫到後來遇到這種問題的小夥伴。html
文章歡迎轉載,請尊重做者勞動成果,帶上原文連接:https://www.cnblogs.com/zhenbianshu/p/9382420.htmljava
開源的請求合併類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix
了, hystrix 專一於保持 WEB 服務器在高併發環境下的系統穩定,咱們經常使用它的熔斷器(Circuit Breaker) 來實現服務的服務隔離和災時降級,有了它,可使整個系統不至於被某一個接口的高併發洪流沖塌,即便接口掛了也能夠將服務降級,返回一我的性化的響應。請求合併做爲一個保障下游服務穩定的利器,在 hystrix 內實現也並不意外。git
咱們在使用 hystrix 時,經常使用它的 javanica 模塊,以註解的方式編寫 hystrix 代碼,使代碼更簡潔並且對業務代碼侵入更低。因此在項目中咱們通常至少須要引用 hystrix-core
和 hystrix-javanica
兩個包。github
另外,hystrix 的實現都是經過 AOP,咱們要還要在項目 xml 裏顯式配置 HystrixAspect 的 bean 來啓用它。web
<aop:aspectj-autoproxy/> <bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
hystrix collapser 是 hystrix 內的請求合併器,它有自定義 BatchMethod 和 註解兩種實現方式,自定義 BatchMethod 網上有各類教程,實現起來很複雜,須要手寫大量代碼,而註解方式只須要添加兩行註解便可,但配置方式我在官方文檔上也沒找見,中文方面本文應該是獨一份兒了。spring
其實現須要注意的是:數據庫
@HystrixCollapser
註解,在定義好的合併方法上添加 @HystrixCommand
註解;java.util.List<SingleParam>
;java.util.concurrent.Future<SingleReturn>
, batch 方法返回 java.util.List<SingleReturn>
,且要保證返回的結果數量和傳入的參數數量一致。下面是一個簡單的示例:緩存
public class HystrixCollapserSample { @HystrixCollapser(batchMethod = "batch") public Future<Boolean> single(String input) { return null; // single方法不會被執行到 } public List<Boolean> batch(List<String> inputs) { return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList()); } }
爲了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這裏簡單總結一下 hystrix 請求合併器的具體實現,源碼的詳細解析在個人筆記:Hystrix collasper 源碼解析。安全
HystrixCollapser
註解後,spring 調用 methodsAnnotatedWithHystrixCommand
方法來執行 hystrix 代理;collapser
實例(在當前 scope 內檢測不到即建立);須要注意,因爲須要等待 timer 執行真正的請求操做,collapser 會致使全部的請求的 cost 都會增長約 timerInterval/2 ms;
hystrix collapser 的配置須要在 @HystrixCollapser
註解上使用,主要包括兩個部分,專有配置和 hystrixCommand 通用配置;
專有配置包括:
com.netflix.hystrix.HystrixCollapser.Scope
枚舉類,有 REQUEST, GLOBAL
兩種選項,在 scope 爲 REQUEST 時,hystrix 會爲每一個請求都建立一個 collapser, 此時你會發現 batch 方法執行時,傳入的請求數總爲1。並且 REQUEST 項仍是默認項,不明白這樣請求合併還有什麼意義;通用配置包括:
一個完整的配置以下:
@HystrixCollapser( batchMethod = "batch", collapserKey = "single", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties = { @HystrixProperty(name = "maxRequestsInBatch", value = "100"), @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"), @HystrixProperty(name = "requestCache.enabled", value = "true") })
因爲業務需求,咱們並不太關心被合併請求的返回值,並且以爲 hystrix 保持那麼多的 Future 並無必要,因而本身實現了一個簡單的請求合併器,業務線程簡單地將請求放到一個容器裏,請求數累積到必定量或延遲了必定的時間,就取出容器內的數據統一發送給下游系統。
設計思想跟 hystrix 相似,合併器有一個字段做爲存儲請求的容器,且設置一個 timer 線程定時消費容器內的請求,業務線程將請求參數提交到合併 器的容器內。不一樣之處在於,業務線程將請求提交給容器後當即同步返回成功,沒必要管請求的消費結果,這樣便實現了時間維度上的合併觸發。
另外,我還添加了另一個維度的觸發條件,每次將請求參數添加到容器後都會檢驗一下容器內請求的數量,若是數量達到必定的閾值,將在業務線程內合併執行一次。
因爲有兩個維度會觸發合併,就不可避免會遇到線程安全問題。爲了保證容器內的請求不會被多個線程重複消費或都漏掉,我須要一個容器能知足如下條件:
java.util.concurrent 包內的 LinkedBlockingDeque
恰好符合要求,首先它實現了 BlockingDeque 接口,多線程環境下的存取操做是安全的;此外,它還提供 drainTo(Collection<? super E> c, int maxElements)
方法,能夠將容器內 maxElements 個元素安全地取出來,放到 Collection c 中。
如下是具體的代碼實現:
public class BatchCollapser<E> implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class); private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap(); private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1); private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>(); private Handler<List<E>, Boolean> cleaner; private long interval; private int threshHold; private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) { this.cleaner = cleaner; this.threshHold = threshHold; this.interval = interval; } @Override public void afterPropertiesSet() throws Exception { SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> { try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); } }, 0, interval, TimeUnit.MILLISECONDS); } public void submit(E event) { batchContainer.add(event); if (batchContainer.size() >= threshHold) { clean(); } } private void clean() { List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold); batchContainer.drainTo(transferList, 100); if (CollectionUtils.isEmpty(transferList)) { return; } try { cleaner.handle(transferList); } catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); } } public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) { Class jobClass = cleaner.getClass(); if (instance.get(jobClass) == null) { synchronized (BatchCollapser.class) { if (instance.get(jobClass) == null) { instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval)); } } } return instance.get(jobClass); } }
如下代碼內須要注意的點:
Handler
的實例,經過實例的 class 來對請求進行分組存儲。ScheduledExecutorService
定時啓動 Timer 線程。上面介紹的請求合併都是將多個請求一次發送,下游服務器處理時本質上仍是多個請求,最好的請求合併是在內存中進行,將請求結果簡單合併成一個發送給下游服務器。如咱們常常會遇到的需求:元素分值累加或數據統計,就能夠先在內存中將某一項的分值或數據累加起來,定時請求數據庫保存。
Guava 內就提供了這麼一種數據結構: ConcurrentHashMultiset
,它不一樣於普通的 set 結構存儲相同元素時直接覆蓋原有元素,而是給每一個元素保持一個計數 count, 插入重複時元素的 count 值加1。並且它在添加和刪除時並不加鎖也能保證線程安全,具體實現是經過一個 while(true) 循環嘗試操做,直到操做夠所須要的數量。
ConcurrentHashMultiset 這種排重計數的特性,很是適合數據統計這種元素在短期內重複率很高的場景,通過排重後的數量計算,能夠大大下降下游服務器的壓力,即便重複率不高,能用少許的內存空間換取系統可用性的提升,也是很划算的。
使用 ConcurrentHashMultiset 進行請求合併與使用普通容器在總體結構上並沒有太大差別,具體相似於:
if (ConcurrentHashMultiset.isEmpty()) { return; } List<Request> transferList = Lists.newArrayList(); ConcurrentHashMultiset.elementSet().forEach(request -> { int count = ConcurrentHashMultiset.count(request); if (count <= 0) { return; } transferList.add(count == 1 ? request : new Request(request.getIncrement() * count)); ConcurrentHashMultiset.remove(request, count); });
最後總結一下各個技術適用的場景:
另外,若是選擇本身來實現的話,徹底能夠將 BatchCollapser 和 ConcurrentHashMultiset 結合一下,在BatchCollapser 裏使用 ConcurrentHashMultiset 做爲容器,這樣就能夠結合二者的優點了。
關於本文有什麼問題能夠在下面留言交流,若是您以爲本文對您有幫助,能夠點擊下面的 推薦
支持一下我,博客一直在更新,歡迎 關注