服務容錯保護斷路器Hystrix之二:Hystrix工做流程解析

1、總運行流程

當你發出請求後,hystrix是這麼運行的html

 

紅圈 :Hystrix 命令執行失敗,執行回退邏輯。也就是你們常常在文章中看到的「服務降級」。
綠圈 :四種狀況會觸發失敗回退邏輯( fallback )。
第一種 :short-circuit ,處理鏈路處於熔斷的回退邏輯,在 「3. #handleShortCircuitViaFallback()」 詳細解析。
第二種 :semaphore-rejection ,處理信號量得到失敗的回退邏輯,在 「4. #handleShortCircuitViaFallback()」 詳細解析。
第三種 :thread-pool-rejection ,處理線程池提交任務拒絕的回退邏輯,在 「5. #handleThreadPoolRejectionViaFallback()」 詳細解析。
第四種 :execution-timeout ,處理命令執行超時的回退邏輯,在 「6. #handleTimeoutViaFallback()」 詳細解析。
第五種 :execution-failure ,處理命令執行異常的回退邏輯,在 「7. #handleFailureViaFallback()」 詳細解析。
第六種 :bad-request ,TODO 【2014】【HystrixBadRequestException】,和 hystrix-javanica 子項目相關。java

另外,#handleXXXX() 方法,總體代碼比較相似,最終都是調用 #getFallbackOrThrowException() 方法,得到【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。

git

 

 

 詳細解釋個步驟github

1.建立  HystrixCommand or HystrixObservableCommand Object

  HystrixCommandweb

用於返回單一的響應
HystrixObservableCommand
用於返回多個可自定義的響應

命令模式,未來自客戶端的請求封裝成一個對象,從而讓你可使用不一樣的請求對客戶端進行參數化。它能夠被用於實現「行爲請求者"與」行爲實現者「的解耦,以便使二者能夠適應變化。
這一過程也包含了策略、資源的初始化,參看AbstractCommand的構造函數:
protected AbstractCommand(...) {
    // 初始化group,group主要是用來對不一樣的command key進行統一管理,好比統一監控、告警等
    this.commandGroup = initGroupKey(...);
    // 初始化command key,用來標識降級邏輯,能夠理解成command的id
    this.commandKey = initCommandKey(...);
    // 初始化自定義的降級策略
    this.properties = initCommandProperties(...);
    // 初始化線程池key,相同的線程池key將共用線程池
    this.threadPoolKey = initThreadPoolKey(...);
    // 初始化監控器
    this.metrics = initMetrics(...);
    // 初始化斷路器
    this.circuitBreaker = initCircuitBreaker(...);
    // 初始化線程池
    this.threadPool = initThreadPool(...);
 
    // Hystrix經過SPI實現了插件機制,容許用戶對事件通知、處理和策略進行自定義
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);
 
    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
 
    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;
 
    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

 

其實構造函數中的不少初始化工做只會集中在建立第一個Command時來作,後續建立的Command對象主要是從靜態Map中取對應的實例來賦值,好比監控器、斷路器和線程池的初始化,由於相同的Command的command key和線程池key都是一致的,在HystrixCommandMetricsHystrixCircuitBreaker.FactoryHystrixThreadPool中會分別有以下靜態屬性:spring

private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
 
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
 
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

可見全部Command對象均可以在這裏找到本身對應的資源實例編程

2. Execute the Command(命令執行)

對於HystrixCommand有4個執行方法
對於HystrixObservableCommand只有後兩個
//同步阻塞方法,其實就是調用了queue().get()
execute() — blocks, then returns the single response received from the dependency (or throws an exception in case of an error)
 
//異步非阻塞方法,直接返回Future,能夠先作本身的事情,作完再.get()
queue() — returns a Future with which you can obtain the single response from the dependency
 
//熱觀察,能夠被當即執行,若是訂閱了那麼會從新通知,其實就是調用了toObservable()並內置ReplaySubject,詳細能夠參考RxJava
observe() — subscribes to the Observable that represents the response(s) from the dependency and returns an Observable that replicates that source Observable
 
//冷觀察,返回一個Observable對象,當調用此接口,還須要本身加入訂閱者,才能接受到信息,詳細能夠參考RxJava
toObservable() — returns an Observable that, when you subscribe to it, will execute the Hystrix command and emit its responses
 
注:因爲Hystrix底層採用了RxJava框架開發,因此沒接觸過的可能會一臉懵逼,須要再去對RxJava有所瞭解。

工做流程的源碼說明:數組

