Lettuce是一個高級的Redis客戶端,下面經過對其建立鏈接過程的源碼進行走讀java
下面看看RedisClient是如何建立單機模式的異步鏈接的, 首先從RedisClient中的connectAsync看起,在該方法中並無什麼特別的地方,在對RedisURI進行非空校驗後就直接調用了內部方法react
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
assertNotNull(redisURI);
return connectStandalone(codec, redisURI, redisURI.getTimeout());
}
在內部方法中首先經過一個異步方式建立鏈接,在從ConnectionFuture中獲取鏈接redis
/** * 獲取單機鏈接 */ private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { //單機異步鏈接 ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout); //獲取鏈接 return getConnection(future); }
那麼異步建立鏈接的過程又是什麼樣子的呢?下面就經過其代碼進行分析一下bootstrap
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { //編解碼器不能爲null assertNotNull(codec); //檢查URI是否有效 checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: " + redisURI); //建立DefaultEndpoint DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions); //建立connection,該connection是一個真正有效的connection其它的都是再此基礎上進行加強 StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout); //異步方式建立鏈接 ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler(clientOptions, clientResources, endpoint)); //註冊監聽器,在結束時觸發 future.whenComplete((channelHandler, throwable) -> { //若是異常不爲null則表示鏈接建立異常,則須要關閉鏈接 if (throwable != null) { connection.close(); } }); //返回 return future; }
在newStatefulRedisConnection中只是建立了鏈接對象,此時還不是一個可用鏈接服務器
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter, RedisCodec<K, V> codec, Duration timeout) { return new StatefulRedisConnectionImpl<>(channelWriter, codec, timeout); }
能夠看到在建立 StatefulRedisConnectionImpl實例的時候其實是建立了多種方式鏈接,異步鏈接,同步鏈接響應式鏈接dom
/** * 初始化一個新的鏈接 */ public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) { super(writer, timeout); this.codec = codec; //建立異步步鏈接 this.async = newRedisAsyncCommandsImpl(); //建立同步鏈接 this.sync = newRedisSyncCommandsImpl(); //建立響應式鏈接 this.reactive = newRedisReactiveCommandsImpl(); }
其中異步鏈接的方法以下:異步
protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() { //使用裝飾器模式對當前實例進行加強 return new RedisAsyncCommandsImpl<>(this, codec); }
此時建立的鏈接對象還不是一個可用鏈接,關鍵邏輯仍是在connectionStatefulAsync中實現socket
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) { //connetion構造器,在Lettuce中對於構造器模式運用不少 ConnectionBuilder connectionBuilder; //根據是不是SSL選擇不一樣構造器 if (redisURI.isSsl()) { SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder(); sslConnectionBuilder.ssl(redisURI); connectionBuilder = sslConnectionBuilder; } else { connectionBuilder = ConnectionBuilder.connectionBuilder(); } //設置connection connectionBuilder.connection(connection); //設置客戶端選項 connectionBuilder.clientOptions(clientOptions); //設置客戶端資源 connectionBuilder.clientResources(clientResources); //設置命令處理器以及endpoint connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); //填充鏈接構造器, connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); //設置頻道類型,同時根據頻道類型設置客戶端NIO線程組 channelType(connectionBuilder, redisURI); //在鏈接生效前是否須要ping if (clientOptions.isPingBeforeActivateConnection()) { if (hasPassword(redisURI)) { connectionBuilder.enableAuthPingBeforeConnect(); } else { connectionBuilder.enablePingBeforeConnect(); } } //建立異步通道 ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder); //若是客戶端選項配置了pingBeforeActivateConnection同時有密碼 if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) { future = future.thenApplyAsync(channelHandler -> { connection.async().auth(new String(redisURI.getPassword())); return channelHandler; }, clientResources.eventExecutorGroup()); } if (LettuceStrings.isNotEmpty(redisURI.getClientName())) { future.thenApply(channelHandler -> { connection.setClientName(redisURI.getClientName()); return channelHandler; }); } if (redisURI.getDatabase() != 0) { future = future.thenApplyAsync(channelHandler -> { connection.async().select(redisURI.getDatabase()); return channelHandler; }, clientResources.eventExecutorGroup()); } return future.thenApply(channelHandler -> (S) connection); }
在connectionBuilder方法中建立了Netty的客戶端Bootstrapasync
protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder, RedisURI redisURI) { //建立Bootstrap netty啓動器 Bootstrap redisBootstrap = new Bootstrap(); //設置channel選項 redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024); redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR); //獲取套接字選項 SocketOptions socketOptions = getOptions().getSocketOptions(); //設置鏈接超時時間 redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(socketOptions.getConnectTimeout().toMillis())); //若是redisURI中沒有socket選擇參數則根據clientresouce設置 if (LettuceStrings.isEmpty(redisURI.getSocket())) { //是否保持長鏈接 redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive()); //是否要求TCP低延遲 redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay()); } //設置超時時間 connectionBuilder.timeout(redisURI.getTimeout()); //設置密碼 connectionBuilder.password(redisURI.getPassword()); //設置bootstrap connectionBuilder.bootstrap(redisBootstrap); connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer); connectionBuilder.socketAddressSupplier(socketAddressSupplier); }
在channelType方法中設置了EeventLoopGroupide
protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) { LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null"); //設置客戶端線程組,EventLoopGroup用來處理全部頻道事件 connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint)); if (connectionPoint.getSocket() != null) { NativeTransports.assertAvailable(); connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass()); } else { connectionBuilder.bootstrap().channel(Transports.socketChannelClass()); } }
/** * 異步處理鏈接同時經過connectionBuilder初始化一個通道 */ @SuppressWarnings("unchecked") protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync( ConnectionBuilder connectionBuilder) { //獲取socketAddress SocketAddress redisAddress = connectionBuilder.socketAddress(); //若是線程池關閉則拋出異常 if (clientResources.eventExecutorGroup().isShuttingDown()) { throw new IllegalStateException("Cannot connect, Event executor group is terminated."); } logger.debug("Connecting to Redis at {}", redisAddress); //頻道準備就緒future CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>(); //獲取bootstrap Bootstrap redisBootstrap = connectionBuilder.bootstrap(); //建立redis通道初始化器 RedisChannelInitializer initializer = connectionBuilder.build(); //設置netty的處理器 redisBootstrap.handler(initializer); //netty自定設置處理 clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); CompletableFuture<Boolean> initFuture = initializer.channelInitialized(); //鏈接Redis服務器,在該處纔是真正和服務器建立鏈接 ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); //增長監聽器 connectFuture.addListener(future -> { //沒有成功 if (!future.isSuccess()) { logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause()); connectionBuilder.endpoint().initialState(); //經過準備就緒異步結果異常結束 channelReadyFuture.completeExceptionally(future.cause()); return; } //completableFuture特性,在future結束的時候執行 initFuture.whenComplete((success, throwable) -> { //若是throwable不爲null表示存在異常 if (throwable == null) { logger.debug("Connecting to Redis at {}: Success", redisAddress); //獲取RedisChannelHandler RedisChannelHandler<?, ?> connection = connectionBuilder.connection(); //註冊可關閉資源,在connection關閉的時候關閉可關閉資源 connection.registerCloseables(closeableResources, connection); //頻道準備就緒 channelReadyFuture.complete(connectFuture.channel()); return; } logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable); connectionBuilder.endpoint().initialState(); Throwable failure; if (throwable instanceof RedisConnectionException) { failure = throwable; } else if (throwable instanceof TimeoutException) { failure = new RedisConnectionException("Could not initialize channel within " + connectionBuilder.getTimeout(), throwable); } else { failure = throwable; } channelReadyFuture.completeExceptionally(failure); CompletableFuture<Boolean> response = new CompletableFuture<>(); response.completeExceptionally(failure); }); }); //針對connectionBuilder.connection()的結果進行裝飾,增長獲取remoteAddress功能 return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder .connection())); }
public RedisChannelInitializer build() { return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout); }
在buildHandlers中建立了一些處理器,這些處理器都是有序的
protected List<ChannelHandler> buildHandlers() { LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set"); LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set"); LettuceAssert.assertState(connection != null, "Connection must be set"); LettuceAssert.assertState(clientResources != null, "ClientResources must be set"); LettuceAssert.assertState(endpoint != null, "Endpoint must be set"); List<ChannelHandler> handlers = new ArrayList<>(); //設置clientOptions connection.setOptions(clientOptions); //添加頻道監控,若是頻道有效則將頻道添加到頻道組中,若是頻道無效則從頻道組中刪除 handlers.add(new ChannelGroupListener(channelGroup)); //添加命令編碼器 handlers.add(new CommandEncoder()); //添加commandHander handlers.add(commandHandlerSupplier.get()); //若是設置自動重連,則設置看門狗處理器 if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } //設置connectionEvenTrigger handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus())); if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } return handlers; }
@Override protected void initChannel(Channel channel) throws Exception { //若是pipeline中沒有配置channelActivator則須要添加channelActivator處理器 if (channel.pipeline().get("channelActivator") == null) { channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() { private AsyncCommand<?, ?, ?> pingCommand; @Override public CompletableFuture<Boolean> channelInitialized() { return initializedFuture; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //若是通道斷開鏈接 clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx))); //若是初始化沒有完成則拋出異常 if (!initializedFuture.isDone()) { initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely")); } initializedFuture = new CompletableFuture<>(); pingCommand = null; super.channelInactive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ConnectionEvents.Activated) { if (!initializedFuture.isDone()) { initializedFuture.complete(true); clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx))); } } super.userEventTriggered(ctx, evt); } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { //經過事件總線發送鏈接事件 clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx))); //若是ping命令提供器不是NO_PING則發送執行ping if (pingCommandSupplier != NO_PING) { pingCommand = pingCommandSupplier.get(); pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout); } else { super.channelActive(ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (!initializedFuture.isDone()) { initializedFuture.completeExceptionally(cause); } super.exceptionCaught(ctx, cause); } }); } //將hanler提供器提供的的處理器添加到該頻道的管道中 for (ChannelHandler handler : handlers.get()) { channel.pipeline().addLast(handler); } //擴展點,用戶能夠對向pipline中添加自定義的channel clientResources.nettyCustomizer().afterChannelInitialized(channel); }