本文主要研究一下lettuce的sentinel鏈接java
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/RedisClient.javagit
private <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); checkValidRedisURI(redisURI); ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder(); connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions())); connectionBuilder.clientResources(clientResources); DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions); StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(endpoint, codec, timeout); logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels()); connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint)) .connection(connection); connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); if (clientOptions.isPingBeforeActivateConnection()) { connectionBuilder.enablePingBeforeConnect(); } if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) { channelType(connectionBuilder, redisURI); try { getConnection(initializeChannelAsync(connectionBuilder)); } catch (RuntimeException e) { connection.close(); throw e; } } else { boolean connected = false; boolean first = true; Exception causingException = null; validateUrisAreOfSameConnectionType(redisURI.getSentinels()); for (RedisURI uri : redisURI.getSentinels()) { if (first) { channelType(connectionBuilder, uri); first = false; } connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri)); if (logger.isDebugEnabled()) { SocketAddress socketAddress = SocketAddressResolver.resolve(uri, clientResources.dnsResolver()); logger.debug("Connecting to Redis Sentinel, address: " + socketAddress); } try { getConnection(initializeChannelAsync(connectionBuilder)); connected = true; break; } catch (Exception e) { logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + e.toString()); causingException = e; } } if (!connected) { connection.close(); throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(), causingException); } } if (LettuceStrings.isNotEmpty(redisURI.getClientName())) { connection.setClientName(redisURI.getClientName()); } return connection; }
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/AbstractRedisClient.javagithub
/** * Connect and initialize a channel from {@link ConnectionBuilder}. * * @param connectionBuilder must not be {@literal null}. * @return the {@link ConnectionFuture} to synchronize the connection process. * @since 4.4 */ @SuppressWarnings("unchecked") protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync( ConnectionBuilder connectionBuilder) { SocketAddress redisAddress = connectionBuilder.socketAddress(); if (clientResources.eventExecutorGroup().isShuttingDown()) { throw new IllegalStateException("Cannot connect, Event executor group is terminated."); } logger.debug("Connecting to Redis at {}", redisAddress); CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>(); Bootstrap redisBootstrap = connectionBuilder.bootstrap(); RedisChannelInitializer initializer = connectionBuilder.build(); redisBootstrap.handler(initializer); clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); CompletableFuture<Boolean> initFuture = initializer.channelInitialized(); ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); connectFuture.addListener(future -> { if (!future.isSuccess()) { logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause()); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(future.cause()); return; } initFuture.whenComplete((success, throwable) -> { if (throwable == null) { logger.debug("Connecting to Redis at {}: Success", redisAddress); RedisChannelHandler<?, ?> connection = connectionBuilder.connection(); connection.registerCloseables(closeableResources, connection); channelReadyFuture.complete(connectFuture.channel()); return; } logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable); connectionBuilder.endpoint().initialState(); Throwable failure; if (throwable instanceof RedisConnectionException) { failure = throwable; } else if (throwable instanceof TimeoutException) { failure = new RedisConnectionException("Could not initialize channel within " + connectionBuilder.getTimeout(), throwable); } else { failure = throwable; } channelReadyFuture.completeExceptionally(failure); CompletableFuture<Boolean> response = new CompletableFuture<>(); response.completeExceptionally(failure); }); }); return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder .connection())); }
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/RedisClient.javaredis
protected SocketAddress getSocketAddress(RedisURI redisURI) throws InterruptedException, TimeoutException, ExecutionException { SocketAddress redisAddress; if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) { logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", redisURI.getSentinels(), redisURI.getSentinelMasterId()); redisAddress = lookupRedis(redisURI); if (redisAddress == null) { throw new RedisConnectionException("Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId()); } } else { redisAddress = SocketAddressResolver.resolve(redisURI, clientResources.dnsResolver()); } return redisAddress; } private SocketAddress lookupRedis(RedisURI sentinelUri) throws InterruptedException, TimeoutException, ExecutionException { try (StatefulRedisSentinelConnection<String, String> connection = connectSentinel(sentinelUri)) { return connection.async().getMasterAddrByName(sentinelUri.getSentinelMasterId()) .get(timeout.toNanos(), TimeUnit.NANOSECONDS); } }