工做流程圖中的第1,2步:HystrixCommand.java的execute()是入口,調用的是queue():
    public R execute() {
        try {
            return queue().get();    
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

在queue()中調用了toObservable()的方法,接着看源碼:緩存

3. Is the Response Cached?(結果是否被緩存)

判斷是否使用緩存:是否實現了getCacheKey() 的方法tomcat

若是使用緩存,再判斷若是請求緩存可用fromCache != null,而且對於該請求的響應也在緩存中,那麼命中的響應會以Observable直接返回。

工做流程的源碼說明:

工做流程圖中的第3步:AbstractCommand.java的toObservable()方法中的片斷:
//....
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
//.... 

              protected boolean isRequestCachingEnabled() {
                 return properties.requestCacheEnabled().get() && getCacheKey() != null;
              }

 
下圖關因而請求緩存的整個生命週期

4. Is the Circuit Open?(斷路器是否打開)

在命令結果沒有緩存命中的時候,Hystrix在執行命令前需檢查斷路器是否爲打開狀態:

  • 若是是打開的,那麼Hystrix不會執行命令,而是轉接到fallback處理邏輯(對應下面的第8步)
  • 若是斷路器是關閉的,那麼Hystrix調到第5步,檢查是否有可用資源來執行命令。

5. Is the Thread Pool/Queue/Semaphore Full?(線程池/請求隊列/信號量是否已經佔滿)

線程池或者信號量是否已經滿負荷,若是已經滿負荷那麼快速失敗

6. HystrixObservableCommand.construct() or HystrixCommand.run())

兩個斷路器的入口,若是是繼承HystrixObservableCommand,那麼就調用construct()函數,若是是繼承HystrixCommand,那麼就調用run()函數。

7. Calculate Circuit Health(計算斷路器的健康度)

Hystrix記錄了成功,失敗,拒絕,超時四種報告

這些報告用於決定哪些用於斷路,被斷路的點在恢復週期內沒法被後來的請求訪問到。

8. Get the Fallback

快速失敗會在如下幾個場景觸發

1.由construct() or run()拋出了一個異常

2.斷路器已經打開的時候

3.沒有空閒的線程池和隊列或者信號量

4.一次命令執行超時

 

能夠重寫快速失敗函數來自定義,

HystrixObservableCommand.resumeWithFallback()

HystrixCommand.getFallback()

9. 成功返回

總體的函數調用流程以下,其實這就是源碼的調用流程

 

 

源碼:

1、AbstractCommand 主要功能點

實現run、getFallback等方法,你就擁有了一個具備基本熔斷功能的類。從使用來看,全部的核心邏輯都由AbstractCommand(即HystrixCommand的父類,HystrixCommand只是對AbstractCommand進行了簡單包裝)抽象類串起來,從功能上來講,AbstractCommand必須將以下功能聯繫起來:

策略配置:Hystrix有兩種降級模型,即信號量(同步)模型和線程池(異步)模型,這兩種模型全部可定製的部分都體如今了HystrixCommandProperties和HystrixThreadPoolProperties兩個類中。然而仍是那句老話,Hystrix只提供了配置修改的入口,沒有將配置界面化,若是想在頁面上動態調整配置,還須要本身實現。

數據統計:Hystrix以命令模式的方式來控制業務邏輯以及熔斷邏輯的調用時機,因此說數據統計對它來講不算難事,但如何高效、精準的在內存中統計數據,還須要必定的技巧。

斷路器:斷路器能夠說是Hystrix內部最重要的狀態機,是它決定着每一個Command的執行過程。

監控露出:能經過某種可配置方式將統計數據展示在儀表盤上。

 二. Hystrix的斷路器設計

斷路器是Hystrix最核心的狀態機,只有瞭解它的變動條件,咱們才能準確掌握Hystrix的內部行爲。上面的內部流程圖中【斷路器狀態判斷】這個環節直接決定着此次請求(或者說這個Command對象)是嘗試去執行正常業務邏輯(即run())仍是走降級後的邏輯(即getFallback()),斷路器HystrixCircuitBreaker有三個狀態,

爲了能作到狀態能按照指定的順序來流轉,而且是線程安全的,斷路器的實現類HystrixCircuitBreakerImpl使用了AtomicReference:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        enum Status {
            CLOSED, OPEN, HALF_OPEN;
        }

// 斷路器初始狀態確定是關閉狀態
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

 

斷路器在狀態變化時,使用了AtomicReference#compareAndSet來確保當條件知足時,只有一筆請求能成功改變狀態。

那麼,什麼條件下斷路器會改變狀態?

1. CLOSED -> OPEN :

時間窗口內(默認10秒請求量大於請求量閾值(即circuitBreakerRequestVolumeThreshold,默認值是20),而且該時間窗口內錯誤率大於錯誤率閾值(即circuitBreakerErrorThresholdPercentage,默認值爲50,表示50%),那麼斷路器的狀態將由默認的CLOSED狀態變爲OPEN狀態。看代碼可能更直接

// 檢查是否超過了咱們設置的斷路器請求量閾值
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    // 若是沒有超過統計窗口的請求量閾值,則不改變斷路器狀態,
    // 若是它是CLOSED狀態,那麼仍然是CLOSED.
    // 若是它是HALF-OPEN狀態,咱們須要等待請求被成功執行,
    // 若是它是OPEN狀態, 咱們須要等待睡眠窗口過去。
} else {
    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        //若是沒有超過統計窗口的錯誤率閾值,則不改變斷路器狀態,,
        // 若是它是CLOSED狀態,那麼仍然是CLOSED.
        // 若是它是HALF-OPEN狀態,咱們須要等待請求被成功執行,
        // 若是它是OPEN狀態, 咱們須要等待【睡眠窗口】過去。
    } else {
        // 若是錯誤率過高,那麼將變爲OPEN狀態
        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
            // 由於斷路器處於打開狀態會有一個時間範圍,因此這裏記錄了變成OPEN的時間
            circuitOpened.set(System.currentTimeMillis());
        }
    }
}

 

