CircuitBreaker模式的Java實現

狀態轉換

clipboard.png

  • 閉->開

在設定的時間窗口內失敗次數達到閾值,由閉->開。html

  • 開->半開

在處於開的狀態,對目標的調用作失敗返回,進入開的時候,啓動計時器,設定時間事後進入半開狀態。java

  • 半開->開

進入半開狀態,會啓動一個計數器,記錄連續成功的調用次數,超過閾值,進入閉狀態。有一次失敗則進入開狀態,同時清零連續成功調用次數。進入開的同時啓動進入半開狀態的定時器。git

  • 半開->閉

進入半開狀態,會啓動一個計數器,記錄連續成功的調用次數,超過閾值,進入閉狀態,同時清零連續成功調用次數。github

實現要點

  • 切到開狀態啓動的定時器

這裏若是使用定時線程來作的話,開的線程多,管理比較麻煩,故這裏改成維護一個切換到開狀態的時間,在每次方法調用,判斷是開狀態時,判斷是否已通過了這個超時閾值,超過的話,進入半開狀態。spring

  • 半開狀態的計數器

目前半開狀態沒有使用時間窗口,僅僅使用連續成功次數來計算,一旦失敗,則將斷路器設置爲open狀態。若是連續成功次數達到閾值,則進入close狀態。每次進入half-open的狀態時,連續成功的計數器清零。設計模式

主要代碼

斷路器狀態

public enum CircuitBreakerState {
    CLOSED,    // working normally, calls are transparently passing through
    OPEN,      // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead
    HALF_OPEN  // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN
}

帶時間窗口的計數器

/**
 * 帶時間窗口的限流計數器
 */
public class LimitCounter {
    private long startTime;
    private long timeIntervalInMs;
    private int maxLimit;
    private AtomicInteger currentCount;

    public LimitCounter(long timeIntervalInMs, int maxLimit) {
        super();
        this.timeIntervalInMs = timeIntervalInMs;
        this.maxLimit = maxLimit;
        startTime = System.currentTimeMillis();
        currentCount = new AtomicInteger(0);
    }


    public int incrAndGet() {
        long currentTime = System.currentTimeMillis();
        if ((startTime + timeIntervalInMs) < currentTime) {
            synchronized (this) {
                if ((startTime + timeIntervalInMs) < currentTime) {
                    startTime = currentTime;
                    currentCount.set(0);
                }
            }
        }
        return currentCount.incrementAndGet();
    }

    public boolean thresholdReached(){
        return currentCount.get() > maxLimit;
    }

    public int get(){
        return currentCount.get();
    }

    public /*synchronized*/ void reset(){
        currentCount.set(0);
    }
}

主要配置

public class CircuitBreakerConfig {

    //closed狀態的失敗次數閾值
    private int failThreshold = 5;

    //closed狀態的失敗計數的時間窗口
    private int failCountWindowInMs = 60*1000;

    //處於open狀態下進入half-open的超時時間
    private int open2HalfOpenTimeoutInMs = 5*1000;

    //half-open狀態下成功次數閾值
    private int consecutiveSuccThreshold = 5;

    private CircuitBreakerConfig(){

    }

    public static CircuitBreakerConfig newDefault(){
        CircuitBreakerConfig config = new CircuitBreakerConfig();
        return config;
    }

    public int getFailThreshold() {
        return failThreshold;
    }

    public void setFailThreshold(int failThreshold) {
        this.failThreshold = failThreshold;
    }

    public int getFailCountWindowInMs() {
        return failCountWindowInMs;
    }

    public void setFailCountWindowInMs(int failCountWindowInMs) {
        this.failCountWindowInMs = failCountWindowInMs;
    }

    public int getOpen2HalfOpenTimeoutInMs() {
        return open2HalfOpenTimeoutInMs;
    }

    public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) {
        this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs;
    }

    public int getConsecutiveSuccThreshold() {
        return consecutiveSuccThreshold;
    }

    public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) {
        this.consecutiveSuccThreshold = consecutiveSuccThreshold;
    }
}

斷路器

