Guava RateLimiter限流源碼解析和實例應用

在開發高併發系統時有三把利器用來保護系統:緩存、降級和限流算法

  • 緩存 緩存的目的是提高系統訪問速度和增大系統處理容量
  • 降級 降級是當服務出現問題或者影響到核心流程時,須要暫時屏蔽掉,待高峯或者問題解決後再打開
  • 限流 限流的目的是經過對併發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則能夠拒絕服務、排隊或等待、降級等處理

經常使用的限流算法

漏桶算法

漏桶算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水,當水流入速度過大會直接溢出,能夠看出漏桶算法能強行限制數據的傳輸速率。spring

 

令牌桶算法

對於不少應用場景來講,除了要求可以限制數據的平均傳輸速率外,還要求容許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更爲適合。如圖所示,令牌桶算法的原理是系統會以一個恆定的速度往桶裏放入令牌,而若是請求須要被處理,則須要先從桶裏獲取一個令牌,當桶裏沒有令牌可取時,則拒絕服務。express

 

RateLimiter使用以及源碼解析

Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法實現流量限制,使用十分方便,並且十分高效。緩存

Guava有兩種限流模式,一種爲穩定模式(SmoothBursty:令牌生成速度恆定),一種爲漸進模式(SmoothWarmingUp:令牌生成速度緩慢提高直到維持在一個穩定值) 兩種模式實現思路相似,主要區別在等待時間的計算上,本篇重點介紹SmoothBursty安全

public static RateLimiter create(double permitsPerSecond) {
  /*
   * 默認的RateLimiter配置能夠保存最多一秒鐘的未使用許可證
   */
  return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

RateLimiter是一個抽象類,SmoothBursty是其子類SmoothRateLimiter的子類,其兩個構造參數含義以下服務器

  • SleepingStopwatch:guava中的一個時鐘類實例,會經過這個來計算時間及令牌
  • maxBurstSeconds:官方解釋,在ReteLimiter未使用時,最多保存幾秒的令牌,默認是1
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  //根據每秒向桶中放入令牌的數量來設置當前存儲令牌數
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}
public final void setRate(double permitsPerSecond) {
  //若是每秒向桶中放入令牌的數量(permitsPerSecond)大於0且爲數字,經過檢查,不然拋出參數異常
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  //對每一個線程進行互斥,創建互斥對象的鎖定
  synchronized (mutex()) {
    //由各項參數更新當前存儲令牌數
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}
public static void checkArgument(boolean expression, @Nullable Object errorMessage) {
  if (!expression) {
    throw new IllegalArgumentException(String.valueOf(errorMessage));
  }
}
private volatile Object mutexDoNotUseDirectly; //線程安全的互斥對象
private Object mutex() {
  Object mutex = mutexDoNotUseDirectly;
  if (mutex == null) {
    synchronized (this) {
      mutex = mutexDoNotUseDirectly;
      if (mutex == null) {
        mutexDoNotUseDirectly = mutex = new Object();
      }
    }
  }
  return mutex;
}

在SmoothBursty中併發

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
  //若當前時間晚於nextFreeTicketMicros,則計算該段時間內能夠生成多少令牌,將生成的令牌加入令牌桶中並更新數據 
  resync(nowMicros);
  //更新添加1個令牌的時間間隔(單位微妙)爲1000000微妙(1秒)除以每秒放入令牌桶中的數量
  double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  //將令牌桶中能夠存儲令牌的時間參數加上更新當前能夠存儲的令牌數
  doSetRate(permitsPerSecond, stableIntervalMicros);
}
private long nextFreeTicketMicros = 0L; //下一次請求能夠獲取令牌的起始時間
double storedPermits; //當前存儲令牌數
double maxPermits; //最大存儲令牌數 = maxBurstSeconds * stableIntervalMicros
double stableIntervalMicros; //添加令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數)
final double maxBurstSeconds; //在RateLimiter未使用時,最多存儲幾秒的令牌
private void resync(long nowMicros) {
  //若是當前時間大於下一次請求能夠獲取令牌的起始時間
  if (nowMicros > nextFreeTicketMicros) {
    //比較最大存儲令牌數和當前存儲的令牌數加上如今要增長的令牌數的大小,小的那個賦給當年存儲令牌數,即增長令牌數與當前令牌數之和不能大於最大令牌數
    storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
    //將當前時間賦給下一次請求能夠獲取的起始時間
    nextFreeTicketMicros = nowMicros;
  }
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  //將最大存儲令牌數存入臨時副本
  double oldMaxPermits = this.maxPermits;
  //更新最大存儲令牌數爲存放令牌的秒數乘以每秒向桶中放入的令牌數
  maxPermits = maxBurstSeconds * permitsPerSecond;
  //若是最大存儲令牌數的臨時副本爲正無窮大
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    //更新當前存儲令牌數爲最大存儲令牌數
    storedPermits = maxPermits;
  } else { //若是最大存儲令牌數的臨時副本不爲正無窮大
    //若是最大存儲令牌數的臨時副本爲0,則更新當前存儲令牌數爲0,不然
    //更新當前存儲令牌數爲當前存儲令牌數乘以最大存儲令牌數除以最大存儲令牌數的臨時副本數
    storedPermits = (oldMaxPermits == 0.0)
        ? 0.0 // initial state
        : storedPermits * maxPermits / oldMaxPermits;
  }
}