這裏的錯誤率是個整數,即errorPercentage= (int) ((doubleerrorCount totalCount 100);,至於睡眠窗口,下面會提到。

2. OPEN ->HALF_OPEN: 

前面說過,當進入OPEN狀態後,會進入一段睡眠窗口,即只會OPEN一段時間,因此這個睡眠窗口過去,就會「自動」從OPEN狀態變成HALF_OPEN狀態,這種設計是爲了能作到彈性恢復,這種狀態的變動,並非由調度線程來作,而是由請求來觸發,每次請求都會進行以下檢查:

@Override
public boolean attemptExecution() {
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    }
    // circuitOpened值等於1說明斷路器狀態爲CLOSED
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        if (isAfterSleepWindow()) {
            // 睡眠窗口過去後只有第一個請求能被執行
            // 若是執行成功,那麼狀態將會變成CLOSED
            // 若是執行失敗,狀態仍變成OPEN
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }
}
 
// 睡眠窗口是否過去
private boolean isAfterSleepWindow() {
    // 還記得上面CLOSED->OPEN時記錄的時間嗎?
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}

 

 

3. HALF_OPEN ->CLOSED :

變爲半開狀態後,會放第一筆請求去執行,並跟蹤它的執行結果,若是是成功,那麼將由HALF_OPEN狀態變成CLOSED狀態

@Override
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        // 已經進入了CLOSED階段,因此將OPEN的修改時間設置成-1
        circuitOpened.set(-1L);
    }
}

4. HALF_OPEN ->OPEN :

 變爲半開狀態時若是第一筆被放去執行的請求執行失敗(資源獲取失敗、異常、超時等),就會由HALP_OPEN狀態再變爲OPEN狀態

@Override
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        // This thread wins the race to re-open the circuit - it resets the start time for the sleep window
        circuitOpened.set(System.currentTimeMillis());
    }
}

三. 滑動窗口(滾動窗口)

上面提到的斷路器須要的時間窗口請求量和錯誤率這兩個統計數據,都是指固定時間長度內的統計數據,斷路器的目標,就是根據這些統計數據來預判並決定系統下一步的行爲,Hystrix經過滑動窗口來對數據進行「平滑」統計,默認狀況下,一個滑動窗口包含10個桶Bucket),每一個桶時間寬度是1秒,負責1秒的數據統計。滑動窗口包含的總時間以及其中的桶數量都是能夠配置的,來張官方的截圖認識下滑動窗口:

上圖的每一個小矩形表明一個桶,能夠看到,每一個桶都記錄着1秒內的四個指標數據:成功量、失敗量、超時量和拒絕量,這裏的拒絕量指的就是上面流程圖中【信號量/線程池資源檢查】中被拒絕的流量。10個桶合起來是一個完整的滑動窗口,因此計算一個滑動窗口的總數據須要將10個桶的數據加起來

 

