Redis原子性寫入HASH結構數據並設置過時時間

Redis中提供了原子性命令SETEX或SET來寫入STRING類型數據並設置Key的過時時間:git

> SET key value EX 60 NX
ok
> SETEX key 60 value
ok

但對於HASH結構則沒有這樣的命令,只能先寫入數據而後設置過時時間:github

> HSET key field value
ok
> EXPIRE key 60
ok

這樣就帶了一個問題:HSET命令執行成功而EXPIRE命令執行失敗(如命令未能成功發送到Redis服務器),那麼數據將不會過時。針對這個問題,本文提供了幾種解決方案:redis

Lua腳本

向Redis中寫入HASH結構的Lua腳本以下:服務器

local fieldIndex=3
local valueIndex=4
local key=KEYS[1]
local fieldCount=ARGV[1]
local expired=ARGV[2]
for i=1,fieldCount,1 do
  redis.pcall('HSET',key,ARGV[fieldIndex],ARGV[valueIndex])
  fieldIndex=fieldIndex+2
  valueIndex=valueIndex+2
end
redis.pcall('EXPIRE',key,expired)

使用Redis命令行工具執行Lua腳本,須要將腳本內容單行化,並以分號間隔不一樣的命令:async

 

>  SCRIPT LOAD "local fieldIndex=3;local valueIndex=4;local key=KEYS[1];local fieldCount=ARGV[1];local expired=ARGV[2];for i=1,fieldCount,1 do redis.pcall('HSET',key,ARGV[fieldIndex],ARGV[valueIndex]) fieldIndex=fieldIndex+2 valueIndex=valueIndex+2 end;redis.pcall('EXPIRE',key,expired);"
"e03e7868920b7669d1c8c8b16dcee86ebfac650d"
> evalsha e03e7868920b7669d1c8c8b16dcee86ebfac650d 1 key 2 1000 field1 value1 field2 value2
nil

 

寫入結果:函數

使用StackExchange.Redis執行Lua腳本:工具

public async Task WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry)
{
    async Task func()
    {
        if (valueDict.Empty())
        {
            return;
        }
        var luaScriptPath = $"{AppDomain.CurrentDomain.BaseDirectory}/Lua/HSET.lua";
        var script = File.ReadAllText(luaScriptPath);
        var seconds = (int)Math.Ceiling(expiry.TotalSeconds);
        var fieldCount = valueDict.Count;
        var redisValues = new RedisValue[fieldCount * 2 + 2];
        redisValues[0] = fieldCount;
        redisValues[1] = seconds;
        var i = 2;
        foreach (var item in valueDict)
        {
            redisValues[i] = item.Key;
            redisValues[i + 1] = item.Value;
            i += 2;
        }
        //await Database.ScriptEvaluateAsync(script, new RedisKey[] { key, fieldCount.ToString(), seconds.ToString() }, redisValues);
        await Database.ScriptEvaluateAsync(script, new RedisKey[] { key }, redisValues);
    }

    await ExecuteCommandAsync(func, $"redisError:hashWrite:{key}");
}

事務

 

Redis官方文檔在事務一節中指出:Redis命令只會在有語法錯誤或對Key使用了錯誤的數據類型時執行失敗。所以,只要咱們保證將正確的寫數據和設置過時時間的命令做爲一個總體發送到服務器端便可,使用Lua腳本正式基於此。lua

 

StackExchange.Redis官方文檔中關於事務的說明,參見:Transactionsspa

 

如下是代碼實現:命令行

public async Task<bool> WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry)
{
    var tranc = Database.CreateTransaction();
    foreach (var item in valueDict)
    {
        tranc.HashSetAsync(key, item.Key, item.Value);
    }
    tranc.KeyExpireAsync(key, expiry);
    return await tranc.ExecuteAsync();
}

佔位符

這種方案比較差,思路以下,共分爲4步,每一步都有可能失敗:

  • 先寫入一個特殊的值,如Nil表示無數據
  • 若第一步操做成功,則Key被寫入Redis。而後對Key設置過時時間。若第一步失敗,則Key未寫入Redis,設置過時時間會失敗
  • 若成功設置Key的過時時間則像Redis中寫入有效數據
  • 刪除第一步中設置的特殊值

