【分佈式鎖】RedLock 實現分佈式鎖

 

併發是程序開發中不可避免的問題,根據系統面向用戶、功能場景的不一樣,併發的重視程度會有不一樣。從程序的角度來講,併發意味着相同的時間點執行了相同的代碼,而有些狀況是不被容許的,好比:轉帳、搶購佔庫存等,若是沒有作好臨界條件的驗證,會帶來很是嚴重的後果。追根結底是由於併發引發的數據不一致問題,面對併發,咱們一般會採用鎖來優化。redis

場景模擬

以下模擬搶購的示例代碼(C#):算法

// 有10個商品庫存
private static int stockCount = 10;

public bool Buy()
{
    // 模擬執行的邏輯代碼花費的時間
    Thread.Sleep(new Random().Next(100,500));
    if (stockCount > 0)
    {
        stockCount--;
        return true;
    }
    return false;
}
var test = new Test();

Parallel.For(1, 16, (i) =>
{
    var stopwatch = new Stopwatch();
    stopwatch.Start();
    var data = test.Buy();
    stopwatch.Stop();
    Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}");
});
Console.ReadKey();

模擬並行調用 Buy 方法 15 次(內部使用的是線程池,因此 ThreadId 會有重複),實際上只有 10 個庫存,返回結果卻顯示 11 個請求都購買成功了。數據庫

單機部署模式解決方案

在單機部署模式下,咱們只須要加 lock(){} 就能夠解決問題:json

// 有10個商品庫存
private static int stockCount = 10;

private static object obj = new object();

public bool Buy()
{
    lock (obj)
    {
        // 模擬執行的邏輯代碼花費的時間
        Thread.Sleep(new Random().Next(100, 500));
        if (stockCount > 0)
        {
            stockCount--;
            return true;
        }
        return false;
    }
}

從輸出結果中能夠看出,確實只有10個請求是顯示購買成功,但同時發現部分請求的執行時間明顯變長,這就是加鎖帶來的最直觀影響,當某個線程得到鎖以後,在沒有釋放以前,其餘線程只能繼續等待,併發越高,更多的線程須要等待輪流被處理。架構

各類語言通常都提供了鎖的實現,用法大同小異,語言自己實現的鎖只能做用於當前進程內,因此在單機模式部署的系統中使用基本沒什麼問題。併發

集羣部署模式解決方案(分佈式鎖)

在集羣模式下,系統部署於多臺機器(一個系統運行在多個進程中),語言自己實現的鎖只能確保當前進程內有效(基於內存),多進程就沒辦法共享鎖狀態,這時咱們就得考慮採用分佈式鎖,分佈式鎖能夠採用 數據庫ZooKeeperRedis 等來實現,最終都是爲了達到在不一樣的進程、線程內能共享鎖狀態的目的。app

這裏將介紹基於 Redis 的 RedLock.net 來解決分佈式下的併發問題,RedLock.net 是 RedLock 分佈式鎖算法的 .NET 版實現 (大部分語言都有對應的實現,查看) ,RedLock 分佈式鎖算法是由 Redis 的做者提出。框架

 

 

RedLock 簡介

RedLock 的思想是使用多臺 Redis Master ,節點徹底獨立,節點間不須要進行數據同步,由於 Master-Slave 架構一旦 Master 發生故障時數據沒有複製到 Slave,被選爲 Master 的 Slave 就丟掉了鎖,另外一個客戶端就能夠再次拿到鎖。鎖經過 setNX(原子操做) 命令設置,在有效時間內當得到鎖的數量大於 (n/2+1) 表明成功,失敗後須要向全部節點發送釋放鎖的消息。dom

獲取鎖:async

SET resource_name my_random_value NX PX 30000

釋放鎖:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