咱們如今來具體看看滑動窗口和桶的設計,若是將滑動窗口設計成對一個長度爲10的整形數組的操做,第一個想到的應該是AtomicLongArray,AtomicLongArray中每一個位置的數據都能線程安全的操做,提供了譬如incrementAndGet、getAndSet、compareAndSet等經常使用方法。但因爲一個桶須要維護四個指標,若是用四個AtomicLongArray來實現,作法不夠高級,因而咱們想到了AtomicReferenceArray<Bucket>Bucket對象內部能夠用AtomicLong來維護着這四個指標。滑動窗口和桶的設計特別講究技巧,須要儘量作到性能、數據準確性兩方面的極致,咱們來看Hystrix是如何作到的。

 

桶的數據統計簡單來講能夠分爲兩類,一類是簡單自增計數器,好比請求量、錯誤量等,另外一類是併發最大值,好比一段時間內的最大併發量(或者說線程池的最大任務數),下面是桶類Bucket的定義:

class Bucket {
    // 標識是哪一秒的桶數據
    final long windowStart;
    // 若是是簡單自增統計數據,那麼將使用adderForCounterType
    final LongAdder[] adderForCounterType;
    // 若是是最大併發類的統計數據,那麼將使用updaterForCounterType
    final LongMaxUpdater[] updaterForCounterType;
 
    Bucket(long startTime) {
        this.windowStart = startTime;
 
        // 預分配內存,提升效率,不一樣事件對應不一樣的數組index
        adderForCounterType = new LongAdder[HystrixRollingNumberEvent.values().length];
        for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
            if (type.isCounter()) {
                adderForCounterType[type.ordinal()] = new LongAdder();
            }
        }
 
        // 預分配內存,提升效率,不一樣事件對應不一樣的數組index
        updaterForCounterType = new LongMaxUpdater[HystrixRollingNumberEvent.values().length];
        for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
            if (type.isMaxUpdater()) {
                updaterForCounterType[type.ordinal()] = new LongMaxUpdater();
                // initialize to 0 otherwise it is Long.MIN_VALUE
                updaterForCounterType[type.ordinal()].update(0);
            }
        }
    }
    //...略...
}

咱們能夠看到,並無用所謂的AtomicLong,爲了方便的管理各類事件(參見com.netflix.hystrix.HystrixEventType)的數據統計,Hystrix對不一樣的事件使用不一樣的數組index(即枚舉的順序),這樣對於某個桶(即某一秒)的指定類型的數據,總能從數組中找到對應的LongAdder(用於統計前面說的簡單自增)或LongMaxUpdater(用於統計前面說的最大併發值)對象來進行自增或更新操做。對於性能有要求的中間件或庫類都避不開要CPUCache優化的問題,好比cache line,以及cache line帶來的false sharing問題。Bucket的內部並無使用AtomicLong,而是使用了JDK8新提供的LongAdder,在高併發的單調自增場景,LongAdder提供了比AtomicLong更好的性能,至於LongAdder的設計思想,本文不展開,感興趣的朋友能夠去拜讀Doug Lea大神的代碼(有意思的是Hystrix沒有直接使用JDK中的LongAdder,而是copy過來改了改)。LongMaxUpdater也是相似的,它和LongAddr同樣都派生於Striped64,這裏再也不展開。

滑動窗口由多個桶組成,業界通常的作法是將數組作成環,Hystrix中也相似,多個桶是放在AtomicReferenceArray<Bucket>來維護的,爲了將其作成環,須要保存頭尾的引用,因而有了ListState類: 

class ListState {
    /*
     * 這裏的data之因此用AtomicReferenceArray而不是普通數組,是由於data須要
     * 在不一樣的ListState對象中跨線程來引用,須要可見性和併發性的保證。
     */
    private final AtomicReferenceArray<Bucket> data;
    private final int size;
    private final int tail;
    private final int head;
 
    private ListState(AtomicReferenceArray<Bucket> data, int head, int tail) {
        this.head = head;
        this.tail = tail;
        if (head == 0 && tail == 0) {
            size = 0;
        } else {
            this.size = (tail + dataLength - head) % dataLength;
        }
        this.data = data;
    }
    //...略...
}

  咱們能夠發現,真正的數據是data,而ListState只是一個時間段的數據快照而已,因此tail和head都是final,這樣作的好處是咱們不須要去爲head、tail的原子操做而苦惱,轉而變成對ListState的持有操做,因此滑動窗口看起來以下:

咱們能夠看到,因爲默認一個滑動窗口包含10個桶,因此AtomicReferenceArray<Bucket>的size得達到10+1=11才能「滑動/滾動」起來,在肯定的某一秒內,只有一個桶被更新,其餘的桶數據都沒有變化。既然經過ListState能夠拿到全部的數據,那麼咱們只須要持有最新的ListState對象便可,爲了能作到可見性和原子操做,因而有了環形桶類BucketCircularArray

