聊聊resilience4j的CircuitBreakerStateMachine

本文主要研究一下resilience4j的CircuitBreakerStateMachinejava

CircuitBreakerStateMachine

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.javagit

/**
 * A CircuitBreaker finite state machine.
 */
public final class CircuitBreakerStateMachine implements CircuitBreaker {

    private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakerStateMachine.class);

    private final String name;
    private final AtomicReference<CircuitBreakerState> stateReference;
    private final CircuitBreakerConfig circuitBreakerConfig;
    private final CircuitBreakerEventProcessor eventProcessor;

    /**
     * Creates a circuitBreaker.
     *
     * @param name                 the name of the CircuitBreaker
     * @param circuitBreakerConfig The CircuitBreaker configuration.
     */
    public CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig) {
        this.name = name;
        this.circuitBreakerConfig = circuitBreakerConfig;
        this.stateReference = new AtomicReference<>(new ClosedState(this));
        this.eventProcessor = new CircuitBreakerEventProcessor();
    }

    /**
     * Creates a circuitBreaker with default config.
     *
     * @param name the name of the CircuitBreaker
     */
    public CircuitBreakerStateMachine(String name) {
        this(name, CircuitBreakerConfig.ofDefaults());
    }

    /**
     * Creates a circuitBreaker.
     *
     * @param name                 the name of the CircuitBreaker
     * @param circuitBreakerConfig The CircuitBreaker configuration supplier.
     */
    public CircuitBreakerStateMachine(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfig) {
        this(name, circuitBreakerConfig.get());
    }

    /**
     * Requests permission to call this backend.
     *
     * @return true, if the call is allowed.
     */
    @Override
    public boolean isCallPermitted() {
        boolean callPermitted = stateReference.get().isCallPermitted();
        if (!callPermitted) {
            publishCallNotPermittedEvent();
        }
        return callPermitted;
    }

    @Override
    public void onError(long durationInNanos, Throwable throwable) {
        if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("CircuitBreaker '%s' recorded a failure:", name), throwable);
            }
            publishCircuitErrorEvent(name, durationInNanos, throwable);
            stateReference.get().onError(throwable);
        } else {
            publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
        }
    }

    @Override
    public void onSuccess(long durationInNanos) {
        publishSuccessEvent(durationInNanos);
        stateReference.get().onSuccess();
    }



    /**
     * Get the state of this CircuitBreaker.
     *
     * @return the the state of this CircuitBreaker
     */
    @Override
    public State getState() {
        return this.stateReference.get().getState();
    }

    /**
     * Get the name of this CircuitBreaker.
     *
     * @return the the name of this CircuitBreaker
     */
    @Override
    public String getName() {
        return this.name;
    }


    /**
     * Get the config of this CircuitBreaker.
     *
     * @return the config of this CircuitBreaker
     */
    @Override
    public CircuitBreakerConfig getCircuitBreakerConfig() {
        return circuitBreakerConfig;
    }

    @Override
    public Metrics getMetrics() {
        return this.stateReference.get().getMetrics();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        return String.format("CircuitBreaker '%s'", this.name);
    }

    @Override
    public void reset() {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> new ClosedState(this));
        if (previousState.getState() != CLOSED) {
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), CLOSED));
        }
        publishResetEvent();
    }

    private void stateTransition(State newState, Function<CircuitBreakerState, CircuitBreakerState> newStateGenerator) {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
            if (currentState.getState() == newState) {
                return currentState;
            }
            return newStateGenerator.apply(currentState);
        });
        if (previousState.getState() != newState) {
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
        }
    }


    @Override
    public void transitionToDisabledState() {
        stateTransition(DISABLED, currentState -> new DisabledState(this));
    }

    @Override
    public void transitionToForcedOpenState() {
        stateTransition(FORCED_OPEN, currentState -> new ForcedOpenState(this));
    }

    @Override
    public void transitionToClosedState() {
        stateTransition(CLOSED, currentState -> new ClosedState(this, currentState.getMetrics()));
    }

    @Override
    public void transitionToOpenState() {
        stateTransition(OPEN, currentState -> new OpenState(this, currentState.getMetrics()));
    }

    @Override
    public void transitionToHalfOpenState() {
        stateTransition(HALF_OPEN, currentState -> new HalfOpenState(this));
    }
    //......
}
  • CircuitBreakerStateMachine實現了CircuitBreakerStateMachine接口
  • AtomicReference<CircuitBreakerState>用來記錄當前斷路器的狀態
  • 狀態轉換接口內部都調用了stateTransition方法,裏頭主要是更新AtomicReference<CircuitBreakerState>以及發佈事件
  • stateTransition以及reset方法裏頭都調用了StateTransition.transitionBetween來發布事件
  • CircuitBreakerEventProcessor用來處理事件

