經過走讀Lettuce異步讀取源碼,針對Lettuce鏈接創建過程進行源碼走讀java
整體展現一個Lettuce異步get時序node
經過時序圖能夠發現MasterSlaveChannelWriter主要提供一個負載分配的功能,並非真正的命令發送服務react
下面經過源碼分析實現過程redis
public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec, Iterable<RedisURI> redisURIs) { LettuceAssert.notNull(redisClient, "RedisClient must not be null"); LettuceAssert.notNull(codec, "RedisCodec must not be null"); LettuceAssert.notNull(redisURIs, "RedisURIs must not be null"); List<RedisURI> uriList = LettuceLists.newList(redisURIs); LettuceAssert.isTrue(!uriList.isEmpty(), "RedisURIs must not be empty"); if (isSentinel(uriList.get(0))) { return connectSentinel(redisClient, codec, uriList.get(0)); } else { return connectStaticMasterSlave(redisClient, codec, uriList); } }
private static <K, V> StatefulRedisMasterSlaveConnection<K, V> connectSentinel(RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) { //建立拓撲提供者爲哨兵拓撲 TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI); //建立哨兵拓撲刷新服務 SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient, redisURI.getSentinelMasterId(), redisURI.getSentinels()); //利用拓撲提供者和redisClient建立主備拓撲刷新服務 MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider); //建立主備鏈接提供者 MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec, redisURI, Collections.emptyMap()); //使用主備拓撲刷新服務獲取全部節點將其設置到鏈接提供者中 connectionProvider.setKnownNodes(refresh.getNodes(redisURI)); //使用鏈接提供者建立主備通道寫入器 MasterSlaveChannelWriter<K, V> channelWriter = new MasterSlaveChannelWriter<>(connectionProvider); //建立鏈接 StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter, codec, redisURI.getTimeout()); connection.setOptions(redisClient.getOptions()); Runnable runnable = () -> { try { LOG.debug("Refreshing topology"); List<RedisNodeDescription> nodes = refresh.getNodes(redisURI); if (nodes.isEmpty()) { LOG.warn("Topology refresh returned no nodes from {}", redisURI); } LOG.debug("New topology: {}", nodes); connectionProvider.setKnownNodes(nodes); } catch (Exception e) { LOG.error("Error during background refresh", e); } }; try { //向鏈接註冊可關閉服務 connection.registerCloseables(new ArrayList<>(), sentinelTopologyRefresh); //綁定哨兵拓撲結構變化執行邏輯 sentinelTopologyRefresh.bind(runnable); } catch (RuntimeException e) { connection.close(); throw e; } return connection; }
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) { super(writer, timeout); this.codec = codec; //建立異步步鏈接 this.async = newRedisAsyncCommandsImpl(); //建立同步鏈接 this.sync = newRedisSyncCommandsImpl(); //建立響應式鏈接 this.reactive = newRedisReactiveCommandsImpl(); }
protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() { //使用裝飾器模式對當前實例進行加強 return new RedisAsyncCommandsImpl<>(this, codec); }
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec) { super(connection, codec); }
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec) { this.connection = connection; this.codec = codec; this.commandBuilder = new RedisCommandBuilder<>(codec); }
StatefulRedisConnectionImplbootstrap
@Override public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) { //前置處理 RedisCommand<K, V, T> toSend = preProcessCommand(command); try { //經過父類進行派發,父類中對writer爲當前類對構造方法對入參 return super.dispatch(toSend); } finally { if (command.getType().name().equals(MULTI.name())) { multi = (multi == null ? new MultiOutput<>(codec) : multi); } } }
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) { if (debugEnabled) { logger.debug("dispatching command {}", cmd); } //將發送命令對處理委派給channelWriter處理 return channelWriter.write(cmd); }
MasterSlaveChannelWriter緩存
@Override public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null"); if (closed) { throw new RedisException("Connection is closed"); } //獲取命令意圖 Intent intent = getIntent(command.getType()); //根據讀寫意圖獲取鏈接 StatefulRedisConnection<K, V> connection = (StatefulRedisConnection) masterSlaveConnectionProvider .getConnection(intent); //經過這個connection派發命令 return connection.dispatch(command); }
//根據意圖獲取鏈接 public StatefulRedisConnection<K, V> getConnection(Intent intent) { if (debugEnabled) { logger.debug("getConnection(" + intent + ")"); } //若是readFrom不爲null且是READ if (readFrom != null && intent == Intent.READ) { //根據readFrom配置從已知節點中選擇可用節點描述 List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() { @Override public List<RedisNodeDescription> getNodes() { return knownNodes; } @Override public Iterator<RedisNodeDescription> iterator() { return knownNodes.iterator(); } }); //若是可選擇節點集合爲空則拋出異常 if (selection.isEmpty()) { throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s", knownNodes, readFrom)); } try { //遍歷全部可用節點 for (RedisNodeDescription redisNodeDescription : selection) { //獲取節點鏈接 StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription); //若是節點鏈接不是打開到鏈接則繼續查找下一個鏈接 if (!readerCandidate.isOpen()) { continue; } //返回可用鏈接 return readerCandidate; } //若是沒有找到可用鏈接,默認返回第一個 return getConnection(selection.get(0)); } catch (RuntimeException e) { throw new RedisException(e); } } //若是沒有配置readFrom或者不是READ 則返回master鏈接 return getConnection(getMaster()); }
protected StatefulRedisConnection<K, V> getConnection(RedisNodeDescription redisNodeDescription) { //若是沒有則建立新節點,並添加到緩存中 return connections.computeIfAbsent( new ConnectionKey(redisNodeDescription.getUri().getHost(), redisNodeDescription.getUri().getPort()), connectionFactory); }
建立實際的connectioapp
@Override public StatefulRedisConnection<K, V> apply(ConnectionKey key) { //構建URI RedisURI.Builder builder = RedisURI.Builder .redis(key.host, key.port) .withSsl(initialRedisUri.isSsl()) .withVerifyPeer(initialRedisUri.isVerifyPeer()) .withStartTls(initialRedisUri.isStartTls()); if (initialRedisUri.getPassword() != null && initialRedisUri.getPassword().length != 0) { builder.withPassword(initialRedisUri.getPassword()); } if (initialRedisUri.getClientName() != null) { builder.withClientName(initialRedisUri.getClientName()); } builder.withDatabase(initialRedisUri.getDatabase()); //建立鏈接 StatefulRedisConnection<K, V> connection = redisClient.connect(redisCodec, builder.build()); //設置是否自動提交 synchronized (stateLock) { connection.setAutoFlushCommands(autoFlushCommands); } return connection; }
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) { assertNotNull(redisURI); return connectStandalone(codec, redisURI, redisURI.getTimeout()); }
private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { //單機異步 ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout); //獲取鏈接 return getConnection(future); }
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); //檢查URI是否有效 checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: " + redisURI); //建立DefaultEndpoint DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions); //建立connection StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout); //建立鏈接 ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler(clientOptions, clientResources, endpoint)); future.whenComplete((channelHandler, throwable) -> { if (throwable != null) { connection.close(); } }); return future; }
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) { //connetion構造器 ConnectionBuilder connectionBuilder; if (redisURI.isSsl()) { SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder(); sslConnectionBuilder.ssl(redisURI); connectionBuilder = sslConnectionBuilder; } else { connectionBuilder = ConnectionBuilder.connectionBuilder(); } //設置connection connectionBuilder.connection(connection); //設置客戶端選項 connectionBuilder.clientOptions(clientOptions); //設置客戶端資源 connectionBuilder.clientResources(clientResources); //設置命令處理器以及endpoint connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); //填充鏈接構造器 connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); //設置通道類型 channelType(connectionBuilder, redisURI); if (clientOptions.isPingBeforeActivateConnection()) { if (hasPassword(redisURI)) { connectionBuilder.enableAuthPingBeforeConnect(); } else { connectionBuilder.enablePingBeforeConnect(); } } //建立異步通道 ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder); //若是客戶端選項配置了pingBeforeActivateConnection同時有密碼 if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) { future = future.thenApplyAsync(channelHandler -> { connection.async().auth(new String(redisURI.getPassword())); return channelHandler; }, clientResources.eventExecutorGroup()); } if (LettuceStrings.isNotEmpty(redisURI.getClientName())) { future.thenApply(channelHandler -> { connection.setClientName(redisURI.getClientName()); return channelHandler; }); } if (redisURI.getDatabase() != 0) { future = future.thenApplyAsync(channelHandler -> { connection.async().select(redisURI.getDatabase()); return channelHandler; }, clientResources.eventExecutorGroup()); } return future.thenApply(channelHandler -> (S) connection); }
/** * 鏈接同時經過connectionBuilder初始化一個通道 */ @SuppressWarnings("unchecked") protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync( ConnectionBuilder connectionBuilder) { //獲取socketAddress SocketAddress redisAddress = connectionBuilder.socketAddress(); //若是線程池關閉則拋出異常 if (clientResources.eventExecutorGroup().isShuttingDown()) { throw new IllegalStateException("Cannot connect, Event executor group is terminated."); } logger.debug("Connecting to Redis at {}", redisAddress); CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>(); //獲取bootstrap Bootstrap redisBootstrap = connectionBuilder.bootstrap(); //建立redis通道初始化器 RedisChannelInitializer initializer = connectionBuilder.build(); //設置netty的處理器 redisBootstrap.handler(initializer); //netty自定設置處理 clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); CompletableFuture<Boolean> initFuture = initializer.channelInitialized(); ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); //增長監聽器 connectFuture.addListener(future -> { if (!future.isSuccess()) { logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause()); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(future.cause()); return; } initFuture.whenComplete((success, throwable) -> { if (throwable == null) { logger.debug("Connecting to Redis at {}: Success", redisAddress); RedisChannelHandler<?, ?> connection = connectionBuilder.connection(); connection.registerCloseables(closeableResources, connection); channelReadyFuture.complete(connectFuture.channel()); return; } logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable); connectionBuilder.endpoint().initialState(); Throwable failure; if (throwable instanceof RedisConnectionException) { failure = throwable; } else if (throwable instanceof TimeoutException) { failure = new RedisConnectionException("Could not initialize channel within " + connectionBuilder.getTimeout(), throwable); } else { failure = throwable; } channelReadyFuture.completeExceptionally(failure); CompletableFuture<Boolean> response = new CompletableFuture<>(); response.completeExceptionally(failure); }); }); return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder .connection())); }
connectionBuilder.build()異步
public RedisChannelInitializer build() { return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout); }
protected List<ChannelHandler> buildHandlers() { LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set"); LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set"); LettuceAssert.assertState(connection != null, "Connection must be set"); LettuceAssert.assertState(clientResources != null, "ClientResources must be set"); LettuceAssert.assertState(endpoint != null, "Endpoint must be set"); List<ChannelHandler> handlers = new ArrayList<>(); //設置clientOptions connection.setOptions(clientOptions); //添加channel監聽器 handlers.add(new ChannelGroupListener(channelGroup)); //添加命令編碼器 handlers.add(new CommandEncoder()); //添加commandHander handlers.add(commandHandlerSupplier.get()); //若是設置自動重連,則設置看門狗處理器 if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } //設置connectionEvenTrigger handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus())); if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } return handlers; }
看門狗處理器到做用就是在通道斷開是進行重連socket
protected ConnectionWatchdog createConnectionWatchdog() { //若是看門狗不爲null直接返回 if (connectionWatchdog != null) { return connectionWatchdog; } LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true"); LettuceAssert.assertState(timer != null, "Timer must be set for autoReconnect=true"); LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true"); //建立鏈接看門狗 ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer, clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection); //向endpoint註冊看門狗 endpoint.registerConnectionWatchdog(watchdog); connectionWatchdog = watchdog; return watchdog; }
lass PlainChannelInitializer extends io.netty.channel.ChannelInitializer<Channel> implements RedisChannelInitializer { //不ping final static Supplier<AsyncCommand<?, ?, ?>> NO_PING = () -> null; //處理器提供器 private final Supplier<List<ChannelHandler>> handlers; //ping命令提供器 private final Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier; private final ClientResources clientResources; //超時時間 private final Duration timeout; private volatile CompletableFuture<Boolean> initializedFuture = new CompletableFuture<>(); PlainChannelInitializer(Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier, Supplier<List<ChannelHandler>> handlers, ClientResources clientResources, Duration timeout) { this.pingCommandSupplier = pingCommandSupplier; this.handlers = handlers; this.clientResources = clientResources; this.timeout = timeout; } @Override protected void initChannel(Channel channel) throws Exception { //若是pipeline中沒有配置channelActivator則須要添加channelActivator處理器 if (channel.pipeline().get("channelActivator") == null) { channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() { private AsyncCommand<?, ?, ?> pingCommand; @Override public CompletableFuture<Boolean> channelInitialized() { return initializedFuture; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //若是通道斷開鏈接 clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx))); //若是初始化沒有完成則拋出異常 if (!initializedFuture.isDone()) { initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely")); } initializedFuture = new CompletableFuture<>(); pingCommand = null; super.channelInactive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ConnectionEvents.Activated) { if (!initializedFuture.isDone()) { initializedFuture.complete(true); clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx))); } } super.userEventTriggered(ctx, evt); } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { //經過事件總線發送鏈接事件 clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx))); //若是ping命令提供器不是NO_PING則發送執行ping if (pingCommandSupplier != NO_PING) { pingCommand = pingCommandSupplier.get(); pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout); } else { super.channelActive(ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (!initializedFuture.isDone()) { initializedFuture.completeExceptionally(cause); } super.exceptionCaught(ctx, cause); } }); } //將hanler提供器提供的處理器添加到channel中 for (ChannelHandler handler : handlers.get()) { channel.pipeline().addLast(handler); } clientResources.nettyCustomizer().afterChannelInitialized(channel); } static void pingBeforeActivate(AsyncCommand<?, ?, ?> cmd, CompletableFuture<Boolean> initializedFuture, ChannelHandlerContext ctx, ClientResources clientResources, Duration timeout) throws Exception { ctx.fireUserEventTriggered(new PingBeforeActivate(cmd)); Runnable timeoutGuard = () -> { if (cmd.isDone() || initializedFuture.isDone()) { return; } initializedFuture.completeExceptionally(new RedisCommandTimeoutException(String.format( "Cannot initialize channel (PING before activate) within %s", timeout))); }; Timeout timeoutHandle = clientResources.timer().newTimeout(t -> { if (clientResources.eventExecutorGroup().isShuttingDown()) { timeoutGuard.run(); return; } clientResources.eventExecutorGroup().submit(timeoutGuard); }, timeout.toNanos(), TimeUnit.NANOSECONDS); cmd.whenComplete((o, throwable) -> { timeoutHandle.cancel(); if (throwable == null) { ctx.fireChannelActive(); initializedFuture.complete(true); } else { initializedFuture.completeExceptionally(throwable); } }); } @Override public CompletableFuture<Boolean> channelInitialized() { return initializedFuture; } }