class BucketCircularArray implements Iterable<Bucket> {
    // 持有最新的ListState
    private final AtomicReference<ListState> state;
     //...略...
}

注意到BucketCircularArray實現了迭代器接口,這是由於咱們輸出給斷路器的數據須要計算滑動窗口中的全部桶,因而你能夠看到真正的滑動窗口類HystrixRollingNumber有以下屬性和方法:

public class HystrixRollingNumber {
    // 環形桶數組
    final BucketCircularArray buckets;
 
    // 獲取該事件類型當前滑動窗口的統計值
    public long getRollingSum(HystrixRollingNumberEvent type) {
        Bucket lastBucket = getCurrentBucket();
        if (lastBucket == null)
            return 0;
    
        long sum = 0;
        // BucketCircularArray實現了迭代器接口環形桶數組
        for (Bucket b : buckets) {
            sum += b.getAdder(type).sum();
        }
        return sum;
    }
    //...略...
}

斷路器就是經過監控來從HystrixRollingNumber的getRollingSum方法來獲取統計值的

到這裏斷路器和滑動窗口的核心部分已經分析完了,固然裏面還有很多細節沒有提到,感興趣的朋友能夠去看一下源碼。Hystrix中經過RxJava來實現了事件的發佈和訂閱,因此若是想深刻了解Hystrix,須要熟悉RxJava,而RxJava在服務端的應用沒有像客戶端那麼廣,一個緣由是場景的限制,還一個緣由是大多數開發者認爲RxJava設計的過於複雜,加上響應式編程模型,有必定的入門門檻。

4、線程池隔離

     不一樣的業務線之間選擇用線程池隔離,下降互相影響的機率。設置隔離策略爲線程池隔離:
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));
在Hystrix內部,是根據 properties.executionIsolationStrategy().get()這個字段判斷隔離級別。如在 getRunObservableDecoratedForMetricsAndErrorHandling這個方法中會先判斷是否是線程池隔離,若是是就獲取線程池,若是不是則進行信號量隔離的操做。
     若是是線程池隔離,還須要設置線程池的相關參數如:線程池名字andThreadPoolKey , coreSize(核心線程池大小) , KeepAliveTimeMinutes(線程存存活時間),MaxQueueSize(最大隊列長度),QueueSizeRejectionThreshold(拒絕執行的閥值)等等。
. andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getCoreSize())
                                    .withKeepAliveTimeMinutes(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getKeepAliveSeconds())
                                    .withMaxQueueSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getMaxQueueSize())
                                    .withQueueSizeRejectionThreshold(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getQueueSizeRejectionThreshold()))
threadPoolKey 也是線程池的名字的前綴,默認前綴是 hystrix 。在Hystrix中,核心線程數和最大線程數是一致的,減小線程臨時建立和銷燬帶來的性能開銷。線程池的默認參數都在HystrixThreadPoolProperties中,重點講解一下參數queueSizeRejectionThreshold 和maxQueueSize 。queueSizeRejectionThreshold默認值是5,容許在隊列中的等待的任務數量。maxQueueSize默認值是-1,隊列大小。若是是Fast Fail 應用,建議使用默認值。線程池飽滿後直接拒絕後續的任務,再也不進行等待。代碼以下HystrixThreadPool類中:
        @Override
        public boolean isQueueSpaceAvailable() {
            if (queueSize <= 0) {
                // we don't have a queue so we won't look for space but instead
                // let the thread-pool reject or not
                return true;
            } else {
                return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
            }
        }
線程池一旦建立完成,相關參數就不會更改,存放在靜態的ConcurrentHashMap中,key是對應的commandKey 。而queueSizeRejectionThreshold是每一個命令都是設置的。
     
     線程池的相關參數都保存在HystrixThreadPool這個類文件中,線程池的建立方法getThreadPool則在HystrixConcurrencyStrategy類文件中。從getThreadPool方法能夠看出線程池的名字就是hystrix-threadPoolKey-threadNumber.
@Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
     
     在HystrixThreadPool實現類的構造方法中,併發HystrixConcurrencyStrategy實例是經過HystrixPlugins獲取的,因此能夠經過HystrixPlugins設置自定義插件。具體的HystrixPlugins如何使用,會在後面章節中講解。
 
線程池的建立     
     前面說了,在Hystrix內部大部分類都是單實例,一樣ThreadPool也不例外,也是單實例。而且相同commandKey的依賴還必須是使用同一個線程池。這就須要把ThreadPool保存在一個靜態的map中,key是commandKey,同時要保證線程安全,Hytstrix使用了ConcurrentHashMap。關於爲何不適用HashTable保證線程安全問題的疑問請自行Google。線程池的建立在HystrixThreadPool這個類文件中的內部類Factory中的getInstance方法。
 
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
     String key = threadPoolKey.name();

            // this should find it for all but the first time
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
     
線程池的使用
     HystrixCommand類的execute()內部調用了queue() ,queue又調用了父類AbstractCommand的toObservable方法,toObservable方法處理了是否可緩存問題後,交給了getRunObservableDecoratedForMetricsAndErrorHandling方法,這個方法設置了一系列的executionHook以後,交給了getExecutionObservableWithLifecycle,這個方法經過getExecutionObservable()獲取了執行器。getExecutionObservable()是個抽象方法,具體實現放在了子類:HystrixCommand和HystrixObservableCommand類中。下面是HystrixCommand類中的getExecutionObservable方法實現:
final protected Observable<R> getExecutionObservable() {
        return Observable.create(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> s) {
                try {
                    s.onNext(run());
                    s.onCompleted();
                } catch (Throwable e) {
                    s.onError(e);
                }
            }

        });
    }
在這個Call方法中執行了具體的業務邏輯run() ;

 

 

 

2、模塊詳解

2.一、建立請求命令

2.1.一、有4種方式

一、同步阻塞方法,其實就是調用了queue().get()
二、異步非阻塞方法,直接返回Future,能夠先作本身的事情,作完再.get()
三、熱觀察,能夠被當即執行,若是訂閱了那麼會從新通知,其實就是調用了toObservable()並內置ReplaySubject,詳細能夠參考RxJava
四、冷觀察,返回一個Observable對象,當調用此接口,還須要本身加入訂閱者,才能接受到信息,詳細能夠參考RxJava

2.1.二、原生模式

基於hystrix的原生接口,也就是繼承HystrixCommand或者HystirxObservableCommand。

在《服務容錯保護斷路器Hystrix之一:入門介紹》中的示例基礎上修改以下,

同步方式/異步方式:

package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class ComputeCommand extends HystrixCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeCommand(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
    }
     /**
     * 快速失敗後調用函數
     * @return
     */
    @Override
    protected String getFallback(){
        return "404 :)";
    }
}

觀察方式:

package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;

import rx.Observable;
import rx.Subscriber;

public class ComputeObservableCommand extends HystrixObservableCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeObservableCommand(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> observer) {
                if(!observer.isUnsubscribed()) {
                    String result = restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
                    observer.onNext(result);
                    observer.onCompleted();
                }
            }
            
        });
    }

}

調用方法:

package com.dxz.ribbon;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import rx.functions.Action1;

@RestController
public class ConsumerController2 {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/add2", method = RequestMethod.GET)
    public String add(@RequestParam String m) throws InterruptedException, ExecutionException {
        if("s".equals(m)) {
            String result = new ComputeCommand(restTemplate, "1", "2").execute();
            System.out.println("result:="+result);
            return result;    
        } else if("a".equals(m)) {
            Future<String> result = new ComputeCommand(restTemplate, "1", "2").queue();
            System.out.println("result:="+result.get());
            return result.get();    
        } else {
            ComputeObservableCommand command1 = new ComputeObservableCommand(restTemplate, "1","2");
            rx.Observable<String> result = command1.observe();  
            result.subscribe(new Action1<String>() {  
                @Override  
                public void call(String s) {  
                    System.out.println("Command called. Result is:" + s);  
                }
            });  
            return null;
        }
        
    }

}

結果:

 2.1.三、註解模式

在《服務容錯保護斷路器Hystrix之一:入門介紹》已經展現過。

 

2.二、定義服務降級

有些狀況不去實現降級邏輯,以下所示。
執行寫操做的命令:當Hystrix命令是用來執行寫操做而不是返回一些信息的時候,一般狀況下這類操做的返回類型時void或是爲空的Observable,實現服務降級的意義不是很大。當寫入操做失敗的時候,咱們一般只須要通知調用者便可。
執行批處理或離線計算的命令:當Hystrix命令是用來執行批處理程序生成一份報告或是進行任何類型的離線計算時,那麼一般這些操做只須要將錯誤傳播給調用者,而後讓調用者稍後重試而不是發送給調用者一個靜默的降級處理響應。

 

2.三、工做流程圖

 

2.四、開關條件

關於斷路器打開