CircuitBreakerState

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.javagithub

/**
 * Abstract state of the CircuitBreaker state machine.
 */
abstract class CircuitBreakerState{

    CircuitBreakerStateMachine stateMachine;

    CircuitBreakerState(CircuitBreakerStateMachine stateMachine) {
        this.stateMachine = stateMachine;
    }

    abstract boolean isCallPermitted();

    abstract void onError(Throwable throwable);

    abstract void onSuccess();

    abstract CircuitBreaker.State getState();

    abstract CircuitBreakerMetrics getMetrics();

    /**
     * Should the CircuitBreaker in this state publish events
     * @return a boolean signaling if the events should be published
     */
    boolean shouldPublishEvents(CircuitBreakerEvent event){
        return event.getEventType().forcePublish || getState().allowPublish;
    }
}
  • CircuitBreakerState與CircuitBreakerStateMachine兩者相互保存各自的引用
  • CircuitBreakerState的子類有OpenState、HalfOpenState、ForcedOpenState、ClosedState、DisabledState
  • 每一個子類的onError、onSuccess方法本身判斷是否進行狀態轉移,若是要轉移會調用CircuitBreakerStateMachine的transitionTo開頭的方法

StateTransition.transitionBetween

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.javaapp

/**
     * State transitions of the CircuitBreaker state machine.
     */
    enum StateTransition {
        CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
        CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
        CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
        HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
        HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
        HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
        HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
        OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
        OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
        OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
        OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
        FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
        FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
        FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
        FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
        DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
        DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
        DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
        DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);

        private final State fromState;

        private final State toState;

        private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP =
                Arrays
                        .stream(StateTransition.values())
                        .collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));

        private boolean matches(State fromState, State toState) {
            return this.fromState == fromState && this.toState == toState;
        }

        public static StateTransition transitionBetween(State fromState, State toState){
            final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
            if(stateTransition == null) {
                throw new IllegalStateException(
                        String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
            }
            return stateTransition;
        }

        StateTransition(State fromState, State toState) {
            this.fromState = fromState;
            this.toState = toState;
        }

        public State getFromState() {
            return fromState;
        }

        public State getToState() {
            return toState;
        }

        @Override
        public String toString(){
            return String.format("State transition from %s to %s", fromState, toState);
        }
    }
  • StateTransition定義了一系列狀態轉換的路徑
  • StateTransition.transitionBetween方法會對這些狀態轉換進行校驗,不合法的拋出IllegalStateException

CircuitBreakerEventProcessor

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.javaide

private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
        @Override
        public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
            registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) {
            registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) {
            registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) {
            registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) {
            registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) {
            registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer);
            return this;
        }

        @Override
        public void consumeEvent(CircuitBreakerEvent event) {
            super.processEvent(event);
        }
    }
  • CircuitBreakerEventProcessor繼承了EventProcessor,處理CircuitBreakerEvent事件
  • CircuitBreakerEventProcessor也實現了EventConsumer以及EventPublisher接口
  • onSuccess、onError、onStateTransition、onReset、onIgnoredError、onCallNotPermitted裏頭調用了registerConsumer方法
  • consumeEvent裏頭則是調用了processEvent方法

EventProcessor

resilience4j-core-0.13.0-sources.jar!/io/github/resilience4j/core/EventProcessor.javaui

public class EventProcessor<T> implements EventPublisher<T> {

    protected volatile boolean consumerRegistered;
    private volatile EventConsumer<T> onEventConsumer;
    private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();

    public boolean hasConsumers(){
        return consumerRegistered;
    }

