重試利器之Guava Retrying

重試利器之Guava Retrying


目錄java


重試的使用場景

在不少業務場景中,爲了排除系統中的各類不穩定因素,以及邏輯上的錯誤,並最大機率保證得到預期的結果,重試機制都是必不可少的。git

尤爲是調用遠程服務,在高併發場景下,極可能由於服務器響應延遲或者網絡緣由,形成咱們得不到想要的結果,或者根本得不到響應。這個時候,一個優雅的重試調用機制,可讓咱們更大機率保證獲得預期的響應。github

一般狀況下,咱們會經過定時任務進行重試。例如某次操做失敗,則記錄下來,當定時任務再次啓動,則將數據放到定時任務的方法中,從新跑一遍。最終直至獲得想要的結果爲止。web

不管是基於定時任務的重試機制,仍是咱們本身寫的簡單的重試器,缺點都是重試的機制太單一,並且實現起來不優雅。spring

如何優雅地設計重試實現

一個完備的重試實現,要很好地解決以下問題:apache

  1. 什麼條件下重試
  2. 什麼條件下中止
  3. 如何中止重試
  4. 中止重試等待多久
  5. 如何等待
  6. 請求時間限制
  7. 如何結束
  8. 如何監聽整個重試過程

而且,爲了更好地封裝性,重試的實現通常分爲兩步:springboot

  1. 使用工廠模式構造重試器
  2. 執行重試方法並獲得結果

一個完整的重試流程能夠簡單示意爲:bash

guava-retrying基礎用法

guava-retrying是基於谷歌的核心類庫guava的重試機制實現,能夠說是一個重試利器。服務器

下面就快速看一下它的用法。網絡

1.Maven配置

<!-- https://mvnrepository.com/artifact/com.github.rholder/guava-retrying -->
<dependency>
    <groupId>com.github.rholder</groupId>
    <artifactId>guava-retrying</artifactId>
    <version>2.0.0</version>
</dependency>
複製代碼

須要注意的是,此版本依賴的是27.0.1版本的guava。若是你項目中的guava低幾個版本沒問題,可是低太多就不兼容了。這個時候你須要升級你項目的guava版本,或者直接去掉你本身的guava依賴,使用guava-retrying傳遞過來的guava依賴。

2.實現Callable

Callable<Boolean> callable = new Callable<Boolean>() {
    public Boolean call() throws Exception {
        return true; // do something useful here
    }
};
複製代碼

Callable的call方法中是你本身實際的業務調用。

  1. 經過RetryerBuilder構造Retryer
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
        .retryIfResult(Predicates.<Boolean>isNull())
        .retryIfExceptionOfType(IOException.class)
        .retryIfRuntimeException()
        .withStopStrategy(StopStrategies.stopAfterAttempt(3))
        .build();
複製代碼
  1. 使用重試器執行你的業務
retryer.call(callable);
複製代碼

下面是完整的參考實現。

public Boolean test() throws Exception {
    //定義重試機制
    Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
            //retryIf 重試條件
            .retryIfException()
            .retryIfRuntimeException()
            .retryIfExceptionOfType(Exception.class)
            .retryIfException(Predicates.equalTo(new Exception()))
            .retryIfResult(Predicates.equalTo(false))

            //等待策略:每次請求間隔1s
            .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))

            //中止策略 : 嘗試請求6次
            .withStopStrategy(StopStrategies.stopAfterAttempt(6))

            //時間限制 : 某次請求不得超過2s , 相似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
            .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))

            .build();

    //定義請求實現
    Callable<Boolean> callable = new Callable<Boolean>() {
        int times = 1;

        @Override
        public Boolean call() throws Exception {
            log.info("call times={}", times);
            times++;

            if (times == 2) {
                throw new NullPointerException();
            } else if (times == 3) {
                throw new Exception();
            } else if (times == 4) {
                throw new RuntimeException();
            } else if (times == 5) {
                return false;
            } else {
                return true;
            }

        }
    };
    //利用重試器調用請求
   return  retryer.call(callable);
}
複製代碼

guava-retrying實現原理

guava-retrying的核心是Attempt類、Retryer類以及一些Strategy(策略)相關的類。

  1. Attempt

Attempt既是一次重試請求(call),也是請求的結果,並記錄了當前請求的次數、是否包含異常和請求的返回值。

