Lettuce建立鏈接過程源碼分析

   Lettuce是一個高級的Redis客戶端,下面經過對其建立鏈接過程的源碼進行走讀java

   下面看看RedisClient是如何建立單機模式的異步鏈接的, 首先從RedisClient中的connectAsync看起,在該方法中並無什麼特別的地方,在對RedisURI進行非空校驗後就直接調用了內部方法react

public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {

assertNotNull(redisURI);
return connectStandalone(codec, redisURI, redisURI.getTimeout());
}

  在內部方法中首先經過一個異步方式建立鏈接,在從ConnectionFuture中獲取鏈接redis

   /**
     * 獲取單機鏈接
     */
    private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
        //單機異步鏈接
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout);
        //獲取鏈接
        return getConnection(future);
    }

  那麼異步建立鏈接的過程又是什麼樣子的呢?下面就經過其代碼進行分析一下bootstrap

private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
            RedisURI redisURI, Duration timeout) {
        
        //編解碼器不能爲null
        assertNotNull(codec);
        //檢查URI是否有效
        checkValidRedisURI(redisURI);

        logger.debug("Trying to get a Redis connection for: " + redisURI);
        //建立DefaultEndpoint
        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);
        //建立connection,該connection是一個真正有效的connection其它的都是再此基礎上進行加強
        StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout);
        //異步方式建立鏈接
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
                () -> new CommandHandler(clientOptions, clientResources, endpoint));
        //註冊監聽器,在結束時觸發
        future.whenComplete((channelHandler, throwable) -> {
            //若是異常不爲null則表示鏈接建立異常,則須要關閉鏈接
            if (throwable != null) {
                connection.close();
            }
        });
        //返回
        return future;
    }

  在newStatefulRedisConnection中只是建立了鏈接對象,此時還不是一個可用鏈接服務器

protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
            RedisCodec<K, V> codec, Duration timeout) {
        return new StatefulRedisConnectionImpl<>(channelWriter, codec, timeout);
    }

  能夠看到在建立 StatefulRedisConnectionImpl實例的時候其實是建立了多種方式鏈接,異步鏈接,同步鏈接響應式鏈接dom

/**
     * 初始化一個新的鏈接
     */
    public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {

        super(writer, timeout);

        this.codec = codec;
        //建立異步步鏈接
        this.async = newRedisAsyncCommandsImpl();
        //建立同步鏈接
        this.sync = newRedisSyncCommandsImpl();
        //建立響應式鏈接
        this.reactive = newRedisReactiveCommandsImpl();
    }

  其中異步鏈接的方法以下:異步

 protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
        //使用裝飾器模式對當前實例進行加強
        return new RedisAsyncCommandsImpl<>(this, codec);
    }

  此時建立的鏈接對象還不是一個可用鏈接,關鍵邏輯仍是在connectionStatefulAsync中實現socket

private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
            DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
        //connetion構造器,在Lettuce中對於構造器模式運用不少
        ConnectionBuilder connectionBuilder;
        //根據是不是SSL選擇不一樣構造器
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        //設置connection
        connectionBuilder.connection(connection);
        //設置客戶端選項
        connectionBuilder.clientOptions(clientOptions);
        //設置客戶端資源
        connectionBuilder.clientResources(clientResources);
        //設置命令處理器以及endpoint
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
        //填充鏈接構造器,
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        //設置頻道類型,同時根據頻道類型設置客戶端NIO線程組
        channelType(connectionBuilder, redisURI);
         //在鏈接生效前是否須要ping   
        if (clientOptions.isPingBeforeActivateConnection()) {
            if (hasPassword(redisURI)) {
                connectionBuilder.enableAuthPingBeforeConnect();
            } else {
                connectionBuilder.enablePingBeforeConnect();
            }
        }
        //建立異步通道
        ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

        //若是客戶端選項配置了pingBeforeActivateConnection同時有密碼
        if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().auth(new String(redisURI.getPassword()));

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            future.thenApply(channelHandler -> {
                connection.setClientName(redisURI.getClientName());
                return channelHandler;
            });

        }

        if (redisURI.getDatabase() != 0) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().select(redisURI.getDatabase());

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        return future.thenApply(channelHandler -> (S) connection);
    }

  在connectionBuilder方法中建立了Netty的客戶端Bootstrapasync

protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
            RedisURI redisURI) {
       //建立Bootstrap netty啓動器
        Bootstrap redisBootstrap = new Bootstrap();
        //設置channel選項
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);

        redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
        //獲取套接字選項
        SocketOptions socketOptions = getOptions().getSocketOptions();
        //設置鏈接超時時間
        redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
                Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
        //若是redisURI中沒有socket選擇參數則根據clientresouce設置
        if (LettuceStrings.isEmpty(redisURI.getSocket())) {
            //是否保持長鏈接
            redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
            //是否要求TCP低延遲
            redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
        }
        //設置超時時間
        connectionBuilder.timeout(redisURI.getTimeout());
        //設置密碼
        connectionBuilder.password(redisURI.getPassword());
        //設置bootstrap
        connectionBuilder.bootstrap(redisBootstrap);
        connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
        connectionBuilder.socketAddressSupplier(socketAddressSupplier);
    }

  在channelType方法中設置了EeventLoopGroupide

 protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {

        LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");
        //設置客戶端線程組,EventLoopGroup用來處理全部頻道事件
        connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint));

        if (connectionPoint.getSocket() != null) {
            NativeTransports.assertAvailable();
            connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
        } else {
            connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
        }
    }

 

 /**
     *  異步處理鏈接同時經過connectionBuilder初始化一個通道
     */
    @SuppressWarnings("unchecked")
    protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
            ConnectionBuilder connectionBuilder) {
        //獲取socketAddress
        SocketAddress redisAddress = connectionBuilder.socketAddress();

        //若是線程池關閉則拋出異常
        if (clientResources.eventExecutorGroup().isShuttingDown()) {
            throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
        }

        logger.debug("Connecting to Redis at {}", redisAddress);
        //頻道準備就緒future
        CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();

        //獲取bootstrap
        Bootstrap redisBootstrap = connectionBuilder.bootstrap();
        //建立redis通道初始化器
        RedisChannelInitializer initializer = connectionBuilder.build();
        //設置netty的處理器
        redisBootstrap.handler(initializer);
        //netty自定設置處理
        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
        //鏈接Redis服務器,在該處纔是真正和服務器建立鏈接
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
        //增長監聽器
        connectFuture.addListener(future -> {
            //沒有成功
            if (!future.isSuccess()) {

                logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
                connectionBuilder.endpoint().initialState();
                //經過準備就緒異步結果異常結束
                channelReadyFuture.completeExceptionally(future.cause());
                return;
            }
            //completableFuture特性,在future結束的時候執行
            initFuture.whenComplete((success, throwable) -> {
                //若是throwable不爲null表示存在異常
                if (throwable == null) {
                    logger.debug("Connecting to Redis at {}: Success", redisAddress);
                    //獲取RedisChannelHandler
                    RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
                    //註冊可關閉資源,在connection關閉的時候關閉可關閉資源
                    connection.registerCloseables(closeableResources, connection);
                    //頻道準備就緒
                    channelReadyFuture.complete(connectFuture.channel());
                    return;
                }

                logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
                connectionBuilder.endpoint().initialState();
                Throwable failure;

                if (throwable instanceof RedisConnectionException) {
                    failure = throwable;
                } else if (throwable instanceof TimeoutException) {
                    failure = new RedisConnectionException("Could not initialize channel within "
                            + connectionBuilder.getTimeout(), throwable);
                } else {
                    failure = throwable;
                }
                channelReadyFuture.completeExceptionally(failure);

                CompletableFuture<Boolean> response = new CompletableFuture<>();
                response.completeExceptionally(failure);

            });
        });
        //針對connectionBuilder.connection()的結果進行裝飾,增長獲取remoteAddress功能
        return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
                .connection()));
    }

 

public RedisChannelInitializer build() {
        return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
    }

  在buildHandlers中建立了一些處理器,這些處理器都是有序的

  1. 命令編碼器,用戶將命令編碼爲Redis通訊協議規定的格式
  2. CammandHanler  lettuce核心功能
  3. ConnectionWatchDog 用於自動重連
  4. ConnectionEventTriger 用於發佈connection事件
 protected List<ChannelHandler> buildHandlers() {

        LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");
        LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set");
        LettuceAssert.assertState(connection != null, "Connection must be set");
        LettuceAssert.assertState(clientResources != null, "ClientResources must be set");
        LettuceAssert.assertState(endpoint != null, "Endpoint must be set");

        List<ChannelHandler> handlers = new ArrayList<>();
        //設置clientOptions
        connection.setOptions(clientOptions);
        //添加頻道監控,若是頻道有效則將頻道添加到頻道組中,若是頻道無效則從頻道組中刪除
        handlers.add(new ChannelGroupListener(channelGroup));
        //添加命令編碼器
        handlers.add(new CommandEncoder());
        //添加commandHander
        handlers.add(commandHandlerSupplier.get());
        //若是設置自動重連,則設置看門狗處理器
        if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
        }
        //設置connectionEvenTrigger
        handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));

        if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
        }

        return handlers;
    }

  

@Override
    protected void initChannel(Channel channel) throws Exception {

        //若是pipeline中沒有配置channelActivator則須要添加channelActivator處理器
        if (channel.pipeline().get("channelActivator") == null) {

            channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() {

                private AsyncCommand<?, ?, ?> pingCommand;

                @Override
                public CompletableFuture<Boolean> channelInitialized() {
                    return initializedFuture;
                }

                @Override
                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    //若是通道斷開鏈接
                    clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx)));
                    //若是初始化沒有完成則拋出異常
                    if (!initializedFuture.isDone()) {
                        initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely"));
                    }

                    initializedFuture = new CompletableFuture<>();
                    pingCommand = null;
                    super.channelInactive(ctx);
                }

                @Override
                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

                    if (evt instanceof ConnectionEvents.Activated) {
                        if (!initializedFuture.isDone()) {
                            initializedFuture.complete(true);
                            clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx)));
                        }
                    }
                    super.userEventTriggered(ctx, evt);
                }

                @Override
                public void channelActive(final ChannelHandlerContext ctx) throws Exception {
                    //經過事件總線發送鏈接事件
                    clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx)));
                    //若是ping命令提供器不是NO_PING則發送執行ping
                    if (pingCommandSupplier != NO_PING) {
                        pingCommand = pingCommandSupplier.get();
                        pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout);
                    } else {
                        super.channelActive(ctx);
                    }
                }

                @Override
                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                    if (!initializedFuture.isDone()) {
                        initializedFuture.completeExceptionally(cause);
                    }
                    super.exceptionCaught(ctx, cause);
                }
            });
        }
        //將hanler提供器提供的的處理器添加到該頻道的管道中
        for (ChannelHandler handler : handlers.get()) {
            channel.pipeline().addLast(handler);
        }
        //擴展點,用戶能夠對向pipline中添加自定義的channel
        clientResources.nettyCustomizer().afterChannelInitialized(channel);
    }
相關文章
相關標籤/搜索