Redis高級客戶端Lettuce詳解

前提

Lettuce是一個RedisJava驅動包,初識她的時候是使用RedisTemplate的時候遇到點問題Debug到底層的一些源碼,發現spring-data-redis的驅動包在某個版本以後替換爲LettuceLettuce翻譯爲生菜,沒錯,就是吃的那種生菜,因此它的Logo長這樣:html

既然能被Spring生態所承認,Lettuce想必有過人之處,因而筆者花時間閱讀她的官方文檔,整理測試示例,寫下這篇文章。編寫本文時所使用的版本爲Lettuce 5.1.8.RELEASESpringBoot 2.1.8.RELEASEJDK [8,11]超長警告:這篇文章斷斷續續花了兩週完成,超過4萬字.....java

Lettuce簡介

Lettuce是一個高性能基於Java編寫的Redis驅動框架,底層集成了Project Reactor提供自然的反應式編程,通訊框架集成了Netty使用了非阻塞IO5.x版本以後融合了JDK1.8的異步編程特性,在保證高性能的同時提供了十分豐富易用的API5.1版本的新特性以下:node

  • 支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX
  • 支持經過Brave模塊跟蹤Redis命令執行。
  • 支持Redis Streams
  • 支持異步的主從鏈接。
  • 支持異步鏈接池。
  • 新增命令最多執行一次模式(禁止自動重連)。
  • 全局命令超時設置(對異步和反應式命令也有效)。
  • ......等等

注意一點Redis的版本至少須要2.6,固然越高越好,API的兼容性比較強大。react

只須要引入單個依賴就能夠開始愉快地使用Lettuceweb

  • Maven
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
  • Gradle
dependencies {
  compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}

鏈接Redis

單機、哨兵、集羣模式下鏈接Redis須要一個統一的標準去表示鏈接的細節信息,在Lettuce中這個統一的標準是RedisURI。能夠經過三種方式構造一個RedisURI實例:redis

  • 定製的字符串URI語法:
RedisURI uri = RedisURI.create("redis://localhost/");
  • 使用建造器(RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • 直接經過構造函數實例化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

定製的鏈接URI語法

  • 單機(前綴爲redis://
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s
簡單:redis://localhost
  • 單機而且使用SSL(前綴爲rediss://) <== 注意後面多了個s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s
簡單:rediss://localhost
  • 單機Unix Domain Sockets模式(前綴爲redis-socket://
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • 哨兵(前綴爲redis-sentinel://
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster

超時時間單位:spring

  • d 天
  • h 小時
  • m 分鐘
  • s 秒鐘
  • ms 毫秒
  • us 微秒
  • ns 納秒

我的建議使用RedisURI提供的建造器,畢竟定製的URI雖然簡潔,可是比較容易出現人爲錯誤。鑑於筆者沒有SSLUnix Domain Socket的使用場景,下面不對這兩種鏈接方式進行列舉。shell

基本使用

Lettuce使用的時候依賴於四個主要組件:apache

  • RedisURI:鏈接信息。
  • RedisClientRedis客戶端,特殊地,集羣鏈接有一個定製的RedisClusterClient
  • ConnectionRedis鏈接,主要是StatefulConnection或者StatefulRedisConnection的子類,鏈接的類型主要由鏈接的具體方式(單機、哨兵、集羣、訂閱發佈等等)選定,比較重要。
  • RedisCommandsRedis命令API接口,基本上覆蓋了Redis發行版本的全部命令,提供了同步(sync)、異步(async)、反應式(reative)的調用方式,對於使用者而言,會常常跟RedisCommands系列接口打交道。

一個基本使用例子以下:編程

@Test
public void testSetGet() throws Exception {
    RedisURI redisUri = RedisURI.builder()                    // <1> 建立單機鏈接的鏈接信息
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);   // <2> 建立客戶端
    StatefulRedisConnection<String, String> connection = redisClient.connect();     // <3> 建立線程安全的鏈接
    RedisCommands<String, String> redisCommands = connection.sync();                // <4> 建立同步命令
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    String result = redisCommands.set("name", "throwable", setArgs);
    Assertions.assertThat(result).isEqualToIgnoringCase("OK");
    result = redisCommands.get("name");
    Assertions.assertThat(result).isEqualTo("throwable");
    // ... 其餘操做
    connection.close();   // <5> 關閉鏈接
    redisClient.shutdown();  // <6> 關閉客戶端
}

注意:

  • <5>:關閉鏈接通常在應用程序中止以前操做,一個應用程序中的一個Redis驅動實例不須要太多的鏈接(通常狀況下只須要一個鏈接實例就能夠,若是有多個鏈接的須要能夠考慮使用鏈接池,其實Redis目前處理命令的模塊是單線程,在客戶端多個鏈接多線程調用理論上沒有效果)。
  • <6>:關閉客戶端通常應用程序中止以前操做,若是條件容許的話,基於後開先閉原則,客戶端關閉應該在鏈接關閉以後操做。

API

Lettuce主要提供三種API

  • 同步(sync):RedisCommands
  • 異步(async):RedisAsyncCommands
  • 反應式(reactive):RedisReactiveCommands

先準備好一個單機Redis鏈接備用:

private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;

@BeforeClass
public static void beforeClass() {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    CLIENT = RedisClient.create(redisUri);
    CONNECTION = CLIENT.connect();
}

@AfterClass
public static void afterClass() throws Exception {
    CONNECTION.close();
    CLIENT.shutdown();
}

Redis命令API的具體實現能夠直接從StatefulRedisConnection實例獲取,見其接口定義:

public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {

    boolean isMulti();

    RedisCommands<K, V> sync();

    RedisAsyncCommands<K, V> async();

    RedisReactiveCommands<K, V> reactive();
}

值得注意的是,在不指定編碼解碼器RedisCodec的前提下,RedisClient建立的StatefulRedisConnection實例通常是泛型實例StatefulRedisConnection<String,String>,也就是全部命令APIKEYVALUE都是String類型,這種使用方式能知足大部分的使用場景。固然,必要的時候能夠定製編碼解碼器RedisCodec<K,V>

同步API

先構建RedisCommands實例:

private static RedisCommands<String, String> COMMAND;

@BeforeClass
public static void beforeClass() {
    COMMAND = CONNECTION.sync();
}

基本使用:

@Test
public void testSyncPing() throws Exception {
   String pong = COMMAND.ping();
   Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}


@Test
public void testSyncSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    COMMAND.set("name", "throwable", setArgs);
    String value = COMMAND.get("name");
    log.info("Get value: {}", value);
}

// Get value: throwable

同步API在全部命令調用以後會當即返回結果。若是熟悉Jedis的話,RedisCommands的用法其實和它相差不大。

異步API

先構建RedisAsyncCommands實例:

private static RedisAsyncCommands<String, String> ASYNC_COMMAND;

@BeforeClass
public static void beforeClass() {
    ASYNC_COMMAND = CONNECTION.async();
}

基本使用:

@Test
public void testAsyncPing() throws Exception {
    RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
    log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG

RedisAsyncCommands全部方法執行返回結果都是RedisFuture實例,而RedisFuture接口的定義以下:

public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {

    String getError();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}

也就是,RedisFuture能夠無縫使用Future或者JDK1.8中引入的CompletableFuture提供的方法。舉個例子:

@Test
public void testAsyncSetAndGet1() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
    // CompletableFuture#thenAccept()
    future.thenAccept(value -> log.info("Set命令返回:{}", value));
    // Future#get()
    future.get();
}
// Set命令返回:OK

@Test
public void testAsyncSetAndGet2() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    CompletableFuture<Void> result =
            (CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
                    .thenAcceptBoth(ASYNC_COMMAND.get("name"),
                            (s, g) -> {
                                log.info("Set命令返回:{}", s);
                                log.info("Get命令返回:{}", g);
                            });
    result.get();
}
// Set命令返回:OK
// Get命令返回:throwable

若是能熟練使用CompletableFuture和函數式編程技巧,能夠組合多個RedisFuture完成一些列複雜的操做。

反應式API

Lettuce引入的反應式編程框架是Project Reactor,若是沒有反應式編程經驗能夠先自行了解一下Project Reactor

構建RedisReactiveCommands實例:

private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;

@BeforeClass
public static void beforeClass() {
    REACTIVE_COMMAND = CONNECTION.reactive();
}

根據Project ReactorRedisReactiveCommands的方法若是返回的結果只包含0或1個元素,那麼返回值類型是Mono,若是返回的結果包含0到N(N大於0)個元素,那麼返回值是Flux。舉個例子:

@Test
public void testReactivePing() throws Exception {
    Mono<String> ping = REACTIVE_COMMAND.ping();
    ping.subscribe(v -> log.info("Ping result:{}", v));
    Thread.sleep(1000);
}
// Ping result:PONG

@Test
public void testReactiveSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
    REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
    Thread.sleep(1000);
}
// Get命令返回:throwable

@Test
public void testReactiveSet() throws Exception {
    REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
    Flux<String> flux = REACTIVE_COMMAND.smembers("food");
    flux.subscribe(log::info);
    REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
    Thread.sleep(1000);
}
// meat
// bread
// fish

舉個更加複雜的例子,包含了事務、函數轉換等:

@Test
public void testReactiveFunctional() throws Exception {
    REACTIVE_COMMAND.multi().doOnSuccess(r -> {
        REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
        REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
    }).flatMap(s -> REACTIVE_COMMAND.exec())
            .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
            .subscribe();
    Thread.sleep(1000);
}
// OK
// 2
// Discarded:false

這個方法開啓一個事務,先把counter設置爲1,再將counter自增1。

發佈和訂閱

非集羣模式下的發佈訂閱依賴於定製的鏈接StatefulRedisPubSubConnection,集羣模式下的發佈訂閱依賴於定製的鏈接StatefulRedisClusterPubSubConnection,二者分別來源於RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub()

  • 非集羣模式:
// 多是單機、普通主從、哨兵等非集羣模式的客戶端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });

// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

// 異步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// 反應式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • 集羣模式:
// 使用方式其實和非集羣模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...

這裏用單機同步命令的模式舉一個Redis鍵空間通知(Redis Keyspace Notifications)的例子:

@Test
public void testSyncKeyspaceNotification() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            // 注意這裏只能是0號庫
            .withDatabase(0)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
    RedisCommands<String, String> redisCommands = redisConnection.sync();
    // 只接收鍵過時的事件
    redisCommands.configSet("notify-keyspace-events", "Ex");
    StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
    connection.addListener(new RedisPubSubAdapter<>() {

        @Override
        public void psubscribed(String pattern, long count) {
            log.info("pattern:{},count:{}", pattern, count);
        }

        @Override
        public void message(String pattern, String channel, String message) {
            log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
        }
    });
    RedisPubSubCommands<String, String> commands = connection.sync();
    commands.psubscribe("__keyevent@0__:expired");
    redisCommands.setex("name", 2, "throwable");
    Thread.sleep(10000);
    redisConnection.close();
    connection.close();
    redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

實際上,在實現RedisPubSubListener的時候能夠單獨抽離,儘可能不要設計成匿名內部類的形式。

事務和批量命令執行

事務相關的命令就是WATCHUNWATCHEXECMULTIDISCARD,在RedisCommands系列接口中有對應的方法。舉個例子:

// 同步模式
@Test
public void testSyncMulti() throws Exception {
    COMMAND.multi();
    COMMAND.setex("name-1", 2, "throwable");
    COMMAND.setex("name-2", 2, "doge");
    TransactionResult result = COMMAND.exec();
    int index = 0;
    for (Object r : result) {
        log.info("Result-{}:{}", index, r);
        index++;
    }
}
// Result-0:OK
// Result-1:OK

RedisPipeline也就是管道機制能夠理解爲把多個命令打包在一次請求發送到Redis服務端,而後Redis服務端把全部的響應結果打包好一次性返回,從而節省沒必要要的網絡資源(最主要是減小網絡請求次數)。Redis對於Pipeline機制如何實現並無明確的規定,也沒有提供特殊的命令支持Pipeline機制。Jedis中底層採用BIO(阻塞IO)通信,因此它的作法是客戶端緩存將要發送的命令,最後須要觸發而後同步發送一個巨大的命令列表包,再接收和解析一個巨大的響應列表包。PipelineLettuce中對使用者是透明的,因爲底層的通信框架是Netty,因此網絡通信層面的優化Lettuce不須要過多幹預,換言之能夠這樣理解:NettyLettuce從底層實現了RedisPipeline機制。可是,Lettuce的異步API也提供了手動Flush的方法:

@Test
public void testAsyncManualFlush() {
    // 取消自動flush
    ASYNC_COMMAND.setAutoFlushCommands(false);
    List<RedisFuture<?>> redisFutures = Lists.newArrayList();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        redisFutures.add(ASYNC_COMMAND.set(key, value));
        redisFutures.add(ASYNC_COMMAND.expire(key, 2));
    }
    long start = System.currentTimeMillis();
    ASYNC_COMMAND.flushCommands();
    boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
    Assertions.assertThat(result).isTrue();
    log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms

上面只是從文檔看到的一些理論術語,可是現實是骨感的,對比了下JedisPipeline提供的方法,發現了JedisPipeline執行耗時比較低:

@Test
public void testJedisPipeline() throws Exception {
    Jedis jedis = new Jedis();
    Pipeline pipeline = jedis.pipelined();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        pipeline.set(key, value);
        pipeline.expire(key, 2);
    }
    long start = System.currentTimeMillis();
    pipeline.syncAndReturnAll();
    log.info("Jedis cost:{} ms", System.currentTimeMillis()  - start);
}
// Jedis cost:9 ms

我的猜想Lettuce可能底層並不是合併全部命令一次發送(甚至多是單條發送),具體可能須要抓包才能定位。依此來看,若是真的有大量執行Redis命令的場景,不妨可使用JedisPipeline

注意:由上面的測試推斷RedisTemplateexecutePipelined()方法是假的Pipeline執行方法,使用RedisTemplate的時候請務必注意這一點。

Lua腳本執行

Lettuce中執行RedisLua命令的同步接口以下:

public interface RedisScriptingCommands<K, V> {

    <T> T eval(String var1, ScriptOutputType var2, K... var3);

    <T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);

    <T> T evalsha(String var1, ScriptOutputType var2, K... var3);

    <T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);

    List<Boolean> scriptExists(String... var1);

    String scriptFlush();

    String scriptKill();

    String scriptLoad(V var1);

    String digest(V var1);
}

異步和反應式的接口方法定義差很少,不一樣的地方就是返回值類型,通常咱們經常使用的是eval()evalsha()scriptLoad()方法。舉個簡單的例子:

private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
        "local value = ARGV[1]\n" +
        "local timeout = ARGV[2]\n" +
        "redis.call('SETEX', key, tonumber(timeout), value)\n" +
        "local result = redis.call('GET', key)\n" +
        "return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();

@Test
public void testLua() throws Exception {
    LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
    String[] keys = new String[]{"name"};
    String[] args = new String[]{"throwable", "5000"};
    String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
    log.info("Get value:{}", result);
}
// Get value:throwable

高可用和分片

爲了Redis的高可用,通常會採用普通主從(Master/Replica,這裏筆者稱爲普通主從模式,也就是僅僅作了主從複製,故障須要手動切換)、哨兵和集羣。普通主從模式能夠獨立運行,也能夠配合哨兵運行,只是哨兵提供自動故障轉移和主節點提高功能。普通主從和哨兵均可以使用MasterSlave,經過入參包括RedisClient、編碼解碼器以及一個或者多個RedisURI獲取對應的Connection實例。

這裏注意一點MasterSlave中提供的方法若是隻要求傳入一個RedisURI實例,那麼Lettuce會進行拓撲發現機制,自動獲取Redis主從節點信息;若是要求傳入一個RedisURI集合,那麼對於普通主從模式來講全部節點信息是靜態的,不會進行發現和更新。

拓撲發現的規則以下:

  • 對於普通主從(Master/Replica)模式,不須要感知RedisURI指向從節點仍是主節點,只會進行一次性的拓撲查找全部節點信息,此後節點信息會保存在靜態緩存中,不會更新。
  • 對於哨兵模式,會訂閱全部哨兵實例並偵聽訂閱/發佈消息以觸發拓撲刷新機制,更新緩存的節點信息,也就是哨兵自然就是動態發現節點信息,不支持靜態配置。

拓撲發現機制的提供APITopologyProvider,須要瞭解其原理的能夠參考具體的實現。

對於集羣(Cluster)模式,Lettuce提供了一套獨立的API

另外,若是Lettuce鏈接面向的是非單個Redis節點,鏈接實例提供了數據讀取節點偏好ReadFrom)設置,可選值有:

  • MASTER:只從Master節點中讀取。
  • MASTER_PREFERRED:優先從Master節點中讀取。
  • SLAVE_PREFERRED:優先從Slavor節點中讀取。
  • SLAVE:只從Slavor節點中讀取。
  • NEAREST:使用最近一次鏈接的Redis實例讀取。

普通主從模式

假設如今有三個Redis服務造成樹狀主從關係以下:

  • 節點一:localhost:6379,角色爲Master。
  • 節點二:localhost:6380,角色爲Slavor,節點一的從節點。
  • 節點三:localhost:6381,角色爲Slavor,節點二的從節點。

首次動態節點發現主從模式的節點信息須要以下構建鏈接:

@Test
public void testDynamicReplica() throws Exception {
    // 這裏只須要配置一個節點的鏈接信息,不必定須要是主節點的信息,從節點也能夠
    RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisClient redisClient = RedisClient.create(uri);
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
    // 只從從節點讀取數據
    connection.setReadFrom(ReadFrom.SLAVE);
    // 執行其餘Redis命令
    connection.close();
    redisClient.shutdown();
}

若是須要指定靜態的Redis主從節點鏈接屬性,那麼能夠這樣構建鏈接:

@Test
public void testStaticReplica() throws Exception {
    List<RedisURI> uris = new ArrayList<>();
    RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
    RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
    uris.add(uri1);
    uris.add(uri2);
    uris.add(uri3);
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
            new Utf8StringCodec(), uris);
    // 只從主節點讀取數據
    connection.setReadFrom(ReadFrom.MASTER);
    // 執行其餘Redis命令
    connection.close();
    redisClient.shutdown();
}

哨兵模式

因爲Lettuce自身提供了哨兵的拓撲發現機制,因此只須要隨便配置一個哨兵節點的RedisURI實例便可:

@Test
public void testDynamicSentinel() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withPassword("你的密碼")
            .withSentinel("localhost", 26379)
            .withSentinelMasterId("哨兵Master的ID")
            .build();
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
    // 只容許從從節點讀取數據
    connection.setReadFrom(ReadFrom.SLAVE);
    RedisCommands<String, String> command = connection.sync();
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    command.set("name", "throwable", setArgs);
    String value = command.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

集羣模式

鑑於筆者對Redis集羣模式並不熟悉,Cluster模式下的API使用自己就有比較多的限制,因此這裏只簡單介紹一下怎麼用。先說幾個特性:

下面的API提供跨槽位(Slot)調用的功能

  • RedisAdvancedClusterCommands
  • RedisAdvancedClusterAsyncCommands
  • RedisAdvancedClusterReactiveCommands

靜態節點選擇功能:

  • masters:選擇全部主節點執行命令。
  • slaves:選擇全部從節點執行命令,其實就是隻讀模式。
  • all nodes:命令能夠在全部節點執行。

集羣拓撲視圖動態更新功能:

  • 手動更新,主動調用RedisClusterClient#reloadPartitions()
  • 後臺定時更新。
  • 自適應更新,基於鏈接斷開和MOVED/ASK命令重定向自動更新。

Redis集羣搭建詳細過程能夠參考官方文檔,假設已經搭建好集羣以下(192.168.56.200是筆者的虛擬機Host):

  • 192.168.56.200:7001 => 主節點,槽位0-5460。
  • 192.168.56.200:7002 => 主節點,槽位5461-10922。
  • 192.168.56.200:7003 => 主節點,槽位10923-16383。
  • 192.168.56.200:7004 => 7001的從節點。
  • 192.168.56.200:7005 => 7002的從節點。
  • 192.168.56.200:7006 => 7003的從節點。

簡單的集羣鏈接和使用方式以下:

@Test
public void testSyncCluster(){
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name",10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

節點選擇:

@Test
public void testSyncNodeSelection() {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
//  commands.all();  // 全部節點
//  commands.masters();  // 主節點
    // 從節點只讀
    NodeSelection<String, String> replicas = commands.slaves();
    NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
    // 這裏只是演示,通常應該禁用keys *命令
    Executions<List<String>> keys = nodeSelectionCommands.keys("*");
    keys.forEach(key -> log.info("key: {}", key));
    connection.close();
    redisClusterClient.shutdown();
}

定時更新集羣拓撲視圖(每隔十分鐘更新一次,這個時間自行考量,不能太頻繁):

@Test
public void testPeriodicClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
            .builder()
            .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

自適應更新集羣拓撲視圖:

@Test
public void testAdaptiveClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
            .enableAdaptiveRefreshTrigger(
                    ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
                    ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
            )
            .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

動態命令和自定義命令

自定義命令是Redis命令有限集,不過能夠更細粒度指定KEYARGV、命令類型、編碼解碼器和返回值類型,依賴於dispatch()方法:

// 自定義實現PING方法
@Test
public void testCustomPing() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
    log.info("PING:{}", result);
    connect.close();
    redisClient.shutdown();
}
// PING:PONG

// 自定義實現Set方法
@Test
public void testCustomSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
            new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
    String result = sync.get("name");
    log.info("Get value:{}", result);
    connect.close();
    redisClient.shutdown();
}
// Get value:throwable

