如何設計並實現一個線程安全的 Map ?(下篇)

在上篇中,咱們已經討論過如何去實現一個 Map 了,而且也討論了諸多優化點。在下篇中,咱們將繼續討論如何實現一個線程安全的 Map。說到線程安全,須要從概念開始提及。php

線程安全就是若是你的代碼塊所在的進程中有多個線程在同時運行,而這些線程可能會同時運行這段代碼。若是每次運行結果和單線程運行的結果是同樣的,並且其餘的變量的值也和預期的是同樣的,就是線程安全的。html

若是代碼塊中包含了對共享數據的更新操做,那麼這個代碼塊就多是非線程安全的。可是若是代碼塊中相似操做都處於臨界區之中,那麼這個代碼塊就是線程安全的。node

一般有如下兩類避免競爭條件的方法來實現線程安全:c++

第一類 —— 避免共享狀態

  1. 可重入 Re-entrancy)

一般在線程安全的問題中,最多見的代碼塊就是函數。讓函數具備線程安全的最有效的方式就是使其可重入。若是某個進程中全部線程均可以併發的對函數進行調用,而且不管他們調用該函數的實際執行狀況怎麼樣,該函數均可以產生預期的結果,那麼就能夠說這個函數是可重入的。git

若是一個函數把共享數據做爲它的返回結果或者包含在它返回的結果中,那麼該函數就確定不是一個可重入的函數。任何內含了操做共享數據的代碼的函數都是不可重入的函數。github

爲了實現線程安全的函數,把全部代碼都置放於臨界區中是可行的。可是互斥量的使用總會耗費必定的系統資源和時間,使用互斥量的過程總會存在各類博弈和權衡。因此請合理使用互斥量保護好那些涉及共享數據操做的代碼。算法

注意:可重入只是線程安全的充分沒必要要條件,並非充要條件。這個反例在下面會講到。編程

  1. 線程本地存儲

若是變量已經被本地化,因此每一個線程都有本身的私有副本。這些變量經過子程序和其餘代碼邊界保留它們的值,而且是線程安全的,由於這些變量都是每一個線程本地存儲的,即便訪問它們的代碼可能被另外一個線程同時執行,依舊是線程安全的。數組

  1. 不可變量

對象一旦初始化之後就不能改變。這意味着只有只讀數據被共享,這也實現了固有的線程安全性。可變(不是常量)操做能夠經過爲它們建立新對象,而不是修改現有對象的方式去實現。 Java,C#和
Python 中的字符串的實現就使用了這種方法。緩存

第二類 —— 線程同步

第一類方法都比較簡單,經過代碼改造就能夠實現。可是若是遇到必定要進行線程中共享數據的狀況,第一類方法就解決不了了。這時候就出現了第二類解決方案,利用線程同步的方法來解決線程安全問題。

今天就從線程同步開始提及。


一. 線程同步理論

在多線程的程序中,多以共享數據做爲線程之間傳遞數據的手段。因爲一個進程所擁有的至關一部分虛擬內存地址均可以被該進程中全部線程共享,因此這些共享數據大可能是之內存空間做爲載體的。若是兩個線程同時讀取同一塊共享內存但獲取到的數據卻不一樣,那麼程序很容易出現一些 bug。

爲了保證共享數據一致性,最簡單而且最完全的方法就是使該數據成爲一個不變量。固然這種絕對的方式在大多數狀況下都是不可行的。好比函數中會用到一個計數器,記錄函數被調用了幾回,這個計數器確定就不能被設爲常量。那這種必須是變量的狀況下,還要保證共享數據的一致性,這就引出了臨界區的概念。

臨界區的出現就是爲了使該區域只能被串行的訪問或者執行。臨界區能夠是某個資源,也能夠是某段代碼。保證臨界區最有效的方式就是利用線程同步機制。

先介紹2種共享數據同步的方法。

1. 互斥量

在同一時刻,只容許一個線程處於臨界區以內的約束稱爲互斥,每一個線程在進入臨界區以前,都必須先鎖定某個對象,只有成功鎖定對象的線程才能容許進入臨界區,不然就會阻塞。這個對象稱爲互斥對象或者互斥量。

通常咱們平常說的互斥鎖就能達到這個目的。

互斥量能夠有多個,它們所保護的臨界區也能夠有多個。先從簡單的提及,一個互斥量和一個臨界區。

(一) 一個互斥量和一個臨界區

上圖就是一個互斥量和一個臨界區的例子。當線程1先進入臨界區的時候,當前臨界區處於未上鎖的狀態,因而它便先將臨界區上鎖。線程1獲取到臨界區裏面的值。

這個時候線程2準備進入臨界區,因爲線程1把臨界區上鎖了,因此線程2進入臨界區失敗,線程2由就緒狀態轉成睡眠狀態。線程1繼續對臨界區的共享數據進行寫入操做。

當線程1完成全部的操做之後,線程1調用解鎖操做。當臨界區被解鎖之後,會嘗試喚醒正在睡眠的線程2。線程2被喚醒之後,由睡眠狀態再次轉換成就緒狀態。線程2準備進入臨界區,當臨界區此到處於未上鎖的狀態,線程2便將臨界區上鎖。

通過 read、write 一系列操做之後,最終在離開臨界區的時候會解鎖。

線程在離開臨界區的時候,必定要記得把對應的互斥量解鎖。這樣其餘因臨界區被上鎖而致使睡眠的線程還有機會被喚醒。因此對同一個互斥變量的鎖定和解鎖必須成對的出現。既不能夠對一個互斥變量進行重複的鎖定,也不能對一個互斥變量進行屢次的解鎖。

若是對一個互斥變量鎖定屢次可能會致使臨界區最終永遠阻塞。可能有人會問了,對一個未鎖定的互斥變成解鎖屢次會出現什麼問題呢?

在 Go 1.8 以前,雖然對互斥變量解鎖屢次不會引發任何 goroutine 的阻塞,可是它可能引發一個運行時的恐慌。Go 1.8 以前的版本,是能夠嘗試恢復這個恐慌的,可是恢復之後,可能會致使一系列的問題,好比重複解鎖操做的 goroutine 會永久的阻塞。因此 Go 1.8 版本之後此類運行時的恐慌就變成了不可恢復的了。因此對互斥變量反覆解鎖就會致使運行時操做,最終程序異常退出。

(二) 多個互斥量和一個臨界區

在這種狀況下,極容易產生線程死鎖的狀況。因此儘可能不要讓不一樣的互斥量所保護的臨界區重疊。

上圖這個例子中,一個臨界區中存在2個互斥量:互斥量 A 和互斥量
B。

線程1先鎖定了互斥量 A ,接着線程2鎖定了互斥量 B。當線程1在成功鎖定互斥量 B 以前永遠不會釋放互斥量 A。一樣,線程2在成功鎖定互斥量 A 以前永遠不會釋放互斥量 B。那麼這個時候線程1和線程2都因沒法鎖定本身須要鎖定的互斥量,都由 ready 就緒狀態轉換爲 sleep 睡眠狀態。這是就產生了線程死鎖了。

線程死鎖的產生緣由有如下幾種:

    1. 系統資源競爭
    1. 進程推薦順序非法
    1. 死鎖必要條件(必要條件中任意一個不知足,死鎖都不會發生)
      (1). 互斥條件
      (2). 不剝奪條件
      (3). 請求和保持條件
      (4). 循環等待條件

