Redis單節點的分佈式鎖只須要注意三點就能夠了:api
1.加鎖並設置鎖的過時時間必須是原子操做;app
2.鎖的value值必需要有惟一性;dom
3.釋放鎖的時候要驗證其value值,不是本身加的鎖不能釋放.異步
可是單節點分佈式鎖最大的缺點就是,它只做用在一個Redis節點上,若是該節點掛了,那就掛了.async
那可不能夠經過哨兵機制來保證高可用呢?分佈式
答案是不行.性能
由於Redis在進行主從複製的時候是異步的.測試
假設 clientA 拿到鎖後,在 master 還沒同步到 slave 時,master 發生了故障,這時候 salve 升級爲 master,致使鎖丟失.ui
RedLock 的思想是:假設有5個Redis節點.這些節點徹底相互獨立,不存在主從或者集羣機制,都是 master.而且這5個Redis實例運行在5臺機器上,這樣保證他們不會同時宕掉.spa
客戶端應該按照如下操做來獲取鎖:
1.獲取當前時間戳,假設是T1.
2.依次嘗試從這5個Redis實例獲取鎖.當客戶端向Redis請求獲取鎖時,客戶端應該設置超時時間,而且這個超時時間應該小於鎖的失效時間.好比你的鎖自動失效時間爲10秒,則超時時間應該在5-50毫秒之間.這樣能夠避免Redis已經掛掉的狀況下,客戶端還在等待響應結果.若是Redis沒有在規定時間內響應,客戶端應該儘快嘗試去另一個Redis實例請求獲取鎖.
3.請求完全部的Redis節點後,只有知足以下兩點,纔算真正的獲取到鎖:
1)當前時間 - T1 的時間差小於鎖的過時時間.好比T1=00:00:00,而後從5個Redis節點都拿到了鎖,當前時間是 00:00:05,也就是說獲取鎖一共用了5秒鐘.假設鎖的過時時間是3秒,那麼此次獲取鎖的操做就算失敗了.
2)從(N/2+1)個Redis節點都獲取到鎖.這個很好理解,5個節點,你拿2個,我拿2個,到底算誰的?
總結一句話就是:從開始獲取鎖計時,只要在鎖的過時時間內成功獲取到一半以上的鎖便算成功,不然算失敗.
4.當客戶端獲取到了鎖,鎖的真正有效時間 = 鎖的過時時間 - 獲取鎖所使用的時間(也就是第3步計算出來的時間).
5.若是客戶端因爲某些緣由(好比獲取鎖的實例個數小於N/2+1,或者已經超過了有效時間),沒有獲取到鎖,客戶端便會在全部的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功),由於可能已經獲取了小於 N/2+1個鎖,必須釋放掉,不然會影響其餘客戶端獲取鎖.
關因而否啓動AOF永久存儲,須要有所取捨.
1.永久啓動,因爲Redis的過時機制是按照unix時間戳走的,因此當咱們重啓Redis後,依然會按照規定的時間過時.可是永久啓動對性能有必定影響;
2.採用默認的1秒1次.若是在1秒內斷電,會致使數據丟失,這時候若是馬上重啓會致使鎖的互斥性實效.
因此有效的解決方案是,採用AOF,1秒1次,無論什麼緣由宕機後,等待必定時間再重啓.這個時間就是鎖的過時時間.
Demo:
安裝官方提供的 RedLock.net
Startup:
public class Startup { private RedLockFactory _redLockFactory; public void ConfigureServices(IServiceCollection services) { services.AddControllers(); var endPoints = new List<RedLockEndPoint> { new DnsEndPoint("127.0.0.1", 6379), new DnsEndPoint("127.0.0.1", 6380), new DnsEndPoint("127.0.0.1", 6381) }; _redLockFactory = RedLockFactory.Create(endPoints); services.AddSingleton(typeof(IDistributedLockFactory), _redLockFactory); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime applicationLifetime) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } //應用程序結束時釋放,由於不是容器建立的對象 applicationLifetime.ApplicationStopping.Register(() => { _redLockFactory.Dispose(); }); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
測試api:
[ApiController] public class ValuesController : ControllerBase { private static int _stock = 10; private readonly IDistributedLockFactory _distributedLockFactory; public ValuesController(IDistributedLockFactory distributedLockFactory) { _distributedLockFactory = distributedLockFactory; } [Route("lockTest")] [HttpGet] public async Task<int> DistributedLockTest() { // resource 鎖定的資源 var resource = "the-thing-we-are-locking-on"; // expiryTime 鎖的過時時間 var expiry = TimeSpan.FromSeconds(5); // waitTime 等待時間 var wait = TimeSpan.FromSeconds(1); // retryTime 等待時間內,多久重試一次 var retry = TimeSpan.FromMilliseconds(250); using (var redLock = await _distributedLockFactory.CreateLockAsync(resource, expiry, wait, retry)) { if (redLock.IsAcquired) { // 模擬執行業務邏輯 await Task.Delay(new Random().Next(100, 500)); if (stock > 0) { stock--; return stock; } return stock; } Console.WriteLine($"{DateTime.Now} : 獲取鎖失敗"); } return -99; } }
測試控制檯:
static void Main(string[] args) { HttpClient client = new HttpClient(); var result = Parallel.For(0, 20, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var response = client.GetAsync($"http://localhost:5000/locktest").Result; stopwatch.Stop(); var data = response.Content.ReadAsStringAsync().Result; Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); }); client.Dispose(); Console.ReadKey(); }
測試結果: