Lettuce異步讀取過程分析

 經過走讀Lettuce異步讀取源碼,針對Lettuce鏈接創建過程進行源碼走讀java

整體展現一個Lettuce異步get時序node

 

經過時序圖能夠發現MasterSlaveChannelWriter主要提供一個負載分配的功能,並非真正的命令發送服務react

下面經過源碼分析實現過程redis

public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec,
            Iterable<RedisURI> redisURIs) {

        LettuceAssert.notNull(redisClient, "RedisClient must not be null");
        LettuceAssert.notNull(codec, "RedisCodec must not be null");
        LettuceAssert.notNull(redisURIs, "RedisURIs must not be null");

        List<RedisURI> uriList = LettuceLists.newList(redisURIs);
        LettuceAssert.isTrue(!uriList.isEmpty(), "RedisURIs must not be empty");

        if (isSentinel(uriList.get(0))) {
            return connectSentinel(redisClient, codec, uriList.get(0));
        } else {
            return connectStaticMasterSlave(redisClient, codec, uriList);
        }
    }

 

private static <K, V> StatefulRedisMasterSlaveConnection<K, V> connectSentinel(RedisClient redisClient,
            RedisCodec<K, V> codec, RedisURI redisURI) {
        //建立拓撲提供者爲哨兵拓撲
        TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);

        //建立哨兵拓撲刷新服務
        SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
                redisURI.getSentinelMasterId(), redisURI.getSentinels());

        //利用拓撲提供者和redisClient建立主備拓撲刷新服務
        MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
        
        //建立主備鏈接提供者
        MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
                redisURI, Collections.emptyMap());
        //使用主備拓撲刷新服務獲取全部節點將其設置到鏈接提供者中
        connectionProvider.setKnownNodes(refresh.getNodes(redisURI));
        
        //使用鏈接提供者建立主備通道寫入器
        MasterSlaveChannelWriter<K, V> channelWriter = new MasterSlaveChannelWriter<>(connectionProvider);
        
        //建立鏈接
        StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter,
                codec, redisURI.getTimeout());

        connection.setOptions(redisClient.getOptions());

        Runnable runnable = () -> {
            try {

                LOG.debug("Refreshing topology");
                List<RedisNodeDescription> nodes = refresh.getNodes(redisURI);

                if (nodes.isEmpty()) {
                    LOG.warn("Topology refresh returned no nodes from {}", redisURI);
                }

                LOG.debug("New topology: {}", nodes);
                connectionProvider.setKnownNodes(nodes);
            } catch (Exception e) {
                LOG.error("Error during background refresh", e);
            }
        };

        try {
            //向鏈接註冊可關閉服務
            connection.registerCloseables(new ArrayList<>(), sentinelTopologyRefresh);
            //綁定哨兵拓撲結構變化執行邏輯
            sentinelTopologyRefresh.bind(runnable);
        } catch (RuntimeException e) {

            connection.close();
            throw e;
        }

        return connection;
    }

  

 public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {

        super(writer, timeout);

        this.codec = codec;
        //建立異步步鏈接
        this.async = newRedisAsyncCommandsImpl();
        //建立同步鏈接
        this.sync = newRedisSyncCommandsImpl();
        //建立響應式鏈接
        this.reactive = newRedisReactiveCommandsImpl();
    }

  

 protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
        //使用裝飾器模式對當前實例進行加強
        return new RedisAsyncCommandsImpl<>(this, codec);
    }

  

    public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
    }

  

    public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec) {
        this.connection = connection;
        this.codec = codec;
        this.commandBuilder = new RedisCommandBuilder<>(codec);
    }

  StatefulRedisConnectionImplbootstrap

  @Override
    public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
        //前置處理
        RedisCommand<K, V, T> toSend = preProcessCommand(command);

        try {
            //經過父類進行派發,父類中對writer爲當前類對構造方法對入參
            return super.dispatch(toSend);
        } finally {
            if (command.getType().name().equals(MULTI.name())) {
                multi = (multi == null ? new MultiOutput<>(codec) : multi);
            }
        }
    }

  

 protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {

        if (debugEnabled) {
            logger.debug("dispatching command {}", cmd);
        }
        //將發送命令對處理委派給channelWriter處理
        return channelWriter.write(cmd);
    }

  MasterSlaveChannelWriter緩存

  @Override
    public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

        LettuceAssert.notNull(command, "Command must not be null");

        if (closed) {
            throw new RedisException("Connection is closed");
        }
        //獲取命令意圖
        Intent intent = getIntent(command.getType());
        //根據讀寫意圖獲取鏈接
        StatefulRedisConnection<K, V> connection = (StatefulRedisConnection) masterSlaveConnectionProvider
                .getConnection(intent);
        //經過這個connection派發命令
        return connection.dispatch(command);
    }

  