在讀取Hash的值時,判斷讀到的field的值是不是Nil,如果則刪除並忽略,若不是則處理。

代碼以下:

namespace RedisClient.Imples
{
    public class RedisHashOperator : RedisCommandExecutor, IRedisHashOperator
    {
        private readonly string KeyExpiryPlaceHolder = "expiryPlaceHolder";

        public RedisHashOperator(ILogger<RedisHashOperator> logger, IRedisConnection redisConnection)
            : base(logger, redisConnection)
        {
        }

        public async Task WriteAsync(string key, IDictionary<string, string> valueDict, TimeSpan expiry)
        {
            async Task action()
            {
                if (valueDict.Empty())
                {
                    return;
                }
                var hashList = new List<HashEntry>();
                foreach (var value in valueDict)
                {
                    hashList.Add(new HashEntry(value.Key, value.Value));
                }
                await Database.HashSetAsync(key, hashList.ToArray());
            }

            async Task successed()
            {
                await ExecuteCommandAsync(action, $"redisEorror:hashWrite:{key}");
            }

            await SetKeyExpireAsync(key, expiry, successed);
        }


        public async Task<RedisReadResult<IDictionary<string, string>>> ReadAllFieldsAsync(string key)
        {
            async Task<RedisReadResult<IDictionary<string, string>>> func()
            {
                var redisReadResult = new RedisReadResult<IDictionary<string, string>>();
                if (Database.KeyExists(key) == false)
                {
                    return redisReadResult.Failed();
                }
                var resultList = await Database.HashGetAllAsync(key);
                if (resultList == null)
                {
                    return redisReadResult.Failed();
                }
                var dict = new Dictionary<string, string>();
                if (resultList.Any())
                {
                    foreach (var result in resultList)
                    {
                        if (result.Name == KeyExpiryPlaceHolder || result.Value == KeyExpiryPlaceHolder)
                        {
                            await RemoveKeyExpiryPlaceHolderAsync(key);
                            continue;
                        }
                        dict[result.Name] = result.Value;
                    }
                }
                return redisReadResult.Success(dict);
            }

            return await ExecuteCommandAsync(func, $"redisError:hashReadAll:{key}");
        }


        #region private
        /// <summary>
        /// 設置HASH結構KEY的過時時間
        /// </summary>
        /// <param name="successed">設置過時時間成功以後的回調函數</param>
        private async Task SetKeyExpireAsync(string key, TimeSpan expiry, Func<Task> successed)
        {
            // 確保KEY的過時時間寫入成功以後再執其它的操做
            await Database.HashSetAsync(key, new HashEntry[] { new HashEntry(KeyExpiryPlaceHolder, KeyExpiryPlaceHolder) });
            if (Database.KeyExpire(key, expiry))
            {
                await successed();
            }
            await Database.HashDeleteAsync(key, KeyExpiryPlaceHolder);
        }

        private async Task RemoveKeyExpiryPlaceHolderAsync(string key)
        {
            await Database.HashDeleteAsync(key, KeyExpiryPlaceHolder);
        }
        #endregion

    }
}

文中屢次出現的ExecuteCommandAsync方法主要目的是實現針對異常狀況的統一處理,實現以下:

namespace RedisClient.Imples
{
    public class RedisCommandExecutor
    {
        private readonly ILogger Logger;
        protected readonly IDatabase Database;

        public RedisCommandExecutor(ILogger<RedisCommandExecutor> logger, IRedisConnection redisConnection)
        {
            Logger = logger;
            Database = redisConnection.GetDatabase();
        }

        protected async Task ExecuteCommandAsync(Func<Task> func, string errorMessage = null)
        {
            try
            {
                await func();
            }
            catch (Exception ex)
            {
                if (string.IsNullOrEmpty(errorMessage))
                {
                    errorMessage = ex.Message;
                }
                Logger.LogError(errorMessage, ex);
            }
        }

        protected async Task<T> ExecuteCommandAsync<T>(Func<Task<T>> func, string errorMessage = null)
        {
            try
            {
                return await func();
            }
            catch (Exception ex)
            {
                if (string.IsNullOrEmpty(errorMessage))
                {
                    errorMessage = ex.Message;
                }
                Logger.LogError(errorMessage, ex);
                return default(T);
            }
        }
    }
}
相關文章
相關標籤/搜索