Redis中提供了原子性命令SETEX或SET來寫入STRING類型數據並設置Key的過時時間:git
> SET key value EX 60 NX ok > SETEX key 60 value ok
但對於HASH結構則沒有這樣的命令,只能先寫入數據而後設置過時時間:github
> HSET key field value ok > EXPIRE key 60 ok
這樣就帶了一個問題:HSET命令執行成功而EXPIRE命令執行失敗(如命令未能成功發送到Redis服務器),那麼數據將不會過時。針對這個問題,本文提供了幾種解決方案:redis
向Redis中寫入HASH結構的Lua腳本以下:服務器
local fieldIndex=3 local valueIndex=4 local key=KEYS[1] local fieldCount=ARGV[1] local expired=ARGV[2] for i=1,fieldCount,1 do redis.pcall('HSET',key,ARGV[fieldIndex],ARGV[valueIndex]) fieldIndex=fieldIndex+2 valueIndex=valueIndex+2 end redis.pcall('EXPIRE',key,expired)
使用Redis命令行工具執行Lua腳本,須要將腳本內容單行化,並以分號間隔不一樣的命令:async
> SCRIPT LOAD "local fieldIndex=3;local valueIndex=4;local key=KEYS[1];local fieldCount=ARGV[1];local expired=ARGV[2];for i=1,fieldCount,1 do redis.pcall('HSET',key,ARGV[fieldIndex],ARGV[valueIndex]) fieldIndex=fieldIndex+2 valueIndex=valueIndex+2 end;redis.pcall('EXPIRE',key,expired);" "e03e7868920b7669d1c8c8b16dcee86ebfac650d" > evalsha e03e7868920b7669d1c8c8b16dcee86ebfac650d 1 key 2 1000 field1 value1 field2 value2 nil
寫入結果:函數
使用StackExchange.Redis執行Lua腳本:工具
public async Task WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry) { async Task func() { if (valueDict.Empty()) { return; } var luaScriptPath = $"{AppDomain.CurrentDomain.BaseDirectory}/Lua/HSET.lua"; var script = File.ReadAllText(luaScriptPath); var seconds = (int)Math.Ceiling(expiry.TotalSeconds); var fieldCount = valueDict.Count; var redisValues = new RedisValue[fieldCount * 2 + 2]; redisValues[0] = fieldCount; redisValues[1] = seconds; var i = 2; foreach (var item in valueDict) { redisValues[i] = item.Key; redisValues[i + 1] = item.Value; i += 2; } //await Database.ScriptEvaluateAsync(script, new RedisKey[] { key, fieldCount.ToString(), seconds.ToString() }, redisValues); await Database.ScriptEvaluateAsync(script, new RedisKey[] { key }, redisValues); } await ExecuteCommandAsync(func, $"redisError:hashWrite:{key}"); }
Redis官方文檔在事務一節中指出:Redis命令只會在有語法錯誤或對Key使用了錯誤的數據類型時執行失敗。所以,只要咱們保證將正確的寫數據和設置過時時間的命令做爲一個總體發送到服務器端便可,使用Lua腳本正式基於此。lua
StackExchange.Redis官方文檔中關於事務的說明,參見:Transactionsspa
如下是代碼實現:命令行
public async Task<bool> WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry) { var tranc = Database.CreateTransaction(); foreach (var item in valueDict) { tranc.HashSetAsync(key, item.Key, item.Value); } tranc.KeyExpireAsync(key, expiry); return await tranc.ExecuteAsync(); }
這種方案比較差,思路以下,共分爲4步,每一步都有可能失敗:
在讀取Hash的值時,判斷讀到的field的值是不是Nil,如果則刪除並忽略,若不是則處理。
代碼以下:
namespace RedisClient.Imples { public class RedisHashOperator : RedisCommandExecutor, IRedisHashOperator { private readonly string KeyExpiryPlaceHolder = "expiryPlaceHolder"; public RedisHashOperator(ILogger<RedisHashOperator> logger, IRedisConnection redisConnection) : base(logger, redisConnection) { } public async Task WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry) { async Task action() { if (valueDict.Empty()) { return; } var hashList = new List<HashEntry>(); foreach (var value in valueDict) { hashList.Add(new HashEntry(value.Key, value.Value)); } await Database.HashSetAsync(key, hashList.ToArray()); } async Task successed() { await ExecuteCommandAsync(action, $"redisEorror:hashWrite:{key}"); } await SetKeyExpireAsync(key, expiry, successed); } public async Task<RedisReadResult<IDictionary<string, string>>> ReadAllFieldsAsync(string key) { async Task<RedisReadResult<IDictionary<string, string>>> func() { var redisReadResult = new RedisReadResult<IDictionary<string, string>>(); if (Database.KeyExists(key) == false) { return redisReadResult.Failed(); } var resultList = await Database.HashGetAllAsync(key); if (resultList == null) { return redisReadResult.Failed(); } var dict = new Dictionary<string, string>(); if (resultList.Any()) { foreach (var result in resultList) { if (result.Name == KeyExpiryPlaceHolder || result.Value == KeyExpiryPlaceHolder) { await RemoveKeyExpiryPlaceHolderAsync(key); continue; } dict[result.Name] = result.Value; } } return redisReadResult.Success(dict); } return await ExecuteCommandAsync(func, $"redisError:hashReadAll:{key}"); } #region private /// <summary> /// 設置HASH結構KEY的過時時間 /// </summary> /// <param name="successed">設置過時時間成功以後的回調函數</param> private async Task SetKeyExpireAsync(string key, TimeSpan expiry, Func<Task> successed) { // 確保KEY的過時時間寫入成功以後再執其它的操做 await Database.HashSetAsync(key, new HashEntry[] { new HashEntry(KeyExpiryPlaceHolder, KeyExpiryPlaceHolder) }); if (Database.KeyExpire(key, expiry)) { await successed(); } await Database.HashDeleteAsync(key, KeyExpiryPlaceHolder); } private async Task RemoveKeyExpiryPlaceHolderAsync(string key) { await Database.HashDeleteAsync(key, KeyExpiryPlaceHolder); } #endregion } }
文中屢次出現的ExecuteCommandAsync方法主要目的是實現針對異常狀況的統一處理,實現以下:
namespace RedisClient.Imples { public class RedisCommandExecutor { private readonly ILogger Logger; protected readonly IDatabase Database; public RedisCommandExecutor(ILogger<RedisCommandExecutor> logger, IRedisConnection redisConnection) { Logger = logger; Database = redisConnection.GetDatabase(); } protected async Task ExecuteCommandAsync(Func<Task> func, string errorMessage = null) { try { await func(); } catch (Exception ex) { if (string.IsNullOrEmpty(errorMessage)) { errorMessage = ex.Message; } Logger.LogError(errorMessage, ex); } } protected async Task<T> ExecuteCommandAsync<T>(Func<Task<T>> func, string errorMessage = null) { try { return await func(); } catch (Exception ex) { if (string.IsNullOrEmpty(errorMessage)) { errorMessage = ex.Message; } Logger.LogError(errorMessage, ex); return default(T); } } } }