//根據意圖獲取鏈接
    public StatefulRedisConnection<K, V> getConnection(Intent intent) {

        if (debugEnabled) {
            logger.debug("getConnection(" + intent + ")");
        }
        //若是readFrom不爲null且是READ
        if (readFrom != null && intent == Intent.READ) {
            //根據readFrom配置從已知節點中選擇可用節點描述
            List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {
                @Override
                public List<RedisNodeDescription> getNodes() {
                    return knownNodes;
                }

                @Override
                public Iterator<RedisNodeDescription> iterator() {
                    return knownNodes.iterator();
                }
            });
            //若是可選擇節點集合爲空則拋出異常
            if (selection.isEmpty()) {
                throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
                        knownNodes, readFrom));
            }
            try {
                //遍歷全部可用節點
                for (RedisNodeDescription redisNodeDescription : selection) {
                    //獲取節點鏈接
                    StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);
                    //若是節點鏈接不是打開到鏈接則繼續查找下一個鏈接
                    if (!readerCandidate.isOpen()) {
                        continue;
                    }
                    //返回可用鏈接
                    return readerCandidate;
                }
                //若是沒有找到可用鏈接,默認返回第一個
                return getConnection(selection.get(0));
            } catch (RuntimeException e) {
                throw new RedisException(e);
            }
        }
        //若是沒有配置readFrom或者不是READ 則返回master鏈接
        return getConnection(getMaster());
    }

  

 protected StatefulRedisConnection<K, V> getConnection(RedisNodeDescription redisNodeDescription) {
       //若是沒有則建立新節點,並添加到緩存中
        return connections.computeIfAbsent(
                new ConnectionKey(redisNodeDescription.getUri().getHost(), redisNodeDescription.getUri().getPort()),
                connectionFactory);
    }

  建立實際的connectioapp

  @Override
        public StatefulRedisConnection<K, V> apply(ConnectionKey key) {
            //構建URI
            RedisURI.Builder builder = RedisURI.Builder
                    .redis(key.host, key.port)
                    .withSsl(initialRedisUri.isSsl())
                    .withVerifyPeer(initialRedisUri.isVerifyPeer())
                    .withStartTls(initialRedisUri.isStartTls());

            if (initialRedisUri.getPassword() != null && initialRedisUri.getPassword().length != 0) {
                builder.withPassword(initialRedisUri.getPassword());
            }

            if (initialRedisUri.getClientName() != null) {
                builder.withClientName(initialRedisUri.getClientName());
            }
            builder.withDatabase(initialRedisUri.getDatabase());
            
            //建立鏈接
            StatefulRedisConnection<K, V> connection = redisClient.connect(redisCodec, builder.build());
            
            //設置是否自動提交
            synchronized (stateLock) {
                connection.setAutoFlushCommands(autoFlushCommands);
            }

            return connection;
        }

  

