在設定的時間窗口內失敗次數達到閾值,由閉->開。html
在處於開的狀態,對目標的調用作失敗返回,進入開的時候,啓動計時器,設定時間事後進入半開狀態。java
進入半開狀態,會啓動一個計數器,記錄連續成功的調用次數,超過閾值,進入閉狀態。有一次失敗則進入開狀態,同時清零連續成功調用次數。進入開的同時啓動進入半開狀態的定時器。git
進入半開狀態,會啓動一個計數器,記錄連續成功的調用次數,超過閾值,進入閉狀態,同時清零連續成功調用次數。github
這裏若是使用定時線程來作的話,開的線程多,管理比較麻煩,故這裏改成維護一個切換到開狀態的時間,在每次方法調用,判斷是開狀態時,判斷是否已通過了這個超時閾值,超過的話,進入半開狀態。spring
目前半開狀態沒有使用時間窗口,僅僅使用連續成功次數來計算,一旦失敗,則將斷路器設置爲open狀態。若是連續成功次數達到閾值,則進入close狀態。每次進入half-open的狀態時,連續成功的計數器清零。設計模式
public enum CircuitBreakerState { CLOSED, // working normally, calls are transparently passing through OPEN, // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead HALF_OPEN // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN }
/** * 帶時間窗口的限流計數器 */ public class LimitCounter { private long startTime; private long timeIntervalInMs; private int maxLimit; private AtomicInteger currentCount; public LimitCounter(long timeIntervalInMs, int maxLimit) { super(); this.timeIntervalInMs = timeIntervalInMs; this.maxLimit = maxLimit; startTime = System.currentTimeMillis(); currentCount = new AtomicInteger(0); } public int incrAndGet() { long currentTime = System.currentTimeMillis(); if ((startTime + timeIntervalInMs) < currentTime) { synchronized (this) { if ((startTime + timeIntervalInMs) < currentTime) { startTime = currentTime; currentCount.set(0); } } } return currentCount.incrementAndGet(); } public boolean thresholdReached(){ return currentCount.get() > maxLimit; } public int get(){ return currentCount.get(); } public /*synchronized*/ void reset(){ currentCount.set(0); } }
public class CircuitBreakerConfig { //closed狀態的失敗次數閾值 private int failThreshold = 5; //closed狀態的失敗計數的時間窗口 private int failCountWindowInMs = 60*1000; //處於open狀態下進入half-open的超時時間 private int open2HalfOpenTimeoutInMs = 5*1000; //half-open狀態下成功次數閾值 private int consecutiveSuccThreshold = 5; private CircuitBreakerConfig(){ } public static CircuitBreakerConfig newDefault(){ CircuitBreakerConfig config = new CircuitBreakerConfig(); return config; } public int getFailThreshold() { return failThreshold; } public void setFailThreshold(int failThreshold) { this.failThreshold = failThreshold; } public int getFailCountWindowInMs() { return failCountWindowInMs; } public void setFailCountWindowInMs(int failCountWindowInMs) { this.failCountWindowInMs = failCountWindowInMs; } public int getOpen2HalfOpenTimeoutInMs() { return open2HalfOpenTimeoutInMs; } public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) { this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs; } public int getConsecutiveSuccThreshold() { return consecutiveSuccThreshold; } public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) { this.consecutiveSuccThreshold = consecutiveSuccThreshold; } }
public class CircuitBreaker { private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class); private String name; private CircuitBreakerConfig config; private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED; //最近進入open狀態的時間 private volatile long lastOpenedTime; //closed狀態下失敗次數 private LimitCounter failCount ; //half-open狀態的連續成功次數,失敗當即清零 private AtomicInteger consecutiveSuccCount = new AtomicInteger(0); //構造器 public CircuitBreaker(String name,CircuitBreakerConfig config) { this.config = config; this.name = name; failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold()); } //狀態判斷 public boolean isOpen(){ return CircuitBreakerState.OPEN == state; } public boolean isHalfOpen(){ return CircuitBreakerState.HALF_OPEN == state; } public boolean isClosed(){ return CircuitBreakerState.CLOSED == state; } //狀態操做 /** * closed->open | halfopen -> open */ public void open(){ lastOpenedTime = System.currentTimeMillis(); state = CircuitBreakerState.OPEN; logger.debug("circuit open,key:{}",name); } /** * open -> halfopen */ public void openHalf(){ consecutiveSuccCount.set(0); state = CircuitBreakerState.HALF_OPEN; logger.debug("circuit open-half,key:{}",name); } /** * halfopen -> close */ public void close(){ failCount.reset(); state = CircuitBreakerState.CLOSED; logger.debug("circuit close,key:{}",name); } //閾值判斷 /** * 是否應該轉到half open * 前提是 open state * @return */ public boolean isOpen2HalfOpenTimeout(){ return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime; } /** * 是否應該從close轉到open * @return */ public boolean isCloseFailThresholdReached(){ return failCount.thresholdReached(); } /** * half-open狀態下是否達到close的閾值 * @return */ public boolean isConsecutiveSuccessThresholdReached(){ return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold(); } //getter public void incrFailCount() { int count = failCount.incrAndGet(); logger.debug("incr fail count:{},key:{}",count,name); } public AtomicInteger getConsecutiveSuccCount() { return consecutiveSuccCount; } public CircuitBreakerState getState() { return state; } }
//最近進入open狀態的時間 private volatile long lastOpenedTime; //closed狀態下失敗次數 private LimitCounter failCount ; //half-open狀態的連續成功次數,失敗當即清零 private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);
public class CircuitBreakerInvocationHandler implements InvocationHandler{ private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class); private Object target; public CircuitBreakerInvocationHandler(Object target) { this.target = target; } //動態生成代理對象 public Object proxy(){ return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class); if(breakerAnno == null){ return method.invoke(target,args); } Class<? extends Throwable>[] noTripExs = breakerAnno.noTripExceptions(); int timeout = breakerAnno.timeoutInMs(); int interval = breakerAnno.failCountWindowInMs(); int failThreshold = breakerAnno.failThreshold(); CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault(); if(interval != -1){ cfg.setFailCountWindowInMs(interval); } if(failThreshold != -1){ cfg.setFailThreshold(failThreshold); } String key = target.getClass().getSimpleName() + method.getName(); CircuitBreaker breaker = CircuitBreakerRegister.get(key); if(breaker == null){ breaker = new CircuitBreaker(key,cfg); CircuitBreakerRegister.putIfAbsent(key,breaker); } Object returnValue = null; logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString()); //breaker state if(breaker.isOpen()){ //判斷是否該進入half open狀態 if(breaker.isOpen2HalfOpenTimeout()){ //進入half open狀態 breaker.openHalf(); logger.debug("method:{} into half open",method.toGenericString()); returnValue = processHalfOpen(breaker,method,args,noTripExs); }else{ throw new CircuitBreakerOpenException(method.toGenericString()); } }else if(breaker.isClosed()){ try{ returnValue = method.invoke(target,args); // 這裏看狀況是否重置標誌 // breaker.close(); }catch (Throwable t){ if(isNoTripException(t,noTripExs)){ throw t; }else{ //增長計數 breaker.incrFailCount(); if(breaker.isCloseFailThresholdReached()){ //觸發閾值,打開 logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString()); breaker.open(); throw new CircuitBreakerOpenException(method.toGenericString()); }else{ throw t; } } } }else if(breaker.isHalfOpen()){ returnValue = processHalfOpen(breaker,method,args,noTripExs); } return returnValue; } private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class<? extends Throwable>[] noTripExs) throws Throwable { try{ Object returnValue = method.invoke(target,args); breaker.getConsecutiveSuccCount().incrementAndGet(); if(breaker.isConsecutiveSuccessThresholdReached()){ //調用成功則進入close狀態 breaker.close(); } return returnValue; }catch (Throwable t){ if(isNoTripException(t,noTripExs)){ breaker.getConsecutiveSuccCount().incrementAndGet(); if(breaker.isConsecutiveSuccessThresholdReached()){ breaker.close(); } throw t; }else{ breaker.open(); throw new CircuitBreakerOpenException(method.toGenericString(), t); } } } private boolean isNoTripException(Throwable t,Class<? extends Throwable>[] noTripExceptions){ if(noTripExceptions == null || noTripExceptions.length == 0){ return false; } for(Class<? extends Throwable> ex:noTripExceptions){ //是不是拋出異常t的父類 //t java.lang.reflect.InvocationTargetException if(ex.isAssignableFrom(t.getCause().getClass())){ return true; } } return false; } }
github工程 circuit-breaker