使用redis的zset實現高效分頁查詢(附完整代碼)

1、需求

移動端系統裏有用戶和文章,文章可設置權限對部分用戶開放。現要實現的功能是,用戶瀏覽本身能看的最新文章,並能夠上滑分頁查看。前端

 

2、數據庫表設計

 涉及到的數據庫表有:用戶表TbUser、文章表TbArticle、用戶可見文章表TbUserArticle。其中,TbUserArticle的結構和數據以下圖,字段有:自增加主鍵id、用戶編號uid、文章編號aid。redis

 

自增加主鍵和分佈式增加主鍵如何選:算法

TbUserArticle的主鍵是自增id,它有個缺陷是,當你的數據庫有主從複製時,主從庫的自增可能因死鎖等緣由致使不一樣步。不過,咱們能夠知道,這裏的TbUserArticle的主鍵id不會作其它表的外鍵,因此能夠是自增id。不像用戶表的主鍵,它就不能用自增id,由於用戶表主鍵(uid)會常常出如今其它表中,當主從庫自增不一致時,不少有uid字段的表數據在從庫中就不正確了。用戶表主鍵最好是用分佈式增加主鍵算法生成的id(好比Snowflake雪花算法)。sql

那麼你可能就要說了,TbUserArticle的主鍵爲何不直接用雪花算法產生,無論有沒有用,先讓主從庫主鍵值一致老是有恃無恐。要知道,雪花算法產生的id通常是18位,而redis的zset的score是double類型,只能表達到16位"整數"部分(精確的說是9007199254740992=2的53次方)。所以,TbUserArticle的主鍵選擇自增id。數據庫

主鍵通常都要選自增id或分佈式增加id,這種主鍵好處多多,它符合自增加(物理存儲時都是在末尾追加數據,減小數據移動)、惟一性、長度小、查詢快的特性,是彙集索引的很好選擇。api

 

3、redis緩存設計-zset

zset的做法及其優勢說明:數組

1.zset的score倒序取數能夠很好的知足取最新數據的需求。緩存

2.用TbUserArticle的文章編號當value,用自增加id當score。自增id的惟一性可很方便的取下一頁數據,直接取小於上次最後一筆的score便可(用lastScore表示)。而若是用文章的時間作score,則要考慮兩筆文章的時間是同分同秒問題,當lastScore落在同分同秒的兩篇文章之間時,就尷尬了,雖然有解,但麻煩了一點。有時的場景你用不了自增id當score,只能用文章時間,那怎麼解決呢,方案就是當是同分同秒時,再根據文章編號作比較就行了,zset的score相同時,也是再根據value排序的。數據結構

3.當新增或從新添加一項時,zset也會保持score排序。而若是用的是redis的list,通常就得從db重載緩存,新增進來的數據項就算是最新的,也不敢直接添加到list第一筆,由於併發狀況下,保證不了最新就是在第一筆;至於從新添加進非最新項,那更是要從db取數從新裝載緩存(通常是直接刪除緩存,要用的時候才裝載)。併發

4.第一次從db加載數據到zset時,可只取前N筆到zset。由於咱們移動端的數據瀏覽,通常是隻看最新N筆,當看到昨天瀏覽過的數據通常就不會再往下瀏覽。

5.控制zset爲固定長度,防止一直增加,一是減小緩存開銷,二是隊列長度越短操做性能越高。並且redis服務端有兩個參數:zset-max-ziplist-entries(zset隊列長度,默認值128)和 zset-max-ziplist-value(zset每項大小,默認值64字節),它們的做用是,當zset長度小於128,且每一個元素的大小小於64字節時,會啓用ziplist(壓縮雙向鏈表),它的內存空間能夠減小8倍左右,並且操做性能也更快。若是不知足這兩個條件則是普通的skiplist(跳躍表)。另,數據結構hash和list默認長度是512。若是系統有100萬個用戶,每一個用戶都有本身的隊列緩存,那麼使用ziplist將節省很是大的內存空間,並提高很大的性能。

注意,當從zset移除一項數據,則看場景是否須要清空隊列。不然有可能添加進來了一項很舊的數據,它會跑到緩存隊列最底部,若是此舊數據比db中未進隊列的數據還舊,那麼隊列中的數據就不正確了。此時,用戶滑到緩存最後一頁時,就有可能瀏覽到這項不正確的數據,爲何是「有可能」,由於當取到zset最後一筆,極可能不夠一頁,而不夠一頁就會從db直接取一頁;而當又添加進一項新數據,這項舊數據就會被T出隊列(由於隊列保持固定長度)。最佳方案是搞個臨界值處理此問題。 而若是添加到zset的數據都是最新數據,則不會有此問題。通常是這種狀況,才能夠用自增id當score。

 當用惟一主鍵id作score時,這但是很是有用,你能夠直接根據id定位到項了,至於如何大用它,我會再出篇博客。

 

