本文主要研究一下lettuce的指標監控java
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/event/metrics/DefaultCommandLatencyEventPublisher.javagit
public class DefaultCommandLatencyEventPublisher implements MetricEventPublisher { private final EventExecutorGroup eventExecutorGroup; private final EventPublisherOptions options; private final EventBus eventBus; private final CommandLatencyCollector commandLatencyCollector; private final Runnable EMITTER = this::emitMetricsEvent; private volatile ScheduledFuture<?> scheduledFuture; public DefaultCommandLatencyEventPublisher(EventExecutorGroup eventExecutorGroup, EventPublisherOptions options, EventBus eventBus, CommandLatencyCollector commandLatencyCollector) { this.eventExecutorGroup = eventExecutorGroup; this.options = options; this.eventBus = eventBus; this.commandLatencyCollector = commandLatencyCollector; if (!options.eventEmitInterval().isZero()) { scheduledFuture = this.eventExecutorGroup.scheduleAtFixedRate(EMITTER, options.eventEmitInterval().toMillis(), options.eventEmitInterval().toMillis(), TimeUnit.MILLISECONDS); } } @Override public boolean isEnabled() { return !options.eventEmitInterval().isZero() && scheduledFuture != null; } @Override public void shutdown() { if (scheduledFuture != null) { scheduledFuture.cancel(true); scheduledFuture = null; } } @Override public void emitMetricsEvent() { if (!isEnabled() || !commandLatencyCollector.isEnabled()) { return; } eventBus.publish(new CommandLatencyEvent(commandLatencyCollector.retrieveMetrics())); } }
spring-boot-autoconfigure-2.0.4.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/data/redis/LettuceConnectionConfiguration.javagithub
@Configuration @ConditionalOnClass(RedisClient.class) class LettuceConnectionConfiguration extends RedisConnectionConfiguration { private final RedisProperties properties; private final List<LettuceClientConfigurationBuilderCustomizer> builderCustomizers; LettuceConnectionConfiguration(RedisProperties properties, ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider, ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider, ObjectProvider<List<LettuceClientConfigurationBuilderCustomizer>> builderCustomizers) { super(properties, sentinelConfigurationProvider, clusterConfigurationProvider); this.properties = properties; this.builderCustomizers = builderCustomizers .getIfAvailable(Collections::emptyList); } @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(ClientResources.class) public DefaultClientResources lettuceClientResources() { return DefaultClientResources.create(); } //...... }
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/event/DefaultEventPublisherOptions.javaredis
public class DefaultEventPublisherOptions implements EventPublisherOptions { public static final long DEFAULT_EMIT_INTERVAL = 10; public static final TimeUnit DEFAULT_EMIT_INTERVAL_UNIT = TimeUnit.MINUTES; public static final Duration DEFAULT_EMIT_INTERVAL_DURATION = Duration.ofMinutes(DEFAULT_EMIT_INTERVAL); //...... }
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/metrics/CommandLatencyCollector.javaspring
public interface CommandLatencyCollector extends MetricCollector<Map<CommandLatencyId, CommandMetrics>> { /** * Record the command latency per {@code connectionPoint} and {@code commandType}. * * @param local the local address * @param remote the remote address * @param commandType the command type * @param firstResponseLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the first response * @param completionLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the command completion */ void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType, long firstResponseLatency, long completionLatency); }
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/protocol/CommandHandler.javaide
/** * A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server. * * @author Will Glozer * @author Mark Paluch * @author Jongyeol Choi * @author Grzegorz Szpak */ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands { //...... /** * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf input = (ByteBuf) msg; if (!input.isReadable() || input.refCnt() == 0) { logger.warn("{} Input not readable {}, {}", logPrefix(), input.isReadable(), input.refCnt()); return; } if (debugEnabled) { logger.debug("{} Received: {} bytes, {} commands in the stack", logPrefix(), input.readableBytes(), stack.size()); } try { if (buffer.refCnt() < 1) { logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix()); return; } if (debugEnabled && ctx.channel() != channel) { logger.debug("{} Ignoring data for a non-registered channel {}", logPrefix(), ctx.channel()); return; } if (traceEnabled) { logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim()); } buffer.writeBytes(input); decode(ctx, buffer); } finally { input.release(); } } private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) { if (latencyMetricsEnabled && command instanceof WithLatency) { WithLatency withLatency = (WithLatency) command; if (withLatency.getFirstResponse() == -1) { withLatency.firstResponse(nanoTime()); } if (!decode0(ctx, buffer, command)) { return false; } recordLatency(withLatency, command.getType()); return true; } return decode0(ctx, buffer, command); } private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) { if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null && remote() != null) { long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent(); long completionLatency = nanoTime() - withLatency.getSent(); clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType, firstResponseLatency, completionLatency); } } }
public class LettuceEventConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(LettuceEventConsumer.class); EventBus eventBus; public LettuceEventConsumer(EventBus eventBus) { this.eventBus = eventBus; } @PostConstruct public void init(){ eventBus.get().subscribe(e -> { LOGGER.info("event:{}",e); }); } }
2018-09-11 16:32:57.361 INFO 6656 --- [xecutorLoop-1-3] com.example.config.LettuceEventConsumer : event:{[local:any -> /192.168.99.100:6379, commandType=GET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=884, max=888, percentiles={50.0=888, 90.0=888, 95.0=888, 99.0=888, 99.9=888}], completion=[min=950, max=954, percentiles={50.0=954, 90.0=954, 95.0=954, 99.0=954, 99.9=954}]], [local:any -> /192.168.99.100:6379, commandType=INFO]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=1449, max=1458, percentiles={50.0=1458, 90.0=1458, 95.0=1458, 99.0=1458, 99.9=1458}], completion=[min=2457, max=2473, percentiles={50.0=2473, 90.0=2473, 95.0=2473, 99.0=2473, 99.9=2473}]], [local:any -> /192.168.99.100:6379, commandType=PUBLISH]=[count=40, timeUnit=MICROSECONDS, firstResponse=[min=708, max=17956, percentiles={50.0=1343, 90.0=2719, 95.0=3948, 99.0=17956, 99.9=17956}], completion=[min=733, max=17956, percentiles={50.0=1376, 90.0=2752, 95.0=3981, 99.0=17956, 99.9=17956}]], [local:any -> /192.168.99.100:6379, commandType=SET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=909, max=913, percentiles={50.0=913, 90.0=913, 95.0=913, 99.0=913, 99.9=913}], completion=[min=995, max=999, percentiles={50.0=999, 90.0=999, 95.0=999, 99.0=999, 99.9=999}]], [local:any -> /192.168.99.100:6379, commandType=SUBSCRIBE]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=19267, max=19398, percentiles={50.0=19398, 90.0=19398, 95.0=19398, 99.0=19398, 99.9=19398}], completion=[min=41418, max=41680, percentiles={50.0=41680, 90.0=41680, 95.0=41680, 99.0=41680, 99.9=41680}]]}
lettuce經過內置eventBus,而後對其命令的執行發佈相應的延時事件,client端能夠根據需求消費eventBus的數據來獲取lettuce的相關指標。能夠說在指標監控場景,採用事件驅動的方式進行實現,顯得更爲靈活,把Event-Driven Architecture的思想發揮的淋漓盡致。spring-boot