想避免線程死鎖的狀況發生有如下幾種方法能夠解決:

    1. 預防死鎖
      (1). 資源有序分配法(破壞環路等待條件)
      (2). 資源原子分配法(破壞請求和保持條件)
    1. 避免死鎖
      銀行家算法
    1. 檢測死鎖
      死鎖定理(資源分配圖化簡法),這種方法雖然能夠檢測,可是沒法預防,檢測出來了死鎖還須要配合解除死鎖的方法才行。

完全解決死鎖有如下幾種方法:

    1. 剝奪資源
    1. 撤銷進程
    1. 試鎖定 — 回退
      若是在執行一個代碼塊的時候,須要前後(順序不定)鎖定兩個變量,那麼在成功鎖定其中一個互斥量以後應該使用試鎖定的方法來鎖定另一個變量。若是試鎖定第二個互斥量失敗,就把已經鎖定的第一個互斥量解鎖,並從新對這兩個互斥量進行鎖定和試鎖定。

如上圖,線程2在鎖定互斥量 B 的時候,再試鎖定互斥量 A,此時鎖定失敗,因而就把互斥量 B 也一塊兒解鎖。接着線程1會來鎖定互斥量 A。此時也不會出現死鎖的狀況。

    1. 固定順序鎖定

這種方式就是讓線程1和線程2都按照相同的順序鎖定互斥量,都按成功鎖定互斥量1之後才能去鎖定互斥量2 。這樣就能保證在一個線程徹底離開這些重疊的臨界區以前,不會有其餘一樣須要鎖定那些互斥量的線程進入到那裏。

(三) 多個互斥量和多個臨界區

多個臨界區和多個互斥量的狀況就要看是否會有衝突的區域,若是出現相互交集的衝突區域,後進臨界區的線程就會進入睡眠狀態,直到該臨界區的線程完成任務之後,再被喚醒。

通常狀況下,應該儘可能少的使用互斥量。每一個互斥量保護的臨界區應該在合理範圍內並儘可能大。可是若是發現多個線程會頻繁出入某個較大的臨界區,而且它們之間常常存在訪問衝突,那麼就應該把這個較大的臨界區劃分的更小一點,並使用不一樣的互斥量保護起來。這樣作的目的就是爲了讓等待進入同一個臨界區的線程數變少,從而下降線程被阻塞的機率,並減小它們被迫進入睡眠狀態的時間,這從必定程度上提升了程序的總體性能。

在說另一個線程同步的方法以前,回答一下文章開頭留下的一個疑問:可重入只是線程安全的充分沒必要要條件,並非充要條件。這個反例在下面會講到。

這個問題最關鍵的一點在於:mutex 是不可重入的

舉個例子:

在下面這段代碼中,函數 increment_counter 是線程安全的,但不是可重入的。

#include <pthread.h>

int increment_counter () {
    static int counter = 0;
    static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

    pthread_mutex_lock(&mutex);

    // only allow one thread to increment at a time
    ++counter;
    // store value before any other threads increment it further
    int result = counter;    

    pthread_mutex_unlock(&mutex);

    return result;
}複製代碼

上面的代碼中,函數 increment_counter 能夠在多個線程中被調用,由於有一個互斥鎖 mutex 來同步對共享變量 counter 的訪問。可是若是這個函數用在可重入的中斷處理程序中,若是在
pthread_mutex_lock(&mutex) 和 pthread_mutex_unlock(&mutex)
之間產生另外一個調用函數 increment_counter 的中斷,則會第二次執行此函數,此時因爲 mutex 已被 lock,函數會在 pthread_mutex_lock(&mutex) 處阻塞,而且因爲 mutex 沒有機會被
unlock,阻塞會永遠持續下去。簡言之,問題在於 pthread 的 mutex 是不可重入的。

解決辦法是設定 PTHREAD_MUTEX_RECURSIVE 屬性。然而對於給出的問題而言,專門使用一個 mutex 來保護一次簡單的增量操做顯然過於昂貴,所以 c++11 中的 原子變量&action=edit&redlink=1) 提供了一個可以使此函數既線程安全又可重入(並且還更簡潔)的替代方案:

#include <atomic>

int increment_counter () {
    static std::atomic<int> counter(0);

    // increment is guaranteed to be done atomically
    int result = ++counter;

    return result;
}複製代碼

在 Go 中,互斥量在標準庫代碼包 sync 中的 Mutex 結構體表示的。sync.Mutex 類型只有兩個公開的指針方法,Lock 和 Unlock。前者用於鎖定當前的互斥量,後者則用於對當前的互斥量進行解鎖。

2. 條件變量

在線程同步的方法中,還有一個能夠與互斥量相提並論的同步方法,條件變量。

條件變量與互斥量不一樣,條件變量的做用並非保證在同一時刻僅有一個線程訪問某一個共享數據,而是在對應的共享數據的狀態發生變化時,通知其餘所以而被阻塞的線程。條件變量老是與互斥變量組合使用的。

這類問題其實很常見。先用生產者消費者的例子來舉例。

若是不用條件變量,只用互斥量,來看看會發生什麼後果。

生產者線程在完成添加操做以前,其餘的生產者線程和消費者線程都沒法進行操做。同一個商品也只能被一個消費者消費。

若是隻用互斥量,可能會出現2個問題。

    1. 生產者線程得到了互斥量之後,卻發現商品已滿,沒法再添加新的商品了。因而該線程就會一直等待。新的生產者也進入不了臨界區,消費者也沒法進入。這時候就死鎖了。
    1. 消費者線程得到了互斥量之後,卻發現商品是空的,沒法消費了。這個時候該線程也是會一直等待。新的生產者和消費者也都沒法進入。這時候一樣也死鎖了。

這就是隻用互斥量沒法解決的問題。在多個線程之間,急需一套同步的機制,能讓這些線程都協做起來。

條件變量就是你們熟悉的 P - V 操做了。這塊你們應該比較熟悉,因此簡單的過一下。

P 操做就是 wait 操做,它的意思就是阻塞當前線程,直到收到該條件變量發來的通知。

V 操做就是 signal 操做,它的意思就是讓該條件變量向至少一個正在等待它通知的線程發送通知,以表示某個共享數據的狀態已經變化。

Broadcast 廣播通知,它的意思就是讓條件變量給正在等待它通知的全部線程發送通知,以表示某個共享數據的狀態已經發生改變。

signal 能夠操做屢次,若是操做3次,就表明發了3次信號通知。如上圖。

P - V 操做設計美妙之處在於,P 操做的次數與 V 操做的次數是相同的。wait 多少次,signal 對應的有多少次。看上圖,這個循環就是這麼的奇妙。

生產者消費者問題

這個問題能夠形象的描述成像上圖這樣,門衛守護着臨界區的安全。售票廳記錄着當前 semaphone 的值,它也控制着門衛是否打開臨界區。

臨界區只容許一個線程進入,當已經有一個線程了,再來一個線程,就會被 lock 住。售票廳也會記錄當前阻塞的線程數。

當以前的線程離開之後,售票廳就會告訴門衛,容許一個線程進入臨界區。

用 P-V 僞代碼來描述生產者消費者:

初始變量:

semaphore  mutex = 1; // 臨界區互斥信號量
semaphore  empty = n; // 空閒緩衝區個數
semaphore  full = 0; // 緩衝區初始化爲空複製代碼

生產者線程:

producer()
{
  while(1) {
    produce an item in nextp;
    P(empty);
    P(mutex);
    add nextp to buffer;
    V(mutex);
    V(full);
  }
}複製代碼

消費者線程:

consumer()
{
  while(1) {
    P(full);
    P(mutex);
    remove an item from buffer;
    V(mutex);
    V(empty);
    consume the item;
  }
}複製代碼

雖然在生產者和消費者單個程序裏面 P,V 並非成對的,可是整個程序裏面 P,V 仍是成對的。

讀者寫者問題——讀者優先,寫者延遲

讀者優先,寫進程被延遲。只要有讀者在讀,後來的讀者均可以隨意進來讀。

讀者要先進入 rmutex ,查看 readcount,而後修改 readcout 的值,最後再去讀數據。對於每一個讀進程都是寫者,都要進去修改 readcount 的值,因此還要單獨設置一個 rmutex 互斥訪問。

初始變量:

int readcount = 0;     // 讀者數量
semaphore  rmutex = 1; // 保證更新 readcount 互斥
semaphore  wmutex = 1; // 保證讀者和寫着互斥的訪問文件複製代碼

讀者線程:

reader()
{
  while(1) {
    P(rmutex);              // 準備進入,修改 readcount,「開門」
    if(readcount == 0) {    // 說明是第一個讀者
      P(wmutex);            // 拿到」鑰匙」,阻止寫線程來寫
    }
    readcount ++;
    V(rmutex);
    reading;
    P(rmutex);              // 準備離開
    readcount --;
    if(readcount == 0) {    // 說明是最後一個讀者
      V(wmutex);            // 交出」鑰匙」,讓寫線程來寫
    }
    V(rmutex);              // 離開,「關門」
  }
}複製代碼

寫者線程:

writer()
{
  while(1) {
    P(wmutex);
    writing;
    V(wmutex);
  }
}複製代碼

讀者寫者問題——寫者優先,讀者延遲

有寫者寫,禁止後面的讀者來讀。在寫者前的讀者,讀完就走。只要有寫者在等待,禁止後來的讀者進去讀。

初始變量:

int readcount = 0;     // 讀者數量
semaphore  rmutex = 1; // 保證更新 readcount 互斥
semaphore  wmutex = 1; // 保證讀者和寫着互斥的訪問文件
semaphore  w = 1;      // 用於實現「寫者優先」複製代碼

讀者線程:

reader()
{
  while(1) {
    P(w);                   // 在沒有寫者的時候才能請求進入
    P(rmutex);              // 準備進入,修改 readcount,「開門」
    if(readcount == 0) {    // 說明是第一個讀者
      P(wmutex);            // 拿到」鑰匙」,阻止寫線程來寫
    }
    readcount ++;
    V(rmutex);
    V(w);
    reading;
    P(rmutex);              // 準備離開
    readcount --;
    if(readcount == 0) {    // 說明是最後一個讀者
      V(wmutex);            // 交出」鑰匙」,讓寫線程來寫
    }
    V(rmutex);              // 離開,「關門」
  }
}複製代碼

寫者線程:

writer()
{
  while(1) {
    P(w);
    P(wmutex);
    writing;
    V(wmutex);
    V(w);
  }
}複製代碼

哲學家進餐問題

假設有五位哲學家圍坐在一張圓形餐桌旁,作如下兩件事情之一:吃飯,或者思考。吃東西的時候,他們就中止思考,思考的時候也中止吃東西。餐桌中間有一大碗意大利麪,每兩個哲學家之間有一隻餐叉。由於用一隻餐叉很難吃到意大利麪,因此假設哲學家必須用兩隻餐叉吃東西。他們只能使用本身左右手邊的那兩隻餐叉。哲學家就餐問題有時也用米飯和筷子而不是意大利麪和餐叉來描述,由於很明顯,吃米飯必須用兩根筷子。

初始變量:

semaphore  chopstick[5] = {1,1,1,1,1}; // 初始化信號量
semaphore  mutex = 1;                  // 設置取筷子的信號量複製代碼

哲學家線程:

Pi()
{
  do {
    P(mutex);                     // 得到取筷子的互斥量
    P(chopstick[i]);              // 取左邊的筷子
    P(chopstick[ (i + 1) % 5 ]);  // 取右邊的筷子
    V(mutex);                     // 釋放取筷子的信號量
    eat;
    V(chopstick[i]);              // 放回左邊的筷子
    V(chopstick[ (i + 1) % 5 ]);  // 放回右邊的筷子
    think;
  }while(1);
}複製代碼

綜上所述,互斥量能夠實現對臨界區的保護,並會阻止競態條件的發生。條件變量做爲補充手段,可讓多方協做更加有效率。

在 Go 的標準庫中,sync 包裏面 sync.Cond 類型表明了條件變量。可是和互斥鎖和讀寫鎖不一樣的是,簡單的聲明沒法建立出一個可用的條件變量,還須要用到 sync.NewCond 函數。

func NewCond( l locker) *Cond複製代碼

*sync.Cond 類型的方法集合中有3個方法,即 Wait、Signal 和 Broadcast 。

二. 簡單的線程鎖方案

實現線程安全的方案最簡單的方法就是加鎖了。

先看看 OC 中如何實現一個線程安全的字典吧。

在 Weex 的源碼中,就實現了一套線程安全的字典。類名叫 WXThreadSafeMutableDictionary。

/** * @abstract Thread safe NSMutableDictionary */
@interface WXThreadSafeMutableDictionary<KeyType, ObjectType> : NSMutableDictionary
@property (nonatomic, strong) dispatch_queue_t queue;
@property (nonatomic, strong) NSMutableDictionary* dict;
@end複製代碼

具體實現以下:

- (instancetype)initCommon
{
    self = [super init];
    if (self) {
        NSString* uuid = [NSString stringWithFormat:@"com.taobao.weex.dictionary_%p", self];
        _queue = dispatch_queue_create([uuid UTF8String], DISPATCH_QUEUE_CONCURRENT);
    }
    return self;
}複製代碼

該線程安全的字典初始化的時候會新建一個併發的 queue。

- (NSUInteger)count
{
    __block NSUInteger count;
    dispatch_sync(_queue, ^{
        count = _dict.count;
    });
    return count;
}

- (id)objectForKey:(id)aKey
{
    __block id obj;
    dispatch_sync(_queue, ^{
        obj = _dict[aKey];
    });
    return obj;
}

- (NSEnumerator *)keyEnumerator
{
    __block NSEnumerator *enu;
    dispatch_sync(_queue, ^{
        enu = [_dict keyEnumerator];
    });
    return enu;
}

- (id)copy{
    __block id copyInstance;
    dispatch_sync(_queue, ^{
        copyInstance = [_dict copy];
    });
    return copyInstance;
}複製代碼

讀取的這些方法都用 dispatch_sync 。

- (void)setObject:(id)anObject forKey:(id<NSCopying>)aKey
{
    aKey = [aKey copyWithZone:NULL];
    dispatch_barrier_async(_queue, ^{
        _dict[aKey] = anObject;
    });
}

- (void)removeObjectForKey:(id)aKey
{
    dispatch_barrier_async(_queue, ^{
        [_dict removeObjectForKey:aKey];
    });
}

