Lettuce命令延遲測量(CommandLatency)

    Lettuce使用了LatencyUtils進行命令延遲測量,LatencyUtils是一個延遲統計追蹤開發包,提供了不少有用的追蹤工具.LatencyStats的設計旨在經過簡單、嵌入式(drop-in)的延遲行爲記錄對象,對進程間延遲進行記錄和追蹤。LatencyStats的功能包括底層追蹤和暫停影響糾正、遺漏補償等。經過可插拔式的暫停監測器與區間估計(interval estimator)結合LatencyStats給出校訂後延遲統計直方圖。本文會持續更新(http://www.javashuo.com/article/p-ctjslbto-dx.html )html

   測試數據都有什麼java

下面看看命令延遲統計的結果,能夠發現統計了命令的個數,第一個響應的最小延遲,最大延遲以及百分位數的統計,還有響應完成的統計數據redis

{[local:any -> localhost/127.0.0.1:6379, commandType=GET]=[count=5, timeUnit=MICROSECONDS, 
firstResponse=[min=348, max=518, percentiles={50.0=462, 90.0=518, 95.0=518, 99.0=518, 99.9=518}], 
completion=[min=440, max=8978, percentiles={50.0=544, 90.0=8978, 95.0=8978, 99.0=8978, 99.9=8978}]], 
[local:any -> localhost/127.0.0.1:6379, commandType=SET]=[count=6, timeUnit=MICROSECONDS, 
firstResponse=[min=501, max=15925, percentiles={50.0=581, 90.0=15925, 95.0=15925, 99.0=15925, 99.9=15925}], 
completion=[min=540, max=19267, percentiles={50.0=622, 90.0=19267, 95.0=19267, 99.0=19267, 99.9=19267}]]}

  

 如何使用命令延遲測量promise

      在上文中說過,lettuce默認使用的是LatencyUtils做爲命令延遲收集器,若是沒有更好的選擇建議使用默認命令延遲收集器;是否是使用默認命令延遲收集器就什麼都不用作了呢?固然不是.下面經過源碼走讀方式確認一下咱們須要作什麼?下面是DefaultClientResources 關於命令延遲測量相關源碼ide

        //若是命令延遲收集器爲null
        if (builder.commandLatencyCollector == null) {
            //若是默認命令延遲收集器可用
            if (DefaultCommandLatencyCollector.isAvailable()) {
                //若是命令延遲收集器選項不爲null,則使用用戶自定義都命令延遲收集器選項設置
                if (builder.commandLatencyCollectorOptions != null) {
                    commandLatencyCollector = new DefaultCommandLatencyCollector(builder.commandLatencyCollectorOptions);
                } else {//若是沒有設置則使用默認數據
                    commandLatencyCollector = new DefaultCommandLatencyCollector(DefaultCommandLatencyCollectorOptions.create());
                }
            } else {//若是默認命令延遲收集器不可用則將命令延遲收集器選項設置爲不可用,並將收集器設置爲不可用收集器
                logger.debug("LatencyUtils/HdrUtils are not available, metrics are disabled");
                builder.commandLatencyCollectorOptions = DefaultCommandLatencyCollectorOptions.disabled();
                commandLatencyCollector = DefaultCommandLatencyCollector.disabled();
            }
           //將共享收集器設置爲false
            sharedCommandLatencyCollector = false;
        } else {//命令延遲收集器不爲null則使用用戶指定的命令延遲收集器,並將共享收集器設置爲true
            sharedCommandLatencyCollector = true;
            commandLatencyCollector = builder.commandLatencyCollector;
        }
        //命令延遲發射器選項
        commandLatencyPublisherOptions = builder.commandLatencyPublisherOptions;
        //若是命令延遲收集器可用同時命令延遲發射器選項不爲null
        if (commandLatencyCollector.isEnabled() && commandLatencyPublisherOptions != null) {
            metricEventPublisher = new DefaultCommandLatencyEventPublisher(eventExecutorGroup, commandLatencyPublisherOptions,
                    eventBus, commandLatencyCollector);
        } else {//若是命令延遲收集器不可用或命令發射選項爲null都將測量事件發射器設置爲null
            metricEventPublisher = null;
        }

 經過上文源碼能夠發現使用默認命令延遲測量只須要保證默認命令延遲收集器可用就能夠了.那麼如何是可用的呢?原來只要在POM中添加LatencyUtils的依賴就能夠了工具

 /**
     * 若是HdrUtils和LatencyUtils在classpath下是有效的就返回true
     */
    public static boolean isAvailable() {
        return LATENCY_UTILS_AVAILABLE && HDR_UTILS_AVAILABLE;
    }

  

        <dependency>
            <groupId>org.latencyutils</groupId>
            <artifactId>LatencyUtils</artifactId>
            <version>2.0.3</version>
        </dependency>

  延遲測量數據如何被髮送的測試

經過下面源碼能夠發現延遲測量數據是經過事件總線發送出去的.同時是按照一個固定的頻率發送命令延遲測量數據,這個頻率是用戶能夠配置,若是不配置則默認爲10分鐘ui

/**
 * 默認命令延遲事件發射器
 *
 */
public class DefaultCommandLatencyEventPublisher implements MetricEventPublisher {
    //事件處理線程池
    private final EventExecutorGroup eventExecutorGroup;
    //事件發射選項
    private final EventPublisherOptions options;
    //事件總線
    private final EventBus eventBus;
    //命令延遲收集器
    private final CommandLatencyCollector commandLatencyCollector;
    //發射器
    private final Runnable EMITTER = this::emitMetricsEvent;

    private volatile ScheduledFuture<?> scheduledFuture;

    public DefaultCommandLatencyEventPublisher(EventExecutorGroup eventExecutorGroup, EventPublisherOptions options,
            EventBus eventBus, CommandLatencyCollector commandLatencyCollector) {

        this.eventExecutorGroup = eventExecutorGroup;
        this.options = options;
        this.eventBus = eventBus;
        this.commandLatencyCollector = commandLatencyCollector;
        //事件發射間隔不爲0
        if (!options.eventEmitInterval().isZero()) {
            //固定間隔發送指標事件
            scheduledFuture = this.eventExecutorGroup.scheduleAtFixedRate(EMITTER, options.eventEmitInterval().toMillis(),
                    options.eventEmitInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public boolean isEnabled() {
        //指標間隔不爲0
        return !options.eventEmitInterval().isZero() && scheduledFuture != null;
    }

    @Override
    public void shutdown() {

        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            scheduledFuture = null;
        }
    }

    @Override
    public void emitMetricsEvent() {

        if (!isEnabled() || !commandLatencyCollector.isEnabled()) {
            return;
        }
        //發送命令延遲測試事件
        eventBus.publish(new CommandLatencyEvent(commandLatencyCollector.retrieveMetrics()));
    }

}

  如何接收到延遲測量數據this

咱們已經知道延遲測量數據是經過事件總線發送出去的,如今只要訂閱事件總線的事件就能夠了spa

client.getResources().eventBus().get().filter(redisEvent -> redisEvent instanceof CommandLatencyEvent)
                .cast(CommandLatencyEvent.class).doOnNext(events::add).subscribe(System.out::println);

  Lettuce中是如何進行延遲測量的

   CommandHandler繼承了ChannelDuplexHandler 無論發送仍是接收都在CommandHandler中處理,因此延遲測量也是在CommandHandler中實現的.

咱們從write方法開始看起

 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

        if (debugEnabled) {
            logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
        }
        
        //若是msg實現了RedisCommand接口就表示發送單個命令
        if (msg instanceof RedisCommand) {
            writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise);
            return;
        }
        //若是實現了List接口就表示批量發送命令
        if (msg instanceof List) {

            List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;
            //若是集合長度爲1 仍是執行發送單個命令
            if (batch.size() == 1) {

                writeSingleCommand(ctx, batch.get(0), promise);
                return;
            }
            //批處理
            writeBatch(ctx, batch, promise);
            return;
        }

        if (msg instanceof Collection) {
            writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);
        }
    }

  

    private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) {

        if (!isWriteable(command)) {
            promise.trySuccess();
            return;
        }
        //入隊
        addToStack(command, promise);
        ctx.write(command, promise);
    }

  

private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {

        try {

            validateWrite(1);

            if (command.getOutput() == null) {
                // fire&forget commands are excluded from metrics
                complete(command);
            }

            RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);

            if (promise.isVoid()) {
                stack.add(redisCommand);
            } else {
                promise.addListener(AddToStack.newInstance(stack, redisCommand));
            }
        } catch (Exception e) {
            command.completeExceptionally(e);
            throw e;
        }
    }

  

 /**
     * 可能包裝爲延遲測量命令
     *
     */
    private RedisCommand<?, ?, ?> potentiallyWrapLatencyCommand(RedisCommand<?, ?, ?> command) {

        //若是延遲測量不可用則直接返回
        if (!latencyMetricsEnabled) {
            return command;
        }
        //若是當前命令就是延遲命令
        if (command instanceof WithLatency) {

            WithLatency withLatency = (WithLatency) command;
            //重置數據
            withLatency.firstResponse(-1);
            withLatency.sent(nanoTime());

            return command;
        }
        //建立延遲測量命令並設置初始化數據
        LatencyMeteredCommand<?, ?, ?> latencyMeteredCommand = new LatencyMeteredCommand<>(command);
        latencyMeteredCommand.firstResponse(-1);
        latencyMeteredCommand.sent(nanoTime());

        return latencyMeteredCommand;
    }

 此時命令已經包裝爲了一個延遲測量命令,同時記錄了命令的發送時間.在接收到響應到時候會記錄響應時間

    private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {
        //若是延遲測量可用且命令實現了WithLatency接口
        if (latencyMetricsEnabled && command instanceof WithLatency) {
            //類型強轉
            WithLatency withLatency = (WithLatency) command;
            //若是第一個響應時間不爲-1則設置當前時間(納秒)爲第一個響應時間
            if (withLatency.getFirstResponse() == -1) {
                withLatency.firstResponse(nanoTime());
            }
            //開始解碼,若是解碼失敗則返回,不記錄延遲測量數據
            if (!decode0(ctx, buffer, command)) {
                return false;
            }
            //記錄延遲數據
            recordLatency(withLatency, command.getType());

            return true;
        }

        return decode0(ctx, buffer, command);
    }

  

private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
        //若是withLatency不爲null且命令延遲收集器可用同時channel和remote()不爲null
        if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null && remote() != null) {
            //第一個響應延遲等於第一個響應時間減去發送時間
            long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
            //結束時間爲當前時間減去發送時間
            long completionLatency = nanoTime() - withLatency.getSent();
            //使用延遲收集器記錄數據
            clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType,
                    firstResponseLatency, completionLatency);
        }
    }

  總的來講,第一個響應時間就是開始解碼的時間,完成時間就是完成解碼時間,若是疏漏也歡迎你們留言

相關文章
相關標籤/搜索