4、代碼實現 

從redis緩存按頁取數通常要考慮的點:

1.當根據cacheKey未取到數據時(多是緩存過時了致使redis無此cacheKey數據),則觸發重載數據(reload):從db取limit N筆數據,裝載到redis zset隊列中,並直接取N筆的第一頁數據返回;
2.若是db自己也無對應數據,則添加"no_db_mark"標識到cacheKey隊列中,下次請求則不會再觸發db重載數據;
3.當取到緩存末尾時,從db取一頁數據直接返回。這種狀況是不多的,要根據業務場景合理規劃緩存長度。

上代碼:

代碼註釋比較詳細和有用,請直接看代碼。其中,批量添加數據到zset的函數AddItemsToZset頗有用,它使用lua一次性添加數據到zset(注意,使用lua時,要保證lua執行快,不然它會阻塞其它命令的執行),經測試:AddItemsToZset添加1w筆數據,只須要39ms;10w筆須要448ms。由於咱們只取前N筆數據到緩存,所以通常不會添加超過1w筆。

  1     /// <summary>
  2     /// 分頁取數幫助類
  3     /// </summary>
  4     public class PageDataHelper
  5     {
  6         public readonly static string NoDbDataMark = "no_db_data";//在zset中標識db也無數據
  7         public static RedisHandle RedisClient = new RedisHandle();//redis操做對象示例
  8         public static DbHandleBase DbHandle = new SqlServerHandle("Data Source=.;Initial Catalog=Test;User Id=sa;Password=123ewq;");//db操做對象示例
  9         /// <summary>
 10         /// 按頁取數。返回文章編號列表。
 11         /// </summary>
 12         /// <param name="lastInfo">上一頁最後一筆的score,若是爲空,則說明是取第一頁。</param>
 13         /// <param name="getPast">true,用戶上滑瀏覽下一頁數據;false,用戶上滑瀏覽最新一頁數據</param>
 14         /// <returns>返回key-value列表,key就是文章編號,value就是自增id(可用於lastScore)</returns>
 15         public static IDictionary<string, double> GetUserPageData(string uid, int pageSize, string lastInfo, bool getPast)
 16         {
 17             long lastScore = 0;
 18             //1.解析lastInfo信息。->getPast爲false,則固定取最新第一頁數據,不用解析。lastInfo爲空,則也不用解析,默認第一頁
 19             if (getPast && !string.IsNullOrWhiteSpace(lastInfo))
 20             {
 21                 lastScore = long.Parse(lastInfo);//外層有try..catch..
 22             }
 23             string cacheKey = $"usr:art:{uid}";
 24             bool isFirstPage = lastScore <= 0;
 25             using (IRedisClient redis = RedisClient.GetRedisClient())
 26             {
 27                 if (isFirstPage)
 28                 {
 29                     //2.第一頁取數
 30                     var items = redis.GetRangeWithScoresFromSortedSetDesc(cacheKey, 0, pageSize - 1);
 31                     if (items.Count == 0)
 32                     {
 33                         //2.1 無數據時,則從db reload數據
 34                         items = ReloadDataToRedis(redis, cacheKey, uid, pageSize);
 35                         if (items.Count == 0 && pageSize > 0)
 36                         {
 37                             //若是db中也無數據,則向zset中添加一筆NoDbDataMark標識
 38                             redis.AddItemToSortedSet(cacheKey, NoDbDataMark, double.MaxValue);
 39                         }
 40                     }
 41                     else if (items.Count == 1 && items.ContainsKey(NoDbDataMark))
 42                     {
 43                         //2.2若是取到的是NoDbDataMark標識,則說明是空數據,則要Clear,返回空列表
 44                         items.Clear();
 45                     }
 46                     //設置緩存有效期,要根據業務場景合理設置緩存有效期,這邊以7天爲例。
 47                     redis.ExpireEntryIn(cacheKey, new TimeSpan(7, 0, 0, 0));
 48                     //2.3 第一頁,有多少就返回多少數據。數據若是不夠一頁,說明自己數據不夠。
 49                     return items;
 50                 }
 51                 else
 52                 {
 53                     //3.第二頁(及以後)取數
 54                     var items = GetPageDataByLastScoreFromRedis(redis, cacheKey, pageSize, lastScore);
 55                     if (items.Count < pageSize)
 56                     {
 57                         //3.1 若是取不夠數據時,就到db取。若是db也不能取到一頁數據,前端會顯示無更多數據,不會一直db取。
 58                         return GetPageDataByLastScoreFromDb(uid, pageSize, lastScore);
 59                     }
 60                     //3.2 若是緩存數據足夠,則返回緩存的數據。
 61                     return items;
 62                 }
 63             }
 64         }
 65         public static Dictionary<string, double> ReloadDataToRedis(IRedisClient redis, string cacheKey, string uid, int pageSize, string bizId = "")
 66         {
 67             //1.db取數 取top 1000筆數據。不須要全取到緩存。
 68             IEnumerable<dynamic> models;
 69             using (var conn = DbHandle.CreateConnectionAndOpen())
 70             {
 71                 var sql = $"select top 1000 id,aid from TbUserArticle where uid=@uid order by id desc;";// limit 1000;";
 72                 models = conn.Query<dynamic>(sql, new { uid = uid });
 73             }
 74             if (models.Count() <= 0) return new Dictionary<string, double>();
 75             //2.數據加載到redis緩存。
 76             var itemsParam = new Dictionary<string, double>();
 77             foreach (dynamic model in models)
 78             {
 79                 itemsParam.Add((string)model.aid, (double)model.id);
 80             }
 81             //使用lua一次性添加數據到緩存。lua語句要執行快,經測試添加1w筆數據,只須要39ms;10w筆須要448ms。由於sql中有limit,所以通常不會添加超過1w筆。
 82             //由於是原子性操做、而且是zset結構,這邊不須要加鎖。db取到數據應第一時間加載到redis。
 83             AddItemsToZset(redis, cacheKey, itemsParam, true, true);
 84             if (pageSize <= 0) return null;
 85             //3.直接由models返回第一頁數據。
 86             return models.Take(pageSize).ToDictionary(x => (string)x.aid, y => (double)y.id);
 87         }
 88 
 89         public static Dictionary<string, double> GetPageDataByLastScoreFromDb(string uid, int pageSize, double lastScore)
 90         {
 91             //db取一頁數據。
 92             var sql = $"select top {pageSize} id,aid from TbUserArticle where uid=@uid and id<{lastScore}order by id desc;";// limit {pageSize};";
 93             using (var conn = DbHandle.CreateConnectionAndOpen())
 94             {
 95                 return conn.Query<dynamic>(sql, new { uid = uid }).ToDictionary(x => (string)x.aid, y => (double)y.id);
 96             }
 97         }
 98         #region 通用函數
 99         /// <summary>
100         /// ZSet第一頁以後的取數,從lastScore開始取pageSize筆數據(第一頁以後纔有lastScore)。
101         /// 使用lua,保證原子性操做。
102         /// </summary>
103         public static Dictionary<string, double> GetPageDataByLastScoreFromRedis(IRedisClient redis, string zsetKey, int pageSize, double lastScore)
104         {
105             //ZREVRANGEBYSCORE: from lastScore to '-inf'.
106             var luaBody = @"local sets = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], '-inf', 'WITHSCORES');
107             local result = {};
108             local index=0;
109             local pageSize=ARGV[2]*1;
110             local lastScore=ARGV[1]*1;
111             for i = 1, #sets, 2 do 
112                 if index>=pageSize then
113                     break;
114                 end
115                 if (lastScore>sets[i+1]*1) then
116                     table.insert(result, sets[i]);
117                     table.insert(result, sets[i+1]);
118                     index=index+1;
119                 end
120             end
121             return result";
122             //ARGV[1]:lastScore ARGV[2]:pageSize
123             var list = redis.ExecLuaAsList(luaBody, new string[] { zsetKey }, new string[] { lastScore.ToString(), pageSize.ToString() });
124             var result = new Dictionary<string, double>();
125             for (var i = 0; i < list.Count; i += 2)
126             {
127                 result.Add(list[i], Convert.ToDouble(list[i + 1]));
128             }
129             return result;
130         }
131         /// <summary>
132         /// 添加一項到zset緩存中。
133         /// </summary>
134         /// <param name="item">要添加到zset的數據項</param>
135         /// <param name="maxCount">控制zset最大長度,若是爲0,則不控制。</param>
136         /// <returns></returns>
137         public static string AddItemToZset(IRedisClient redis, string zsetKey, KeyValuePair<string, double> item, int maxCount = 0)
138         {
139             var items = new Dictionary<string, double>() { { item.Key, item.Value } };
140             return AddItemsToZset(redis, zsetKey, items);
141         }
142         /// <summary>
143         /// 添加多項到zset緩存中。
144         /// </summary>
145         /// <param name="items">要添加到zset的數據列表</param>
146         /// <param name="hasCacheExpire">緩存zsetKey是否有設置緩存有效期。若是有設置緩存有效期,則當緩存中無數據時,多是緩存過時;而若是緩存無有效期,緩存中無數據,就是db和緩存都無數據</param>
147         /// <param name="isReload">是不是reload狀況,true重載狀況;false追加</param>
148         /// <param name="maxCount">控制zset最大長度,若是爲0,則不控制。
149         /// 通常不用控制,只在db取數reload時增長limit,以免全量進緩存;
150         /// 若是當前zsetKey是經常使用緩存,會一直暴漲,則纔要控制zset長度。</param>
151         /// <returns></returns>
152         public static string AddItemsToZset(IRedisClient redis, string zsetKey, Dictionary<string, double> items, bool hasCacheExpire = true
153             , bool isReload = false, int maxCount = 0)
154         {
155             //!isReload,是由於若是isReload=true狀況無數據,則也要進來重載隊列爲無數據(即,若是以前有數據要重載爲無數據)
156             if (!isReload && items.Count <= 0) return null;
157             var argArr = new List<string>(items.Count * 2 + 2);//lua參數數組
158             //var hasCacheExpire = cacheValidTime != null;
159             //第一個lua參數是hasCacheExpire
160             argArr.Add(hasCacheExpire ? "1" : "0");
161             //第二個lua參數是maxCount
162             argArr.Add(maxCount.ToString());
163             //組合lua其它參數列表:ZADD的參數
164             foreach (var item in items)
165             {
166                 //Add score。 //ZADD KEY_NAME SCORE1 VALUE1
167                 argArr.Add(item.Value.ToString());
168                 argArr.Add(item.Key);
169             }
170             #region lua
171             /*
172             * 如下lua命令說明。
173             * 1.ZREVRANGE從大到小取第一筆數據firstMark;
174             * 2.緩存有設置有效期時(hasCacheExpire=1),若是第一筆數據firstMark爲nil,則說明列表是空(失效key、未生成key),則不作任何處理,直接返回字符串not_exist_key。由於多是用戶失效數據,用戶長期未訪問,則不添加,後繼來訪問時重載數據。
175             * 3.若是firstMark標識爲no_db_data,則是被api標識爲db沒數據,而此時因要ZADD數據進來,所以要把此標識刪除。其中,ZREMRANGEBYRANK從小到大刪除,-1是倒數第一筆。
176             * 4.ZADD數據進來
177             * 5.KeepLength保持隊列長度操做。若是隊列長度(由ZCARD獲取)超過指定的maxCount,則從隊列第一筆開始刪除多餘元素,即score最小開始刪除。
178             * 6.maxCount爲>0才KeepLength。返回數值:curCount - maxCount。(能夠用返回值簡單算出隊列當前長度curCount)。若是返回值小於等於0則說明沒有觸發刪除操做。
179             * 7.maxCount爲<=0時,直接返回'no_remove'。
180             */
181             //清空原來,從新加載數據的狀況
182             const string reloadLua = "redis.call('DEL', KEYS[1]) ";
183             //追加數據到zset的狀況
184             const string addToLua =
185             @"local firstMark = redis.call('ZREVRANGE',KEYS[1],0,0);
186             local hasCacheExpire=ARGV[1]*1;
187             if hasCacheExpire==1 and firstMark and firstMark[1]==nil then
188                 return 'not_exist_key';
189             end
190             if firstMark and firstMark[1]=='{0}' then
191                 redis.call('ZREMRANGEBYRANK', KEYS[1], -1,-1);
192             end";
193             const string constAllLua =
194             @"{0}
195             for i=3, #ARGV, 2 
196                 do redis.call('ZADD', KEYS[1], ARGV[i], ARGV[i+1]);  
197             end
198             local maxCount=ARGV[2]*1;
199             if maxCount>0 then
200               local curCount= redis.call('ZCARD', KEYS[1]);
201               local removeCount=curCount - maxCount;
202               if removeCount>0 then
203                 redis.call('ZREMRANGEBYRANK', KEYS[1], 0,removeCount-1);    
204               end  
205               return removeCount;
206             end
207             return 'no_remove';";
208             #endregion
209             var luaBody = string.Format(constAllLua, isReload ? reloadLua : string.Format(addToLua, NoDbDataMark));
210             var luaResult = redis.ExecLuaAsString(luaBody, new string[] { zsetKey }, argArr.ToArray());
211             return luaResult;
212         }
213         #endregion
214     }

返回key-value列表,key就是文章編號,value就是自增id(可用於lastScore)

相關文章
相關標籤/搜索