public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {

        assertNotNull(redisURI);
        return connectStandalone(codec, redisURI, redisURI.getTimeout());
    }

  

 private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
        //單機異步
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout);
        //獲取鏈接
        return getConnection(future);
    }

  

    private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
            RedisURI redisURI, Duration timeout) {

        assertNotNull(codec);
        //檢查URI是否有效
        checkValidRedisURI(redisURI);

        logger.debug("Trying to get a Redis connection for: " + redisURI);
        //建立DefaultEndpoint
        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);
        //建立connection
        StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout);
        //建立鏈接
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
                () -> new CommandHandler(clientOptions, clientResources, endpoint));

        future.whenComplete((channelHandler, throwable) -> {

            if (throwable != null) {
                connection.close();
            }
        });

        return future;
    }

  

   private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
            DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
        //connetion構造器
        ConnectionBuilder connectionBuilder;
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        //設置connection
        connectionBuilder.connection(connection);
        //設置客戶端選項
        connectionBuilder.clientOptions(clientOptions);
        //設置客戶端資源
        connectionBuilder.clientResources(clientResources);
        //設置命令處理器以及endpoint
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
        //填充鏈接構造器
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        //設置通道類型
        channelType(connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            if (hasPassword(redisURI)) {
                connectionBuilder.enableAuthPingBeforeConnect();
            } else {
                connectionBuilder.enablePingBeforeConnect();
            }
        }
        //建立異步通道
        ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
        
        //若是客戶端選項配置了pingBeforeActivateConnection同時有密碼
        if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().auth(new String(redisURI.getPassword()));

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            future.thenApply(channelHandler -> {
                connection.setClientName(redisURI.getClientName());
                return channelHandler;
            });

        }

        if (redisURI.getDatabase() != 0) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().select(redisURI.getDatabase());

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        return future.thenApply(channelHandler -> (S) connection);
    }

  

/**
     *  鏈接同時經過connectionBuilder初始化一個通道
     */
    @SuppressWarnings("unchecked")
    protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
            ConnectionBuilder connectionBuilder) {
        //獲取socketAddress
        SocketAddress redisAddress = connectionBuilder.socketAddress();

        //若是線程池關閉則拋出異常
        if (clientResources.eventExecutorGroup().isShuttingDown()) {
            throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
        }

        logger.debug("Connecting to Redis at {}", redisAddress);

        CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();

        //獲取bootstrap
        Bootstrap redisBootstrap = connectionBuilder.bootstrap();
        //建立redis通道初始化器
        RedisChannelInitializer initializer = connectionBuilder.build();
        //設置netty的處理器
        redisBootstrap.handler(initializer);
        //netty自定設置處理
        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
        //增長監聽器
        connectFuture.addListener(future -> {
                
            if (!future.isSuccess()) {

                logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
                connectionBuilder.endpoint().initialState();
                channelReadyFuture.completeExceptionally(future.cause());
                return;
            }

            initFuture.whenComplete((success, throwable) -> {

                if (throwable == null) {
                    logger.debug("Connecting to Redis at {}: Success", redisAddress);
                    RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
                    connection.registerCloseables(closeableResources, connection);
                    channelReadyFuture.complete(connectFuture.channel());
                    return;
                }

                logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
                connectionBuilder.endpoint().initialState();
                Throwable failure;

                if (throwable instanceof RedisConnectionException) {
                    failure = throwable;
                } else if (throwable instanceof TimeoutException) {
                    failure = new RedisConnectionException("Could not initialize channel within "
                            + connectionBuilder.getTimeout(), throwable);
                } else {
                    failure = throwable;
                }
                channelReadyFuture.completeExceptionally(failure);

                CompletableFuture<Boolean> response = new CompletableFuture<>();
                response.completeExceptionally(failure);

            });
        });

        return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
                .connection()));
    }

  connectionBuilder.build()異步

 public RedisChannelInitializer build() {
        return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
    }

  

protected List<ChannelHandler> buildHandlers() {

        LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");
        LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set");
        LettuceAssert.assertState(connection != null, "Connection must be set");
        LettuceAssert.assertState(clientResources != null, "ClientResources must be set");
        LettuceAssert.assertState(endpoint != null, "Endpoint must be set");

        List<ChannelHandler> handlers = new ArrayList<>();
        //設置clientOptions
        connection.setOptions(clientOptions);
        //添加channel監聽器
        handlers.add(new ChannelGroupListener(channelGroup));
        //添加命令編碼器
        handlers.add(new CommandEncoder());
        //添加commandHander
        handlers.add(commandHandlerSupplier.get());
        //若是設置自動重連,則設置看門狗處理器
        if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
        }
        //設置connectionEvenTrigger
        handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));

        if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
        }

        return handlers;
    }

  看門狗處理器到做用就是在通道斷開是進行重連socket

 protected ConnectionWatchdog createConnectionWatchdog() {
         //若是看門狗不爲null直接返回
        if (connectionWatchdog != null) {
            return connectionWatchdog;
        }

        LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true");
        LettuceAssert.assertState(timer != null, "Timer must be set for autoReconnect=true");
        LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");
        //建立鏈接看門狗
        ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer,
                clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection);
        //向endpoint註冊看門狗
        endpoint.registerConnectionWatchdog(watchdog);

        connectionWatchdog = watchdog;
        return watchdog;
    }

  

