多線程併發編程「鎖」 事

多線程技術是提升系統併發能力的重要技術,在應用多線程技術時須要注意不少問題,如線程退出問題、CPU及內存資源利用問題、線程安全問題等,本文主要講線程安全問題及如何使用「鎖」來解決線程安全問題。

1、相關概念
在瞭解鎖以前,首先闡述一下線程安全問題涉及到的相關概念:程序員

線程安全
若是你的代碼所在的進程中有多個線程在同時運行,而這些線程可能會同時運行這段代碼。若是每次運行結果和單線程運行的結果是同樣的,並且其餘變量的值也和預期的是同樣的,則是線程安全的。線程安全問題是由共享資源引發的,能夠是一個全局變量、一個文件、一個數據庫表中的某條數據,當多個線程同時訪問這類資源的時候,就可能存在線程安全問題。redis

臨界資源
臨界資源是一次僅容許一個進程(線程)使用的共享資源,當其餘進程(線程)訪問該共享資源時須要等待。數據庫

臨界區
臨界區是指一個訪問共享資源的代碼段。數組

線程同步
爲了解決線程安全問題,一般採用「序列化訪問臨界資源」的方案,或者叫「串行化訪問臨界資源」,即在同一時刻,保證只能有一個線程訪問臨界資源,也稱線程同步互斥訪問。安全


鎖是實現線程同步的重要手段,它將包圍的代碼語句塊標記爲臨界區,這樣一次只有一個線程進入臨界區執行代碼。多線程

2、同一個進程內多線程併發鎖
Lock
對於單進程內的多線程併發場景,咱們可使用語言和類庫提供的鎖,如下以C#鎖爲例說明鎖是如何作到線程安全的。先來看一段示例代碼。CountService爲計數服務類,提供了一個參數的構造方法,參數爲是否加鎖,默認爲不加。併發

public class CountService
{
   private int count;
   private readonly object lockObj;
   private readonly bool withLock = true;
   public CountService(bool withLock = false)
   {
       count = 0;
       this.withLock = withLock;
       lockObj = new object();
   }
   public void Increment()
   {
       if (withLock)
       {
           lock (lockObj)
           {
               count++;
           }
       }
       else
           count++;
   }
   public int GetCountValue()
   {
       return count;
   }
}

而後模擬多線程調用,代碼以下:分佈式

class Program
{
    static void Main(string[] args)
    {
        for (int i = 0; i < 10; i++)
        {
            var taskList = new List<Task>();
            CountService service = new CountService(false);
           
            for (int j = 0; j < 1000; j++)
            {
                taskList.Add(
                    Task.Run(() =>
                    {
                        service.Increment();
                    })
                );
            }
            Task.WaitAll(taskList.ToArray());
            Console.WriteLine(service.GetCountValue());
        }
        Console.Read();
    }
}

若是按照單線程執行,預期的結果會在控制檯輸出10個1000,但真實的結果倒是以下圖所示,而且可能每次輸出的結果都不一致。
圖片描述工具

若是在計數服務實例化時,參數改成true,則能夠獲得預期的結果,因此加鎖能夠保證計數服務對象是線程安全的。C#中lock 語句獲取給定對象的互斥鎖(也能夠叫做排它鎖),執行語句塊,而後釋放鎖。 持有鎖時,持有鎖的線程能夠再次獲取並釋放鎖。它能夠阻止任何其餘線程獲取鎖並等待釋放鎖。lock是一個語法糖,它的內部實現使用的是Monitor,至關於以下代碼。
bool isGetLock = false;
//lockObj 是私有靜態變量性能

Monitor.Enter(lockObj, ref isGetLock);
try
{
    do something…
}
finally
{
    if(isGetLock == true)
        Monitor.Exit(lockObj);
}

原理:

那Monitor.Enter和Monitor.Exit 到底是怎麼工做的呢?CRL初始化時在堆中分配一個同步塊數組,每當一個對象在堆中建立的時候,都有兩個額外的開銷字段與它關聯。第一個是「類型對象指針」,值爲類型的「類型對象」的內存地址。第二個是「同步塊索引」,值爲同步塊數據組中的一個整數索引。一個對象在構造時,它的同步塊索引初始化爲-1,代表不引用任何同步塊。而後,調用Monitor.Enter時,CLR在同步塊數組中找到一個空白同步塊,並設置對象的同步塊索引,讓它引用該同步塊。調用Exit時,會檢查是否有其餘任何線程正在等待使用對象的同步塊。若是沒有線程在等待它,同步塊就自由了,會將對象的同步塊索引設回-1,自由的同步塊未來能夠和另外一個對象關聯。下圖反映的就是對象與同步塊的關聯關係。
圖片描述

建議:

NET提供了能夠跨進程使用的鎖,如Mutex、Semaphore等。 Mutex、Semaphore須要先把託管代碼轉成本地用戶模式代碼、再轉換成本地內核代碼。當釋放後須要從新轉換成託管代碼,性能會有必定的損耗,因此儘可能在須要跨進程的場景使用。咱們的實際開發中這種場景很少,本文再也不詳細介紹。可參考微軟官方文檔:https://docs.microsoft.com/zh...
.NET提供了線程安全的集合,這些集合在內部實現了線程同步,咱們能夠直接使用。

對於簡單的狀態更改,如遞增、遞減、求和、賦值等,微軟官方建議使用 Interlocked 類的方法,而不是 lock 語句。雖然 lock 語句是實用的通用工具,但 Interlocked 類提高了更新(必須是原子操做)的性能。如能夠實現如下代碼的替代
圖片描述
圖片描述
注意事項:

避免鎖定能夠被公共訪問的對象lock(this)、lock(typeof(ClassName)) 、lock(public static variable) 、lock(public const variable),都存在可能被其餘代碼鎖定的狀況,這樣會阻塞你本身的代碼。

禁止鎖定字符串在編譯階段若是兩個變量的字符串內容相同的話,CLR會將字符串放在(Intern Pool)駐留池(暫存池)中,以此來保證相同內容的字符串引用的地址是相同的。因此若是有兩個地方都在使用lock("myLock")的話,它們實際鎖住的是同一個對象。
禁止鎖定值類型的對象Monitor的方法參數爲object類型,因此傳遞值類型會致使值類型被裝箱,形成線程在已裝箱對象上獲取鎖。每次調用Moitor.Enter都會在一個徹底不一樣的對象上獲取鎖,因此徹底沒法實現線程同步。

避免死鎖若是兩個線程中的每一個線程都嘗試鎖定另外一個線程已鎖定的資源,則會發生死鎖。咱們應該保證每塊代碼鎖定對象的順序一致。儘可能避免鎖定可被公共訪問的對象,由於私有對象只有咱們本身用,咱們能夠保證鎖的正確使用。咱們還能夠利用Monitor.Enter來檢測死鎖,該方法支持設置獲取鎖的超時時間,好比,Monitor.TryEnter(lockObject, 300),若是在300毫秒內沒有獲取鎖,該方法返回false。

3、分佈式集羣下的多線程併發鎖
C#中,lock(Monitor)、Mutex、Semaphore只適用於單機環境,解決不了分佈式集羣環境中,各節點多線程併發的線程安全問題。對於分佈式場景,咱們可使用分佈式鎖。

經常使用的分佈式鎖有:

Memcached分佈式鎖
Memcached的add命令是原子性操做,只有在key不存在的狀況下,才能add成功,並返回STORED,也就意味着線程獲得了鎖,若是key存在,返回NOT_STORED ,則說明有其餘線程已經拿到鎖。

Redis分佈式鎖
和Memcached的方式相似,利用Redis的set命令。此命令一樣是原子性操做,只有在key不存在的狀況下,才能set成功。當一個線程執行set返回OK,說明key本來不存在,該線程成功獲得了鎖;當一個線程執行set返回-1,說明key已經存在,該線程搶鎖失敗。

Zookeeper分佈式鎖
把ZooKeeper上的一個節點看做是一個鎖,得到鎖就經過建立臨時節點的方式來實現。ZooKeeper 會保證在全部客戶端中,最終只有一個客戶端可以建立成功,那麼就能夠認爲該客戶端得到了鎖。同時,全部沒有獲取到鎖的客戶端就須要到/exclusive_lock 節點上註冊一個子節點變動的Watcher監聽,以便實時監聽到lock節點的變動狀況。等拿到鎖的客戶端執行完業務邏輯後,客戶端就會主動將本身建立的臨時節點刪除,釋放鎖,而後ZooKeeper 會通知全部在 /exclusive_lock 節點上註冊了節點變動 Watcher 監聽的客戶端。這些客戶端在接收到通知後,再次從新發起分佈式鎖獲取請求。