RedLock.net 集成

  1. 建立 .NETCore API 項目

  2. Nuget 安裝 RedLock.net

    Install-Package RedLock.net

     

  3. appsettings.json 添加 redis 配置
    {
      "RedisUrl": "127.0.0.1:6379", // 多個用,分割
      ...
    }

     

  4. 添加 ProductService.cs,模擬商品購買
    // 有10個商品庫存,若是同時啓動多個API服務進行測試,這裏改爲存數據庫或其餘方式
    private static int stockCount = 10;
    public async Task<bool> BuyAsync()
    {
        // 模擬執行的邏輯代碼花費的時間
        await Task.Delay(new Random().Next(100, 500));
        if (stockCount > 0)
        {
            stockCount--;
            return true;
        }
        return false;
    }

     

  5. 修改 Startup.cs ,建立 RedLockFactory

    定義 RedLockFactory 變量:

    private RedLockFactory lockFactory;

    添加方法:

    private RedLockFactory GetRedLockFactory()
    {
        var redisUrl = Configuration["RedisUrl"];
        if (string.IsNullOrEmpty(redisUrl))
        {
            throw new ArgumentException("RedisUrl 不能爲空");
        }
        var urls = redisUrl.Split(",").ToList();
        var endPoints = new List<RedLockEndPoint>();
        foreach (var item in urls)
        {
            var arr = item.Split(":");
            endPoints.Add(new DnsEndPoint(arr[0], Convert.ToInt32(arr[1])));
        }
        return RedLockFactory.Create(endPoints);
    }

    在 ConfigureServices 注入 IDistributedLockFactory:

    lockFactory = GetRedLockFactory();
    services.AddSingleton(typeof(IDistributedLockFactory), lockFactory);
    services.AddScoped(typeof(ProductService));

    修改 Configure,應用程序結束時釋放 lockFactory :

    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ...
    
        lifetime.ApplicationStopping.Register(() =>
        {
            lockFactory.Dispose();
        });
    }

     

  6. 在 Controller 添加方法 DistributedLockTest
    private readonly IDistributedLockFactory _distributedLockFactory;
    private readonly ProductService _productService;
    
    public HomeController(IDistributedLockFactory distributedLockFactory,
        ProductService productService)
    {
        _distributedLockFactory = distributedLockFactory;
        _productService = productService;
    }
    
    [HttpGet]
    public async Task<bool> DistributedLockTest()
    {
        var productId = "id";
        // resource 鎖定的對象
        // expiryTime 鎖定過時時間,鎖區域內的邏輯執行若是超過過時時間,鎖將被釋放
        // waitTime 等待時間,相同的 resource 若是當前的鎖被其餘線程佔用,最多等待時間
        // retryTime 等待時間內,多久嘗試獲取一次
        using (var redLock = await _distributedLockFactory.CreateLockAsync(productId, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(20)))
        {
            if (redLock.IsAcquired)
            {
                var result = await _productService.BuyAsync();
                return result;
            }
            else
            {
                Console.WriteLine($"獲取鎖失敗:{DateTime.Now}");
            }
        }
        return false;
    }

     

  7. 調用接口測試
       Parallel.For(1, 16, (i) =>
    {
        var stopwatch = new Stopwatch();
        stopwatch.Start();
        var data = GetAsync($"http://localhost:5000/home/distributedLockTest").Result;
        stopwatch.Stop();
        Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}");
    });

    關於 RedLock 分佈式鎖算法的爭議你們能夠參考:

    How to do distributed locking
    Is Redlock safe?

    總結

    若是使用鎖,必然對性能上會有必定影響,咱們須要根據實際場景來判斷是真正須要。在指定鎖過時時間時要相對合理,避免出現鎖已過時,但邏輯還沒執行完成,這樣就失去了鎖的意義,固然這種狀況下咱們還能夠考慮重入鎖。

    最後推薦一下微軟開源的一個基於 Actor 模型的分佈式框架 Orleans,也能夠達到分佈式鎖的效果。原文地址:https://mp.weixin.qq.com/s/gwiCY6qfLtWLDe2AjlbOsw,http://beckjin.com/2019/01/06/redLock-net/

相關文章
相關標籤/搜索