/** * An attempt of a call, which resulted either in a result returned by the call, * or in a Throwable thrown by the call. * * @param <V> The type returned by the wrapped callable. * @author JB */
public interface Attempt<V> 複製代碼
  1. Retryer

Retryer經過RetryerBuilder這個工廠類進行構造。RetryerBuilder負責將定義的重試策略賦值到Retryer對象中。

在Retryer執行call方法的時候,會將這些重試策略一一使用。

下面就看一下Retryer的call方法的具體實現。

/** * Executes the given callable. If the rejection predicate * accepts the attempt, the stop strategy is used to decide if a new attempt * must be made. Then the wait strategy is used to decide how much time to sleep * and a new attempt is made. * * @param callable the callable task to be executed * @return the computed result of the given callable * @throws ExecutionException if the given callable throws an exception, and the * rejection predicate considers the attempt as successful. The original exception * is wrapped into an ExecutionException. * @throws RetryException if all the attempts failed before the stop strategy decided * to abort, or the thread was interrupted. Note that if the thread is interrupted, * this exception is thrown and the thread's interrupt status is set. */
   public V call(Callable<V> callable) throws ExecutionException, RetryException {
       long startTime = System.nanoTime();
       //說明: 根據attemptNumber進行循環——也就是重試多少次
       for (int attemptNumber = 1; ; attemptNumber++) {
           //說明:進入方法不等待,當即執行一次
           Attempt<V> attempt;
           try {
                //說明:執行callable中的具體業務
                //attemptTimeLimiter限制了每次嘗試等待的時常
               V result = attemptTimeLimiter.call(callable);
               //利用調用結果構造新的attempt
               attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
           } catch (Throwable t) {
               attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
           }

           //說明:遍歷自定義的監聽器
           for (RetryListener listener : listeners) {
               listener.onRetry(attempt);
           }

           //說明:判斷是否知足重試條件,來決定是否繼續等待並進行重試
           if (!rejectionPredicate.apply(attempt)) {
               return attempt.get();
           }

           //說明:此時知足中止策略,由於尚未獲得想要的結果,所以拋出異常
           if (stopStrategy.shouldStop(attempt)) {
               throw new RetryException(attemptNumber, attempt);
           } else {
                //說明:執行默認的中止策略——線程休眠
               long sleepTime = waitStrategy.computeSleepTime(attempt);
               try {
                   //說明:也能夠執行定義的中止策略
                   blockStrategy.block(sleepTime);
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   throw new RetryException(attemptNumber, attempt);
               }
           }
       }
   }

複製代碼

Retryer執行過程以下。

guava-retrying高級用法

基於guava-retrying的實現原理,咱們能夠根據實際業務來肯定本身的重試策略。

下面以數據同步這種常規系統業務爲例,自定義重試策略。

以下實現基於Spring Boot 2.1.2.RELEASE版本。

並使用Lombok簡化Bean。

<dependency>
	 <groupId>org.projectlombok</groupId>
	 <artifactId>lombok</artifactId>
	 <optional>true</optional>
</dependency>
複製代碼

業務描述

當商品建立之後,須要另外設置商品的價格。因爲兩個操做是有兩我的進行的,所以會出現以下問題,即商品沒有建立,可是價格數據卻已經建好了。遇到這種狀況,價格數據須要等待商品正常建立之後,繼續完成同步。

咱們經過一個http請求進行商品的建立,同時經過一個定時器來修改商品的價格。

當商品不存在,或者商品的數量小於1的時候,商品的價格不能設置。須要等商品成功建立且數量大於0的時候,才能將商品的價格設置成功。

實現過程

  1. 自定義重試阻塞策略

默認的阻塞策略是線程休眠,這裏使用自旋鎖實現,不阻塞線程。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;

import com.github.rholder.retry.BlockStrategy;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.time.LocalDateTime;

/** * 自旋鎖的實現, 不響應線程中斷 */
@Slf4j
@NoArgsConstructor
public class SpinBlockStrategy implements BlockStrategy {

    @Override
    public void block(long sleepTime) throws InterruptedException {

        LocalDateTime startTime = LocalDateTime.now();

        long start = System.currentTimeMillis();
        long end = start;
        log.info("[SpinBlockStrategy]...begin wait.");

        while (end - start <= sleepTime) {
            end = System.currentTimeMillis();
        }

        //使用Java8新增的Duration計算時間間隔
        Duration duration = Duration.between(startTime, LocalDateTime.now());

        log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());

    }
}
複製代碼
  1. 自定義重試監聽器