主要講一下Redis分佈式鎖及常見問題
Redis加鎖的僞代碼:

if(set(key,value,30,NX) == "OK")
{
    try
    {
        do something...
    }
    finally
    {
        del(key)
    }
}

key是鎖的惟一標識,通常是按業務來決定命名。好比要給用戶註冊代碼加鎖,能夠給key命名爲 「lock_user_regist_用戶手機號」。

30爲鎖的超時時間,單位爲秒,若是不設置超時時間,一但獲得鎖的線程在執行任務的過程當中掛掉,來不及顯式地釋放鎖,這塊資源將會永遠被鎖住,別的線程就再也進不來了。設置了超時時間,即便因不可控因素致使了沒有顯式的釋放鎖,最多也就只鎖定這些時間即可自動恢復。可是指定了超時時間,還會引出其餘問題,後邊會講。

NX表明只在鍵不存在時,纔對鍵進行設置操做,並返回OK。

當業務處理完畢,finally中執行redis del指令將鎖刪除。刪除鎖時可能出現一種異常的場景,好比線程A成功獲得了鎖,而且設置的超時時間是30秒。因某些緣由致使線程A執行了很長時間(過了30秒都沒執行完),這時候鎖過時自動釋放,線程B獲得了鎖。隨後,線程A執行完了任務,線程A接着執行del指令來釋放鎖。但這時候線程B還沒執行完,線程A實際上刪除的是線程B加的鎖。如何避免這種狀況呢?能夠在del釋放鎖以前作一個判斷,驗證當前的鎖是否是本身加的鎖。至於具體的實現,能夠在加鎖的時候生成一個隨機數,儘量的不重複,能夠用Guid生成一個隨機字符串作value,並在刪除以前驗證key對應的value是否是當前線程生成的Guid字符串。
加鎖僞代碼:

string value = Guid.NewGuid().ToString();
set(key,value,30,NX);

解鎖僞代碼:

if(value.Equals(redisClient.get(key))
{
    del(key);
}

但這又引出了一個新的問題,判斷及解鎖是兩個獨立的指令,不是原子性操做,這就得須要藉助Lua腳本實現。將解鎖的代碼封裝爲Lua腳本,在須要解鎖的時候,發送執行腳本的指令。

應用上邊講到的方法,儘管咱們避免了線程A誤刪除掉鎖的狀況,可是同一時間有A、B兩個線程在訪問代碼,這自己就不是線程安全的。如何保證線程安全呢?產生該現象的緣由就在於咱們給鎖指定了超時時間,不是說超時時間加的不對,而是咱們應該想辦法能給鎖「續命」,即當過去29秒了,線程A還沒執行完,咱們要有一種機制能夠定時重置一下鎖的超時時間。思路大概爲讓得到鎖的線程開啓一個守護線程,用來重置快要過時的鎖的超時時間,若是超時時間設置爲30秒,守護線程能夠從第29秒開始,每25秒執行一次expire指令,當線程A執行完成後,顯式關掉守護線程。還有一些程序員可能會出現如下寫法,無論if條件有沒有成立,finally都會執行刪除鎖的命令,即便鎖沒有過時也會出現線程鎖被誤刪除的狀況,你們必定要注意。固然若是你已經應用上邊講的改進方案,避免了鎖被其餘線程誤刪,可是這個也是得不償失的,沒有獲取到鎖的線程沒有必要去執行刪除鎖的命令。錯誤的Redis加鎖僞代碼:

try
{
    if(set(key,value,30,NX) == 「OK」)
    {
         do something...        
    }
}
finally
{
    del(key)
}

4、總結本文對多線程併發環境中,保證線程安全的「鎖」方案進行了儘量詳細的講解,平時咱們在設計高性能、低延遲開發方案時,務必要考慮因併發訪問致使的數據安全性問題。

相關文章
相關標籤/搜索