當咱們在編寫多線程程序時,經常會涉及到多個線程對共享數據的訪問。若是不對這種訪問加以限制,每每會致使程序運行結果與預期不符
編寫代碼時,咱們以及習慣了用鎖去保護數據。那麼,這裏的鎖是什麼?爲何它能知足咱們的要求?它存在於哪裏?html
讓咱們從一個最簡單的例子出發---多個線程併發修改一個全局變量:node
/* 全局變量 */ int g_sum = 0; /* 每一個線程入口 */ void *thread(void* arg) { for(int i = 0; i < 100; i++) { g_sum++; } return NULL; }
在多核
處理器上,若是有兩個線程同時執行上面的累加操做,最終的g_sum
幾乎不多是預期的200
(每一個線程累加100
次),而更傾向因而一個接近200
的隨機值。算法
這是由於CPU
對g_sum
進行累加時,它們都會:1
.從內存中讀取 2
.修改它的值 3
.將新值寫回內存。因爲CPU
之間是獨立的,而內存是共享的,因此就有可能存在一種時序:兩個CPU
前後從內存中讀取了g_sum
的值,並各自對它進行了遞增,最終將新的值寫入g_sum
,這時。兩個線程的兩次累加最終只讓g_sum
增長了1
shell
要解決上面的問題,一個很天然的想法同一時間段內,要想辦法只讓一個線程對全局變量進行讀-修改-寫。咱們能夠用鎖
去保護臨界區
編程
這裏引入了臨界區
的概念。臨界區是指訪問共用資源的程序片斷(好比上面的例子中的"g_sum++")。線程在進入臨界區時加鎖,退出臨界區時解鎖。也就是說,鎖將臨界區"保護"了起來。數組
臨界區
是人們爲一段代碼片斷強加上的概念,但加鎖
和解鎖
不同,它必須實打實地存在於代碼中。那麼問題來了,鎖
應該如何實現 ? 數據結構
爲了回答這個問題,咱們先將鎖
須要具備的特性列出來:多線程
1
. 它須要支持加鎖(lock
)和解鎖(unlock
)兩種操做。2
. 它須要是有狀態(State
)的,它須要記錄當前這把鎖處於Locked
仍是Unlocked
狀態。3
. 鎖的狀態變化必須是原子(Atomic)的4
. 當它處於Locked
狀態時,對其進行加鎖(lock
)的操做,不會成功。併發
第1
條,對實現者來講,一是要提供兩個API
分別對應這兩種操做。post
第2
條,須要一個地方能記錄鎖的狀態,對計算機系統來講,這個地方只能是內存
。
第3
條,將鎖
的狀態記錄在內存中有個和全局變量同樣的問題,那就是如何避免多個線程同時去改變鎖的狀態 ? 總不能用鎖
去保護鎖
吧 ? 好在各個體系的CPU
都提供了這種原子操做的原語, 對x86
來講,就是指令的LOCK
前綴, 它能夠在執行指令時控制住總線,直到指令執行完成。這也就保證了鎖
的狀態修改是經過原子
操做完成的。
第4
條,加鎖操做成功的前提是鎖
的狀態是處於"Unlocked",若是該條件不知足,則本次加鎖操做失敗,那麼失敗之後的行爲呢?不一樣的鎖有不一樣的實現,通常來講有三種可選擇的行爲:1
.當即返回失敗 2
.不斷嘗試再加鎖,直到成功. 3
. 睡眠線程本身,直到能夠得到鎖。
固然,咱們並不須要去重複造鎖
的輪子。
在用戶空間,glibc
提供了諸如spinlock
、semaphore
、rwlock
、mutex
類型的鎖的實現,咱們只要使用API
就行。
int sem_wait(sem_t *sem); int sem_post(sem_t *sem); int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); .......
在內核空間,Linux
也有相似的實現.
在剛纔的例子中,若是咱們使用了鎖
去保護g_sum
,那麼最終必定能獲得200
。可是,咱們在獲得準確結果的同時也會付出性能的代價。
若是把臨界區
比做成一個獨木橋,那麼線程就是須要過獨木橋的人。 顯然,若是過橋的人(併發訪問臨界區的線程)越多,獨木橋越長(鎖保護的臨界區的範圍越大),那麼其餘人等地就越久(性能就降低地越厲害)。
下面這是在一臺8
核CPU
虛擬機環境下,測試程序的運行結果。
橫座標是併發運行的線程的數目,縱座標是完成相同任務(累加必定次數)時的運行時間。越多的線程會帶來越多的衝突,所以,總的運行時間會逐漸增大。
若是增長臨界區的長度呢(在每次循環中增長一些額外指令),則會獲得下面的結果:
橫座標表示額外的指令,縱座標依然表示時間。
可見,線程的併發越多、臨界區越大都會形成程序性能降低。這也是爲何追求性能的程序會選擇使用每cpu變量
(或者每線程變量),而且儘可能減少鎖保護的粒度。
前面說過,鎖
是有狀態的,而且這個狀態須要保存在內存中。那麼?具體到Linux
平臺,鎖
對象是保存在內核空間仍是用戶空間呢? 在比較早的內核(2.5.7)中,這個對象是保存在內核中的,這是很天然的作法。由於當一個線程(task
)去等待得到一個互斥鎖時,若是獲取不到,那麼它須要將積極睡眠,直到鎖
可用後再被喚醒。
這個過程具體來講,就是將本身的task_struct
掛到鎖
對象的等待鏈表上。當鎖
的持有者unlock
時,內核就能夠從該等待列表上找到並喚醒鏈表上全部task
。
可見,每次用戶的加鎖解鎖操做都必須陷入內核(即便如今沒有其餘線程持有這把鎖)。陷入內核意味着幾百個時鐘就消耗了。在衝突不大的場景中,這種消耗就白白浪費了。
所以,從2.5.7
版本開始,Linux
引入了Futex
(Fast Userspace muTEXes
),即快速的用戶態互斥機制,這個機制是用戶態和內核態共同協做完成的,它將保存鎖
狀態的對象放在用戶態。若是用戶在加鎖時發現鎖
處於(Unlocked
)狀態,那麼就直接修改狀態就行了(fast path
),不須要陷入內核。固然,若是此時鎖處於(Locked
)狀態,仍是須要陷入內核(slow path
)。
那麼咱們如何使用Futex
機制呢?答案是咱們徹底不須要顯示地使用,glibc
庫中的semaphore
、mutex
底層就是使用的Futex
。
鎖
是經過一個狀態的原子操做來保證共享數據的訪問互斥。而無鎖
的意思就是不須要這樣一個狀態。
說到無鎖
,必須提到的就是CAS
指令(也能夠叫CSW
)。CAS
是CompareAndSwap
的縮寫,即比較-交換。不一樣體系的CPU
有不一樣的CAS
的指令實現。在x86
上,就是帶LOCK
前綴的CMPXCHG
指令。因此,CAS
操做是原子的
它的功能用僞代碼描述就是下面這樣(僅爲理解,實際是一條原子指令):
bool compare_and_swap(int *src, int *dest, int newval) { if (*src == *dest) { *src = newval; return true; } else { return false; } }
第一個操做數的內容與第二個操做數的內容相比較, 若是相同,則將第三個操做數賦值給第一個操做數,返回TRUE
, 不然返回FALSE
。
較新版本的gcc
已經內置了CAS
操做的API
(以下)。其餘編譯器也提供了相似的API
,不過這不是本文的重點。
bool __sync_bool_comware_and_swap(type *ptr, type oldval, type newval);
無鎖
一般構建無鎖隊列(Lock-Free Queue
)。顧名思義,無鎖隊列就是指不使用鎖結構
來控制多線程併發互斥的隊列。
咱們知道,隊列是一個典型的先入先出(FIFO
)的數據結構,具備入隊(Enqueue
)和出隊(Dequeue
)兩種操做。併發條件下,多個線程可能在入隊或出隊時會產生競爭。
以單向鏈表爲基礎實現的隊列以下圖所示(有一個Dummy
鏈表頭),線程1和線程2都但願本身能完成入隊操做
一般來講,入隊要完成兩件事:
Next
指向新節點Tail
指向的節點到新入隊的節點若是可使用鎖
,咱們能夠經過將以上兩件事放到一個鎖
的保護範圍內就能完成線程的互斥,那麼對於無鎖呢?
John D.Valois
在《Implemeting Lock-Free Queues》中提出的無鎖隊列的入隊列算法以下(僞代碼):
EnQueue(x) { /* 建立新的節點 n */ n = new node(); n->value = x; n->next = NULL; do { t = tail; // 取得尾節點 succ = CAS(t->next, NULL, n) // 嘗試更新尾節點的Next指向新的節點 if succ != TRUE CAS(tail, t, t->next) // 更新失敗,嘗試將tail向後走 }while(succ != TRUE); CAS(tail, t, n); // 更新隊列的Tail指針,使它指向新的節點 }
這裏的Enqueue
算法中使用了三次CAS
操做。
1
. 第一次CAS
操做更新尾節點的Next指向新的節點。若是在單線程環境中,這個操做一定成功。但在多線程環境,若是有多個線程都在進行Enqueue
操做,那麼在線程T1取得尾節點後,線程T2可能已經完成了新節點的入隊,此時T1的CAS
操做就會失敗,由於此時t->Next
已經不爲NULL
了,而變成了T2新插入的節點。
再強調一遍,CAS
操做會鎖住總線!所以T1和T2只有一個線程會成功,成功的線程會更新尾節點的Next
,另外一個線程會由於CAS
失敗而從新循環。
若是CAS
操做成功,鏈表會變成下面這樣,此時的Tail
指針尚未更新
2
. 若是第一個CAS
失敗,說明有其餘線程在壞事(進行了元素入隊),這個時候第二個CAS
操做會嘗試推動Tail
指針。這樣作是爲了防止第一個CAS
成功的線程忽然掛掉而致使不更新Tail
指針
3
. 第三個CAS
操做更新尾節點的Next
論文中還給出了另外一個版本的入隊算法,以下所示
EnQueue2(x) { /* 建立新的節點 n */ n = new node(); n->value = x; n->next = NULL; oldt = t = tail do { while(t->next != NULL) // 不斷向後到達隊列尾部 t = t->next }while(CAS(t->next, NULL, n) != TRUE); // 更新尾節點的Next指向新的節點 CAS(tail, oldt, n); // 更新隊列的Tail指針,使它指向新的節點 }
與前一個的版本相比,新版本在循環內部增長了不斷向後遍歷的過程,也就是若是Tail
指針後面已經有被其餘線程添加了節點,本線程並不會等待Tail
更新,而是直接向後遍歷。
再來看出隊,論文中給出的出隊算法以下:
DeQueue() { do { h = head; if h->next = NULL error queue_empty; }while (CAS(head, h, h->next)!= TRUE) return h->next->value; }
須要特別注意,該出隊算法不是返回隊首的元素,而是返回Head->Next
節點。完成出隊後,移動Head
指針到剛出隊的元素。算法中使用了一個CAS
操做來控制競爭下的Head
指針更新。另外,算法中並無描述隊列元素的資源釋放。
以鏈表爲基礎的無鎖隊列有一個缺點就是內存的頻繁申請和釋放,在一些語言實現中,這種申請釋放自己就是帶鎖的。包含有鎖操做的行爲天然稱不上是無鎖。所以,更通用的無鎖隊列是基於數組實現的。論文中描述了一種基於數組的無鎖隊列算法,它具備如下一些特性:
1
. 數組預先分配好,也就是能容納的元素個數優先
2
. 使用者能夠將值填入數組,除此以外,數組有三個特殊值:HEAD
, TAIL
和EMPTY
。隊列初始化時(下圖),除了有兩個相鄰的位置是填入HEAD
, TAIL
以外,其餘位置都是EMPTY
。顯然,用戶數據不能再使用這三個值了。。
x
入隊,它會找到TAIL
的位置,而後對該位置和以後的位置執行一次Double-Word CAS
。該操做將<TAIL
, EMPTY
>原子地替換爲<x
, TAIL
>。固然,若是TAIL
後面不是EMPTY
(而是HEAD`),就說明隊列滿了,入隊失敗。HEAD
的位置,一樣利用Double-Word CAS
,將<HEAD
,x
>替換爲<EMPTY
, HEAD
>。固然若是HEAD
後面是EMPTY
,則出隊失敗(此時隊列是空的)。HEAD
和TAIL
的位置,算法使用兩個變量記錄入隊和出隊發生的次數,顯然這兩個變量的改變都是原子遞增的。在某個時刻,隊列多是下面這個樣子
我也用CAS
操做實現了一個隊列,可是沒有用論文中的算法。而更偏向於DPDK
的實現。
struct headtail{ volatile uint32_t head; volatile uint32_t tail; }; struct Queue{ struct headtail prod; struct headtail cons; int array[QUEUE_SIZE]; int capacity; }; int CAS_EnQueue(struct Queue* queue, int val) { uint32_t head; uint32_t idx; bool succ; do{ head = queue->prod.head; if (queue->capacity + queue->cons.tail - head < 1) { /* queue is full */ return -1; } /* move queue->prod.head */ succ = CAS(&queue->prod.head, head, head + 1); }while(!succ); idx = head & queue->capacity; /* set val */ queue->array[idx] = val; /* wait */ while(unlikely(queue->prod.tail != head)) { _mm_pause(); } queue->prod.tail = head + 1; return 0; } int CAS_DeQueue(struct Queue* queue, int* pval) { uint32_t head; uint32_t idx; bool succ; do { head = queue->cons.head; if (queue->prod.tail - head < 1) { /* Queue is Empty */ return -1; } /* forward queue->head */ succ = CAS(&queue->cons.head, head, head + 1); }while(!succ); idx = head & queue->capacity; *pval = queue->array[idx]; /* wait */ while(unlikely(queue->cons.tail != head)) { _mm_pause(); } /* move cons tail */ queue->cons.tail = head + 1; return 0; }
不管是鎖
仍是無鎖
,其實都是一種多線程環境下的同步方式,鎖
的應用更爲普遍,而無鎖
更有一種自旋的味道在裏面,在特定場景下的確能提升性能,好比DPDK
中ring
實際就是無鎖隊列的應用