RetryListener能夠監控屢次重試過程,並可使用attempt作一些額外的事情。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RetryLogListener implements RetryListener {

    @Override
    public <V> void onRetry(Attempt<V> attempt) {

        // 第幾回重試,(注意:第一次重試實際上是第一次調用)
        log.info("retry time : [{}]", attempt.getAttemptNumber());

        // 距離第一次重試的延遲
        log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt());

        // 重試結果: 是異常終止, 仍是正常返回
        log.info("hasException={}", attempt.hasException());
        log.info("hasResult={}", attempt.hasResult());

        // 是什麼緣由致使異常
        if (attempt.hasException()) {
            log.info("causeBy={}" , attempt.getExceptionCause().toString());
        } else {
            // 正常返回時的結果
            log.info("result={}" , attempt.getResult());
        }

        log.info("log listen over.");

    }
}
複製代碼
  1. 自定義Exception

有些異常須要重試,有些不須要。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;

/** * 當拋出這個異常的時候,表示須要重試 */
public class NeedRetryException extends Exception {

    public NeedRetryException(String message) {
        super("NeedRetryException can retry."+message);
    }

}
複製代碼
  1. 實現具體重試業務與Callable接口

使用call方法調用本身的業務。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.math.BigDecimal;

/** * 商品model */
@Data
@AllArgsConstructor
public class Product {

    private Long id;

    private String name;

    private Integer count;

    private BigDecimal price;

}
複製代碼
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;

import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import org.springframework.stereotype.Repository;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/** * 商品DAO */
@Repository
public class ProductRepository {

    private static ConcurrentHashMap<Long,Product> products=new  ConcurrentHashMap();

    private static AtomicLong ids=new AtomicLong(0);

    public List<Product> findAll(){
        return new ArrayList<>(products.values());
    }

    public Product findById(Long id){
        return products.get(id);
    }

    public Product updatePrice(Long id, BigDecimal price){
        Product p=products.get(id);
        if (null==p){
            return p;
        }
        p.setPrice(price);
        return p;
    }

    public Product addProduct(Product product){
        Long id=ids.addAndGet(1);
        product.setId(id);
        products.put(id,product);
        return product;
    }

}
複製代碼
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

/** * 業務方法實現 */
@Component
@Slf4j
public class ProductInformationHander implements Callable<Boolean> {

    @Autowired
    private ProductRepository pRepo;

    private static Map<Long, BigDecimal> prices = new HashMap<>();

    static {
        prices.put(1L, new BigDecimal(100));
        prices.put(2L, new BigDecimal(200));
        prices.put(3L, new BigDecimal(300));
        prices.put(4L, new BigDecimal(400));
        prices.put(8L, new BigDecimal(800));
        prices.put(9L, new BigDecimal(900));
    }

    @Override
    public Boolean call() throws Exception {

        log.info("sync price begin,prices size={}", prices.size());

        for (Long id : prices.keySet()) {
            Product product = pRepo.findById(id);

            if (null == product) {
                throw new NeedRetryException("can not find product by id=" + id);
            }
            if (null == product.getCount() || product.getCount() < 1) {
                throw new NeedRetryException("product count is less than 1, id=" + id);
            }

            Product updatedP = pRepo.updatePrice(id, prices.get(id));
            if (null == updatedP) {
                return false;
            }

            prices.remove(id);
        }

        log.info("sync price over,prices size={}", prices.size());

        return true;
    }

}
複製代碼
  1. 構造重試器Retryer

將上面的實現做爲參數,構造Retryer。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;

import com.github.rholder.retry.*;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/** * 構造重試器 */
@Component
public class ProductRetryerBuilder {

    public Retryer build() {
        //定義重試機制
        Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()

                //retryIf 重試條件
                //.retryIfException()
                //.retryIfRuntimeException()
                //.retryIfExceptionOfType(Exception.class)
                //.retryIfException(Predicates.equalTo(new Exception()))
                //.retryIfResult(Predicates.equalTo(false))
                .retryIfExceptionOfType(NeedRetryException.class)

                //等待策略:每次請求間隔1s
                .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))

								//中止策略 : 嘗試請求3次
                .withStopStrategy(StopStrategies.stopAfterAttempt(3))

                //時間限制 : 某次請求不得超過2s , 相似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
                .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))

                //默認的阻塞策略:線程睡眠
                //.withBlockStrategy(BlockStrategies.threadSleepStrategy())
                //自定義阻塞策略:自旋鎖
                .withBlockStrategy(new SpinBlockStrategy())

                //自定義重試監聽器
                .withRetryListener(new RetryLogListener())

                .build();

        return retryer;

    }
}
複製代碼
  1. 與定時任務結合執行Retryer