    @SuppressWarnings("unchecked")
    public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
        consumerRegistered = true;
        eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
    }

    @SuppressWarnings("unchecked")
    public <E extends T> boolean processEvent(E event) {
        boolean consumed = false;
        if(onEventConsumer != null){
            onEventConsumer.consumeEvent(event);
            consumed = true;
        }
        if(!eventConsumers.isEmpty()){
            EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
            if(eventConsumer != null){
                eventConsumer.consumeEvent(event);
                consumed = true;
            }
        }
        return consumed;
    }

    @Override
    public void onEvent(EventConsumer<T> onEventConsumer) {
        consumerRegistered = true;
        this.onEventConsumer = onEventConsumer;
    }
}
  • registerConsumer方法主要是往eventConsumers設置事件類型的消費者
  • processEvent方法主要是查找相應的事件消費者去處理事件

EventPublisher

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.javathis

/**
     * An EventPublisher can be used to register event consumers.
     */
    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {

        EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);

        EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);

        EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);

        EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);

        EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);

        EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
        }
  • 這個EventPublisher接口繼承了io.github.resilience4j.core.EventPublisher,邏輯上有點混亂
  • 這些on方法分別處理了CircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent事件

CircuitBreakerEvent

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/CircuitBreakerEvent.javadebug

/**
 * An event which is created by a CircuitBreaker.
 */
public interface CircuitBreakerEvent {

    /**
     * Returns the name of the CircuitBreaker which has created the event.
     *
     * @return the name of the CircuitBreaker which has created the event
     */
    String getCircuitBreakerName();

    /**
     * Returns the type of the CircuitBreaker event.
     *
     * @return the type of the CircuitBreaker event
     */
    Type getEventType();

    /**
     * Returns the creation time of CircuitBreaker event.
     *
     * @return the creation time of CircuitBreaker event
     */
    ZonedDateTime getCreationTime();

    /**
     * Event types which are created by a CircuitBreaker.
     */
    enum Type {
        /** A CircuitBreakerEvent which informs that an error has been recorded */
        ERROR(false),
        /** A CircuitBreakerEvent which informs that an error has been ignored */
        IGNORED_ERROR(false),
        /** A CircuitBreakerEvent which informs that a success has been recorded */
        SUCCESS(false),
        /** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
        NOT_PERMITTED(false),
        /** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
        STATE_TRANSITION(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
        RESET(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
        FORCED_OPEN(false),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
        DISABLED(false);

        public final boolean forcePublish;

        Type(boolean forcePublish) {
            this.forcePublish = forcePublish;
        }
    }
}
  • CircuitBreakerEvent接口定義了事件的Type枚舉
  • 規範了getCircuitBreakerName、getEventType、getCreationTime方法

AbstractCircuitBreakerEvent

resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/AbstractCircuitBreakerEvent.javacode

abstract class AbstractCircuitBreakerEvent implements CircuitBreakerEvent {

    private final String circuitBreakerName;
    private final ZonedDateTime creationTime;

    AbstractCircuitBreakerEvent(String circuitBreakerName) {
        this.circuitBreakerName = circuitBreakerName;
        this.creationTime = ZonedDateTime.now();
    }

    @Override
    public String getCircuitBreakerName() {
        return circuitBreakerName;
    }

    @Override
    public ZonedDateTime getCreationTime() {
        return creationTime;
    }
}
  • 定義了circuitBreakerName、creationTime屬性
  • 重寫了getCircuitBreakerName、getCreationTime方法
  • ircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent都是從AbstractCircuitBreakerEvent繼承而來,個別的自定義了本身的屬性,主要是重寫getEventType以及toString方法

小結

  • CircuitBreakerStateMachine裏頭維護了一個AtomicReference<CircuitBreakerState>引用,對應的onError及onSuccess方法都委託給改引用對應的狀態的onError以及onSuccess方法
  • 子類的onError以及onSuccess方法方法則自行判斷是否須要進行狀態切換以及切換到什麼狀態,本身調用CircuitBreakerStateMachine的transitionTo開頭的方法,來改變AtomicReference<CircuitBreakerState>的值(藉助子類的各自實現來化解狀態轉換的複雜邏輯),同時發佈一些事件。

doc

相關文章
相關標籤/搜索