·時間窗口內請求次數(限流)

若是在10s內,超過某個閾值的請求量,纔會考慮斷路(小於這個次數不會被斷路)

配置是circuitBreaker.requestVolumeThreshold

默認10s 20次

·失敗率

默認失敗率超過50%就會被斷路

配置是circuitBreaker.errorThresholdPercentage

 

關於斷路器關閉

·從新嘗試

在必定時間以後,從新嘗試請求來決定是否繼續打開或者選擇關閉斷路器

配置是circuitBreaker.sleepWindowInMilliseconds

默認5000ms

 

2.五、關於隔離

bulkhead pattern模式(艙壁模式)

Htstrix使用了bulkhead pattern模式,典型的例子就是線程隔離。

簡單解釋一下bulkhead pattern模式。通常狀況咱們都用一個線程池來管理全部線程,容易形成一個問題,粒度太粗,沒法對線程進行分類管理,會致使局部問題影響全局。bulkhead pattern模式在於,採用多個線程池來管理線程,這樣使得1個線程池資源出現問題時不會形成另外一個線程池資源問題。儘可能使問題最小化。

如圖所示,採用了bulkhead pattern模式的效果

 

 

說完原理說實現,如何針對不一樣依賴採用不一樣的線程池管理呢

Hystrix給了咱們三種key來用於隔離:

·CommandKey,針對相同的接口通常CommandKey值相同,目的是把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics以及其餘相關對象關聯在一塊兒,造成一個原子組。採用原生接口的話,默認值爲類名;採用註解形式的話,默認值爲方法名。

·CommandGroupKey,對CommandKey分組,用於真正的隔離。相同CommandGroupKey會使用同一個線程池或者信號量。通常狀況相同業務功能會使用相同的CommandGroupKey。

·ThreadPoolKey,若是說CommandGroupKey只是邏輯隔離,那麼ThreadPoolKey就是物理隔離,當沒有設置ThreadPoolKey的時候,線程池或者信號量的劃分按照CommandGroupKey,當設置了ThreadPoolKey,那麼線程池和信號量的劃分就按照ThreadPoolKey來處理,相同ThreadPoolKey採用同一個線程池或者信號量。

 

Coding

原生模式

能夠經過HystrixCommand.Setter來自定義配置
HystrixCommandGroupKey.Factory.asKey(""))
HystrixCommandKey.Factory.asKey("")
HystrixThreadPoolKey.Factory.asKey("")

註解模式

能夠直接在方法名上添加

@HystrixCommand(groupKey = "", commandKey = "", threadPoolKey = "")

2.六、關於請求緩存

工做流程圖

 

優點

·複用性

  這裏的複用性指的是代碼複用性

·一致性

  也就是常說的冪等性,無論請求幾回,獲得的結果應該都是同樣的

·減小重複工做

  因爲請求緩存是在HystrixCommand的construct()或run()運行以前運行,全部能夠有效減小線程的使用

適用場景

請求緩存的優點顯而易見,可是也不是銀彈。

在讀少寫多的場景就顯得不太合適,對於讀的請求,須要add緩存。對於增刪改的請求,須要把緩存remove。在增長系統資源開銷的同時,又很雞肋。

因此通常適合讀多寫少的場景。彷佛全部緩存機制都有這個侷限性吧。

Coding

原生模式

繼承HystrixCommand後,重寫getCacheKey()方法,該方法默認返回的是null,也就是不使用請求緩存功能。相同key的請求會使用相同的緩存。
package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class ComputeCommandCache extends HystrixCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeCommandCache(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
    }
    
 @Override protected String getCacheKey() {
        System.out.println("調用getCacheKey");//打印一下何時會觸發
        return a + b;
    }
    
     /**
     * 快速失敗後調用函數
     * @return
     */
    @Override
    protected String getFallback(){
        return "404 :)";
    }
}

調用類,若是不加HystrixRequestContext.initializeContext();//初始化請求上下文,會報錯以下:

報錯了:java.util.concurrent.ExecutionException: Observable onError
Caused by: java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?

 

package com.dxz.ribbon;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import rx.functions.Action1;