- (void)removeAllObjects{
    dispatch_barrier_async(_queue, ^{
        [_dict removeAllObjects];
    });
}複製代碼

和寫入相關的方法都用 dispatch_barrier_async。

再看看 Go 用互斥量如何實現一個簡單的線程安全的 Map 吧。

既然要用到互斥量,那麼咱們封裝一個包含互斥量的 Map 。

type MyMap struct {
    sync.Mutex
    m map[int]int
}

var myMap *MyMap

func init() {
    myMap = &MyMap{
        m: make(map[int]int, 100),
    }
}複製代碼

再簡單的實現 Map 的基礎方法。

func builtinMapStore(k, v int) {
    myMap.Lock()
    defer myMap.Unlock()
    myMap.m[k] = v
}

func builtinMapLookup(k int) int {
    myMap.Lock()
    defer myMap.Unlock()
    if v, ok := myMap.m[k]; !ok {
        return -1
    } else {
        return v
    }
}

func builtinMapDelete(k int) {
    myMap.Lock()
    defer myMap.Unlock()
    if _, ok := myMap.m[k]; !ok {
        return
    } else {
        delete(myMap.m, k)
    }
}複製代碼

實現思想比較簡單,在每一個操做前都加上 lock,在每一個函數結束 defer 的時候都加上 unlock。

這種加鎖的方式實現的線程安全的字典,優勢是比較簡單,缺點是性能不高。文章最後會進行幾種實現方法的性能對比,用數字說話,就知道這種基於互斥量加鎖方式實現的性能有多差了。

在語言原生就自帶線程安全 Map 的語言中,它們的原生底層實現都不是經過單純的加鎖來實現線程安全的,好比 Java 的 ConcurrentHashMap,Go 1.9 新加的 sync.map。

三. 現代線程安全的 Lock - Free 方案 CAS

在 Java 的 ConcurrentHashMap 底層實現中大量的利用了 volatile,final,CAS 等 Lock-Free 技術來減小鎖競爭對於性能的影響。

在 Go 中也大量的使用了原子操做,CAS 是其中之一。比較並交換即 「Compare And Swap」,簡稱 CAS。

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)複製代碼

CAS 會先判斷參數 addr 指向的被操做值與參數 old 的值是否相等。若是至關,相應的函數纔會用參數 new 表明的新值替換舊值。不然,替換操做就會被忽略。

這一點與互斥鎖明顯不一樣,CAS 老是假設被操做的值不曾改變,並一旦確認這個假設成立,就當即進行值的替換。而互斥鎖的作法就更加謹慎,老是先假設會有併發的操做修改被操做的值,並須要使用鎖將相關操做放入臨界區中加以保護。能夠說互斥鎖的作法趨於悲觀,CAS 的作法趨於樂觀,相似樂觀鎖。

CAS 作法最大的優點在於能夠不建立互斥量和臨界區的狀況下,完成併發安全的值替換操做。這樣大大的減小了線程同步操做對程序性能的影響。固然 CAS 也有一些缺點,缺點下一章會提到。

接下來看看源碼是如何實現的。如下以64位爲例,32位相似。

TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0-25
    JMP    ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0-25
    JMP    ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0-25
    MOVQ    addr+0(FP), BP
    MOVQ    old+8(FP), AX
    MOVQ    new+16(FP), CX
    LOCK
    CMPXCHGQ    CX, 0(BP)
    SETEQ    swapped+24(FP)
    RET複製代碼

上述實現最關鍵的一步就是 CMPXCHG。

查詢 Intel 的文檔

文檔上說:

比較 eax 和目的操做數(第一個操做數)的值,若是相同,ZF 標誌被設置,同時源操做數(第二個操做)的值被寫到目的操做數,不然,清
ZF 標誌,而且把目的操做數的值寫回 eax。

因而也就得出了 CMPXCHG 的工做原理:

比較 _old 和 (*__ptr) 的值,若是相同,ZF 標誌被設置,同時
_new 的值被寫到 (*__ptr),不然,清 ZF 標誌,而且把 (*__ptr) 的值寫回 _old。

在 Intel 平臺下,會用 LOCK CMPXCHG 來實現,這裏的 LOCK 是 CPU 鎖。

Intel 的手冊對 LOCK 前綴的說明以下:

    1. 確保對內存的讀-改-寫操做原子執行。在 Pentium 及 Pentium 以前的處理器中,帶有 LOCK 前綴的指令在執行期間會鎖住總線,使得其餘處理器暫時沒法經過總線訪問內存。很顯然,這會帶來昂貴的開銷。從 Pentium 4,Intel Xeon 及 P6 處理器開始,Intel 在原有總線鎖的基礎上作了一個頗有意義的優化:若是要訪問的內存區域(area of memory)在 LOCK 前綴指令執行期間已經在處理器內部的緩存中被鎖定(即包含該內存區域的緩存行當前處於獨佔或以修改狀態),而且該內存區域被徹底包含在單個緩存行(cache line)中,那麼處理器將直接執行該指令。因爲在指令執行期間該緩存行會一直被鎖定,其它處理器沒法讀/寫該指令要訪問的內存區域,所以能保證指令執行的原子性。這個操做過程叫作緩存鎖定(cache locking),緩存鎖定將大大下降 LOCK 前綴指令的執行開銷,可是當多處理器之間的競爭程度很高或者指令訪問的內存地址未對齊時,仍然會鎖住總線。
    1. 禁止該指令與以前和以後的讀和寫指令重排序。
    1. 把寫緩衝區中的全部數據刷新到內存中。

看完描述,能夠看出,CPU 鎖主要分兩種,總線鎖和緩存鎖。總線鎖用在老的 CPU 中,緩存鎖用在新的 CPU 中。

所謂總線鎖就是使用 CPU 提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時,其餘處理器的請求將被阻塞住,那麼該 CPU 能夠獨佔使用共享內存。總線鎖的這種方式,在執行期間會鎖住總線,使得其餘處理器暫時沒法經過總線訪問內存。因此總線鎖定的開銷比較大,最新的處理器在某些場合下使用緩存鎖定代替總線鎖定來進行優化。

所謂「緩存鎖定」就是若是緩存在處理器緩存行中內存區域在 LOCK 操做期間被鎖定,當它執行鎖操做回寫內存時,處理器不在總線上產生
LOCK#信號,而是修改內部的內存地址,並容許它的緩存一致性機制來保證操做的原子性,由於緩存一致性機制會阻止同時修改被兩個以上處理器緩存的內存區域數據,當其餘處理器回寫已被鎖定的緩存行的數據時會對緩存行無效。

有兩種狀況處理器沒法使用緩存鎖。

  • 第一種狀況是,當操做的數據不能被緩存在處理器內部,或操做的數據跨多個緩存行(cache line),則處理器會調用總線鎖定。

  • 第二種狀況是:有些處理器不支持緩存鎖定。一些老的 CPU 就算鎖定的內存區域在處理器的緩存行中也會調用總線鎖定。

雖然緩存鎖能夠大大下降 CPU 鎖的執行開銷,可是若是遇到多處理器之間的競爭程度很高或者指令訪問的內存地址未對齊時,仍然會鎖住總線。因此緩存鎖和總線鎖相互配合,效果更佳。