動態命令是基於Redis命令有限集,而且經過註解和動態代理完成一些複雜命令組合的實現。主要註解在io.lettuce.core.dynamic.annotation包路徑下。簡單舉個例子:

public interface CustomCommand extends Commands {

    // SET [key] [value]
    @Command("SET ?0 ?1")
    String setKey(String key, String value);

    // SET [key] [value]
    @Command("SET :key :value")
    String setKeyNamed(@Param("key") String key, @Param("value") String value);

    // MGET [key1] [key2]
    @Command("MGET ?0 ?1")
    List<String> mGet(String key1, String key2);
    /**
     * 方法名做爲命令
     */
    @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
    String mSet(String key1, String value1, String key2, String value2);
}


@Test
public void testCustomDynamicSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
    CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
    commands.setKey("name", "throwable");
    commands.setKeyNamed("throwable", "doge");
    log.info("MGET ===> " + commands.mGet("name", "throwable"));
    commands.mSet("key1", "value1","key2", "value2");
    log.info("MGET ===> " + commands.mGet("key1", "key2"));
    connect.close();
    redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]

高階特性

Lettuce有不少高階使用特性,這裏只列舉我的認爲經常使用的兩點:

  • 配置客戶端資源。
  • 使用鏈接池。

更多其餘特性能夠自行參看官方文檔。

配置客戶端資源

客戶端資源的設置與Lettuce的性能、併發和事件處理相關。線程池或者線程組相關配置佔據客戶端資源配置的大部分(EventLoopGroupsEventExecutorGroup),這些線程池或者線程組是鏈接程序的基礎組件。通常狀況下,客戶端資源應該在多個Redis客戶端之間共享,而且在再也不使用的時候須要自行關閉。筆者認爲,客戶端資源是面向Netty的。注意除非特別熟悉或者花長時間去測試調整下面提到的參數,不然在沒有經驗的前提下憑直覺修改默認值,有可能會踩坑

客戶端資源接口是ClientResources,實現類是DefaultClientResources

構建DefaultClientResources實例:

// 默認
ClientResources resources = DefaultClientResources.create();

// 建造器
ClientResources resources = DefaultClientResources.builder()
                        .ioThreadPoolSize(4)
                        .computationThreadPoolSize(4)
                        .build()

使用:

ClientResources resources = DefaultClientResources.create();
// 非集羣
RedisClient client = RedisClient.create(resources, uri);
// 集羣
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 關閉資源
resources.shutdown();

客戶端資源基本配置:

屬性 描述 默認值
ioThreadPoolSize I/O線程數 Runtime.getRuntime().availableProcessors()
computationThreadPoolSize 任務線程數 Runtime.getRuntime().availableProcessors()

客戶端資源高級配置:

屬性 描述 默認值
eventLoopGroupProvider EventLoopGroup提供商 -
eventExecutorGroupProvider EventExecutorGroup提供商 -
eventBus 事件總線 DefaultEventBus
commandLatencyCollectorOptions 命令延時收集器配置 DefaultCommandLatencyCollectorOptions
commandLatencyCollector 命令延時收集器 DefaultCommandLatencyCollector
commandLatencyPublisherOptions 命令延時發佈器配置 DefaultEventPublisherOptions
dnsResolver DNS處理器 JDK或者Netty提供
reconnectDelay 重連延時配置 Delay.exponential()
nettyCustomizer Netty自定義配置器 -
tracing 軌跡記錄器 -

非集羣客戶端RedisClient的屬性配置:

Redis非集羣客戶端RedisClient自己提供了配置屬性方法:

RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
                       .autoReconnect(false)
                       .pingBeforeActivateConnection(true)
                       .build());

非集羣客戶端的配置屬性列表:

屬性 描述 默認值
pingBeforeActivateConnection 鏈接激活以前是否執行PING命令 false
autoReconnect 是否自動重連 true
cancelCommandsOnReconnectFailure 重連失敗是否拒絕命令執行 false
suspendReconnectOnProtocolFailure 底層協議失敗是否掛起重連操做 false
requestQueueSize 請求隊列容量 2147483647(Integer#MAX_VALUE)
disconnectedBehavior 失去鏈接時候的行爲 DEFAULT
sslOptions SSL配置 -
socketOptions Socket配置 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions 超時配置 -
publishOnScheduler 發佈反應式信號數據的調度器 使用I/O線程

集羣客戶端屬性配置:

Redis集羣客戶端RedisClusterClient自己提供了配置屬性方法:

RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
                .enableAllAdaptiveRefreshTriggers()
                .build();

client.setOptions(ClusterClientOptions.builder()
                       .topologyRefreshOptions(topologyRefreshOptions)
                       .build());

集羣客戶端的配置屬性列表:

屬性 描述 默認值
enablePeriodicRefresh 是否容許週期性更新集羣拓撲視圖 false
refreshPeriod 更新集羣拓撲視圖週期 60秒
enableAdaptiveRefreshTrigger 設置自適應更新集羣拓撲視圖觸發器RefreshTrigger -
adaptiveRefreshTriggersTimeout 自適應更新集羣拓撲視圖觸發器超時設置 30秒
refreshTriggersReconnectAttempts 自適應更新集羣拓撲視圖觸發重連次數 5
dynamicRefreshSources 是否容許動態刷新拓撲資源 true
closeStaleConnections 是否容許關閉陳舊的鏈接 true
maxRedirects 集羣重定向次數上限 5
validateClusterNodeMembership 是否校驗集羣節點的成員關係 true

使用鏈接池

引入鏈接池依賴commons-pool2

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.7.0</version>
</dependency

基本使用以下:

@Test
public void testUseConnectionPool() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    GenericObjectPool<StatefulRedisConnection<String, String>> pool
            = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
    try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
        RedisCommands<String, String> command = connection.sync();
        SetArgs setArgs = SetArgs.Builder.nx().ex(5);
        command.set("name", "throwable", setArgs);
        String n = command.get("name");
        log.info("Get value:{}", n);
    }
    pool.close();
    redisClient.shutdown();
}

其中,同步鏈接的池化支持須要用ConnectionPoolSupport,異步鏈接的池化支持須要用AsyncConnectionPoolSupportLettuce5.1以後才支持)。

幾個常見的漸進式刪除例子

漸進式刪除Hash中的域-屬性:

@Test
public void testDelBigHashKey() throws Exception {
    // SCAN參數
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP遊標
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目標KEY
    String key = "BIG_HASH_KEY";
    prepareHashTestData(key);
    log.info("開始漸進式刪除Hash的元素...");
    int counter = 0;
    do {
        MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);
        // 重置TEMP遊標
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        Collection<String> fields = result.getMap().values();
        if (!fields.isEmpty()) {
            COMMAND.hdel(key, fields.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("漸進式刪除Hash的元素完畢,迭代次數:{} ...", counter);
}

private void prepareHashTestData(String key) throws Exception {
    COMMAND.hset(key, "1", "1");
    COMMAND.hset(key, "2", "2");
    COMMAND.hset(key, "3", "3");
    COMMAND.hset(key, "4", "4");
    COMMAND.hset(key, "5", "5");
}

漸進式刪除集合中的元素:

@Test
public void testDelBigSetKey() throws Exception {
    String key = "BIG_SET_KEY";
    prepareSetTestData(key);
    // SCAN參數
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP遊標
    ScanCursor cursor = ScanCursor.INITIAL;
    log.info("開始漸進式刪除Set的元素...");
    int counter = 0;
    do {
        ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);
        // 重置TEMP遊標
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<String> values = result.getValues();
        if (!values.isEmpty()) {
            COMMAND.srem(key, values.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("漸進式刪除Set的元素完畢,迭代次數:{} ...", counter);
}

private void prepareSetTestData(String key) throws Exception {
    COMMAND.sadd(key, "1", "2", "3", "4", "5");
}

漸進式刪除有序集合中的元素:

@Test
public void testDelBigZSetKey() throws Exception {
    // SCAN參數
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP遊標
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目標KEY
    String key = "BIG_ZSET_KEY";
    prepareZSetTestData(key);
    log.info("開始漸進式刪除ZSet的元素...");
    int counter = 0;
    do {
        ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);
        // 重置TEMP遊標
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<ScoredValue<String>> scoredValues = result.getValues();
        if (!scoredValues.isEmpty()) {
            COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("漸進式刪除ZSet的元素完畢,迭代次數:{} ...", counter);
}

private void prepareZSetTestData(String key) throws Exception {
    COMMAND.zadd(key, 0, "1");
    COMMAND.zadd(key, 0, "2");
    COMMAND.zadd(key, 0, "3");
    COMMAND.zadd(key, 0, "4");
    COMMAND.zadd(key, 0, "5");
}

在SpringBoot中使用Lettuce

我的認爲,spring-data-redis中的API封裝並非很優秀,用起來比較重,不夠靈活,這裏結合前面的例子和代碼,在SpringBoot腳手架項目中配置和整合Lettuce。先引入依賴:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.8.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
            <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>5.1.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

通常狀況下,每一個應用應該使用單個Redis客戶端實例和單個鏈接實例,這裏設計一個腳手架,適配單機、普通主從、哨兵和集羣四種使用場景。對於客戶端資源,採用默認的實現便可。對於Redis的鏈接屬性,比較主要的有HostPortPassword,其餘能夠暫時忽略。基於約定大於配置的原則,先定製一系列屬性配置類(其實有些配置是能夠徹底共用,可是考慮到要清晰描述類之間的關係,這裏拆分多個配置屬性類和多個配置方法):

@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {

    private LettuceSingleProperties single;
    private LettuceReplicaProperties replica;
    private LettuceSentinelProperties sentinel;
    private LettuceClusterProperties cluster;

}

@Data
public class LettuceSingleProperties {

    private String host;
    private Integer port;
    private String password;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {

}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {

    private String masterId;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {

}

配置類以下,主要使用@ConditionalOnProperty作隔離,通常狀況下,不多有人會在一個應用使用一種以上的Redis鏈接場景:

@RequiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {

    private final LettuceProperties lettuceProperties;

    @Bean(destroyMethod = "shutdown")
    public ClientResources clientResources() {
        return DefaultClientResources.create();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisURI singleRedisUri() {
        LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
        return RedisURI.builder()
                .withHost(singleProperties.getHost())
                .withPort(singleProperties.getPort())
                .withPassword(singleProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
        return singleRedisClient.connect();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisURI replicaRedisUri() {
        LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
        return RedisURI.builder()
                .withHost(replicaProperties.getHost())
                .withPort(replicaProperties.getPort())
                .withPassword(replicaProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,
                                                                                     @Qualifier("replicaRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisURI sentinelRedisUri() {
        LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
        return RedisURI.builder()
                .withPassword(sentinelProperties.getPassword())
                .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort())
                .withSentinelMasterId(sentinelProperties.getMasterId())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,
                                                                                      @Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisURI clusterRedisUri() {
        LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
        return RedisURI.builder()
                .withHost(clusterProperties.getHost())
                .withPort(clusterProperties.getPort())
                .withPassword(clusterProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) {
        return RedisClusterClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.cluster")
    public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {
        return clusterClient.connect();
    }
}

最後爲了讓IDE識別咱們的配置,能夠添加IDE親緣性,/META-INF文件夾下新增一個文件spring-configuration-metadata.json,內容以下:

{
  "properties": [
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceSingleProperties",
      "description": "單機配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.replica",
      "type": "club.throwable.spring.lettuce.LettuceReplicaProperties",
      "description": "主從配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.sentinel",
      "type": "club.throwable.spring.lettuce.LettuceSentinelProperties",
      "description": "哨兵配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceClusterProperties",
      "description": "集羣配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    }
  ]
}

若是想IDE親緣性作得更好,能夠添加/META-INF/additional-spring-configuration-metadata.json進行更多細節定義。簡單使用以下:

@Slf4j
@Component
public class RedisCommandLineRunner implements CommandLineRunner {

    @Autowired
    @Qualifier("singleRedisConnection")
    private StatefulRedisConnection<String, String> connection;

    @Override
    public void run(String... args) throws Exception {
        RedisCommands<String, String> redisCommands = connection.sync();
        redisCommands.setex("name", 5, "throwable");
        log.info("Get value:{}", redisCommands.get("name"));
    }
}
// Get value:throwable

小結

本文算是基於Lettuce的官方文檔,對它的使用進行全方位的分析,包括主要功能、配置都作了一些示例,限於篇幅部分特性和配置細節沒有分析。Lettuce已經被spring-data-redis接納做爲官方的Redis客戶端驅動,因此值得信賴,它的一些API設計確實比較合理,擴展性高的同時靈活性也高。我的建議,基於Lettuce包自行添加配置到SpringBoot應用用起來會駕輕就熟,畢竟RedisTemplate實在太笨重,並且還屏蔽了Lettuce一些高級特性和靈活的API

參考資料:

連接

(本文完 c-14-d e-a-20190928 最近事太多...)

技術公衆號(《Throwable文摘》),不按期推送筆者原創技術文章(毫不抄襲或者轉載):

相關文章
相關標籤/搜索