Lettuce
是一個Redis
的Java
驅動包,初識她的時候是使用RedisTemplate
的時候遇到點問題Debug
到底層的一些源碼,發現spring-data-redis
的驅動包在某個版本以後替換爲Lettuce
。Lettuce
翻譯爲生菜,沒錯,就是吃的那種生菜,因此它的Logo
長這樣:html
既然能被Spring
生態所承認,Lettuce
想必有過人之處,因而筆者花時間閱讀她的官方文檔,整理測試示例,寫下這篇文章。編寫本文時所使用的版本爲Lettuce 5.1.8.RELEASE
,SpringBoot 2.1.8.RELEASE
,JDK [8,11]
。超長警告:這篇文章斷斷續續花了兩週完成,超過4萬字.....java
Lettuce
是一個高性能基於Java
編寫的Redis
驅動框架,底層集成了Project Reactor
提供自然的反應式編程,通訊框架集成了Netty
使用了非阻塞IO
,5.x
版本以後融合了JDK1.8
的異步編程特性,在保證高性能的同時提供了十分豐富易用的API
,5.1
版本的新特性以下:node
Redis
的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX
。Brave
模塊跟蹤Redis
命令執行。Redis Streams
。注意一點:Redis
的版本至少須要2.6
,固然越高越好,API
的兼容性比較強大。react
只須要引入單個依賴就能夠開始愉快地使用Lettuce
:web
<dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.1.8.RELEASE</version> </dependency>
dependencies { compile 'io.lettuce:lettuce-core:5.1.8.RELEASE' }
單機、哨兵、集羣模式下鏈接Redis
須要一個統一的標準去表示鏈接的細節信息,在Lettuce
中這個統一的標準是RedisURI
。能夠經過三種方式構造一個RedisURI
實例:redis
URI
語法:RedisURI uri = RedisURI.create("redis://localhost/");
RedisURI.Builder
):RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);
redis://
)格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s 簡單:redis://localhost
SSL
(前綴爲rediss://
) <== 注意後面多了個s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s 簡單:rediss://localhost
Unix Domain Sockets
模式(前綴爲redis-socket://
)格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]] 完整:redis-socket:///tmp/redis?timeout=10s&_database=0
redis-sentinel://
)格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId 完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster
超時時間單位:spring
我的建議使用RedisURI
提供的建造器,畢竟定製的URI
雖然簡潔,可是比較容易出現人爲錯誤。鑑於筆者沒有SSL
和Unix Domain Socket
的使用場景,下面不對這兩種鏈接方式進行列舉。shell
Lettuce
使用的時候依賴於四個主要組件:apache
RedisURI
:鏈接信息。RedisClient
:Redis
客戶端,特殊地,集羣鏈接有一個定製的RedisClusterClient
。Connection
:Redis
鏈接,主要是StatefulConnection
或者StatefulRedisConnection
的子類,鏈接的類型主要由鏈接的具體方式(單機、哨兵、集羣、訂閱發佈等等)選定,比較重要。RedisCommands
:Redis
命令API
接口,基本上覆蓋了Redis
發行版本的全部命令,提供了同步(sync
)、異步(async
)、反應式(reative
)的調用方式,對於使用者而言,會常常跟RedisCommands
系列接口打交道。一個基本使用例子以下:編程
@Test public void testSetGet() throws Exception { RedisURI redisUri = RedisURI.builder() // <1> 建立單機鏈接的鏈接信息 .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); // <2> 建立客戶端 StatefulRedisConnection<String, String> connection = redisClient.connect(); // <3> 建立線程安全的鏈接 RedisCommands<String, String> redisCommands = connection.sync(); // <4> 建立同步命令 SetArgs setArgs = SetArgs.Builder.nx().ex(5); String result = redisCommands.set("name", "throwable", setArgs); Assertions.assertThat(result).isEqualToIgnoringCase("OK"); result = redisCommands.get("name"); Assertions.assertThat(result).isEqualTo("throwable"); // ... 其餘操做 connection.close(); // <5> 關閉鏈接 redisClient.shutdown(); // <6> 關閉客戶端 }
注意:
Redis
驅動實例不須要太多的鏈接(通常狀況下只須要一個鏈接實例就能夠,若是有多個鏈接的須要能夠考慮使用鏈接池,其實Redis
目前處理命令的模塊是單線程,在客戶端多個鏈接多線程調用理論上沒有效果)。Lettuce
主要提供三種API
:
sync
):RedisCommands
。async
):RedisAsyncCommands
。reactive
):RedisReactiveCommands
。先準備好一個單機Redis
鏈接備用:
private static StatefulRedisConnection<String, String> CONNECTION; private static RedisClient CLIENT; @BeforeClass public static void beforeClass() { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); CLIENT = RedisClient.create(redisUri); CONNECTION = CLIENT.connect(); } @AfterClass public static void afterClass() throws Exception { CONNECTION.close(); CLIENT.shutdown(); }
Redis
命令API
的具體實現能夠直接從StatefulRedisConnection
實例獲取,見其接口定義:
public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> { boolean isMulti(); RedisCommands<K, V> sync(); RedisAsyncCommands<K, V> async(); RedisReactiveCommands<K, V> reactive(); }
值得注意的是,在不指定編碼解碼器RedisCodec
的前提下,RedisClient
建立的StatefulRedisConnection
實例通常是泛型實例StatefulRedisConnection<String,String>
,也就是全部命令API
的KEY
和VALUE
都是String
類型,這種使用方式能知足大部分的使用場景。固然,必要的時候能夠定製編碼解碼器RedisCodec<K,V>
。
先構建RedisCommands
實例:
private static RedisCommands<String, String> COMMAND; @BeforeClass public static void beforeClass() { COMMAND = CONNECTION.sync(); }
基本使用:
@Test public void testSyncPing() throws Exception { String pong = COMMAND.ping(); Assertions.assertThat(pong).isEqualToIgnoringCase("PONG"); } @Test public void testSyncSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); COMMAND.set("name", "throwable", setArgs); String value = COMMAND.get("name"); log.info("Get value: {}", value); } // Get value: throwable
同步API
在全部命令調用以後會當即返回結果。若是熟悉Jedis
的話,RedisCommands
的用法其實和它相差不大。
先構建RedisAsyncCommands
實例:
private static RedisAsyncCommands<String, String> ASYNC_COMMAND; @BeforeClass public static void beforeClass() { ASYNC_COMMAND = CONNECTION.async(); }
基本使用:
@Test public void testAsyncPing() throws Exception { RedisFuture<String> redisFuture = ASYNC_COMMAND.ping(); log.info("Ping result:{}", redisFuture.get()); } // Ping result:PONG
RedisAsyncCommands
全部方法執行返回結果都是RedisFuture
實例,而RedisFuture
接口的定義以下:
public interface RedisFuture<V> extends CompletionStage<V>, Future<V> { String getError(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; }
也就是,RedisFuture
能夠無縫使用Future
或者JDK
1.8中引入的CompletableFuture
提供的方法。舉個例子:
@Test public void testAsyncSetAndGet1() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs); // CompletableFuture#thenAccept() future.thenAccept(value -> log.info("Set命令返回:{}", value)); // Future#get() future.get(); } // Set命令返回:OK @Test public void testAsyncSetAndGet2() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); CompletableFuture<Void> result = (CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs) .thenAcceptBoth(ASYNC_COMMAND.get("name"), (s, g) -> { log.info("Set命令返回:{}", s); log.info("Get命令返回:{}", g); }); result.get(); } // Set命令返回:OK // Get命令返回:throwable
若是能熟練使用CompletableFuture
和函數式編程技巧,能夠組合多個RedisFuture
完成一些列複雜的操做。
Lettuce
引入的反應式編程框架是Project Reactor,若是沒有反應式編程經驗能夠先自行了解一下Project Reactor
。
構建RedisReactiveCommands
實例:
private static RedisReactiveCommands<String, String> REACTIVE_COMMAND; @BeforeClass public static void beforeClass() { REACTIVE_COMMAND = CONNECTION.reactive(); }
根據Project Reactor
,RedisReactiveCommands
的方法若是返回的結果只包含0或1個元素,那麼返回值類型是Mono
,若是返回的結果包含0到N(N大於0)個元素,那麼返回值是Flux
。舉個例子:
@Test public void testReactivePing() throws Exception { Mono<String> ping = REACTIVE_COMMAND.ping(); ping.subscribe(v -> log.info("Ping result:{}", v)); Thread.sleep(1000); } // Ping result:PONG @Test public void testReactiveSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); REACTIVE_COMMAND.set("name", "throwable", setArgs).block(); REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value)); Thread.sleep(1000); } // Get命令返回:throwable @Test public void testReactiveSet() throws Exception { REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block(); Flux<String> flux = REACTIVE_COMMAND.smembers("food"); flux.subscribe(log::info); REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block(); Thread.sleep(1000); } // meat // bread // fish
舉個更加複雜的例子,包含了事務、函數轉換等:
@Test public void testReactiveFunctional() throws Exception { REACTIVE_COMMAND.multi().doOnSuccess(r -> { REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe(); REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe(); }).flatMap(s -> REACTIVE_COMMAND.exec()) .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded())) .subscribe(); Thread.sleep(1000); } // OK // 2 // Discarded:false
這個方法開啓一個事務,先把counter
設置爲1,再將counter
自增1。
非集羣模式下的發佈訂閱依賴於定製的鏈接StatefulRedisPubSubConnection
,集羣模式下的發佈訂閱依賴於定製的鏈接StatefulRedisClusterPubSubConnection
,二者分別來源於RedisClient#connectPubSub()
系列方法和RedisClusterClient#connectPubSub()
:
// 多是單機、普通主從、哨兵等非集羣模式的客戶端 RedisClient client = ... StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub(); connection.addListener(new RedisPubSubListener<String, String>() { ... }); // 同步命令 RedisPubSubCommands<String, String> sync = connection.sync(); sync.subscribe("channel"); // 異步命令 RedisPubSubAsyncCommands<String, String> async = connection.async(); RedisFuture<Void> future = async.subscribe("channel"); // 反應式命令 RedisPubSubReactiveCommands<String, String> reactive = connection.reactive(); reactive.subscribe("channel").subscribe(); reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
// 使用方式其實和非集羣模式基本一致 RedisClusterClient clusterClient = ... StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub(); connection.addListener(new RedisPubSubListener<String, String>() { ... }); RedisPubSubCommands<String, String> sync = connection.sync(); sync.subscribe("channel"); // ...
這裏用單機同步命令的模式舉一個Redis
鍵空間通知(Redis Keyspace Notifications)的例子:
@Test public void testSyncKeyspaceNotification() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) // 注意這裏只能是0號庫 .withDatabase(0) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection<String, String> redisConnection = redisClient.connect(); RedisCommands<String, String> redisCommands = redisConnection.sync(); // 只接收鍵過時的事件 redisCommands.configSet("notify-keyspace-events", "Ex"); StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub(); connection.addListener(new RedisPubSubAdapter<>() { @Override public void psubscribed(String pattern, long count) { log.info("pattern:{},count:{}", pattern, count); } @Override public void message(String pattern, String channel, String message) { log.info("pattern:{},channel:{},message:{}", pattern, channel, message); } }); RedisPubSubCommands<String, String> commands = connection.sync(); commands.psubscribe("__keyevent@0__:expired"); redisCommands.setex("name", 2, "throwable"); Thread.sleep(10000); redisConnection.close(); connection.close(); redisClient.shutdown(); } // pattern:__keyevent@0__:expired,count:1 // pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name
實際上,在實現RedisPubSubListener
的時候能夠單獨抽離,儘可能不要設計成匿名內部類的形式。
事務相關的命令就是WATCH
、UNWATCH
、EXEC
、MULTI
和DISCARD
,在RedisCommands
系列接口中有對應的方法。舉個例子:
// 同步模式 @Test public void testSyncMulti() throws Exception { COMMAND.multi(); COMMAND.setex("name-1", 2, "throwable"); COMMAND.setex("name-2", 2, "doge"); TransactionResult result = COMMAND.exec(); int index = 0; for (Object r : result) { log.info("Result-{}:{}", index, r); index++; } } // Result-0:OK // Result-1:OK
Redis
的Pipeline
也就是管道機制能夠理解爲把多個命令打包在一次請求發送到Redis
服務端,而後Redis
服務端把全部的響應結果打包好一次性返回,從而節省沒必要要的網絡資源(最主要是減小網絡請求次數)。Redis
對於Pipeline
機制如何實現並無明確的規定,也沒有提供特殊的命令支持Pipeline
機制。Jedis
中底層採用BIO
(阻塞IO)通信,因此它的作法是客戶端緩存將要發送的命令,最後須要觸發而後同步發送一個巨大的命令列表包,再接收和解析一個巨大的響應列表包。Pipeline
在Lettuce
中對使用者是透明的,因爲底層的通信框架是Netty
,因此網絡通信層面的優化Lettuce
不須要過多幹預,換言之能夠這樣理解:Netty
幫Lettuce
從底層實現了Redis
的Pipeline
機制。可是,Lettuce
的異步API
也提供了手動Flush
的方法:
@Test public void testAsyncManualFlush() { // 取消自動flush ASYNC_COMMAND.setAutoFlushCommands(false); List<RedisFuture<?>> redisFutures = Lists.newArrayList(); int count = 5000; for (int i = 0; i < count; i++) { String key = "key-" + (i + 1); String value = "value-" + (i + 1); redisFutures.add(ASYNC_COMMAND.set(key, value)); redisFutures.add(ASYNC_COMMAND.expire(key, 2)); } long start = System.currentTimeMillis(); ASYNC_COMMAND.flushCommands(); boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0])); Assertions.assertThat(result).isTrue(); log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start); } // Lettuce cost:1302 ms
上面只是從文檔看到的一些理論術語,可是現實是骨感的,對比了下Jedis
的Pipeline
提供的方法,發現了Jedis
的Pipeline
執行耗時比較低:
@Test public void testJedisPipeline() throws Exception { Jedis jedis = new Jedis(); Pipeline pipeline = jedis.pipelined(); int count = 5000; for (int i = 0; i < count; i++) { String key = "key-" + (i + 1); String value = "value-" + (i + 1); pipeline.set(key, value); pipeline.expire(key, 2); } long start = System.currentTimeMillis(); pipeline.syncAndReturnAll(); log.info("Jedis cost:{} ms", System.currentTimeMillis() - start); } // Jedis cost:9 ms
我的猜想Lettuce
可能底層並不是合併全部命令一次發送(甚至多是單條發送),具體可能須要抓包才能定位。依此來看,若是真的有大量執行Redis
命令的場景,不妨可使用Jedis
的Pipeline
。
注意:由上面的測試推斷RedisTemplate
的executePipelined()
方法是假的Pipeline
執行方法,使用RedisTemplate
的時候請務必注意這一點。
Lettuce
中執行Redis
的Lua
命令的同步接口以下:
public interface RedisScriptingCommands<K, V> { <T> T eval(String var1, ScriptOutputType var2, K... var3); <T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4); <T> T evalsha(String var1, ScriptOutputType var2, K... var3); <T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4); List<Boolean> scriptExists(String... var1); String scriptFlush(); String scriptKill(); String scriptLoad(V var1); String digest(V var1); }
異步和反應式的接口方法定義差很少,不一樣的地方就是返回值類型,通常咱們經常使用的是eval()
、evalsha()
和scriptLoad()
方法。舉個簡單的例子:
private static RedisCommands<String, String> COMMANDS; private static String RAW_LUA = "local key = KEYS[1]\n" + "local value = ARGV[1]\n" + "local timeout = ARGV[2]\n" + "redis.call('SETEX', key, tonumber(timeout), value)\n" + "local result = redis.call('GET', key)\n" + "return result;"; private static AtomicReference<String> LUA_SHA = new AtomicReference<>(); @Test public void testLua() throws Exception { LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA)); String[] keys = new String[]{"name"}; String[] args = new String[]{"throwable", "5000"}; String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args); log.info("Get value:{}", result); } // Get value:throwable
爲了Redis
的高可用,通常會採用普通主從(Master/Replica
,這裏筆者稱爲普通主從模式,也就是僅僅作了主從複製,故障須要手動切換)、哨兵和集羣。普通主從模式能夠獨立運行,也能夠配合哨兵運行,只是哨兵提供自動故障轉移和主節點提高功能。普通主從和哨兵均可以使用MasterSlave
,經過入參包括RedisClient
、編碼解碼器以及一個或者多個RedisURI
獲取對應的Connection
實例。
這裏注意一點,MasterSlave
中提供的方法若是隻要求傳入一個RedisURI
實例,那麼Lettuce
會進行拓撲發現機制,自動獲取Redis
主從節點信息;若是要求傳入一個RedisURI
集合,那麼對於普通主從模式來講全部節點信息是靜態的,不會進行發現和更新。
拓撲發現的規則以下:
Master/Replica
)模式,不須要感知RedisURI
指向從節點仍是主節點,只會進行一次性的拓撲查找全部節點信息,此後節點信息會保存在靜態緩存中,不會更新。拓撲發現機制的提供API
爲TopologyProvider
,須要瞭解其原理的能夠參考具體的實現。
對於集羣(Cluster
)模式,Lettuce
提供了一套獨立的API
。
另外,若是Lettuce
鏈接面向的是非單個Redis
節點,鏈接實例提供了數據讀取節點偏好(ReadFrom
)設置,可選值有:
MASTER
:只從Master
節點中讀取。MASTER_PREFERRED
:優先從Master
節點中讀取。SLAVE_PREFERRED
:優先從Slavor
節點中讀取。SLAVE
:只從Slavor
節點中讀取。NEAREST
:使用最近一次鏈接的Redis
實例讀取。假設如今有三個Redis
服務造成樹狀主從關係以下:
首次動態節點發現主從模式的節點信息須要以下構建鏈接:
@Test public void testDynamicReplica() throws Exception { // 這裏只須要配置一個節點的鏈接信息,不必定須要是主節點的信息,從節點也能夠 RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build(); RedisClient redisClient = RedisClient.create(uri); StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri); // 只從從節點讀取數據 connection.setReadFrom(ReadFrom.SLAVE); // 執行其餘Redis命令 connection.close(); redisClient.shutdown(); }
若是須要指定靜態的Redis
主從節點鏈接屬性,那麼能夠這樣構建鏈接:
@Test public void testStaticReplica() throws Exception { List<RedisURI> uris = new ArrayList<>(); RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build(); RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build(); RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build(); uris.add(uri1); uris.add(uri2); uris.add(uri3); RedisClient redisClient = RedisClient.create(); StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uris); // 只從主節點讀取數據 connection.setReadFrom(ReadFrom.MASTER); // 執行其餘Redis命令 connection.close(); redisClient.shutdown(); }
因爲Lettuce
自身提供了哨兵的拓撲發現機制,因此只須要隨便配置一個哨兵節點的RedisURI
實例便可:
@Test public void testDynamicSentinel() throws Exception { RedisURI redisUri = RedisURI.builder() .withPassword("你的密碼") .withSentinel("localhost", 26379) .withSentinelMasterId("哨兵Master的ID") .build(); RedisClient redisClient = RedisClient.create(); StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri); // 只容許從從節點讀取數據 connection.setReadFrom(ReadFrom.SLAVE); RedisCommands<String, String> command = connection.sync(); SetArgs setArgs = SetArgs.Builder.nx().ex(5); command.set("name", "throwable", setArgs); String value = command.get("name"); log.info("Get value:{}", value); } // Get value:throwable
鑑於筆者對Redis
集羣模式並不熟悉,Cluster
模式下的API
使用自己就有比較多的限制,因此這裏只簡單介紹一下怎麼用。先說幾個特性:
下面的API提供跨槽位(Slot
)調用的功能:
RedisAdvancedClusterCommands
。RedisAdvancedClusterAsyncCommands
。RedisAdvancedClusterReactiveCommands
。靜態節點選擇功能:
masters
:選擇全部主節點執行命令。slaves
:選擇全部從節點執行命令,其實就是隻讀模式。all nodes
:命令能夠在全部節點執行。集羣拓撲視圖動態更新功能:
RedisClusterClient#reloadPartitions()
。MOVED/ASK
命令重定向自動更新。Redis
集羣搭建詳細過程能夠參考官方文檔,假設已經搭建好集羣以下(192.168.56.200
是筆者的虛擬機Host):
簡單的集羣鏈接和使用方式以下:
@Test public void testSyncCluster(){ RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect(); RedisAdvancedClusterCommands<String, String> commands = connection.sync(); commands.setex("name",10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); } // Get value:throwable
節點選擇:
@Test public void testSyncNodeSelection() { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect(); RedisAdvancedClusterCommands<String, String> commands = connection.sync(); // commands.all(); // 全部節點 // commands.masters(); // 主節點 // 從節點只讀 NodeSelection<String, String> replicas = commands.slaves(); NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands(); // 這裏只是演示,通常應該禁用keys *命令 Executions<List<String>> keys = nodeSelectionCommands.keys("*"); keys.forEach(key -> log.info("key: {}", key)); connection.close(); redisClusterClient.shutdown(); }
定時更新集羣拓撲視圖(每隔十分鐘更新一次,這個時間自行考量,不能太頻繁):
@Test public void testPeriodicClusterTopology() throws Exception { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions .builder() .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)) .build(); redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build()); StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect(); RedisAdvancedClusterCommands<String, String> commands = connection.sync(); commands.setex("name", 10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); Thread.sleep(Integer.MAX_VALUE); connection.close(); redisClusterClient.shutdown(); }
自適應更新集羣拓撲視圖:
@Test public void testAdaptiveClusterTopology() throws Exception { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder() .enableAdaptiveRefreshTrigger( ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS ) .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)) .build(); redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build()); StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect(); RedisAdvancedClusterCommands<String, String> commands = connection.sync(); commands.setex("name", 10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); Thread.sleep(Integer.MAX_VALUE); connection.close(); redisClusterClient.shutdown(); }
自定義命令是Redis
命令有限集,不過能夠更細粒度指定KEY
、ARGV
、命令類型、編碼解碼器和返回值類型,依賴於dispatch()
方法:
// 自定義實現PING方法 @Test public void testCustomPing() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> sync = connect.sync(); RedisCodec<String, String> codec = StringCodec.UTF8; String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec)); log.info("PING:{}", result); connect.close(); redisClient.shutdown(); } // PING:PONG // 自定義實現Set方法 @Test public void testCustomSet() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> sync = connect.sync(); RedisCodec<String, String> codec = StringCodec.UTF8; sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec), new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable")); String result = sync.get("name"); log.info("Get value:{}", result); connect.close(); redisClient.shutdown(); } // Get value:throwable
動態命令是基於Redis
命令有限集,而且經過註解和動態代理完成一些複雜命令組合的實現。主要註解在io.lettuce.core.dynamic.annotation
包路徑下。簡單舉個例子:
public interface CustomCommand extends Commands { // SET [key] [value] @Command("SET ?0 ?1") String setKey(String key, String value); // SET [key] [value] @Command("SET :key :value") String setKeyNamed(@Param("key") String key, @Param("value") String value); // MGET [key1] [key2] @Command("MGET ?0 ?1") List<String> mGet(String key1, String key2); /** * 方法名做爲命令 */ @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME) String mSet(String key1, String value1, String key2, String value2); } @Test public void testCustomDynamicSet() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommandFactory commandFactory = new RedisCommandFactory(connect); CustomCommand commands = commandFactory.getCommands(CustomCommand.class); commands.setKey("name", "throwable"); commands.setKeyNamed("throwable", "doge"); log.info("MGET ===> " + commands.mGet("name", "throwable")); commands.mSet("key1", "value1","key2", "value2"); log.info("MGET ===> " + commands.mGet("key1", "key2")); connect.close(); redisClient.shutdown(); } // MGET ===> [throwable, doge] // MGET ===> [value1, value2]
Lettuce
有不少高階使用特性,這裏只列舉我的認爲經常使用的兩點:
更多其餘特性能夠自行參看官方文檔。
客戶端資源的設置與Lettuce
的性能、併發和事件處理相關。線程池或者線程組相關配置佔據客戶端資源配置的大部分(EventLoopGroups
和EventExecutorGroup
),這些線程池或者線程組是鏈接程序的基礎組件。通常狀況下,客戶端資源應該在多個Redis
客戶端之間共享,而且在再也不使用的時候須要自行關閉。筆者認爲,客戶端資源是面向Netty
的。注意:除非特別熟悉或者花長時間去測試調整下面提到的參數,不然在沒有經驗的前提下憑直覺修改默認值,有可能會踩坑。
客戶端資源接口是ClientResources
,實現類是DefaultClientResources
。
構建DefaultClientResources
實例:
// 默認 ClientResources resources = DefaultClientResources.create(); // 建造器 ClientResources resources = DefaultClientResources.builder() .ioThreadPoolSize(4) .computationThreadPoolSize(4) .build()
使用:
ClientResources resources = DefaultClientResources.create(); // 非集羣 RedisClient client = RedisClient.create(resources, uri); // 集羣 RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris); // ...... client.shutdown(); clusterClient.shutdown(); // 關閉資源 resources.shutdown();
客戶端資源基本配置:
屬性 | 描述 | 默認值 |
---|---|---|
ioThreadPoolSize |
I/O 線程數 |
Runtime.getRuntime().availableProcessors() |
computationThreadPoolSize |
任務線程數 | Runtime.getRuntime().availableProcessors() |
客戶端資源高級配置:
屬性 | 描述 | 默認值 |
---|---|---|
eventLoopGroupProvider |
EventLoopGroup 提供商 |
- |
eventExecutorGroupProvider |
EventExecutorGroup 提供商 |
- |
eventBus |
事件總線 | DefaultEventBus |
commandLatencyCollectorOptions |
命令延時收集器配置 | DefaultCommandLatencyCollectorOptions |
commandLatencyCollector |
命令延時收集器 | DefaultCommandLatencyCollector |
commandLatencyPublisherOptions |
命令延時發佈器配置 | DefaultEventPublisherOptions |
dnsResolver |
DNS 處理器 |
JDK或者Netty 提供 |
reconnectDelay |
重連延時配置 | Delay.exponential() |
nettyCustomizer |
Netty 自定義配置器 |
- |
tracing |
軌跡記錄器 | - |
非集羣客戶端RedisClient
的屬性配置:
Redis
非集羣客戶端RedisClient
自己提供了配置屬性方法:
RedisClient client = RedisClient.create(uri); client.setOptions(ClientOptions.builder() .autoReconnect(false) .pingBeforeActivateConnection(true) .build());
非集羣客戶端的配置屬性列表:
屬性 | 描述 | 默認值 |
---|---|---|
pingBeforeActivateConnection |
鏈接激活以前是否執行PING 命令 |
false |
autoReconnect |
是否自動重連 | true |
cancelCommandsOnReconnectFailure |
重連失敗是否拒絕命令執行 | false |
suspendReconnectOnProtocolFailure |
底層協議失敗是否掛起重連操做 | false |
requestQueueSize |
請求隊列容量 | 2147483647(Integer#MAX_VALUE) |
disconnectedBehavior |
失去鏈接時候的行爲 | DEFAULT |
sslOptions |
SSL配置 |
- |
socketOptions |
Socket 配置 |
10 seconds Connection-Timeout, no keep-alive, no TCP noDelay |
timeoutOptions |
超時配置 | - |
publishOnScheduler |
發佈反應式信號數據的調度器 | 使用I/O 線程 |
集羣客戶端屬性配置:
Redis
集羣客戶端RedisClusterClient
自己提供了配置屬性方法:
RedisClusterClient client = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES)) .enableAllAdaptiveRefreshTriggers() .build(); client.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .build());
集羣客戶端的配置屬性列表:
屬性 | 描述 | 默認值 |
---|---|---|
enablePeriodicRefresh |
是否容許週期性更新集羣拓撲視圖 | false |
refreshPeriod |
更新集羣拓撲視圖週期 | 60秒 |
enableAdaptiveRefreshTrigger |
設置自適應更新集羣拓撲視圖觸發器RefreshTrigger |
- |
adaptiveRefreshTriggersTimeout |
自適應更新集羣拓撲視圖觸發器超時設置 | 30秒 |
refreshTriggersReconnectAttempts |
自適應更新集羣拓撲視圖觸發重連次數 | 5 |
dynamicRefreshSources |
是否容許動態刷新拓撲資源 | true |
closeStaleConnections |
是否容許關閉陳舊的鏈接 | true |
maxRedirects |
集羣重定向次數上限 | 5 |
validateClusterNodeMembership |
是否校驗集羣節點的成員關係 | true |
引入鏈接池依賴commons-pool2
:
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.7.0</version> </dependency
基本使用以下:
@Test public void testUseConnectionPool() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig); try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) { RedisCommands<String, String> command = connection.sync(); SetArgs setArgs = SetArgs.Builder.nx().ex(5); command.set("name", "throwable", setArgs); String n = command.get("name"); log.info("Get value:{}", n); } pool.close(); redisClient.shutdown(); }
其中,同步鏈接的池化支持須要用ConnectionPoolSupport
,異步鏈接的池化支持須要用AsyncConnectionPoolSupport
(Lettuce
5.1以後才支持)。
漸進式刪除Hash中的域-屬性:
@Test public void testDelBigHashKey() throws Exception { // SCAN參數 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP遊標 ScanCursor cursor = ScanCursor.INITIAL; // 目標KEY String key = "BIG_HASH_KEY"; prepareHashTestData(key); log.info("開始漸進式刪除Hash的元素..."); int counter = 0; do { MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs); // 重置TEMP遊標 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); Collection<String> fields = result.getMap().values(); if (!fields.isEmpty()) { COMMAND.hdel(key, fields.toArray(new String[0])); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("漸進式刪除Hash的元素完畢,迭代次數:{} ...", counter); } private void prepareHashTestData(String key) throws Exception { COMMAND.hset(key, "1", "1"); COMMAND.hset(key, "2", "2"); COMMAND.hset(key, "3", "3"); COMMAND.hset(key, "4", "4"); COMMAND.hset(key, "5", "5"); }
漸進式刪除集合中的元素:
@Test public void testDelBigSetKey() throws Exception { String key = "BIG_SET_KEY"; prepareSetTestData(key); // SCAN參數 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP遊標 ScanCursor cursor = ScanCursor.INITIAL; log.info("開始漸進式刪除Set的元素..."); int counter = 0; do { ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs); // 重置TEMP遊標 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); List<String> values = result.getValues(); if (!values.isEmpty()) { COMMAND.srem(key, values.toArray(new String[0])); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("漸進式刪除Set的元素完畢,迭代次數:{} ...", counter); } private void prepareSetTestData(String key) throws Exception { COMMAND.sadd(key, "1", "2", "3", "4", "5"); }
漸進式刪除有序集合中的元素:
@Test public void testDelBigZSetKey() throws Exception { // SCAN參數 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP遊標 ScanCursor cursor = ScanCursor.INITIAL; // 目標KEY String key = "BIG_ZSET_KEY"; prepareZSetTestData(key); log.info("開始漸進式刪除ZSet的元素..."); int counter = 0; do { ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs); // 重置TEMP遊標 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); List<ScoredValue<String>> scoredValues = result.getValues(); if (!scoredValues.isEmpty()) { COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new)); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("漸進式刪除ZSet的元素完畢,迭代次數:{} ...", counter); } private void prepareZSetTestData(String key) throws Exception { COMMAND.zadd(key, 0, "1"); COMMAND.zadd(key, 0, "2"); COMMAND.zadd(key, 0, "3"); COMMAND.zadd(key, 0, "4"); COMMAND.zadd(key, 0, "5"); }
我的認爲,spring-data-redis
中的API
封裝並非很優秀,用起來比較重,不夠靈活,這裏結合前面的例子和代碼,在SpringBoot
腳手架項目中配置和整合Lettuce
。先引入依賴:
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.1.8.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.1.8.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> </dependencies>
通常狀況下,每一個應用應該使用單個Redis
客戶端實例和單個鏈接實例,這裏設計一個腳手架,適配單機、普通主從、哨兵和集羣四種使用場景。對於客戶端資源,採用默認的實現便可。對於Redis
的鏈接屬性,比較主要的有Host
、Port
和Password
,其餘能夠暫時忽略。基於約定大於配置的原則,先定製一系列屬性配置類(其實有些配置是能夠徹底共用,可是考慮到要清晰描述類之間的關係,這裏拆分多個配置屬性類和多個配置方法):
@Data @ConfigurationProperties(prefix = "lettuce") public class LettuceProperties { private LettuceSingleProperties single; private LettuceReplicaProperties replica; private LettuceSentinelProperties sentinel; private LettuceClusterProperties cluster; } @Data public class LettuceSingleProperties { private String host; private Integer port; private String password; } @EqualsAndHashCode(callSuper = true) @Data public class LettuceReplicaProperties extends LettuceSingleProperties { } @EqualsAndHashCode(callSuper = true) @Data public class LettuceSentinelProperties extends LettuceSingleProperties { private String masterId; } @EqualsAndHashCode(callSuper = true) @Data public class LettuceClusterProperties extends LettuceSingleProperties { }
配置類以下,主要使用@ConditionalOnProperty
作隔離,通常狀況下,不多有人會在一個應用使用一種以上的Redis
鏈接場景:
@RequiredArgsConstructor @Configuration @ConditionalOnClass(name = "io.lettuce.core.RedisURI") @EnableConfigurationProperties(value = LettuceProperties.class) public class LettuceAutoConfiguration { private final LettuceProperties lettuceProperties; @Bean(destroyMethod = "shutdown") public ClientResources clientResources() { return DefaultClientResources.create(); } @Bean @ConditionalOnProperty(name = "lettuce.single.host") public RedisURI singleRedisUri() { LettuceSingleProperties singleProperties = lettuceProperties.getSingle(); return RedisURI.builder() .withHost(singleProperties.getHost()) .withPort(singleProperties.getPort()) .withPassword(singleProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.single.host") public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.single.host") public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) { return singleRedisClient.connect(); } @Bean @ConditionalOnProperty(name = "lettuce.replica.host") public RedisURI replicaRedisUri() { LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica(); return RedisURI.builder() .withHost(replicaProperties.getHost()) .withPort(replicaProperties.getPort()) .withPassword(replicaProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.replica.host") public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.replica.host") public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient, @Qualifier("replicaRedisUri") RedisURI redisUri) { return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri); } @Bean @ConditionalOnProperty(name = "lettuce.sentinel.host") public RedisURI sentinelRedisUri() { LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel(); return RedisURI.builder() .withPassword(sentinelProperties.getPassword()) .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort()) .withSentinelMasterId(sentinelProperties.getMasterId()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.sentinel.host") public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.sentinel.host") public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient, @Qualifier("sentinelRedisUri") RedisURI redisUri) { return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri); } @Bean @ConditionalOnProperty(name = "lettuce.cluster.host") public RedisURI clusterRedisUri() { LettuceClusterProperties clusterProperties = lettuceProperties.getCluster(); return RedisURI.builder() .withHost(clusterProperties.getHost()) .withPort(clusterProperties.getPort()) .withPassword(clusterProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.cluster.host") public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) { return RedisClusterClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.cluster") public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) { return clusterClient.connect(); } }
最後爲了讓IDE
識別咱們的配置,能夠添加IDE
親緣性,/META-INF
文件夾下新增一個文件spring-configuration-metadata.json
,內容以下:
{ "properties": [ { "name": "lettuce.single", "type": "club.throwable.spring.lettuce.LettuceSingleProperties", "description": "單機配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.replica", "type": "club.throwable.spring.lettuce.LettuceReplicaProperties", "description": "主從配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.sentinel", "type": "club.throwable.spring.lettuce.LettuceSentinelProperties", "description": "哨兵配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.single", "type": "club.throwable.spring.lettuce.LettuceClusterProperties", "description": "集羣配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" } ] }
若是想IDE
親緣性作得更好,能夠添加/META-INF/additional-spring-configuration-metadata.json
進行更多細節定義。簡單使用以下:
@Slf4j @Component public class RedisCommandLineRunner implements CommandLineRunner { @Autowired @Qualifier("singleRedisConnection") private StatefulRedisConnection<String, String> connection; @Override public void run(String... args) throws Exception { RedisCommands<String, String> redisCommands = connection.sync(); redisCommands.setex("name", 5, "throwable"); log.info("Get value:{}", redisCommands.get("name")); } } // Get value:throwable
本文算是基於Lettuce
的官方文檔,對它的使用進行全方位的分析,包括主要功能、配置都作了一些示例,限於篇幅部分特性和配置細節沒有分析。Lettuce
已經被spring-data-redis
接納做爲官方的Redis
客戶端驅動,因此值得信賴,它的一些API
設計確實比較合理,擴展性高的同時靈活性也高。我的建議,基於Lettuce
包自行添加配置到SpringBoot
應用用起來會駕輕就熟,畢竟RedisTemplate
實在太笨重,並且還屏蔽了Lettuce
一些高級特性和靈活的API
。
參考資料:
(本文完 c-14-d e-a-20190928 最近事太多...)
技術公衆號(《Throwable文摘》),不按期推送筆者原創技術文章(毫不抄襲或者轉載):