爲何要用分佈式鎖?redis
先上一張截圖,這是在瀏覽別人的博客時看到的.dom
在瞭解爲何要用分佈式鎖以前,咱們應該知道到底什麼是分佈式鎖.分佈式
鎖按照不一樣的維度,有多種分類.好比測試
1.悲觀鎖,樂觀鎖;ui
2.公平鎖,非公平鎖;lua
3.獨享鎖,共享鎖;spa
4.線程鎖,進程鎖;操作系統
等等.線程
咱們平時用的鎖,好比 lock,它是線程鎖,主要用來給方法,代碼塊加鎖.因爲進程的內存單元是被其全部線程共享的,因此線程鎖控制的實際是多個線程對同一塊內存區域的訪問.code
有線程鎖,就必然有進程鎖.顧名思義,進程鎖的目的是控制多個進程對共享資源的訪問.由於進程之間彼此獨立,各個進程是沒法控制其餘進程對資源的訪問,因此只能經過操做系統來控制.好比 Mutex.
可是進程鎖有一個前提,那就是須要多個進程在同一個系統中,若是多個進程不在同一個系統,那就只能使用分佈式鎖來控制了.
分佈式鎖是控制分佈式系統中不一樣系統之間訪問共享資源的一種鎖實現.它和線程鎖,進程鎖的做用都是同樣,只是範圍不同.
因此要實現分佈式鎖,就必須依靠第三方存儲介質來存儲鎖的信息.由於各個進程之間彼此誰都不服誰,只能找一個帶頭大哥咯;
如下示例需引用NUGET: CSRedisCore
示例一
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0"); var lockKey = "lockKey"; var stock = 5;//商品庫存 var taskCount = 10;//線程數量 redisClient.Del(lockKey);//測試前,先把鎖刪了. for (int i = 0; i < taskCount; i++) { Task.Run(() => { //獲取鎖 do { //setnx : key不存在纔會成功,存在則失敗. var success = redisClient.SetNx(lockKey, 1); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費"); if (stock <= 0) { Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!"); redisClient.Del(lockKey); return; } stock--; //模擬處理業務 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩餘 {stock} 個"); //業務處理完後,釋放鎖. redisClient.Del(lockKey); }); }
運行結果:
看起來貌似沒毛病,實際上上述代碼有個致命的問題:
當某個線程拿到鎖以後,若是系統崩潰了,那麼鎖永遠都不會被釋放.所以,咱們應該給鎖加一個過時時間,當時間到了,尚未被主動釋放,咱們就讓redis釋放掉它,以保證其餘消費者能夠拿到鎖,進行消費.
這裏給鎖加過時時間也有講究,不能拿到鎖後再加,好比:
......
//setnx : key不存在纔會成功,存在則失敗. var success = redisClient.SetNx(lockKey, 1); if (success == true) { redisClient.Set(lockKey, 1, expireSeconds: 5); break; }
這樣操做的話,獲取鎖和設置鎖的過時時間就不是原子操做,一樣會出現上面提到的問題.Redis 提供了一個合而爲一的操做能夠解決這個問題.
//set : key存在則失敗,不存在纔會成功,而且過時時間5秒 var success = redisClient.Set(lockKey, 1, expireSeconds: 5, exists: RedisExistence.Nx);
這個問題雖然解決了,但隨之產生了一個新的問題:
假設有3個線程A,B,C
當線程A拿到鎖後執行業務的時候超時了,超過了鎖的過時時間還沒執行完,這時候鎖被Redis釋放了,
因而線程B拿到了鎖並開始執行業務邏輯.
當線程B的業務邏輯還沒執行完的時候,線程A的業務邏輯執行完了,因而乎就跑去釋放掉了鎖.
這時候線程C就能夠拿到鎖開始執行它的業務邏輯.
這不就亂套了麼...
所以,線程在釋放鎖的時候應該判斷這個鎖還屬不屬於本身.
因此,在設置鎖的時候,redis的value值不能像上面代碼那樣,隨便給個1,而應該給一個隨機值,表明當前線程.
var id = Guid.NewGuid().ToString("N"); //獲取鎖 do { //set : key存在則失敗,不存在纔會成功,而且過時時間5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費");
.........
//業務處理完後,釋放鎖. var value = redisClient.Get<string>(lockKey); if (value == id) { redisClient.Del(lockKey); }
完美了嗎?
不完美.仍是老生常談的問題,取value和刪除key 分了兩步走,不是原子操做.
而且,這裏還不能用pipe,由於須要根據取到的value來決定下一個操做.上面設置過時時間卻是能夠用pipe.
因此,這裏只能用lua.
完整的代碼以下:
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0"); var lockKey = "lockKey"; var stock = 5;//商品庫存 var taskCount = 10;//線程數量 var script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本 redisClient.Del(lockKey);//測試前,先把鎖刪了. for (int i = 0; i < taskCount; i++) { Task.Run(() => { var id = Guid.NewGuid().ToString("N"); //獲取鎖 do { //set : key存在則失敗,不存在纔會成功,而且過時時間5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費"); if (stock <= 0) { Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!"); redisClient.Eval(script,lockKey,id); return; } stock--; //模擬處理業務,這裏不考慮失敗的狀況 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩餘 {stock} 個"); //業務處理完後,釋放鎖. redisClient.Eval(script, lockKey, id); }); }
這篇文章只介紹了單節點Redis的分佈式鎖,由於單節點,因此不是高可用.
多節點Redis則須要用官方介紹的RedLock,這玩意有點繞,我須要捋一捋.