綜上,用 CAS 方式來保證線程安全的方式就比用互斥鎖的方式效率要高不少。

四. CAS 的缺陷

雖然 CAS 的效率高,可是依舊存在3大問題。

1. ABA 問題

線程1準備用 CAS 將變量的值由 A 替換爲 B ,在此以前,線程2將變量的值由 A 替換爲 C ,又由 C 替換爲 A,而後線程1執行 CAS 時發現變量的值仍然爲 A,因此 CAS 成功。但實際上這時的現場已經和最初不一樣了。圖上也爲了分開兩個 A 不一樣,因此用不一樣的顏色標記了。最終線程2把 A 替換成了 B 。這就是經典的 ABA 問題。可是這會致使項目出現什麼問題呢?

設想存在這樣一個鏈棧,棧裏面存儲了一個鏈表,棧頂是 A,A 的 next 指針指向 B。在線程1中,要將棧頂元素 A 用 CAS 把它替換成 B。接着線程2來了,線程2將以前包含 A,B 元素的鏈表都 pop 出去。而後 push 進來一個 A - C - D 鏈表,棧頂元素依舊是 A。這時線程1發現 A 沒有發生變化,因而替換成 B。這個時候 B 的 next 其實爲 nil。替換完成之後,線程2操做的鏈表 C - D 這裏就與表頭斷開鏈接了。也就是說線程1 CAS 操做結束,C - D 就被丟失了,再也找不回來了。棧中只剩下 B 一個元素了。這很明顯出現了 bug。

那怎麼解決這種狀況呢?最通用的作法就是加入版本號進行標識。

每次操做都加上版本號,這樣就能夠完美解決 ABA 的問題了。

2. 循環時間可能過長

自旋 CAS 若是長時間不成功,會給 CPU 帶來很是大的執行開銷。若是能支持 CPU 提供的 Pause 指令,那麼 CAS 的效率能有必定的提高。Pause 指令有兩個做用,第一它能夠延遲流水線執行指令(de-pipeline),使 CPU 不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它能夠避免在退出循環的時候因內存順序衝突(memory order violation)而引發 CPU 流水線被清空(CPU pipeline flush),從而提升 CPU 的執行效率。

3. 只能保證一個共享變量的原子操做

CAS 操做只能保證一個共享變量的原子操做,可是保證多個共享變量操做的原子性。通常作法可能就考慮利用鎖了。

不過也能夠利用一個結構體,把兩個變量合併成一個變量。這樣還能夠繼續利用 CAS 來保證原子性操做。

五. Lock - Free 方案舉例

在 Lock - Free方案舉例以前,先來回顧一下互斥量的方案。上面咱們用互斥量實現了 Go 的線程安全的 Map。至於這個 Map 的性能如何,接下來對比的時候能夠看看數據。

1. NO Lock - Free 方案

若是不用 Lock - Free 方案也不用簡單的互斥量的方案,如何實現一個線程安全的字典呢?答案是利用分段鎖的設計,只有在同一個分段內才存在競態關係,不一樣的分段鎖之間沒有鎖競爭。相比於對整個
Map 加鎖的設計,分段鎖大大的提升了高併發環境下的處理能力。

type ConcurrentMap []*ConcurrentMapShared


type ConcurrentMapShared struct {
    items        map[string]interface{}
    sync.RWMutex // 讀寫鎖,保證進入內部 map 的線程安全
}複製代碼

分段鎖 Segment 存在一個併發度。併發度能夠理解爲程序運行時可以同時更新 ConccurentMap 且不產生鎖競爭的最大線程數,實際上就是 ConcurrentMap 中的分段鎖個數。即數組的長度。

var SHARD_COUNT = 32複製代碼

若是併發度設置的太小,會帶來嚴重的鎖競爭問題;若是併發度設置的過大,本來位於同一個 Segment 內的訪問會擴散到不一樣的 Segment 中,CPU cache 命中率會降低,從而引發程序性能降低。

ConcurrentMap 的初始化就是對數組的初始化,而且初始化數組裏面每一個字典。

func New() ConcurrentMap {
    m := make(ConcurrentMap, SHARD_COUNT)
    for i := 0; i < SHARD_COUNT; i++ {
        m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
    }
    return m
}複製代碼

ConcurrentMap 主要使用 Segment 來實現減少鎖粒度,把 Map 分割成若干個 Segment,在 put 的時候須要加讀寫鎖,get 時候只加讀鎖。

既然分段了,那麼針對每一個 key 對應哪個段的邏輯就由一個哈希函數來定。

func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    for i := 0; i < len(key); i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}複製代碼

上面這段哈希函數會根據每次傳入的 string ,計算出不一樣的哈希值。

func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
    return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}複製代碼

根據哈希值對數組長度取餘,取出 ConcurrentMap 中的 ConcurrentMapShared。在 ConcurrentMapShared 中存儲對應這個段的 key - value。

func (m ConcurrentMap) Set(key string, value interface{}) {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    shard.items[key] = value
    shard.Unlock()
}複製代碼

上面這段就是 ConcurrentMap 的 set 操做。思路很清晰:先取出對應段內的 ConcurrentMapShared,而後再加讀寫鎖鎖定,寫入 key - value,寫入成功之後再釋放讀寫鎖。

func (m ConcurrentMap) Get(key string) (interface{}, bool) {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // Get item from shard.
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}複製代碼

上面這段就是 ConcurrentMap 的 get 操做。思路也很清晰:先取出對應段內的 ConcurrentMapShared,而後再加讀鎖鎖定,讀取 key - value,讀取成功之後再釋放讀鎖。

這裏和 set 操做的區別就在於只須要加讀鎖便可,不用加讀寫鎖。

func (m ConcurrentMap) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}複製代碼

ConcurrentMap 的 Count 操做就是把 ConcurrentMap 數組的每個分段元素裏面的每個元素都遍歷一遍,計算出總數。

func (m ConcurrentMap) Keys() []string {
    count := m.Count()
    ch := make(chan string, count)
    go func() {
        // 遍歷全部的 shard.
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m {
            go func(shard *ConcurrentMapShared) {
                // 遍歷全部的 key, value 鍵值對.
                shard.RLock()
                for key := range shard.items {
                    ch <- key
                }
                shard.RUnlock()
                wg.Done()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    // 生成 keys 數組,存儲全部的 key
    keys := make([]string, 0, count)
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}複製代碼

上述是返回 ConcurrentMap 中全部 key ,結果裝在字符串數組中。

type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}複製代碼

上述代碼是 Upsert 操做。若是已經存在了,就更新。若是是一個新元素,就用 UpsertCb 函數插入一個新的。思路也是先根據 string 找到對應的段,而後加讀寫鎖。這裏只能加讀寫鎖,由於無論是 update 仍是 insert 操做,都須要寫入。讀取 key 對應的 value 值,而後調用 UpsertCb 函數,把結果更新到 key 對應的 value 中。最後釋放讀寫鎖便可。

UpsertCb 函數在這裏值得說明的是,這個函數是回調返回待插入到 map 中的新元素。這個函數當且僅當在讀寫鎖被鎖定的時候纔會被調用,所以必定不容許再去嘗試讀取同一個 map 中的其餘 key 值。由於這樣會致使線程死鎖。死鎖的緣由是 Go 中 sync.RWLock 是不可重入的。

完整的代碼見concurrent_map.go

這種分段的方法雖然比單純的加互斥量好不少,由於 Segment 把鎖住的範圍進一步的減小了,可是這個範圍依舊比較大,還能再進一步的減小鎖麼?

還有一點就是併發量的設置,要合理,不能太大也不能過小。

2. Lock - Free 方案

在 Go 1.9 的版本中默認就實現了一種線程安全的 Map,摒棄了Segment(分段鎖)的概念,而是啓用了一種全新的方式實現,利用了 CAS 算法,即 Lock - Free 方案。

採用 Lock - Free 方案之後,能比上一個分案,分段鎖更進一步縮小鎖的範圍。性能大大提高。

接下來就讓咱們來看看如何用 CAS 實現一個線程安全的高性能 Map 。

官方是 sync.map 有以下的描述:

這個 Map 是線程安全的,讀取,插入,刪除也都保持着常數級的時間複雜度。多個 goroutines 協程同時調用 Map 方法也是線程安全的。該 Map 的零值是有效的,而且零值是一個空的 Map 。線程安全的 Map 在第一次使用以後,不容許被拷貝。

這裏解釋一下爲什麼不能被拷貝。由於對結構體的複製不但會生成該值的副本,還會生成其中字段的副本。如此一來,本應施加於此的併發線程安全保護也就失效了。

做爲源值賦給別的變量,做爲參數值傳入函數,做爲結果值從函數返回,做爲元素值經過通道傳遞等都會形成值的複製。正確的作法是用指向該類型的指針類型的變量。

Go 1.9 中 sync.map 的數據結構以下:

type Map struct {

    mu Mutex

    // 併發讀取 map 中一部分的內容是線程安全的,這是不須要
    // read 這部分自身讀取就是線程安全的,由於是原子性的。可是存儲的時候仍是須要 Mutex
    // 存儲在 read 中的 entry 在併發讀取過程當中是容許更新的,即便沒有 Mutex 信號量,也是線程安全的。可是更新一個之前刪除的 entry 就須要把值拷貝到 dirty Map 中,而且必需要帶上 Mutex
    read atomic.Value // readOnly

    // dirty 中包含 map 中必需要互斥量 mu 保護才能線程安全的部分。爲了使 dirty 能快速的轉化成 read map,dirty 中包含了 read map 中全部沒有被刪除的 entries
    // 已經刪除過的 entries 不存儲在 dirty map 中。在 clean map 中一個已經刪除的 entry 必定是沒有被刪除過的,而且當新值將要被存儲的時候,它們會被添加到 dirty map 中。
    // 當 dirty map 爲 nil 的時候,下一次寫入的時候會經過 clean map 忽略掉舊的 entries 之後的淺拷貝副原本初始化 dirty map。
    dirty map[interface{}]*entry

    // misses 記錄了 read map 由於須要判斷 key 是否存在而鎖住了互斥量 mu 進行了 update 操做之後的加載次數。
    // 一旦 misses 值大到足夠去複製 dirty map 所需的花費的時候,那麼 dirty map 就被提高到未被修改狀態下的 read map,下次存儲就會建立一個新的 dirty map。
    misses int
}複製代碼

在這個 Map 中,包含一個互斥量 mu,一個原子值 read,一個非線程安全的字典 map,這個字典的 key 是 interface{} 類型,value 是 *entry 類型。最後還有一個 int 類型的計數器。

先來講說原子值。atomic.Value 這個類型有兩個公開的指針方法,Load 和 Store 。Load 方法用於原子地的讀取原子值實例中存儲的值,它會返回一個 interface{} 類型的結果,而且不接受任何參數。Store 方法用於原子地在原子值實例中存儲一個值,它接受一個 interface{} 類型的參數而沒有任何結果。在不曾經過 Store 方法向原子值實例存儲值以前,它的 Load 方法總會返回 nil。

在這個線程安全的字典中,Load 和 Store 的都是一個 readOnly 的數據結構。

// readOnly 是一個不可變的結構體,原子性的存儲在 Map.read 中
type readOnly struct {
    m map[interface{}]*entry
    // 標誌 dirty map 中是否包含一些不在 m 中的 key 。
    amended bool // true if the dirty map contains some key not in m.
}複製代碼

readOnly 中存儲了一個非線程安全的字典,這個字典和上面 dirty map 存儲的類型徹底一致。key 是 interface{} 類型,value 是 *entry 類型。

// entry 是一個插槽,與 map 中特定的 key 相對應
type entry struct {
    p unsafe.Pointer // *interface{}
}複製代碼

p 指針指向 *interface{} 類型,裏面存儲的是 entry 的地址。若是 p \=\= nil,表明 entry 被刪除了,而且 m.dirty \=\= nil。若是 p \=\= expunged,表明 entry 被刪除了,而且 m.dirty != nil ,那麼 entry 從 m.dirty 中丟失了。

除去以上兩種狀況外,entry 都是有效的,而且被記錄在 m.read.m[key] 中,若是 m.dirty!= nil,entry 被存儲在 m.dirty[key] 中。

一個 entry 能夠經過原子替換操做成 nil 來刪除它。當 m.dirty 在下一次被建立,entry 會被 expunged 指針原子性的替換爲 nil,m.dirty[key] 不對應任何 value。只要 p != expunged,那麼一個 entry 就能夠經過原子替換操做更新關聯的 value。若是 p \=\= expunged,那麼一個 entry 想要經過原子替換操做更新關聯的 value,只能在首次設置 m.dirty[key] = e 之後才能更新 value。這樣作是爲了能在 dirty map 中查找到它。

總結一下,sync.map 的數據結構如上。

再看看線程安全的 sync.map 的一些操做。

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 若是 key 對應的 value 不存在,而且 dirty map 包含 read map 中沒有的 key,那麼開始讀取 dirty map 
    if !ok && read.amended {
        // dirty map 不是線程安全的,因此須要加上互斥鎖
        m.mu.Lock()
        // 當 m.dirty 被提高的時候,爲了防止獲得一個虛假的 miss ,因此此時咱們加鎖。
        // 若是再次讀取相同的 key 不 miss,那麼這個 key 值就就不值得拷貝到 dirty map 中。
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // 不管 entry 是否存在,記錄此次 miss 。
            // 這個 key 將會緩慢的被取出,直到 dirty map 提高到 read map
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}複製代碼

上述代碼是 Load 操做。返回的是入參 key 對應的 value 值。若是 value 不存在就返回 nil。dirty map 中會保存一些 read map 裏面不存在的 key,那麼就要讀取出 dirty map 裏面 key 對應的 value。注意讀取的時候須要加互斥鎖,由於 dirty map 是非線程安全的。

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}複製代碼

上面這段代碼是記錄 misses 次數的。只有當 misses 個數大於 dirty map 的長度的時候,會把 dirty map 存儲到 read map 中。而且把 dirty 置空,misses 次數也清零。

在看 Store 操做以前,先說一個 expunged 變量。

// expunged 是一個指向任意類型的指針,用來標記從 dirty map 中刪除的 entry
var expunged = unsafe.Pointer(new(interface{}))複製代碼

expunged 變量是一個指針,用來標記從 dirty map 中刪除的 entry。

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    // 從 read map 中讀取 key 失敗或者取出的 entry 嘗試存儲 value 失敗,直接返回
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // e 指向的是非 nil 的
        if e.unexpungeLocked() {
            // entry 先前被刪除了,這就意味着存在一個非空的 dirty map 裏面並無存儲這個 entry
            m.dirty[key] = e
        }
        // 使用 storeLocked 函數以前,必須保證 e 沒有被清除
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 已經存儲在 dirty map 中了,表明 e 沒有被清除
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // 到這個 else 中就意味着,當前的 key 是第一次被加到 dirty map 中。
            // store 以前先判斷一下 dirty map 是否爲空,若是爲空,就把 read map 淺拷貝一次。
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 在 dirty 中存儲 value
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}複製代碼

