redis 支持對 list,set 和 zset 元素的排序,排序的時間複雜度是 O(N+M*log(M))。(N 是集合大小,M 爲返回元素的數量)java
sort key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination]
假設如今有用戶數據以下:
redis
127.0.0.1:6379> sadd uid 1 2 3 4 127.0.0.1:6379> mset user_name_1 admin user_level_1 9999 127.0.0.1:6379> mset user_name_2 jack user_level_2 10 127.0.0.1:6379> mset user_name_3 peter user_level_3 25 127.0.0.1:6379> mset user_name_4 mary user_level_4 70
首先,直接利用集合內的元素作排序操做:spring
127.0.0.1:6379> sort uid 1) "1" 2) "2" 3) "3" 4) "4"
接着,咱們來試試 [BY pattern] 和 [GET pattern [GET pattern ...]] 操做:ide
127.0.0.1:6379> sort uid by user_name_* get # get user_name_* get user_level_* alpha 1) "1" 2) "admin" 3) "9999" 4) "2" 5) "jack" 6) "10" 7) "4" 8) "mary" 9) "70" 10) "3" 11) "peter" 12) "25"
這個語句有點晦澀,試着這麼理解 「by user_name_* 」, user_name_* 是一個佔用符,它先取出 uid 中的值,而後再用這個值拼接成外部鍵,而真正進行排序的正是這些外部鍵值;「get # get user_name_* get user_level_* 」 的含義也能夠這麼理解,get # 表示返回本身元素,[get pattern] 表示返回外部鍵值。工具
redis 的事務機制主要是由下面的幾個指令來完成:性能
當 redis 接受到 multi 指令時,這個鏈接會進入一個事務上下文,該鏈接後續的命令並非當即執行,而是先放到一個隊列中;當從鏈接受到 exec 命令後,redis 會順序的執行隊列中的全部命令。並將全部命令的運行結果打包到一塊兒返回給 client。而後此鏈接就結束事務上下文。ui
redis 將是否有 watch 命令分爲普通類型事務和 CAS(Check And Set)類型事務,無 watch 命令的爲普通類型事務,有 watch 命令的爲 CAS類型事務。code
事務失敗的緣由能夠分爲靜態錯誤(如不存在的命令)和運行時錯誤(如 CAS 錯誤、對 string 用 lpop 操做等)。靜態錯誤會在提交 exec 時返回錯誤信息,使事務不能執行;而除 CAS 之外的運行時錯誤不會阻止事務繼續執行。所以,Redis 的事務機制並不具備原子性。server
127.0.0.1:6379> multi OK 127.0.0.1:6379> lpush list java QUEUED 127.0.0.1:6379> scard list QUEUED 127.0.0.1:6379> exec 1) (integer) 1 2) (error) WRONGTYPE Operation against a key holding the wrong kind of value 127.0.0.1:6379> lrange list 0 -1 1) "java"
能夠看到,即便命令錯誤,事務依然沒有被回滾。所以,redis 的事務機制過於原始,不建議使用。blog
須要注意的是,若是咱們使用 AOF 的方式持久化,可能存在事務被部分寫入的狀況(事務執行過程當中 redis 掛掉等)從而致使 redis 啓動失敗退出,可使用 redis-check-aof 工具進行修復。
在事務中 redis 提供了隊列,能夠批量執行任務,這樣性能就比較高,但使用 multi…exec 事務命令是有系統開銷的,由於它會檢測對應的鎖和序列化命令。有時咱們但願在沒有任何附加條件的狀況下使用隊列批量執行一系列命令,這時可使用 redis的流水線(pipeline)技術。
看看如何在 spring-data-redis 中使用 pipeline 功能,須要注意的是 RedisTemplate 的序列化須要使用 StringRedisSerializer,不能使用 JdkSerializationRedisSerializer,不然會致使序列化失敗。
@Test public void pipeline() { // 1.executePipelined 重寫 入參 RedisCallback 的doInRedis方法 List<Object> resultList = redisTemplate.executePipelined(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection redisConnection) throws DataAccessException { // 2.redisConnection 給本次管道內添加 要一次性執行的多條命令 // 2.1 一個set操做 redisConnection.set("hello".getBytes(), "world".getBytes()); // 2.2一個批量mset操做 Map<byte[], byte[]> tuple = new HashMap(); tuple.put("m_hello_1".getBytes(), "m_world_1".getBytes()); tuple.put("m_hello_2".getBytes(), "m_world_2".getBytes()); tuple.put("m_hello_3".getBytes(), "m_world_3".getBytes()); redisConnection.mSet(tuple); // 2.3一個get操做 redisConnection.get("m_hello_1".getBytes()); // 3 這裏必定要返回null,最終pipeline的執行結果,纔會返回給最外層 return null; } }); // 4. 最後對redis pipeline管道操做返回結果進行判斷和業務補償 for (Object str : resultList) { System.out.println(str); } }
繼 RxJava、Reactor、CompletableFuture 以及 Spring 的事件驅動模型後,咱們又要接觸一種觀察者模式啦!redis 做爲一個pub/sub server,在訂閱者和發佈者之間起到了消息路由的功能。訂閱者能夠經過 subscribe 和 psubscribe 命令向 redis server 訂閱本身感興趣的channel ;發佈者經過 publish 命令向 redis server 的 channel 發送消息,訂閱該 channel 的所有 client 都會收到此消息。一個 client 能夠訂閱多個 channel,也能夠向多個 channel 發送消息。
訂閱 channel:
127.0.0.1:6379> subscribe channel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel" 3) (integer) 1 1) "message" 2) "channel" 3) "Hello,World"
發佈 channel 消息:
127.0.0.1:6379> publish channel Hello,World (integer) 1
接下來咱們來看看在 spring-data-redis 中如何實現發佈訂閱功能。首先咱們須要一個消息監聽器,只要讓它實現 MessageListener 接口便可:
public class ChannelListener implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { System.out.println("channel is:" + new String(message.getChannel())); System.out.println("channel content:" + new String(message.getBody())); } }
那麼,下面怎麼作呢?固然是把消息監聽器和 channel 綁定在一塊兒,讓消息監聽器知道處理哪一個 channel 的消息:
/** * redis 消息監聽器容器, 綁定消息監聽器和 channel * * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱了一個叫 channel 的通道 container.addMessageListener(channelAdapter(), new PatternTopic("channel")); //這個 container 能夠添加多個 messageListener return container; } /** * 消息監聽器適配器 */ @Bean public MessageListenerAdapter channelAdapter() { return new MessageListenerAdapter(new ChannelListener()); }
接下來,讓咱們試着往這個 channel 發佈一個消息吧!
@Test public void publish() { redisTemplate.convertAndSend("channel", "Hello,World"); }