本文介紹一下redis中zset的使用。首先說一下我本地的實驗環境:java
redis版本:6.0.7redis
springboot-redis版本:spring
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.1.6.RELEASE</version> </dependency>
裏面使用到的spring-data-redis版本:api
2.1.9.RELEASE
裏面使用到的lettuce鏈接池版本:springboot
5.1.7.RELEASE
以前在文章《redis靈魂拷問:聊一聊redis底層數據結構》中講過redis的數據結構了,zset用到了3種數據結構,壓縮列表、跳錶,而且使用哈希表來保存value:score鍵值對。markdown
當同時知足下面2個條件時會用到壓縮列表,不然會用跳錶:數據結構
集合中元素都小於64字節ide
固然這個也是能夠配置的,在redis.conf文件中:spring-boot
# Similarly to hashes and lists, sorted sets are also specially encoded in # order to save a lot of space. This encoding is only used when the length and # elements of a sorted set are below the following limits: zset-max-ziplist-entries 128 zset-max-ziplist-value 64
由於使用哈希表保存分數,因此zset查找分數的命令時間複雜度是o(1)。網站
跳錶的數據結構咱們再回顧一下,看下圖:
跳錶中的元素是按照分數有序排列的,每一個元素都有指向後一個元素的指針,因此跳錶能夠很方便地進行範圍查詢,查找一個元素的複雜度是O(log(N)),從這個元素經過指針就能夠找到後面的M個元素,因此複雜度是O(log(N)+M)。
注意:下面的命令我用java代碼來實現,註解中寫了每一個命令的原生命令和時間複雜度,使用的時候你們能夠根據每一個命令的複雜度來進行取捨。
/** * ZADD * 時間複雜度 O(log(N)),n是sorted set中元素個數 */ public void add(){ //批量添加 Set<DefaultTypedTuple> zset1 = new HashSet<>(); zset1.add(new DefaultTypedTuple("v1",30.0)); zset1.add(new DefaultTypedTuple("v2",20.0)); zset1.add(new DefaultTypedTuple("v3",40.0)); zset1.add(new DefaultTypedTuple("v4",10.0)); //單個添加 redisTemplate.opsForZSet().add("zset1", zset1); //加入後排序[v5, v4, v2, v1, v3] redisTemplate.opsForZSet().add("zset1", "v5", 5); }
/** * ZREM * 複雜度O(M*log(N)) * @return 刪除元素個數 */ public Long remove(){ Set<DefaultTypedTuple> zset6 = new HashSet<>(); zset6.add(new DefaultTypedTuple("v1",30.0)); zset6.add(new DefaultTypedTuple("v2",20.0)); zset6.add(new DefaultTypedTuple("v3",40.0)); zset6.add(new DefaultTypedTuple("v4",10.0)); zset6.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset6", zset6); //返回2 return redisTemplate.opsForZSet().remove("zset6", "v4", "v5"); } /** * ZREMRANGEBYRANK * 複雜度O(log(N)+M) * @return 刪除元素個數 */ public Long removeRange(){ Set<DefaultTypedTuple> zset7 = new HashSet<>(); zset7.add(new DefaultTypedTuple("v1",10.0)); zset7.add(new DefaultTypedTuple("v2",20.0)); zset7.add(new DefaultTypedTuple("v3",30.0)); zset7.add(new DefaultTypedTuple("v4",40.0)); zset7.add(new DefaultTypedTuple("v5",50.0)); redisTemplate.opsForZSet().add("zset7", zset7); //返回3,此時zset中只剩[v1,v5] return redisTemplate.opsForZSet().removeRange("zset7", 1, 3); } /** * ZREMRANGEBYSCORE * 複雜度O(log(N)+M) * @return 刪除元素個數 */ public Long removeRangeByScore(){ Set<DefaultTypedTuple> zset8 = new HashSet<>(); zset8.add(new DefaultTypedTuple("v1",10.0)); zset8.add(new DefaultTypedTuple("v2",20.0)); zset8.add(new DefaultTypedTuple("v3",30.0)); zset8.add(new DefaultTypedTuple("v4",40.0)); zset8.add(new DefaultTypedTuple("v5",50.0)); redisTemplate.opsForZSet().add("zset8", zset8); //返回3,此時zset中只剩[v4,v5] return redisTemplate.opsForZSet().removeRangeByScore("zset8", 10, 30); }
/** * ZCARD * 返回元素個數 * 複雜度 O(1) * * @return */ public Long zCard(){ Set<DefaultTypedTuple> zset1 = new HashSet<>(); zset1.add(new DefaultTypedTuple("v1",30.0)); zset1.add(new DefaultTypedTuple("v2",20.0)); zset1.add(new DefaultTypedTuple("v3",40.0)); zset1.add(new DefaultTypedTuple("v4",10.0)); zset1.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset1", zset1); //[v5, v4, v2, v1, v3],返回5 return redisTemplate.opsForZSet().zCard("zset1"); }
/** * ZCOUNT * 時間複雜度O(log(N)) * 返回分數是min~max之間的元素個數, 閉區間 * @return */ public Long count(){ Set<DefaultTypedTuple> zset1 = new HashSet<>(); zset1.add(new DefaultTypedTuple("v1",30.0)); zset1.add(new DefaultTypedTuple("v2",20.0)); zset1.add(new DefaultTypedTuple("v3",40.0)); zset1.add(new DefaultTypedTuple("v4",10.0)); zset1.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset1", zset1); //["v1",30.0, "v2",20.0, "v3",40.0, "v4",10.0, "v5",5.0],返回2 return redisTemplate.opsForZSet().count("zset1", 20.0, 30.0); }
/** * ZRANK * 時間複雜度O(log(N)) * * @return 返回元素的正序索引位置 */ public Long rank(){ Set<DefaultTypedTuple> zset1 = new HashSet<>(); zset1.add(new DefaultTypedTuple("v1",30.0)); zset1.add(new DefaultTypedTuple("v2",20.0)); zset1.add(new DefaultTypedTuple("v3",40.0)); zset1.add(new DefaultTypedTuple("v4",10.0)); zset1.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset1", zset1); //[v5, v4, v2, v1, v3],這裏輸出0 return redisTemplate.opsForZSet().rank("zset1", "v5"); } /** * ZREVRANK * 時間複雜度O(log(N)) * 跟rank相反,返回元素逆序的位置 * * @return 返回元素的逆序索引位置 */ public Long reverseRank(){ Set<DefaultTypedTuple> zset1 = new HashSet<>(); zset1.add(new DefaultTypedTuple("v1",30.0)); zset1.add(new DefaultTypedTuple("v2",20.0)); zset1.add(new DefaultTypedTuple("v3",40.0)); zset1.add(new DefaultTypedTuple("v4",10.0)); zset1.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset1", zset1); //[v5, v4, v2, v1, v3],這裏輸出4 return redisTemplate.opsForZSet().reverseRank("zset1", "v5"); }
/** * ZRANGE/ZREVRANGE命令 * 複雜度O(log(N)+M),N是有序集合中的元素,M是返回的元素個數 *注意: * 1.索引下標從0開始 * 2.ZREVRANGE對應逆序輸出,這裏不給出示例 * * @return 返回指定索引範圍內的元素,注意,這裏是閉區間, 若是end傳入-1,就是從start到最後一個元素 */ public Set range(){ Set<DefaultTypedTuple> zset1 = new HashSet<>(); zset1.add(new DefaultTypedTuple("v1",30.0)); zset1.add(new DefaultTypedTuple("v2",20.0)); zset1.add(new DefaultTypedTuple("v3",40.0)); zset1.add(new DefaultTypedTuple("v4",10.0)); zset1.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset1", zset1); //輸出[v4, v2, v1, v3],總共5個元素,索引從1開始 return redisTemplate.opsForZSet().range("zset1", 1, -1); } /** * ZRANGEBYLE/ZRANGEBYLEX命令 * 複雜度O(log(N)+M),N是有序集合中的元素,M是返回的元素個數 *注意: * 1.這個命令用於元素分數相同的有序集合 * 2.spring的RedisZSetCommands.Range不生效 * 3.ZRANGEBYLEX對應逆序輸出,當前客戶端不支持這個命令 * * @return 返回指定索引範圍內的元素 */ public Set rangeByLex(){ Set<DefaultTypedTuple> zset3 = new HashSet<>(); zset3.add(new DefaultTypedTuple("a",0d)); zset3.add(new DefaultTypedTuple("b",0d)); zset3.add(new DefaultTypedTuple("c",0d)); zset3.add(new DefaultTypedTuple("d",0d)); zset3.add(new DefaultTypedTuple("e",0d)); zset3.add(new DefaultTypedTuple("f",0d)); zset3.add(new DefaultTypedTuple("g",0d)); redisTemplate.opsForZSet().add("zset3", zset3); RedisZSetCommands.Range range = RedisZSetCommands.Range.range(); //下面range賦值不生效,給lt賦值後返回空 //range.lt("f"); range.gt("c"); RedisZSetCommands.Limit limit = new RedisZSetCommands.Limit(); limit.offset(0); limit.count(5); //返回[a, b, c, d, e] return redisTemplate.opsForZSet().rangeByLex("zset3", range, limit); } /** * ZRANGEBYSCORE/ZRANGEBYSCORE命令 * 複雜度O(log(N)+M),N是有序集合中的元素,M是返回的元素個數 *注意: * 這個命令是閉區間 * ZRANGEBYSCORE命令對應逆序輸出 * * @return 返回指定索引範圍內的元素 */ public Set rangeByScore(){ Set<DefaultTypedTuple> zset4 = new HashSet<>(); zset4.add(new DefaultTypedTuple("v1",30.0)); zset4.add(new DefaultTypedTuple("v2",20.0)); zset4.add(new DefaultTypedTuple("v3",40.0)); zset4.add(new DefaultTypedTuple("v4",10.0)); zset4.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset4", zset4); //返回[v4, v2, v1] return redisTemplate.opsForZSet().rangeByScore("zset4", 10, 30); }
/** * ZSCAN命令 * 複雜度O(1) * 注意: * * * @return 返回指定索引範圍內的元素 */ public void scan(){ Set<DefaultTypedTuple> zset9 = new HashSet<>(); for (int i = 1; i <= 1000; i ++){ zset9.add(new DefaultTypedTuple("v" + i,i * 10.0)); } redisTemplate.opsForZSet().add("zset9", zset9); ScanOptions.ScanOptionsBuilder scanOptionsBuilder = new ScanOptions.ScanOptionsBuilder(); //這個count參數其實也不起做用,數據量小,好比20個,咱們設置了10,會所有輸出;數據量10000個,咱們輸入2000,也是輸出1000個 scanOptionsBuilder.count(2000); //這裏使用v*居然匹配不到 scanOptionsBuilder.match("*"); Cursor<ZSetOperations.TypedTuple> cursor = redisTemplate.opsForZSet().scan("zset9", scanOptionsBuilder.build()); System.out.println("======================"); cursor.forEachRemaining(r -> System.out.println(r.getValue() + ":" + r.getScore())); /** * 下面是輸出1000行中的前5行: * v490:4900.0 * v573:5730.0 * v643:6430.0 * v733:7330.0 * v408:4080.0 */ }
/** * ZINCRBY命令 * 增長元素分數 * 複雜度 O(log(N)),n是zset中元素個數 * * @return 增長分數後的元素值 */ public Double incrementScore(){ Set<DefaultTypedTuple> zset5 = new HashSet<>(); zset5.add(new DefaultTypedTuple("v1",30.0)); zset5.add(new DefaultTypedTuple("v2",20.0)); zset5.add(new DefaultTypedTuple("v3",40.0)); zset5.add(new DefaultTypedTuple("v4",10.0)); zset5.add(new DefaultTypedTuple("v5",5.0)); redisTemplate.opsForZSet().add("zset5", zset5); //返回15.0 return redisTemplate.opsForZSet().incrementScore("zset5", "v5", 10d); } /** * ZSCAN命令 * 複雜度O(1) * @return 查找元素的分數 */ public Double score(){ Set<DefaultTypedTuple> zset14 = new HashSet<>(); zset14.add(new DefaultTypedTuple("v1",10.0)); zset14.add(new DefaultTypedTuple("v2",20.0)); zset14.add(new DefaultTypedTuple("v3",30.0)); zset14.add(new DefaultTypedTuple("v4",40.0)); zset14.add(new DefaultTypedTuple("v5",50.0)); redisTemplate.opsForZSet().add("zset14", zset14); //返回30.0 return redisTemplate.opsForZSet().score("zset14", "v3"); }
/** * ZINTER/ZINTERSTORE * * 複雜度O(N*K)+O(M*log(M)) ,N是元素少的zset的元素數量,K是2個zset的元素總數,M是返回結果 */ public void intersectAndStore(){ Set<DefaultTypedTuple> zset10 = new HashSet<>(); zset10.add(new DefaultTypedTuple("v1",10.0)); zset10.add(new DefaultTypedTuple("v3",30.0)); zset10.add(new DefaultTypedTuple("v5",50.0)); zset10.add(new DefaultTypedTuple("v6",60.0)); redisTemplate.opsForZSet().add("zset10", zset10); Set<DefaultTypedTuple> zset11 = new HashSet<>(); zset11.add(new DefaultTypedTuple("v1",10.0)); zset11.add(new DefaultTypedTuple("v2",20.0)); zset11.add(new DefaultTypedTuple("v3",30.0)); zset11.add(new DefaultTypedTuple("v4",40.0)); zset11.add(new DefaultTypedTuple("v5",50.0)); redisTemplate.opsForZSet().add("zset11", zset11); redisTemplate.opsForZSet().intersectAndStore("zset10", "zset11", "zsetinter9and11"); ScanOptions scanOptions = ScanOptions.NONE; Cursor<ZSetOperations.TypedTuple> cursor = redisTemplate.opsForZSet().scan("zsetinter9and11", scanOptions); System.out.println("======================"); cursor.forEachRemaining(r -> System.out.println(r.getValue() + ":" + r.getScore())); /** * 輸出結果以下 * v1:20.0 * v3:60.0 * v5:100.0 */ } /** * ZUNIONSTORE * * 複雜度:O(N)+O(M log(M)),其中N是2個zset的元素總數,M是返回的元素個數 */ public void unionAndStore(){ Set<DefaultTypedTuple> zset12 = new HashSet<>(); zset12.add(new DefaultTypedTuple("v1",10.0)); zset12.add(new DefaultTypedTuple("v2",20.0)); zset12.add(new DefaultTypedTuple("v3",30.0)); redisTemplate.opsForZSet().add("zset12", zset12); Set<DefaultTypedTuple> zset13 = new HashSet<>(); zset13.add(new DefaultTypedTuple("v4",40.0)); zset13.add(new DefaultTypedTuple("v5",50.0)); zset13.add(new DefaultTypedTuple("v6",60.0)); redisTemplate.opsForZSet().add("zset13", zset13); redisTemplate.opsForZSet().unionAndStore("zset12", "zset13", "zsetinter12and13"); ScanOptions scanOptions = ScanOptions.NONE; Cursor<ZSetOperations.TypedTuple> cursor = redisTemplate.opsForZSet().scan("zsetinter12and13", scanOptions); System.out.println("======================"); cursor.forEachRemaining(r -> System.out.println(r.getValue() + ":" + r.getScore())); /** * 輸出結果以下 * v1:10.0 * v2:20.0 * v3:30.0 * v4:40.0 * v5:50.0 * v6:60.0 */ }
做爲隊列2個命令:ZPOPMAX/ZPOPMIN,讓當前分數最高/最低的元素出隊,複雜度O(log(N)*M) ,當前spring版本客戶端不支持
zset保存了分數值,因此對於閱讀量、點擊量排行等場景能夠很方便的使用。
假如一個博客網站上有10篇文章,咱們要統計今天閱讀量排名前2位的文章,咱們能夠先初始化一個10篇文章的zset,代碼以下:
Set<DefaultTypedTuple> articles = new HashSet<>(); articles.add(new DefaultTypedTuple("article1",0d)); articles.add(new DefaultTypedTuple("article2",0d)); articles.add(new DefaultTypedTuple("article3",0d)); articles.add(new DefaultTypedTuple("article4",0d)); articles.add(new DefaultTypedTuple("article5",0d)); articles.add(new DefaultTypedTuple("article6",0d)); articles.add(new DefaultTypedTuple("article7",0d)); articles.add(new DefaultTypedTuple("article8",0d)); articles.add(new DefaultTypedTuple("article9",0d)); articles.add(new DefaultTypedTuple("article10",0d)); redisTemplate.opsForZSet().add("articles", articles);
每當有1篇文章被閱讀時,咱們就把分數加1,好比第一篇:
redisTemplate.opsForZSet().incrementScore("articles", "article1", 1d);
日終時,咱們找出排名前2位的文章:
redisTemplate.opsForZSet().range("articles", 0, 1);
跟上面的場景相似,假如咱們要找出銷售量前2位的商品,咱們也能夠初始化一個商品zset,分數就是銷售量,每次售出一件商品時分數值加1,最後range命令去除前2個商品。
好比咱們要對1萬個手機號排名,咱們能夠把姓名做爲key,把手機號score存入zset中,代碼以下:
Set<DefaultTypedTuple> phones = new HashSet<>(); phones.add(new DefaultTypedTuple("張三",18605556899)); redisTemplate.opsForZSet().add("phones", phones);
咱們能夠隨便找出一個幸運手機號,好比6000
redisTemplate.opsForZSet().range("articles", 5999, 5999);
zset使用了壓縮列表、跳錶的數據結構,而且使用哈希表來保存value:score鍵值對。
range命令得益於底層使用了跳錶,複雜度並不高,可是會隨着返回元素的數量而增長。zscan命令複雜度很低,可是spring提供的api不友好,超過1000須要分頁的時候,就很差用了。元素個數少於1000時使用zscan命令一次取出是最快的。
交集並集的複雜度很高,若是有bigkey的狀況,會嚴重阻塞主線程,建議通常不要使用。能夠把2個zset的元素取出來,在應用內存中進行交集並集運算,這樣不會阻塞redis主線程。
因爲api和版本限制,本文並無列出zset的全部命令,你們能夠查看官網:
https://redis.io/commands/zunionstore