聊聊resilience4j的Retry

本文主要研究一下resilience4j的Retryjava

Retry

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/Retry.javagit

/**
 * A Retry instance is thread-safe can be used to decorate multiple requests.
 * A Retry.
 */
public interface Retry {

    /**
     * Returns the ID of this Retry.
     *
     * @return the ID of this Retry
     */
    String getName();

    /**
     * Creates a retry Context.
     *
     * @return the retry Context
     */
    Retry.Context context();

    /**
     * Returns the RetryConfig of this Retry.
     *
     * @return the RetryConfig of this Retry
     */
    RetryConfig getRetryConfig();

    /**
     * Returns an EventPublisher can be used to register event consumers.
     *
     * @return an EventPublisher
     */
    EventPublisher getEventPublisher();

    /**
     * Creates a Retry with a custom Retry configuration.
     *
     * @param name the ID of the Retry
     * @param retryConfig a custom Retry configuration
     *
     * @return a Retry with a custom Retry configuration.
     */
    static Retry of(String name, RetryConfig retryConfig){
        return new RetryImpl(name, retryConfig);
    }

    /**
     * Creates a Retry with a custom Retry configuration.
     *
     * @param name the ID of the Retry
     * @param retryConfigSupplier a supplier of a custom Retry configuration
     *
     * @return a Retry with a custom Retry configuration.
     */
    static Retry of(String name, Supplier<RetryConfig> retryConfigSupplier){
        return new RetryImpl(name, retryConfigSupplier.get());
    }

    /**
     * Creates a retryable supplier.
     *
     * @param retry the retry context
     * @param supplier the original function
     * @param <T> the type of results supplied by this supplier
     *
     * @return a retryable function
     */
    static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier){
        return () -> {
            Retry.Context context = retry.context();
            do try {
                T result = supplier.get();
                context.onSuccess();
                return result;
            } catch (RuntimeException runtimeException) {
                context.onRuntimeError(runtimeException);
            } while (true);
        };
    }

    /**
     * Creates a retryable callable.
     *
     * @param retry the retry context
     * @param supplier the original function
     * @param <T> the type of results supplied by this supplier
     *
     * @return a retryable function
     */
    static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier){
        return () -> {
            Retry.Context context = retry.context();
            do try {
                T result = supplier.call();
                context.onSuccess();
                return result;
            } catch (RuntimeException runtimeException) {
                context.onRuntimeError(runtimeException);
            } while (true);
        };
    }

    /**
     * Creates a retryable runnable.
     *
     * @param retry the retry context
     * @param runnable the original runnable
     *
     * @return a retryable runnable
     */
    static Runnable decorateRunnable(Retry retry, Runnable runnable){
        return () -> {
            Retry.Context context = retry.context();
            do try {
                runnable.run();
                context.onSuccess();
                break;
            } catch (RuntimeException runtimeException) {
                context.onRuntimeError(runtimeException);
            } while (true);
        };
    }
    //......
}
  • 這個類定義了一些工廠方法,最後new的是RetryImpl
  • 還定義了decorate開頭的方法,包裝retry的邏輯
  • retry邏輯是包裝在一個循環裏頭,先執行業務代碼,若是成功調用context.onSuccess(),跳出循環,若是失敗捕獲RuntimeException,而後調用context.onRuntimeError

RetryConfig

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/RetryConfig.javagithub

public class RetryConfig {

    public static final int DEFAULT_MAX_ATTEMPTS = 3;
    public static final long DEFAULT_WAIT_DURATION = 500;
    public static final IntervalFunction DEFAULT_INTERVAL_FUNCTION = (numOfAttempts) -> DEFAULT_WAIT_DURATION;
    public static final Predicate<Throwable> DEFAULT_RECORD_FAILURE_PREDICATE = (throwable) -> true;

    private int maxAttempts = DEFAULT_MAX_ATTEMPTS;

    private IntervalFunction intervalFunction = DEFAULT_INTERVAL_FUNCTION;
    // The default exception predicate retries all exceptions.
    private Predicate<Throwable> exceptionPredicate = DEFAULT_RECORD_FAILURE_PREDICATE;