咱們再來看一下RateLimiter的tryAcquire方法app

public boolean tryAcquire(long timeout, TimeUnit unit) {
  //嘗試在timeout時間內獲取令牌,若是能夠則掛起(睡眠)等待相應時間並返回true,不然當即返回false 
  return tryAcquire(1, timeout, unit);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  //取等待時間的微妙數與0比較取大值賦給超時時間
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  //若是檢查時間>0,經過檢查,此處爲1
  checkPermits(permits);
  long microsToWait;
  //創建互斥對象加鎖互斥
  synchronized (mutex()) {
    //獲取當前時間
    long nowMicros = stopwatch.readMicros();
    //若是下一次請求能夠獲取令牌的起始時間減去等待時間大於當前時間
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false; //返回false
    } else { //若是下一次請求能夠獲取令牌的起始時間減去等待時間小於等於當前時間
      //獲取下一次請求能夠獲取令牌的起始時間減去當前時間的值與0之間的大值並刷新各參數(下一次請求能夠獲取令牌的起始時間、當前存儲令牌數)
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  //線程休眠microsToWait時間
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  //返回true
  return true;
}
private static int checkPermits(int permits) {
  checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  return permits;
}
final Stopwatch stopwatch = Stopwatch.createStarted();
@Override
long readMicros() {
  return stopwatch.elapsed(MICROSECONDS);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
  //返回下一次請求能夠獲取令牌的起始時間減去等待時間是否小於等於當前時間
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  //獲取下一次請求能夠獲取令牌的起始時間並更新各參數(下一次請求能夠獲取令牌的起始時間、當前存儲令牌數long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  //返回下一次請求能夠獲取令牌的起始時間減去當前時間的值與0之間的大值
  return max(momentAvailable - nowMicros, 0);
}
@Override
void sleepMicrosUninterruptibly(long micros) {
  if (micros > 0) {
    Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
  }
}

在SmoothBursty中ide

@Override
final long queryEarliestAvailable(long nowMicros) {
  //返回下一次請求能夠獲取令牌的起始時間
  return nextFreeTicketMicros;
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  //若當前時間晚於nextFreeTicketMicros,則計算該段時間內能夠生成多少令牌,將生成的令牌加入令牌桶中並更新數據
  resync(nowMicros);
  //獲取下一次請求能夠獲取令牌的起始時間
  long returnValue = nextFreeTicketMicros;
  //在容許的請求數(這裏爲1)和當前存儲令牌數間取小值賦給容許消費的存儲令牌數(storedPermitsToSpend)
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  //將容許的請求數減去容許消費的存儲令牌數賦給容許刷新數(freshPermits)
  double freshPermits = requiredPermits - storedPermitsToSpend;
  //將容許刷新數乘以添加令牌時間間隔賦給等待微妙數(waitMicros)
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
      + (long) (freshPermits * stableIntervalMicros);
  //更新下一次請求能夠獲取令牌的起始時間爲下一次請求能夠獲取令牌的起始時間加上等待微妙數
  this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
  //更新當前存儲令牌數爲當前存儲令牌數減去容許消費的存儲令牌數
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}

在Uninterruptibles中spring-boot

public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
  //定義是否已中斷爲false
  boolean interrupted = false;
  try {
    //將下一次請求能夠獲取令牌的起始時間減去當前時間的值轉化爲納秒定義爲remainingNanos
    long remainingNanos = unit.toNanos(sleepFor);
    //將系統的納秒值加上該轉化值爲end
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        //線程休眠remainingNanos時間
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        //若是發生中斷異常,將是否已中斷更新爲true
        interrupted = true;
        //更新remainingNanos爲end減去系統的納秒值,並進入下一輪循環
        remainingNanos = end - System.nanoTime();
      }
    }
  } finally {
    //若是發生中斷異常
    if (interrupted) {
      //當前線程中斷
      Thread.currentThread().interrupt();
    }
  }
}

