本文主要研究一下resilience4j的CircuitBreakerStateMachinejava
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.javagit
/** * A CircuitBreaker finite state machine. */ public final class CircuitBreakerStateMachine implements CircuitBreaker { private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakerStateMachine.class); private final String name; private final AtomicReference<CircuitBreakerState> stateReference; private final CircuitBreakerConfig circuitBreakerConfig; private final CircuitBreakerEventProcessor eventProcessor; /** * Creates a circuitBreaker. * * @param name the name of the CircuitBreaker * @param circuitBreakerConfig The CircuitBreaker configuration. */ public CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig) { this.name = name; this.circuitBreakerConfig = circuitBreakerConfig; this.stateReference = new AtomicReference<>(new ClosedState(this)); this.eventProcessor = new CircuitBreakerEventProcessor(); } /** * Creates a circuitBreaker with default config. * * @param name the name of the CircuitBreaker */ public CircuitBreakerStateMachine(String name) { this(name, CircuitBreakerConfig.ofDefaults()); } /** * Creates a circuitBreaker. * * @param name the name of the CircuitBreaker * @param circuitBreakerConfig The CircuitBreaker configuration supplier. */ public CircuitBreakerStateMachine(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfig) { this(name, circuitBreakerConfig.get()); } /** * Requests permission to call this backend. * * @return true, if the call is allowed. */ @Override public boolean isCallPermitted() { boolean callPermitted = stateReference.get().isCallPermitted(); if (!callPermitted) { publishCallNotPermittedEvent(); } return callPermitted; } @Override public void onError(long durationInNanos, Throwable throwable) { if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) { if (LOG.isDebugEnabled()) { LOG.debug(String.format("CircuitBreaker '%s' recorded a failure:", name), throwable); } publishCircuitErrorEvent(name, durationInNanos, throwable); stateReference.get().onError(throwable); } else { publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable); } } @Override public void onSuccess(long durationInNanos) { publishSuccessEvent(durationInNanos); stateReference.get().onSuccess(); } /** * Get the state of this CircuitBreaker. * * @return the the state of this CircuitBreaker */ @Override public State getState() { return this.stateReference.get().getState(); } /** * Get the name of this CircuitBreaker. * * @return the the name of this CircuitBreaker */ @Override public String getName() { return this.name; } /** * Get the config of this CircuitBreaker. * * @return the config of this CircuitBreaker */ @Override public CircuitBreakerConfig getCircuitBreakerConfig() { return circuitBreakerConfig; } @Override public Metrics getMetrics() { return this.stateReference.get().getMetrics(); } /** * {@inheritDoc} */ @Override public String toString() { return String.format("CircuitBreaker '%s'", this.name); } @Override public void reset() { CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> new ClosedState(this)); if (previousState.getState() != CLOSED) { publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), CLOSED)); } publishResetEvent(); } private void stateTransition(State newState, Function<CircuitBreakerState, CircuitBreakerState> newStateGenerator) { CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> { if (currentState.getState() == newState) { return currentState; } return newStateGenerator.apply(currentState); }); if (previousState.getState() != newState) { publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState)); } } @Override public void transitionToDisabledState() { stateTransition(DISABLED, currentState -> new DisabledState(this)); } @Override public void transitionToForcedOpenState() { stateTransition(FORCED_OPEN, currentState -> new ForcedOpenState(this)); } @Override public void transitionToClosedState() { stateTransition(CLOSED, currentState -> new ClosedState(this, currentState.getMetrics())); } @Override public void transitionToOpenState() { stateTransition(OPEN, currentState -> new OpenState(this, currentState.getMetrics())); } @Override public void transitionToHalfOpenState() { stateTransition(HALF_OPEN, currentState -> new HalfOpenState(this)); } //...... }
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.javagithub
/** * Abstract state of the CircuitBreaker state machine. */ abstract class CircuitBreakerState{ CircuitBreakerStateMachine stateMachine; CircuitBreakerState(CircuitBreakerStateMachine stateMachine) { this.stateMachine = stateMachine; } abstract boolean isCallPermitted(); abstract void onError(Throwable throwable); abstract void onSuccess(); abstract CircuitBreaker.State getState(); abstract CircuitBreakerMetrics getMetrics(); /** * Should the CircuitBreaker in this state publish events * @return a boolean signaling if the events should be published */ boolean shouldPublishEvents(CircuitBreakerEvent event){ return event.getEventType().forcePublish || getState().allowPublish; } }
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.javaapp
/** * State transitions of the CircuitBreaker state machine. */ enum StateTransition { CLOSED_TO_OPEN(State.CLOSED, State.OPEN), CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED), CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN), HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED), HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN), HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED), HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN), OPEN_TO_CLOSED(State.OPEN, State.CLOSED), OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN), OPEN_TO_DISABLED(State.OPEN, State.DISABLED), OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN), FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED), FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN), FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED), FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN), DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED), DISABLED_TO_OPEN(State.DISABLED, State.OPEN), DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN), DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN); private final State fromState; private final State toState; private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP = Arrays .stream(StateTransition.values()) .collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity())); private boolean matches(State fromState, State toState) { return this.fromState == fromState && this.toState == toState; } public static StateTransition transitionBetween(State fromState, State toState){ final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState)); if(stateTransition == null) { throw new IllegalStateException( String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString())); } return stateTransition; } StateTransition(State fromState, State toState) { this.fromState = fromState; this.toState = toState; } public State getFromState() { return fromState; } public State getToState() { return toState; } @Override public String toString(){ return String.format("State transition from %s to %s", fromState, toState); } }
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.javaide
private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher { @Override public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) { registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer); return this; } @Override public EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) { registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer); return this; } @Override public EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) { registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer); return this; } @Override public EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) { registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer); return this; } @Override public EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) { registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer); return this; } @Override public EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) { registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer); return this; } @Override public void consumeEvent(CircuitBreakerEvent event) { super.processEvent(event); } }
resilience4j-core-0.13.0-sources.jar!/io/github/resilience4j/core/EventProcessor.javaui
public class EventProcessor<T> implements EventPublisher<T> { protected volatile boolean consumerRegistered; private volatile EventConsumer<T> onEventConsumer; private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>(); public boolean hasConsumers(){ return consumerRegistered; } @SuppressWarnings("unchecked") public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){ consumerRegistered = true; eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer); } @SuppressWarnings("unchecked") public <E extends T> boolean processEvent(E event) { boolean consumed = false; if(onEventConsumer != null){ onEventConsumer.consumeEvent(event); consumed = true; } if(!eventConsumers.isEmpty()){ EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass()); if(eventConsumer != null){ eventConsumer.consumeEvent(event); consumed = true; } } return consumed; } @Override public void onEvent(EventConsumer<T> onEventConsumer) { consumerRegistered = true; this.onEventConsumer = onEventConsumer; } }
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.javathis
/** * An EventPublisher can be used to register event consumers. */ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> { EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer); EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer); EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer); EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer); EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer); EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer); }
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/CircuitBreakerEvent.javadebug
/** * An event which is created by a CircuitBreaker. */ public interface CircuitBreakerEvent { /** * Returns the name of the CircuitBreaker which has created the event. * * @return the name of the CircuitBreaker which has created the event */ String getCircuitBreakerName(); /** * Returns the type of the CircuitBreaker event. * * @return the type of the CircuitBreaker event */ Type getEventType(); /** * Returns the creation time of CircuitBreaker event. * * @return the creation time of CircuitBreaker event */ ZonedDateTime getCreationTime(); /** * Event types which are created by a CircuitBreaker. */ enum Type { /** A CircuitBreakerEvent which informs that an error has been recorded */ ERROR(false), /** A CircuitBreakerEvent which informs that an error has been ignored */ IGNORED_ERROR(false), /** A CircuitBreakerEvent which informs that a success has been recorded */ SUCCESS(false), /** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */ NOT_PERMITTED(false), /** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */ STATE_TRANSITION(true), /** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */ RESET(true), /** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */ FORCED_OPEN(false), /** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */ DISABLED(false); public final boolean forcePublish; Type(boolean forcePublish) { this.forcePublish = forcePublish; } } }
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/AbstractCircuitBreakerEvent.javacode
abstract class AbstractCircuitBreakerEvent implements CircuitBreakerEvent { private final String circuitBreakerName; private final ZonedDateTime creationTime; AbstractCircuitBreakerEvent(String circuitBreakerName) { this.circuitBreakerName = circuitBreakerName; this.creationTime = ZonedDateTime.now(); } @Override public String getCircuitBreakerName() { return circuitBreakerName; } @Override public ZonedDateTime getCreationTime() { return creationTime; } }
藉助子類的各自實現來化解狀態轉換的複雜邏輯
),同時發佈一些事件。