@RestController
public class ConsumerControllerCache {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/add3", method = RequestMethod.GET)
    public String add(@RequestParam String m) throws InterruptedException, ExecutionException {
        HystrixRequestContext.initializeContext();//初始化請求上下文
        if("s".equals(m)) {
            String result = new ComputeCommandCache(restTemplate, "1", "2").execute();
            System.out.println("result:="+result);
            return result;    
        } else if("a".equals(m)) {
            Future<String> result = new ComputeCommandCache(restTemplate, "1", "2").queue();
            System.out.println("result:="+result.get());
            return result.get();    
        } else {
            ComputeObservableCommand command1 = new ComputeObservableCommand(restTemplate, "1","2");
            rx.Observable<String> result = command1.observe();  
            result.subscribe(new Action1<String>() {  
                @Override  
                public void call(String s) {  
                    System.out.println("Command called. Result is:" + s);  
                }
            });  
            return null;
        }
        
    }

}

註解模式

在方法名上增長,並添加與cacheKeyMethod字符串相同的方法。二者共用入參。
複製代碼
@CacheResult(cacheKeyMethod = "getCacheKey")
public String post2AnotherService(String seed){
}
public String getCacheKey(String seed){

    return seed;

}
複製代碼

 

初始化HystrixRequestContext

還有關鍵的一步,在調用HystrixCommand以前初始化HystrixRequestContext,其實就是建立一個ThreadLocal的副本,共享請求緩存就是經過ThreadLocal來實現的。
HystrixRequestContext context=HystrixRequestContext.initializeContext();
操做完成後context.shutdown();
通常狀況能夠在過濾器中控制是初始化和關閉整個生命週期
複製代碼
//啓動HystrixRequestContext
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
    chain.doFilter(req, res);
} finally {
    //關閉HystrixRequestContext
    context.shutdown();
}
複製代碼

 

2.七、關於請求合併(Requst Collapsing)

工做流程圖

 

上半部分是模擬請求,下半部分是該請求的依賴設置,時間窗口默認是10ms,在這個時間窗口內,全部對於該接口的請求都會被加入隊列,而後進行批處理。這樣的好處在於,若是短期內對於某個接口有大量請求,那麼能夠只處理一次就完成全部響應。

 

優點

全局線程合併

在tomcat容器中,全部請求共用一個進程,也就是一個JVM容器,在併發場景下會派生出許多線程,collapsing能夠合併整個JVM中的請求線程,這樣能夠解決不一樣使用者同時請求的大量併發問題。

 

局部線程合併

能夠合併單個tomcat請求線程,好比在10ms內有10個請求被同一線程處理(這不是像往常同樣請求->處理,而是請求->加入請求隊列,全部能夠快速收集請求),那這些請求能夠被合併。

對象建模和代碼複雜度

在實際場景下,調用接口取數據的複雜度每每高於數據的複雜度,通俗來講就是取數據能夠變幻無窮的取,而數據就那麼幾個接口。

collapsing能夠幫助你更好的實現你的業務,好比屢次請求合併結果後再廣播出去。

 

適用場景

·併發量大接口

當併發量小,一個時間窗口內只有幾個或沒有請求,那麼就白白浪費了請求合併的資源。

·請求耗時接口

時間窗口是固定的,假如一個請求實際耗時10ms,加上固定的時間窗口,最大延遲達到20ms,延遲被提升了100%。若一個請求實際耗時有1s,那麼時間窗口的延遲就能夠被忽略不計。

 

Coding

原生模式

複製代碼
/**
 * 批量返回值類型
 * 返回值類型
 * 請求參數類型
 */
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.class);

    private final Integer key;

    public CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    /**
     *獲取請求參數
     */
    public Integer getRequestArgument() {
        return key;
    }

    /**
     *合併請求產生批量命令的具體實現
     */
    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
        return new BatchCommand(requests);
    }

    /**
     *批量命令結果返回後的處理,須要實現將批量結果拆分並傳遞給合併前的各原子請求命令的邏輯中
     */
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
        int count = 0;
        //請求響應一一對應
        for (CollapsedRequest<String, Integer> request : requests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {
        private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.BatchCommand.class);

        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
                super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            this.requests = requests;
        }

        @Override
        protected List<String> run() {
            ArrayList<String> response = new ArrayList<String>();
            // 處理每一個請求,返回結果
            for (CollapsedRequest<String, Integer> request : requests) {
                logger.info("request.getArgument()={}",request.getArgument());
                // artificial response for each argument received in the batch
                response.add("ValueForKey: " + request.getArgument());
            }
            return response;
        }
    }
}
複製代碼
調用的時候只須要new CommandCollapserGetValueForKey(1).queue()
在同一個時間窗口內,批處理的函數調用順序爲
getRequestArgument()->createCommand()->mapResponseToRequests()

//官方配置文檔

https://github.com/Netflix/Hystrix/wiki/Configuration#circuitBreaker.sleepWindowInMilliseconds

相關文章
相關標籤/搜索