定時任務只須要跑一次,可是實際上實現了全部的重試策略。這樣大大簡化了定時器的設計。

首先使用@EnableScheduling聲明項目支持定時器註解。

@SpringBootApplication
@EnableScheduling
public class DemoRetryerApplication {
	public static void main(String[] args) {
		SpringApplication.run(DemoRetryerApplication.class, args);
	}
}
複製代碼
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;

import com.github.rholder.retry.Retryer;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/** * 商品信息定時器 */
@Component
public class ProductScheduledTasks {

    @Autowired
    private ProductRetryerBuilder builder;

    @Autowired
    private ProductInformationHander hander;

    /** * 同步商品價格定時任務 * @Scheduled(fixedDelay = 30000) :上一次執行完畢時間點以後30秒再執行 */
    @Scheduled(fixedDelay = 30*1000)
    public void syncPrice() throws Exception{
        Retryer retryer=builder.build();
        retryer.call(hander);
    }

}
複製代碼

執行結果:因爲並無商品,所以重試之後,拋出異常。

2019-二月-28 14:37:52.667 INFO  [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over.
2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task.
com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
	at com.github.rholder.retry.Retryer.call(Retryer.java:174)
複製代碼

你也能夠增長一些商品數據,看一下重試成功的效果。

完整示例代碼在這裏

使用中遇到的問題

Guava版本衝突

因爲項目中依賴的guava版本太低,啓動項目時出現了以下異常。

java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
 at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41)
 at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207)
 at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346)
 at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87)
 at com.bzn.web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84)
 at com.bzn.web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33)
 at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939)
 at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434)
 at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
 at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559)
 at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
複製代碼

所以,要排除項目中低版本的guava依賴。

<exclusion>
 <groupId>com.google.guava</groupId>
 <artifactId>guava</artifactId>
</exclusion>
複製代碼

同時,因爲Guava在新版本中移除了sameThreadExecutor方法,但目前項目中的ZK須要此方法,所以須要手動設置合適的guava版本。

果真,在19.0版本中MoreExecutors的此方法依然存在,只是標註爲過時了。

@Deprecated
  @GwtIncompatible("TODO")
  public static ListeningExecutorService sameThreadExecutor() {
    return new DirectExecutorService();
  }
複製代碼

聲明依賴的guava版本改成19.0便可。

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
 <groupId>com.google.guava</groupId>
 <artifactId>guava</artifactId>
 <version>19.0</version>
</dependency>
複製代碼

動態調節重試策略

在實際使用過程當中,有時常常須要調整重試的次數、等待的時間等重試策略,所以,將重試策略的配置參數化保存,能夠動態調節。

例如在秒殺、雙十一購物節等時期增長等待的時間與重試次數,以保證錯峯請求。在平時,能夠適當減小等待時間和重試次數。

對於系統關鍵性業務,若是屢次重試步成功,能夠經過RetryListener進行監控與報警。

關於『動態調節重試策略 』下面提供一個參考實現:

import com.github.rholder.retry.Attempt; 
import com.github.rholder.retry.WaitStrategy; 
 
/** * 自定義等待策略:根據重試次數動態調節等待時間,第一次請求間隔1s,第二次間隔10s,第三次及之後都是20s。 * * * 在建立Retryer的時候經過withWaitStrategy將該等待策略生效便可。 * * RetryerBuilder.<Boolean>newBuilder() * .withWaitStrategy(new AlipayWaitStrategy()) * * 相似的效果也能夠經過自定義 BlockStrategy 來實現,你能夠寫一下試試。 * */ 
public class AlipayWaitStrategy implements WaitStrategy { 
 
    @Override 
    public long computeSleepTime(Attempt failedAttempt) { 
        long number = failedAttempt.getAttemptNumber(); 
        if (number==1){ 
            return 1*1000; 
        } 
        if (number==2){ 
            return 10*1000; 
        } 
        return 20*1000; 
    } 
 
}
複製代碼

Wechat-westcall
相關文章
相關標籤/搜索