源碼分析就是這些了,如今咱們來看一下Guava RateLimiter的應用,在APO中攔截Controller,並進行限流

在pom中添加

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>18.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

標籤

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
    /**
     *
     * @return
     */
    String value() default "";

    /**
     * 每秒向桶中放入令牌的數量   默認最大即不作限流
     * @return
     */
    double perSecond() default Double.MAX_VALUE;

    /**
     * 獲取令牌的等待時間  默認0
     * @return
     */
    int timeOut() default 0;

    /**
     * 超時時間單位
     * @return
     */
    TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
}

AOP類

@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
    private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);

    /**
     * 帶有指定註解切入
     */
    @ResponseBody
    @Around(value = "@annotation(com.guanjian.annotation.LxRateLimit)")
    public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
        log.info("攔截到了{}方法...", pjp.getSignature().getName());
        Signature signature = pjp.getSignature();
        MethodSignature methodSignature = (MethodSignature)signature;
        //獲取目標方法
        Method targetMethod = methodSignature.getMethod();
        if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
            //獲取目標方法的@LxRateLimit註解
            LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
            rateLimiter.setRate(lxRateLimit.perSecond());
            if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
                return "服務器繁忙,請稍後再試!";
        }
        return pjp.proceed();
    }
}

Controller

@RestController
public class AnnotationTestController {
    @GetMapping("/testannotation")
    @LxRateLimit(perSecond = 2000.0, timeOut = 500) //此處限速爲2000qps
    public String testAnnotation() {
        return "get token success";
    }
}

咱們先在Controller中將@LxRateLimit(perSecond = 2000.0, timeOut = 500)註釋掉

運行Jmeter進行壓測

咱們啓用500線程壓測

壓測結果

吞吐量爲7867.8qps,此時是不限速的

如今咱們恢復Controller中的@LxRateLimit(perSecond = 2000.0, timeOut = 500)

吞吐量爲2067.7qps

系統日誌能夠看到大量的攔截

2019-05-26 21:24:33.370  INFO 11092 --- [o-8080-exec-176] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.370  INFO 11092 --- [io-8080-exec-27] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.374  INFO 11092 --- [o-8080-exec-128] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.374  INFO 11092 --- [o-8080-exec-191] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.374  INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.377  INFO 11092 --- [io-8080-exec-36] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.379  INFO 11092 --- [o-8080-exec-123] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.379  INFO 11092 --- [io-8080-exec-61] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.380  INFO 11092 --- [io-8080-exec-19] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.382  INFO 11092 --- [io-8080-exec-77] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法... 2019-05-26 21:24:33.384  INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...

相關文章
相關標籤/搜索