    //......
}
  • 該配置主要有3個參數,一個是maxAttempts,一個是exceptionPredicate,一個是intervalFunction

Retry.Context

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/Retry.javaapp

interface Context {

        /**
         *  Records a successful call.
         */
        void onSuccess();

        /**
         * Handles a checked exception
         *
         * @param exception the exception to handle
         * @throws Throwable the exception
         */
        void onError(Exception exception) throws Throwable;

        /**
         * Handles a runtime exception
         *
         * @param runtimeException the exception to handle
         */
        void onRuntimeError(RuntimeException runtimeException);
    }
  • 這個接口定義了onSuccess、onError、onRuntimeError

RetryImpl.ContextImpl

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/internal/RetryImpl.javathis

public final class ContextImpl implements Retry.Context {

        private final AtomicInteger numOfAttempts = new AtomicInteger(0);
        private final AtomicReference<Exception> lastException = new AtomicReference<>();
        private final AtomicReference<RuntimeException> lastRuntimeException = new AtomicReference<>();

        private ContextImpl() {
        }

        public void onSuccess() {
            int currentNumOfAttempts = numOfAttempts.get();
            if(currentNumOfAttempts > 0){
                succeededAfterRetryCounter.increment();
                Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
                publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
            }else{
                succeededWithoutRetryCounter.increment();
            }
        }

        public void onError(Exception exception) throws Throwable{
            if(exceptionPredicate.test(exception)){
                lastException.set(exception);
                throwOrSleepAfterException();
            }else{
                failedWithoutRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), exception));
                throw exception;
            }
        }

        public void onRuntimeError(RuntimeException runtimeException){
            if(exceptionPredicate.test(runtimeException)){
                lastRuntimeException.set(runtimeException);
                throwOrSleepAfterRuntimeException();
            }else{
                failedWithoutRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), runtimeException));
                throw runtimeException;
            }
        }

        private void throwOrSleepAfterException() throws Exception {
            int currentNumOfAttempts = numOfAttempts.incrementAndGet();
            Exception throwable = lastException.get();
            if(currentNumOfAttempts >= maxAttempts){
                failedAfterRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnErrorEvent(getName(), currentNumOfAttempts, throwable));
                throw throwable;
            }else{
                waitIntervalAfterFailure(currentNumOfAttempts, throwable);
            }
        }

        private void throwOrSleepAfterRuntimeException(){
            int currentNumOfAttempts = numOfAttempts.incrementAndGet();
            RuntimeException throwable = lastRuntimeException.get();
            if(currentNumOfAttempts >= maxAttempts){
                failedAfterRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnErrorEvent(getName(), currentNumOfAttempts, throwable));
                throw throwable;
            }else{
                waitIntervalAfterFailure(currentNumOfAttempts, throwable);
            }
        }

        private void waitIntervalAfterFailure(int currentNumOfAttempts, Throwable throwable) {
            // wait interval until the next attempt should start
            long interval = intervalFunction.apply(numOfAttempts.get());
            publishRetryEvent(()-> new RetryOnRetryEvent(getName(), currentNumOfAttempts, throwable, interval));
            Try.run(() -> sleepFunction.accept(interval))
                    .getOrElseThrow(ex -> lastRuntimeException.get());
        }

    }
  • throwOrSleepAfterRuntimeException方法會根據重試次數判斷,若是超出則拋lastRuntimeException,若是不超出則調用waitIntervalAfterFailure
  • waitIntervalAfterFailure則經過sleepFunction來進行延時

小結

  • resilience4j的Retry沿用了該組件的一向風格,經過decorate方法來織入重試的邏輯
  • 重試的邏輯就是一個while true循環,先執行業務方法,若是成功則調用Retry.Context的onSuccess方法,而後跳出循環,若是失敗的話,只捕獲RuntimeException,而後調用Retry.Context的onRuntimeError
  • onRuntimeError會先判斷該異常是否須要重試,若是不須要則直接拋出原有異常,須要重試的話,則numOfAttempts.incrementAndGet(),若是超出限制則拋出異常,沒有超出限制則根據配置的重試間隔進行sleep,而後onRuntimeError返回繼續下一循環重試。

doc

相關文章
相關標籤/搜索