Store 優先從 read map 裏面去讀取 key ,而後存儲它的 value。若是 entry 是被標記爲從 dirty map 中刪除過的,那麼還須要從新存儲回 dirty map中。

若是 read map 裏面沒有相應的 key,就去 dirty map 裏面去讀取。dirty map 就直接存儲對應的 value。

最後如何 read map 和 dirty map 都沒有這個 key 值,這就意味着該 key 是第一次被加入到 dirty map 中。在 dirty map 中存儲這個 key 以及對應的 value。

// 當 entry 沒有被刪除的狀況下去存儲一個 value。
// 若是 entry 被刪除了,tryStore 方法返回 false,而且保留 entry 不變
func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}複製代碼

tryStore 函數的實現和 CAS 原理差很少,它會反覆的循環判斷 entry 是否被標記成了 expunged,若是 entry 通過 CAS 操做成功的替換成了 i,那麼就返回 true,反之若是被標記成了 expunged,就返回 false。

// unexpungeLocked 函數確保了 entry 沒有被標記成已被清除。
// 若是 entry 先前被清除過了,那麼在 mutex 解鎖以前,它必定要被加入到 dirty map 中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}複製代碼

若是 entry 的 unexpungeLocked 返回爲 true,那麼就說明 entry 已經被標記成了 expunged,那麼它就會通過 CAS 操做把它置爲 nil。

再來看看刪除操做的實現。

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        // 因爲 dirty map 是非線程安全的,因此操做前要加鎖
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 刪除 dirty map 中的 key
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}複製代碼

delete 操做的實現比較簡單,若是 read map 中存在 key,就能夠直接刪除,若是不存在 key 而且 dirty map 中有這個 key,那麼就要刪除 dirty map 中的這個 key。操做 dirty map 的時候記得先加上鎖進行保護。

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}複製代碼

刪除 entry 具體的實現如上。這個操做裏面都是原子性操做。循環判斷 entry 是否爲 nil 或者已經被標記成了 expunged,若是是這種狀況就返回 false,表明刪除失敗。不然就 CAS 操做,將 entry 的 p 指針置爲 nil,並返回 true,表明刪除成功。

至此,關於 Go 1.9 中自帶的線程安全的 sync.map 的實現就分析完了。官方的實現裏面基本沒有用到鎖,互斥量的 lock 也是基於 CAS的。read map 也是原子性的。因此比以前加鎖的實現版本性能有所提高。

究竟 Lock - Free 的性能有多強呢?接下來作一下性能測試。

五. 性能對比

性能測試主要針對3個方面,Insert,Get,Delete。測試對象主要針對簡單加互斥鎖的原生 Map ,分段加鎖的 Map,Lock - Free 的 Map 這三種進行性能測試。

性能測試的全部代碼已經放在 github 了,地址在這裏,性能測試用的指令是:

go test -v -run=^$ -bench . -benchmem複製代碼

1. 插入 Insert 性能測試

// 插入不存在的 key (粗糙的鎖)
func BenchmarkSingleInsertAbsentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapStore(strconv.Itoa(i), "value")
    }
}

// 插入不存在的 key (分段鎖)
func BenchmarkSingleInsertAbsent(b *testing.B) {
    m := New()
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set(strconv.Itoa(i), "value")
    }
}

// 插入不存在的 key (syncMap)
func BenchmarkSingleInsertAbsentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Store(strconv.Itoa(i), "value")
    }
}複製代碼

測試結果:

BenchmarkSingleInsertAbsentBuiltInMap-4          2000000           857 ns/op         170 B/op           1 allocs/op
BenchmarkSingleInsertAbsent-4                    2000000           651 ns/op         170 B/op           1 allocs/op
BenchmarkSingleInsertAbsentSyncMap-4             1000000          1094 ns/op         187 B/op           5 allocs/op複製代碼

實驗結果是分段鎖的性能最高。這裏說明一下測試結果,-4表明測試用了4核 CPU ,2000000 表明循環次數,857 ns/op 表明的是平均每次執行花費的時間,170 B/op 表明的是每次執行堆上分配內存總數,allocs/op 表明的是每次執行堆上分配內存次數。

這樣看來,循環次數越多,花費時間越少,分配內存總數越小,分配內存次數越少,性能就越好。下面的性能圖表中去除掉了第一列循環次數,只花了剩下的3項,因此條形圖越短的性能越好。如下的每張條形圖的規則和測試結果表明的意義都和這裏同樣,下面就再也不贅述了。

// 插入存在 key (粗糙鎖)
func BenchmarkSingleInsertPresentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    myMap.BuiltinMapStore("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapStore("key", "value")
    }
}

// 插入存在 key (分段鎖)
func BenchmarkSingleInsertPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set("key", "value")
    }
}

// 插入存在 key (syncMap)
func BenchmarkSingleInsertPresentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    syncMap.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Store("key", "value")
    }
}複製代碼

測試結果:

BenchmarkSingleInsertPresentBuiltInMap-4        20000000            74.6 ns/op           0 B/op           0 allocs/op
BenchmarkSingleInsertPresent-4                  20000000            61.1 ns/op           0 B/op           0 allocs/op
BenchmarkSingleInsertPresentSyncMap-4           20000000           108 ns/op          16 B/op           1 allocs/op複製代碼

從圖中能夠看出,sync.map 在涉及到 Store 這一項的均比其餘二者的性能差。無論插入不存在的 Key 仍是存在的 Key,分段鎖的性能均是目前最好的。

2. 讀取 Get 性能測試

// 讀取存在 key (粗糙鎖)
func BenchmarkSingleGetPresentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    myMap.BuiltinMapStore("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapLookup("key")
    }
}

// 讀取存在 key (分段鎖)
func BenchmarkSingleGetPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Get("key")
    }
}

// 讀取存在 key (syncMap)
func BenchmarkSingleGetPresentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    syncMap.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Load("key")
    }
}複製代碼

測試結果:

BenchmarkSingleGetPresentBuiltInMap-4           20000000            71.5 ns/op           0 B/op           0 allocs/op
BenchmarkSingleGetPresent-4                     30000000            42.3 ns/op           0 B/op           0 allocs/op
BenchmarkSingleGetPresentSyncMap-4              30000000            40.3 ns/op           0 B/op           0 allocs/op複製代碼

從圖中能夠看出,sync.map 在 Load 這一項的性能很是優秀,遠高於其餘二者。

3. 併發插入讀取混合性能測試

接下來的實現就涉及到了併發插入和讀取了。因爲分段鎖實現的特殊性,分段個數會多多少少影響到性能,那麼接下來的實驗就會對分段鎖分1,16,32,256 這4段進行測試,分別看看性能變化如何,其餘兩種線程安全的 Map 不變。

