《讀書筆記》程序員的自我修養之線程安全問題

場景:因爲多線程程序處於一個多變的環境中,可訪問的全局變量和堆數據隨時均可能被其餘線程改變。緩存

一個經典實例來闡述多個線程同時訪問一個共享數據所形成的後果。安全

線程1多線程

線程2併發

i=1函數

++i優化

--iui

首先要明白++i的實現步驟以下:spa

(1)    讀取i到某個寄存器X;線程

(2)    X++;code

(3)    將X的內容存儲回i。

若是線程1和線程2併發執行,則執行順序以下:

執行序號

執行指令

語句執行後變量值

線程

1

i=1

i=1,X[1]=未知

1

2

X[1]=i

i=1,X[1]=1

1

3

X[2]=i

i=1,X[2]=1

2

4

X[1]++

i=1,X[1]=2

1

5

X[2]--

i=1,X[2]=0

2

6

i=X[1]

i=2,X[1]=2

1

7

i=X[2]

i=0,X[2]=0

2

  • 從程序邏輯來看,兩個線程都執行完以後,i的值應該爲1,可是實際的狀況下,可能爲0,1,2;
  • 形成這個問題的緣由是++i這條語句會被編譯爲3條彙編代碼。
  • 可見,兩個程序同時讀寫同一個共享數據時會致使意想不到的後果。

 爲了不上述問題,有以下幾種方式:

  1. 原子指令:單指令操做,執行時不會被打斷(僅適用於簡單場合)
  2. 同步:一個線程訪問數據未結束時,其餘線程禁止對相同數據的訪問。
  3. 鎖:同步最多見的方法。

註解:鎖是一種非強制機制,每個線程在訪問數據或資源以前首先試圖獲取(Acquire)鎖,並在訪問結束以後釋放(Release)鎖。當鎖已經被佔用的時候試圖獲取鎖時,線程會等待,直到鎖從新可用。

 

鎖的分類:

1)  二元信號量(只有兩種狀態:佔用和非佔用,適合只能被惟一線程訪問的資源)

2)  多元信號量,也叫信號量(一個初始值爲N的信號量容許N個線程併發訪問)

獲取過程:

將信號量的值減1;

若是信號量的值小於0,則進入等待狀態,不然繼續執行。

釋放過程

將信號量的值加1;

若是信號量的值小於1,喚醒一個等待中的線程。

3)  互斥量(與二元信號量類似,資源僅容許被一個線程訪問)

二元信號量與互斥量的區別:二元信號量:在整個系統能夠被人以線程獲取並釋放,即,同一個信號量能夠被系統中的一個線程獲取後以後,能夠由另外一個線程釋放;互斥量則要求哪一個線程獲取了互斥量,哪一個線程就要負責釋放這個鎖。

4)  臨界區(其做用範圍僅限於本進程,其餘進程沒法獲取該鎖)

5)  讀寫鎖(適用於讀多,寫少的場合)

讀寫鎖有兩種獲取方式(共享的和獨佔的)

6)  條件變量(可讓許多線程一塊兒等待某個事件的發生,當事件發生時,全部的線程能夠一塊兒恢復執行。多元信號量,只能讓一個線程恢復執行。)

 

編譯器的過分優化也可能形成線程安全的問題,看以下幾個例子。

例1:

x = 0;

Thread1 Thread2

lock(); lock();

x++; x++;

unlock(); unlock();

因爲有lock和unlock的保護,x++的行爲不會被併發所破壞,那麼x的值彷佛必然是2了。其實否則,若是編譯器爲了提升x的訪問速度,把x放到了某個寄存器裏(不一樣線程的寄存器是各自獨立的),所以若是Thread1先得到鎖,則程序的執行多是以下狀況:

  •  [Thread1]讀取x的值到某個寄存器R[1](R[1]=0)
  •  [Thread1] R[1]++(因爲以後可能還要訪問x,所以Thread1暫時不講R[1]寫回x)
  •  [Thread2]讀取x的值到某個寄存器R[2](R[2]=0)
  •  [Thread2] R[2]++(R[2]=1)
  •  [Thread2]將R[2]寫回至x(x=1)

可見,在這樣的狀況下,即便正確的加鎖,也不能保證多線程安全。

例2:

x = y = 0;

Thread1 Thread2

x = 1; y = 1;

r1 = y; r2 = x;

r1和r2至少有一個爲1,邏輯上不可能同時爲0.然而,事實上r1 = r2 = 0的狀況確實可能發生。編譯器在進行優化的時候,可能爲了效率而交換絕不相干的兩條相鄰指令(如x=1和r1=y)的執行順序。

則變爲:

x = y = 0;

Thread1 Thread2

r1 = y; r2 = x;

x = 1; y = 1;

解決方法:

可使用volatile來阻止過分優化,volatile主要作兩件事
1)阻止編譯器爲了提升速度將一個變量緩存到寄存器內而不寫回
2)阻止編譯器調整操做volatile變量的指令順序。

這個方法能夠解決第一個問題,但不能徹底解決第二個問題,由於volatile可以阻止編譯器調整順序,也沒法阻止CPU動態調度換序。

 

例3:

volatile T* pInst = 0;

T* GetInstance()
{
       if (pInst == NULL)
       {
              lock();
              if(pInst == NULL)
                     pInst = new T;
              unlock();
       }
       return pInst;
}

當函數返回時,pInst老是指向一個有效的對象。而lock和unclock防止了多線程競爭致使的麻煩。

然而,實際上這樣的代碼是有問題的,問題來源於CPU的亂序執行。

C++裏的new操做包含兩個步驟:

(1)    分配內存

(2)    調用構造函數

因此pInst = new T包含了三個步驟:

(1)    分配內存

(2)    在內存的位置上調用構造函數

(3)    將內存的地址賦給pInst

在這3步中,(2)(3)的順序是能夠顛倒的,也就是說可能出現這種狀況:pInst的值已經不是NULL,但對象仍然沒有構造完畢。這時候若是出現另一個對GetInstance的併發調用,此時第一個 if內的表達式pInst == NULL爲false,因此這個調用會直接返回還沒有構造徹底的對象的地址(pInst)以提供給用戶使用。

從上面的例子中能夠看出,阻止CPU換序是必需的。但目前並不存在可移植的阻止換序的方法。一般狀況下是調用CPU提供的一條指令(barrier)。

barrier 指令用於阻止CPU將該指令以前的指令交換到barrier以後,反之亦然。

 

所以,例3能夠修改成以下:

#define barrier() _asm_ volatile("lwsync")

volatile T* pInst = 0;

T* GetInstance()

{
       if(!pInst)

       {
              lock();

              if(!pInst)

              {
                     T* temp = new T;

                     barrier();

                     pInst = temp;
              }
              unlock();
       }
       return pInst;
}

因爲barrier的存在,對象的構造必定在barrier執行以前完成,所以,當pInst被賦值時,對象老是無缺的。

相關文章
相關標籤/搜索