聊聊rsocket load balancer的Ewma

本文主要研究一下rsocket load balancer的Ewmajava

Moving Average

SMA

SMA(Simple Moving Average),即簡單移動平均,其公式以下:git

SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n
這裏的Pt到爲Pt-n+1爲最近的n個數據

WMA

WMA(Weighted Moving Average),即加權移動平均,其公式以下:github

WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)
WMA會給最近的n個數據加上權重,其中這些權重加起來和爲1,通常是較近的數據權重比較大

EMA或EWMA

EMA(Exponentially Moving Average)指數移動平均或EWMA(Exponentially Weighted Moving Average)指數加權移動平均,其公式以下:算法

EMAt = (Pt * S) + (1- S) * EMAt-1
它有一個S參數爲平滑指數,通常是取2/(N+1)

Ewma

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.javasocket

public class Ewma {
  private final long tau;
  private volatile long stamp;
  private volatile double ewma;

  public Ewma(long halfLife, TimeUnit unit, double initialValue) {
    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
    stamp = 0L;
    ewma = initialValue;
  }

  public synchronized void insert(double x) {
    long now = Clock.now();
    double elapsed = Math.max(0, now - stamp);
    stamp = now;

    double w = Math.exp(-elapsed / tau);
    ewma = w * ewma + (1.0 - w) * x;
  }

  public synchronized void reset(double value) {
    stamp = 0L;
    ewma = value;
  }

  public double value() {
    return ewma;
  }

  @Override
  public String toString() {
    return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")";
  }
}
  • Ewma的構造器須要指定halfLife、timeunit、initialValue(ewma初始值)參數;ewma = w ewma + (1.0 - w) x,其中x爲當前值,w爲權重
  • 權重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed爲距離上次計算的時間長度;tau(希臘字母)爲EWMA的時間常量
  • 這裏的tau = halfLife / Math.log(2)根據timeunit轉換後的值;其中halfLife參數,表明speed of convergence,即收斂的速度

RSocketSupplier

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.javaide

public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {

  private static final double EPSILON = 1e-4;

  private Supplier<Mono<RSocket>> rSocketSupplier;

  private final MonoProcessor<Void> onClose;

  private final long tau;
  private long stamp;
  private final Ewma errorPercentage;

  public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
    this.rSocketSupplier = rSocketSupplier;
    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
    this.stamp = Clock.now();
    this.errorPercentage = new Ewma(halfLife, unit, 1.0);
    this.onClose = MonoProcessor.create();
  }

  public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
    this(rSocketSupplier, 5, TimeUnit.SECONDS);
  }

  @Override
  public double availability() {
    double e = errorPercentage.value();
    if (Clock.now() - stamp > tau) {
      // If the window is expired artificially increase the availability
      double a = Math.min(1.0, e + 0.5);
      errorPercentage.reset(a);
    }
    if (e < EPSILON) {
      e = 0.0;
    } else if (1.0 - EPSILON < e) {
      e = 1.0;
    }

    return e;
  }

  private synchronized void updateErrorPercentage(double value) {
    errorPercentage.insert(value);
    stamp = Clock.now();
  }

  @Override
  public Mono<RSocket> get() {
    return rSocketSupplier
        .get()
        .doOnNext(o -> updateErrorPercentage(1.0))
        .doOnError(t -> updateErrorPercentage(0.0))
        .map(AvailabilityAwareRSocketProxy::new);
  }

  @Override
  public void dispose() {
    onClose.onComplete();
  }

  @Override
  public boolean isDisposed() {
    return onClose.isDisposed();
  }

  @Override
  public Mono<Void> onClose() {
    return onClose;
  }

  private class AvailabilityAwareRSocketProxy extends RSocketProxy {
    public AvailabilityAwareRSocketProxy(RSocket source) {
      super(source);

      onClose.doFinally(signalType -> source.dispose()).subscribe();
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
      return source
          .fireAndForget(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(v -> updateErrorPercentage(1.0));
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
      return source
          .requestResponse(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(p -> updateErrorPercentage(1.0));
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
      return source
          .requestStream(payload)
          .doOnError(th -> errorPercentage.insert(0.0))
          .doOnComplete(() -> updateErrorPercentage(1.0));
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
      return source
          .requestChannel(payloads)
          .doOnError(th -> errorPercentage.insert(0.0))
          .doOnComplete(() -> updateErrorPercentage(1.0));
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
      return source
          .metadataPush(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(v -> updateErrorPercentage(1.0));
    }

    @Override
    public double availability() {
      // If the window is expired set success and failure to zero and return
      // the child availability
      if (Clock.now() - stamp > tau) {
        updateErrorPercentage(1.0);
      }
      return source.availability() * errorPercentage.value();
    }
  }
}
  • RSocketSupplier實現了Availability、Supplier、Closeable接口,其中它定義了errorPercentage變量,其類型爲Ewma;若是沒有指定halfLife值,則RSocketSupplier默認halfLife爲5秒,ewma的初始值爲1.0
  • RSocketSupplier定義了一個常量EPSILON = 1e-4,其availability方法會先計算availability,而後在距離上次計算時間stamp超過tau值時會重置errorPercentage;以後當availability小於EPSILON時返回0,當availability + EPSILON大於1時返回1.0
  • updateErrorPercentage方法用於往ewma插入新值,同時更新stamp;get方法的doOnNext方法updateErrorPercentage值爲1.0,doOnError方法updateErrorPercentage值爲0.0;map會將RSocket轉換爲AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy對目標RSocket進行代理,對相關方法的doOnError及doOnSuccess都織入errorPercentage的統計

小結

  • Moving Average有好幾種算法,包括SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average)或EWMA(Exponentially Weighted Moving Average)
  • Ewma的構造器須要指定halfLife、timeunit、initialValue(ewma初始值)參數;ewma = w ewma + (1.0 - w) x,其中x爲當前值,w爲權重;權重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed爲距離上次計算的時間長度;tau(希臘字母)爲EWMA的時間常量;這裏的tau = halfLife / Math.log(2)根據timeunit轉換後的值;其中halfLife參數,表明speed of convergence,即收斂的速度
  • rsocket load balancer使用了Ewma了統計服務的availability;其中RSocketSupplier實現了Availability、Supplier、Closeable接口,其中它定義了errorPercentage變量,其類型爲Ewma;若是沒有指定halfLife值,則RSocketSupplier默認halfLife爲5秒,ewma的初始值爲1.0;RSocketSupplier的get方法會將RSocket轉換爲AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy則會對目標RSocket進行代理,對相關方法的doOnError及doOnSuccess都織入errorPercentage的統計

doc

相關文章
相關標籤/搜索