public class CircuitBreaker {

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);

    private String name;

    private CircuitBreakerConfig config;

    private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;

    //最近進入open狀態的時間
    private volatile long lastOpenedTime;

    //closed狀態下失敗次數
    private LimitCounter failCount ;

    //half-open狀態的連續成功次數,失敗當即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);


    //構造器
    public CircuitBreaker(String name,CircuitBreakerConfig config) {
        this.config = config;
        this.name = name;
        failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold());
    }

    //狀態判斷
    public boolean isOpen(){
        return CircuitBreakerState.OPEN == state;
    }

    public boolean isHalfOpen(){
        return CircuitBreakerState.HALF_OPEN == state;
    }

    public boolean isClosed(){
        return CircuitBreakerState.CLOSED == state;
    }

    //狀態操做

    /**
     * closed->open | halfopen -> open
     */
    public void open(){
        lastOpenedTime = System.currentTimeMillis();
        state = CircuitBreakerState.OPEN;
        logger.debug("circuit open,key:{}",name);
    }

    /**
     * open -> halfopen
     */
    public void openHalf(){
        consecutiveSuccCount.set(0);
        state = CircuitBreakerState.HALF_OPEN;
        logger.debug("circuit open-half,key:{}",name);
    }

    /**
     * halfopen -> close
     */
    public void close(){
        failCount.reset();
        state = CircuitBreakerState.CLOSED;
        logger.debug("circuit close,key:{}",name);
    }

    //閾值判斷

    /**
     * 是否應該轉到half open
     * 前提是 open state
     * @return
     */
    public boolean isOpen2HalfOpenTimeout(){
        return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime;
    }

    /**
     * 是否應該從close轉到open
     * @return
     */
    public boolean isCloseFailThresholdReached(){
        return failCount.thresholdReached();
    }

    /**
     * half-open狀態下是否達到close的閾值
     * @return
     */
    public boolean isConsecutiveSuccessThresholdReached(){
        return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold();
    }

    //getter
    public void incrFailCount() {
        int count = failCount.incrAndGet();
        logger.debug("incr fail count:{},key:{}",count,name);
    }

    public AtomicInteger getConsecutiveSuccCount() {
        return consecutiveSuccCount;
    }

    public CircuitBreakerState getState() {
        return state;
    }
}

斷路器維護的變量

//最近進入open狀態的時間
    private volatile long lastOpenedTime;

    //closed狀態下失敗次數
    private LimitCounter failCount ;

    //half-open狀態的連續成功次數,失敗當即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);

基於jdk代理的攔截

public class CircuitBreakerInvocationHandler implements InvocationHandler{

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class);

    private Object target;

    public CircuitBreakerInvocationHandler(Object target) {
        this.target = target;
    }

    //動態生成代理對象
    public Object proxy(){
        return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this);
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class);
        if(breakerAnno == null){
            return method.invoke(target,args);
        }
        Class<? extends Throwable>[] noTripExs = breakerAnno.noTripExceptions();
        int timeout = breakerAnno.timeoutInMs();
        int interval = breakerAnno.failCountWindowInMs();
        int failThreshold = breakerAnno.failThreshold();
        CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault();
        if(interval != -1){
            cfg.setFailCountWindowInMs(interval);
        }
        if(failThreshold != -1){
            cfg.setFailThreshold(failThreshold);
        }

        String key = target.getClass().getSimpleName() + method.getName();
        CircuitBreaker breaker = CircuitBreakerRegister.get(key);
        if(breaker == null){
            breaker = new CircuitBreaker(key,cfg);
            CircuitBreakerRegister.putIfAbsent(key,breaker);
        }

        Object returnValue = null;

        logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString());
        //breaker state
        if(breaker.isOpen()){
            //判斷是否該進入half open狀態
            if(breaker.isOpen2HalfOpenTimeout()){
                //進入half open狀態
                breaker.openHalf();
                logger.debug("method:{} into half open",method.toGenericString());
                returnValue = processHalfOpen(breaker,method,args,noTripExs);
            }else{
                throw new CircuitBreakerOpenException(method.toGenericString());
            }
        }else if(breaker.isClosed()){
            try{
                returnValue = method.invoke(target,args);
//                這裏看狀況是否重置標誌
//                breaker.close();
            }catch (Throwable t){
                if(isNoTripException(t,noTripExs)){
                    throw t;
                }else{
                    //增長計數
                    breaker.incrFailCount();
                    if(breaker.isCloseFailThresholdReached()){
                        //觸發閾值,打開
                        logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString());
                        breaker.open();
                        throw new CircuitBreakerOpenException(method.toGenericString());
                    }else{
                        throw t;
                    }
                }
            }

        }else if(breaker.isHalfOpen()){
            returnValue = processHalfOpen(breaker,method,args,noTripExs);
        }

        return returnValue;
    }

    private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class<? extends Throwable>[] noTripExs) throws Throwable {
        try{
            Object returnValue = method.invoke(target,args);
            breaker.getConsecutiveSuccCount().incrementAndGet();
            if(breaker.isConsecutiveSuccessThresholdReached()){
                //調用成功則進入close狀態
                breaker.close();
            }
            return returnValue;
        }catch (Throwable t){
            if(isNoTripException(t,noTripExs)){
                breaker.getConsecutiveSuccCount().incrementAndGet();
                if(breaker.isConsecutiveSuccessThresholdReached()){
                    breaker.close();
                }
                throw t;
            }else{
                breaker.open();
                throw new CircuitBreakerOpenException(method.toGenericString(), t);
            }
        }
    }

    private boolean isNoTripException(Throwable t,Class<? extends Throwable>[] noTripExceptions){
        if(noTripExceptions == null || noTripExceptions.length == 0){
            return false;
        }
        for(Class<? extends Throwable> ex:noTripExceptions){
            //是不是拋出異常t的父類
            //t java.lang.reflect.InvocationTargetException
            if(ex.isAssignableFrom(t.getCause().getClass())){
                return true;
            }
        }
        return false;
    }
}
github工程 circuit-breaker

參考

相關文章
相關標籤/搜索