lass PlainChannelInitializer extends io.netty.channel.ChannelInitializer<Channel> implements RedisChannelInitializer {

    //不ping
    final static Supplier<AsyncCommand<?, ?, ?>> NO_PING = () -> null;
    //處理器提供器
    private final Supplier<List<ChannelHandler>> handlers;
    //ping命令提供器
    private final Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier;
    private final ClientResources clientResources;
    //超時時間
    private final Duration timeout;

    private volatile CompletableFuture<Boolean> initializedFuture = new CompletableFuture<>();

    PlainChannelInitializer(Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier, Supplier<List<ChannelHandler>> handlers,
            ClientResources clientResources, Duration timeout) {
        this.pingCommandSupplier = pingCommandSupplier;
        this.handlers = handlers;
        this.clientResources = clientResources;
        this.timeout = timeout;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {

        //若是pipeline中沒有配置channelActivator則須要添加channelActivator處理器
        if (channel.pipeline().get("channelActivator") == null) {

            channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() {

                private AsyncCommand<?, ?, ?> pingCommand;

                @Override
                public CompletableFuture<Boolean> channelInitialized() {
                    return initializedFuture;
                }

                @Override
                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    //若是通道斷開鏈接
                    clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx)));
                    //若是初始化沒有完成則拋出異常
                    if (!initializedFuture.isDone()) {
                        initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely"));
                    }

                    initializedFuture = new CompletableFuture<>();
                    pingCommand = null;
                    super.channelInactive(ctx);
                }

                @Override
                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

                    if (evt instanceof ConnectionEvents.Activated) {
                        if (!initializedFuture.isDone()) {
                            initializedFuture.complete(true);
                            clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx)));
                        }
                    }
                    super.userEventTriggered(ctx, evt);
                }

                @Override
                public void channelActive(final ChannelHandlerContext ctx) throws Exception {
                    //經過事件總線發送鏈接事件
                    clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx)));
                    //若是ping命令提供器不是NO_PING則發送執行ping
                    if (pingCommandSupplier != NO_PING) {
                        pingCommand = pingCommandSupplier.get();
                        pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout);
                    } else {
                        super.channelActive(ctx);
                    }
                }

                @Override
                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                    if (!initializedFuture.isDone()) {
                        initializedFuture.completeExceptionally(cause);
                    }
                    super.exceptionCaught(ctx, cause);
                }
            });
        }
        //將hanler提供器提供的處理器添加到channel中
        for (ChannelHandler handler : handlers.get()) {
            channel.pipeline().addLast(handler);
        }

        clientResources.nettyCustomizer().afterChannelInitialized(channel);
    }

    static void pingBeforeActivate(AsyncCommand<?, ?, ?> cmd, CompletableFuture<Boolean> initializedFuture,
            ChannelHandlerContext ctx, ClientResources clientResources, Duration timeout) throws Exception {

        ctx.fireUserEventTriggered(new PingBeforeActivate(cmd));

        Runnable timeoutGuard = () -> {

            if (cmd.isDone() || initializedFuture.isDone()) {
                return;
            }

            initializedFuture.completeExceptionally(new RedisCommandTimeoutException(String.format(
                    "Cannot initialize channel (PING before activate) within %s", timeout)));
        };

        Timeout timeoutHandle = clientResources.timer().newTimeout(t -> {

            if (clientResources.eventExecutorGroup().isShuttingDown()) {
                timeoutGuard.run();
                return;
            }

            clientResources.eventExecutorGroup().submit(timeoutGuard);
        }, timeout.toNanos(), TimeUnit.NANOSECONDS);

        cmd.whenComplete((o, throwable) -> {

            timeoutHandle.cancel();

            if (throwable == null) {
                ctx.fireChannelActive();
                initializedFuture.complete(true);
            } else {
                initializedFuture.completeExceptionally(throwable);
            }
        });

    }

    @Override
    public CompletableFuture<Boolean> channelInitialized() {
        return initializedFuture;
    }

}
相關文章
相關標籤/搜索