重試的使用場景 在不少業務場景中,爲了排除系統中的各類不穩定因素,以及邏輯上的錯誤,並最大機率保證得到預期的結果,重試機制都是必不可少的。 尤爲是調用遠程服務,在高併發場景下,極可能由於服務器響應延遲或者網絡緣由,形成咱們得不到想要的結果,或者根本得不到響應。這個時候,一個優雅的重試調用機制,可讓咱們更大機率保證獲得預期的響應。 sequenceDiagram Client->>Server:{"msg":"hello,server"} Note right of Server: busying ...... Client->>Server:{"msg":"hello,server"}java
Server-->>Client:{"Exception":"500"} Note right of Server: busying ...... loop ServerBlock Server -->> Server: Too busy to deal with so many requests... end Client->>Server:{"msg":"hello,server"} activate Server Server-->>Client:{"msg":"hello,client"} deactivate Server
一般狀況下,咱們會經過<strong>定時任務</strong>進行重試。例如某次操做失敗,則記錄下來,當定時任務再次啓動,則將數據放到定時任務的方法中,從新跑一遍。最終直至獲得想要的結果爲止。 不管是基於定時任務的重試機制,仍是咱們本身寫的簡單的重試器,缺點都是重試的機制太單一,並且實現起來不優雅。 如何優雅地設計重試實現 一個完備的重試實現,要很好地解決以下問題:git
什麼條件下重試 什麼條件下中止 如何中止重試 中止重試等待多久 如何等待 請求時間限制 如何結束 如何監聽整個重試過程github
而且,爲了更好地封裝性,重試的實現通常分爲兩步:web
使用工廠模式構造重試器 執行重試方法並獲得結果spring
一個完整的重試流程能夠簡單示意爲: graph LR A((Start)) -->|build| B(Retryer) B --> C{need call?} C -->|continue| D[call] D --> Z[call count++] Z --> C C -->|finished| E[result] E --> F((success)) E --> G((failed ))apache
guava-retrying基礎用法 guava-retrying是基於谷歌的核心類庫guava的重試機制實現,能夠說是一個重試利器。 下面就快速看一下它的用法。 1.Maven配置 <!-- https://mvnrepository.com/artifact/com.github.rholder/guava-retrying --> <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>2.0.0</version> </dependency>springboot
須要注意的是,此版本依賴的是27.0.1版本的guava。若是你項目中的guava低幾個版本沒問題,可是低太多就不兼容了。這個時候你須要升級你項目的guava版本,或者直接去掉你本身的guava依賴,使用guava-retrying傳遞過來的guava依賴。 2.實現Callable Callable<Boolean> callable = new Callable<Boolean>() { public Boolean call() throws Exception { return true; // do something useful here } };服務器
Callable的call方法中是你本身實際的業務調用。網絡
經過RetryerBuilder構造Retryer併發
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder() .retryIfResult(Predicates.<Boolean>isNull()) .retryIfExceptionOfType(IOException.class) .retryIfRuntimeException() .withStopStrategy(StopStrategies.stopAfterAttempt(3)) .build();
使用重試器執行你的業務
retryer.call(callable);
下面是完整的參考實現。 public Boolean test() throws Exception { //定義重試機制 Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder() //retryIf 重試條件 .retryIfException() .retryIfRuntimeException() .retryIfExceptionOfType(Exception.class) .retryIfException(Predicates.equalTo(new Exception())) .retryIfResult(Predicates.equalTo(false))
//等待策略:每次請求間隔1s .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) //中止策略 : 嘗試請求6次 .withStopStrategy(StopStrategies.stopAfterAttempt(6)) //時間限制 : 某次請求不得超過2s , 相似: TimeLimiter timeLimiter = new SimpleTimeLimiter(); .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS)) .build(); //定義請求實現 Callable<Boolean> callable = new Callable<Boolean>() { int times = 1; @Override public Boolean call() throws Exception { log.info("call times={}", times); times++; if (times == 2) { throw new NullPointerException(); } else if (times == 3) { throw new Exception(); } else if (times == 4) { throw new RuntimeException(); } else if (times == 5) { return false; } else { return true; } } }; //利用重試器調用請求
return retryer.call(callable); }
guava-retrying實現原理 guava-retrying的核心是Attempt類、Retryer類以及一些Strategy(策略)相關的類。
Attempt
Attempt既是一次重試請求(call),也是請求的結果,並記錄了當前請求的次數、是否包含異常和請求的返回值。 /**
An attempt of a call, which resulted either in a result returned by the call,
or in a Throwable thrown by the call.
@param <V> The type returned by the wrapped callable.
@author JB */ public interface Attempt<V>
Retryer
Retryer經過RetryerBuilder這個工廠類進行構造。RetryerBuilder負責將定義的重試策略賦值到Retryer對象中。 在Retryer執行call方法的時候,會將這些重試策略一一使用。 下面就看一下Retryer的call方法的具體實現。 /** * Executes the given callable. If the rejection predicate * accepts the attempt, the stop strategy is used to decide if a new attempt * must be made. Then the wait strategy is used to decide how much time to sleep * and a new attempt is made. * * @param callable the callable task to be executed * @return the computed result of the given callable * @throws ExecutionException if the given callable throws an exception, and the * rejection predicate considers the attempt as successful. The original exception * is wrapped into an ExecutionException. * @throws RetryException if all the attempts failed before the stop strategy decided * to abort, or the thread was interrupted. Note that if the thread is interrupted, * this exception is thrown and the thread's interrupt status is set. */ public V call(Callable<V> callable) throws ExecutionException, RetryException { long startTime = System.nanoTime(); //說明: 根據attemptNumber進行循環——也就是重試多少次 for (int attemptNumber = 1; ; attemptNumber++) { //說明:進入方法不等待,當即執行一次 Attempt<V> attempt; try { //說明:執行callable中的具體業務 //attemptTimeLimiter限制了每次嘗試等待的時常 V result = attemptTimeLimiter.call(callable); //利用調用結果構造新的attempt attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } catch (Throwable t) { attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); }
//說明:遍歷自定義的監聽器 for (RetryListener listener : listeners) { listener.onRetry(attempt); } //說明:判斷是否知足重試條件,來決定是否繼續等待並進行重試 if (!rejectionPredicate.apply(attempt)) { return attempt.get(); } //說明:此時知足中止策略,由於尚未獲得想要的結果,所以拋出異常 if (stopStrategy.shouldStop(attempt)) { throw new RetryException(attemptNumber, attempt); } else { //說明:執行默認的中止策略——線程休眠 long sleepTime = waitStrategy.computeSleepTime(attempt); try { //說明:也能夠執行定義的中止策略 blockStrategy.block(sleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RetryException(attemptNumber, attempt); } } }
}
Retryer執行過程以下。 graph TB
sq[Retryer] --> ci((call)) subgraph Retrying rb>RetryerBuilder]-- build retryer<br/>with strategies --> ro di{Retryer:<br/>using callable whith <br/>strategies execute call...} -.-> ro(<br>.retryIf...<br>.withWaitStrategy<br>.withStopStrategy<br>.withAttemptTimeLimiter<br>.withBlockStrategy<br>.withRetryListene) di==>ro2(Attempt: get the result) end classDef green fill:#9f6,stroke:#333,stroke-width:2px; classDef orange fill:#f96,stroke:#333,stroke-width:4px; class sq,e green class di orange
guava-retrying高級用法 基於guava-retrying的實現原理,咱們能夠根據實際業務來肯定本身的重試策略。 下面以數據同步這種常規系統業務爲例,自定義重試策略。 以下實現基於Spring Boot 2.1.2.RELEASE版本。 並使用Lombok簡化Bean。 <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
業務描述 當商品建立之後,須要另外設置商品的價格。因爲兩個操做是有兩我的進行的,所以會出現以下問題,即商品沒有建立,可是價格數據卻已經建好了。遇到這種狀況,價格數據須要等待商品正常建立之後,繼續完成同步。 咱們經過一個http請求進行商品的建立,同時經過一個定時器來修改商品的價格。 當商品不存在,或者商品的數量小於1的時候,商品的價格不能設置。須要等商品成功建立且數量大於0的時候,才能將商品的價格設置成功。 實現過程
自定義重試阻塞策略
默認的阻塞策略是線程休眠,這裏使用自旋鎖實現,不阻塞線程。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
import com.github.rholder.retry.BlockStrategy; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j;
import java.time.Duration; import java.time.LocalDateTime;
/**
自旋鎖的實現, 不響應線程中斷 */ @Slf4j @NoArgsConstructor public class SpinBlockStrategy implements BlockStrategy {
@Override public void block(long sleepTime) throws InterruptedException {
LocalDateTime startTime = LocalDateTime.now(); long start = System.currentTimeMillis(); long end = start; log.info("[SpinBlockStrategy]...begin wait."); while (end - start <= sleepTime) { end = System.currentTimeMillis(); } //使用Java8新增的Duration計算時間間隔 Duration duration = Duration.between(startTime, LocalDateTime.now()); log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
} }
自定義重試監聽器
RetryListener能夠監控屢次重試過程,並可使用attempt作一些額外的事情。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryListener; import lombok.extern.slf4j.Slf4j;
@Slf4j public class RetryLogListener implements RetryListener {
@Override public <V> void onRetry(Attempt<V> attempt) { // 第幾回重試,(注意:第一次重試實際上是第一次調用) log.info("retry time : [{}]", attempt.getAttemptNumber()); // 距離第一次重試的延遲 log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt()); // 重試結果: 是異常終止, 仍是正常返回 log.info("hasException={}", attempt.hasException()); log.info("hasResult={}", attempt.hasResult()); // 是什麼緣由致使異常 if (attempt.hasException()) { log.info("causeBy={}" , attempt.getExceptionCause().toString()); } else { // 正常返回時的結果 log.info("result={}" , attempt.getResult()); } log.info("log listen over."); }
}
自定義Exception
有些異常須要重試,有些不須要。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
/**
當拋出這個異常的時候,表示須要重試 */ public class NeedRetryException extends Exception {
public NeedRetryException(String message) { super("NeedRetryException can retry."+message); }
}
實現具體重試業務與Callable接口
使用call方法調用本身的業務。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
import lombok.AllArgsConstructor; import lombok.Data;
import java.math.BigDecimal;
/**
商品model */ @Data @AllArgsConstructor public class Product {
private Long id;
private String name;
private Integer count;
private BigDecimal price;
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product; import org.springframework.stereotype.Repository;
import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong;
/**
商品DAO */ @Repository public class ProductRepository {
private static ConcurrentHashMap<Long,Product> products=new ConcurrentHashMap();
private static AtomicLong ids=new AtomicLong(0);
public List<Product> findAll(){ return new ArrayList<>(products.values()); }
public Product findById(Long id){ return products.get(id); }
public Product updatePrice(Long id, BigDecimal price){ Product p=products.get(id); if (null==p){ return p; } p.setPrice(price); return p; }
public Product addProduct(Product product){ Long id=ids.addAndGet(1); product.setId(id); products.put(id,product); return product; }
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import lombok.extern.slf4j.Slf4j; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable;
/**
業務方法實現 */ @Component @Slf4j public class ProductInformationHander implements Callable<Boolean> {
@Autowired private ProductRepository pRepo;
private static Map<Long, BigDecimal> prices = new HashMap<>();
static { prices.put(1L, new BigDecimal(100)); prices.put(2L, new BigDecimal(200)); prices.put(3L, new BigDecimal(300)); prices.put(4L, new BigDecimal(400)); prices.put(8L, new BigDecimal(800)); prices.put(9L, new BigDecimal(900)); }
@Override public Boolean call() throws Exception {
log.info("sync price begin,prices size={}", prices.size()); for (Long id : prices.keySet()) { Product product = pRepo.findById(id); if (null == product) { throw new NeedRetryException("can not find product by id=" + id); } if (null == product.getCount() || product.getCount() < 1) { throw new NeedRetryException("product count is less than 1, id=" + id); } Product updatedP = pRepo.updatePrice(id, prices.get(id)); if (null == updatedP) { return false; } prices.remove(id); } log.info("sync price over,prices size={}", prices.size()); return true;
}
}
構造重試器Retryer
將上面的實現做爲參數,構造Retryer。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import com.github.rholder.retry.*; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
構造重試器 */ @Component public class ProductRetryerBuilder {
public Retryer build() { //定義重試機制 Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
//retryIf 重試條件 //.retryIfException() //.retryIfRuntimeException() //.retryIfExceptionOfType(Exception.class) //.retryIfException(Predicates.equalTo(new Exception())) //.retryIfResult(Predicates.equalTo(false)) .retryIfExceptionOfType(NeedRetryException.class) //等待策略:每次請求間隔1s .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) //中止策略 : 嘗試請求3次 .withStopStrategy(StopStrategies.stopAfterAttempt(3)) //時間限制 : 某次請求不得超過2s , 相似: TimeLimiter timeLimiter = new SimpleTimeLimiter(); .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS)) //默認的阻塞策略:線程睡眠 //.withBlockStrategy(BlockStrategies.threadSleepStrategy()) //自定義阻塞策略:自旋鎖 .withBlockStrategy(new SpinBlockStrategy()) //自定義重試監聽器 .withRetryListener(new RetryLogListener()) .build(); return retryer;
} }
與定時任務結合執行Retryer
定時任務只須要跑一次,可是實際上實現了全部的重試策略。這樣大大簡化了定時器的設計。 首先使用@EnableScheduling聲明項目支持定時器註解。 @SpringBootApplication @EnableScheduling public class DemoRetryerApplication { public static void main(String[] args) { SpringApplication.run(DemoRetryerApplication.class, args); } }
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
import com.github.rholder.retry.Retryer; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
/**
商品信息定時器 */ @Component public class ProductScheduledTasks {
@Autowired private ProductRetryerBuilder builder;
@Autowired private ProductInformationHander hander;
/**
}
執行結果:因爲並無商品,所以重試之後,拋出異常。 2019-二月-28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over. 2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task. com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts. at com.github.rholder.retry.Retryer.call(Retryer.java:174)
你也能夠增長一些商品數據,看一下重試成功的效果。 完整示例代碼在這裏。 使用中遇到的問題 Guava版本衝突 因爲項目中依賴的guava版本太低,啓動項目時出現了以下異常。 java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService; at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41) at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207) at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346) at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87) at com.bzn.web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84) at com.bzn.web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33) at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
所以,要排除項目中低版本的guava依賴。 <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion>
同時,因爲Guava在新版本中移除了sameThreadExecutor方法,但目前項目中的ZK須要此方法,所以須要手動設置合適的guava版本。 果真,在19.0版本中MoreExecutors的此方法依然存在,只是標註爲過時了。 @Deprecated @GwtIncompatible("TODO") public static ListeningExecutorService sameThreadExecutor() { return new DirectExecutorService(); }
聲明依賴的guava版本改成19.0便可。 <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency>
動態調節重試策略 在實際使用過程當中,有時常常須要調整重試的次數、等待的時間等重試策略,所以,將重試策略的配置參數化保存,能夠動態調節。 例如在秒殺、雙十一購物節等時期增長等待的時間與重試次數,以保證錯峯請求。在平時,能夠適當減小等待時間和重試次數。 對於系統關鍵性業務,若是屢次重試步成功,能夠經過RetryListener進行監控與報警。 關於 『動態調節重試策略 』下面提供一個參考實現。 import com.github.rholder.retry.Attempt; import com.github.rholder.retry.WaitStrategy;
/**
自定義等待策略:根據重試次數動態調節等待時間,第一次請求間隔1s,第二次間隔10s,第三次及之後都是20s。
在建立Retryer的時候經過withWaitStrategy將該等待策略生效便可。
RetryerBuilder.<Boolean>newBuilder()
.withWaitStrategy(new AlipayWaitStrategy())
相似的效果也能夠經過自定義 BlockStrategy 來實現,你能夠寫一下試試。
*/ public class AlipayWaitStrategy implements WaitStrategy {
@Override public long computeSleepTime(Attempt failedAttempt) { long number = failedAttempt.getAttemptNumber(); if (number==1){ return 1*1000; } if (number==2){ return 10*1000; } return 20*1000; }
}