因爲併發的代碼太多了,這裏就不貼出來了,感興趣的同窗能夠看這裏

下面就直接放出測試結果:

併發插入不存在的 Key 值

BenchmarkMultiInsertDifferentBuiltInMap-4        1000000          2359 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_1_Shard-4          1000000          2039 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_16_Shard-4         1000000          1937 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_32_Shard-4         1000000          1944 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_256_Shard-4        1000000          1991 ns/op         331 B/op          11 allocs/op
BenchmarkMultiInsertDifferentSyncMap-4           1000000          3760 ns/op         635 B/op          33 allocs/op複製代碼

從圖中能夠看出,sync.map 在涉及到 Store 這一項的均比其餘二者的性能差。併發插入不存在的 Key,分段鎖劃分的 Segment 多少與性能沒有關係。

併發插入存在的 Key 值

BenchmarkMultiInsertSameBuiltInMap-4             1000000          1182 ns/op         160 B/op          10 allocs/op
BenchmarkMultiInsertSame-4                       1000000          1091 ns/op         160 B/op          10 allocs/op
BenchmarkMultiInsertSameSyncMap-4                1000000          1809 ns/op         480 B/op          30 allocs/op複製代碼

從圖中能夠看出,sync.map 在涉及到 Store 這一項的均比其餘二者的性能差。

併發的讀取存在的 Key 值

BenchmarkMultiGetSameBuiltInMap-4                2000000           767 ns/op           0 B/op           0 allocs/op
BenchmarkMultiGetSame-4                          3000000           481 ns/op           0 B/op           0 allocs/op
BenchmarkMultiGetSameSyncMap-4                   3000000           464 ns/op           0 B/op           0 allocs/op複製代碼

從圖中能夠看出,sync.map 在 Load 這一項的性能遠超多其餘二者。

併發插入讀取不存在的 Key 值

BenchmarkMultiGetSetDifferentBuiltInMap-4        1000000          3281 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_1_Shard-4          1000000          3007 ns/op         338 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_16_Shard-4          500000          2662 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_32_Shard-4         1000000          2732 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_256_Shard-4        1000000          2788 ns/op         339 B/op          12 allocs/op
BenchmarkMultiGetSetDifferentSyncMap-4            300000          8990 ns/op        1104 B/op          34 allocs/op複製代碼

從圖中能夠看出,sync.map 在涉及到 Store 這一項的均比其餘二者的性能差。併發插入讀取不存在的 Key,分段鎖劃分的 Segment 多少與性能沒有關係。

併發插入讀取存在的 Key 值

BenchmarkMultiGetSetBlockBuiltInMap-4            1000000          2095 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_1_Shard-4              1000000          1712 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_16_Shard-4             1000000          1730 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_32_Shard-4             1000000          1645 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_256_Shard-4            1000000          1619 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlockSyncMap-4                500000          2660 ns/op         480 B/op          30 allocs/op複製代碼

從圖中能夠看出,sync.map 在涉及到 Store 這一項的均比其餘二者的性能差。併發插入讀取存在的 Key,分段鎖劃分的 Segment 越小,性能越好!

4. 刪除 Delete 性能測試

// 刪除存在 key (粗糙鎖)
func BenchmarkDeleteBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            myMap.BuiltinMapDelete(strconv.Itoa(k))
        }
    })
}

// 刪除存在 key (分段鎖)
func BenchmarkDelete(b *testing.B) {
    m := New()
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            m.Remove(strconv.Itoa(k))
        }
    })
}

// 刪除存在 key (syncMap)
func BenchmarkDeleteSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            syncMap.Delete(strconv.Itoa(k))
        }
    })
}複製代碼

測試結果:

BenchmarkDeleteBuiltInMap-4                     10000000           130 ns/op           8 B/op           1 allocs/op
BenchmarkDelete-4                               20000000            76.7 ns/op           8 B/op           1 allocs/op
BenchmarkDeleteSyncMap-4                        30000000            45.4 ns/op           8 B/op           0 allocs/op複製代碼

從圖中能夠看出,sync.map 在 Delete 這一項是完美的超過其餘二者的。

六. 總結

本文從線程安全理論基礎開始講了線程安全中一些處理方法。其中涉及到互斥量和條件變量相關知識。從 Lock 的方案談到了 Lock - Free 的 CAS 相關方案。最後針對 Go 1.9 新加的 sync.map 進行了源碼分析和性能測試。

採用了 Lock - Free 方案的 sync.map 測試結果並無想象中的那麼出色。除了 Load 和 Delete 這兩項遠遠甩開其餘二者,凡是涉及到 Store 相關操做的性能均低於其餘二者 Map 的實現。不過這也是有緣由的。

縱觀 Java ConcurrentHashmap 一路的變化:

JDK 6,7 中的 ConcurrentHashmap 主要使用 Segment 來實現減少鎖粒度,把 HashMap 分割成若干個 Segment,在 put 的時候須要鎖住 Segment,get 時候不加鎖,使用 volatile 來保證可見性,當要統計全局時(好比size),首先會嘗試屢次計算 modcount 來肯定,這幾回嘗試中,是否有其餘線程進行了修改操做,若是沒有,則直接返回 size。若是有,則須要依次鎖住全部的 Segment 來計算。

JDK 7 中 ConcurrentHashmap 中,當長度過長碰撞會很頻繁,鏈表的增改刪查操做都會消耗很長的時間,影響性能,因此 JDK8 中徹底重寫了concurrentHashmap,代碼量從原來的1000多行變成了 6000多行,實現上也和原來的分段式存儲有很大的區別。

JDK 8 的 ConcurrentHashmap 主要設計上的變化有如下幾點:

  • 不採用 Segment 而採用 node,鎖住 node 來實現減少鎖粒度。
  • 設計了 MOVED 狀態 當 Resize 的中過程當中線程2還在 put 數據,線程2會幫助 resize。
  • 使用3個 CAS 操做來確保 node 的一些操做的原子性,這種方式代替了鎖。
  • sizeCtl 的不一樣值來表明不一樣含義,起到了控制的做用。

可見 Go 1.9 一上來第一個版本就直接摒棄了 Segment 的作法,採起了 CAS 這種 Lock - Free 的方案提升性能。可是它並無對整個字典進行相似 Java 的 Node 的設計。可是整個 sync.map 在 ns/op ,B/op,allocs/op 這三個性能指標上是普通原生非線程安全 Map 的三倍!

不過相信 Google 應該還會繼續優化這部分吧,畢竟源碼裏面還有幾處 TODO 呢,讓咱們一塊兒其餘 Go 將來版本的發展吧,筆者也會一直持續關注的。

(在本篇文章截稿的時候,筆者又忽然發現了一種分段鎖的 Map 實現,性能更高,它具備負載均衡等特色,應該是目前筆者見到的性能最好的 Go 語言實現的線程安全的 Map ,關於它的實現源碼分析就只能放在下篇博文單獨寫一篇或者之後有空再分析啦)


Reference:
《Go 併發實戰編程》
Split-Ordered Lists: Lock-Free Extensible Hash Tables
Semaphores are Surprisingly Versatile
線程安全
JAVA CAS原理深度分析
Java ConcurrentHashMap 總結

GitHub Repo:Halfrost-Field

Follow: halfrost · GitHub

Source: halfrost.com/go_map_chap…

相關文章
相關標籤/搜索