在多線程編程中,咱們經常使用互斥鎖來保證全局變量的線程安全,例如 pthread 中的 pthread_mutex,mach 中的 semaphore。他們經過 lock & unlock 或是 up & down 的方式來維護資源的狀態,保證只有特定個數的線程能得到特定個數的資源。html
那麼單單從軟件層面可否真正的實現互斥鎖呢?答案是否認的,由於不管如何,程序的互斥狀態都須要存儲在內存中,在多線程操做互斥狀態時,是沒法保證互斥狀態的線程安全的。也就是說,必須經過硬件支持,同時在處理器指令集層面提供原語,才能實現真正的互斥,也就是題目中提到的 Exclusive。編程
本文將介紹 ARM 指令集中與 Exclusive 相關的指令,經過學習這些指令,你不只可以理解鎖的本質和實現原理,還能掌握在彙編層面保證讀寫一致性的方法。安全
在 上一篇文章 中,咱們介紹了用於限制 CPU 亂序執行的內存屏障指令。相似的,Acquire 和 Release 也屬於內存屏障,也是爲了防止亂序執行帶來邏輯錯誤。bash
Acquire 用於修飾內存讀取指令,一條 read-acquire 的讀指令會禁止它後面的內存操做指令被提早執行,即後續內存操做指令重排時沒法向上越過屏障,下圖[1]直觀的描述了這一功能:多線程
Release 用於修飾內存寫指令,一條 write-release 的寫指令會禁止它上面的內存操做指令被滯後到寫指令完成後才執行,即寫指令以前的內存操做指令重排時午安向下越過屏障,下圖[2]直觀描述了這一功能: 併發
爲了在硬件層面支持讀寫互斥,就須要判斷一個地址是否已被其餘處理器或核心修改,在 ARM 處理器中包含了被稱爲 Exclusive Monitor 的狀態機來維護內存的互斥狀態,從而保證讀寫一致性[2]。jsp
狀態機的起始狀態爲 Open,對於某地址的 Load-Exclusive 讀操做會將讀取的地址標記爲 Exclusive 狀態;在對同一地址進行 Store-Exclusive 寫操做會先檢查 Monitor 是否處於 Exclusive 狀態,若處於該狀態則將內容寫入,並將狀態置爲 Open,若是在寫入前發現 Monitor 已經處於 Open 狀態,說明有其餘處理器或核心已經寫入內容,本次寫入失敗。函數
簡言之,Load-Exclusive 讀指令將讀取的地址標記爲 Exclusive,Store-Exclusive 執行只能寫入狀態爲 Exclusive 的地址,並在成功後將地址從新標記爲 Open,經過這種方式便可保證多讀單寫,即保證了讀取效率,又防止了多寫帶來的一致性問題。佈局
對於多核的體系結構,ARM 將 Exclusive Monitor 分爲 Local 和 Global 兩種。post
若是內存地址被標記爲 Nonshareable,則它的可見性被侷限在處理器內,對於這類內存的互斥狀態只須要維護在處理其內部的 Local Monitor 中。
Local Monitor 只在處理器內維護了狀態,因爲不涉及多多處理器的狀態共享,不須要對真正的內存進行標記,所以它的硬件既能夠經過對內存地址進行標記實現,也能夠經過追蹤指令的執行實現。這也要求不進行內存共享的代碼在使用 Local Monitor 編程時不能以 Local Monitor 會對地址進行檢查爲前提[2]。
對於多處理器併發編程,能夠經過在被標記爲 Shareable 的內存單元中定義一個 mutex 信號量實現,爲了保證 mutex 的多讀單寫,須要藉助於全部處理器共享的硬件結構 Global Monitor,他會記錄特定處理器對共享內存的 Exclusive 狀態,從而保證多處理器併發時的多讀單寫。
Compare and Swap 簡稱爲 CAS,是無鎖編程中最經常使用的方式,它在修改某個共享的值 a
時,首先讀取 a
的值,拷貝兩份,分別存儲爲 pre_a
和 new_a
,將 new_a
的值進行修改,在將 new_a
寫回到內存以前,先檢查內存中的 a
是否等於 pre_a
,若等於則說明 a
的值未被他人修改,此時能夠將 new_a
寫入內存,不然說明當前讀到的 a
已經不是最新的,寫入失敗。
顯然,經過 CAS 和自旋鎖搭配便可實現無鎖的互斥寫,可是 CAS 中的關鍵步驟 Compare & Swap 必須具備原子性,不然可能 Compare 時發現值未變化,但在 Compare 和 Swap 的間隙中有他人修改了值,從而致使多寫。
上述基本概念中咱們介紹了三種概念,分別是 Acquire and Release
, Exclusive Monitor
和 Compare and Swap
,在彙編層面,他們都有特定的指令支持。
LDXR 即 LDR 的 Exclusive 版本,它的用法與 LDR 徹底一致,區別在於它含有 Load-Exclusive 語義,即將讀取的內存單元狀態置爲 Exclusive。
STXR 即 STR 的 Exclusive 版本,因爲須要是否 Store 成功,他相比於 STR 多了一個 32 位寄存器的參數用於接收執行結果,用法爲:
STXR Ws, Xt, [Xn|SP{,#0}]
複製代碼
即嘗試將 Xt
寫入 [Xn|SP{,#0}]
,若是寫入成功則將 0 寫入 Ws,不然將非 0 寫入,它經常和 CBZ 指令搭配,若是寫入失敗則跳回到 LDXR,從新執行一遍 LDXR & STXR 操做,直至成功。
下面的例子給出了使用 LDXR & STXR 實現原子加一的過程:
; extern int atom_add(int *val);
_atom_add:
mov x9, x0 ; 備份 x0,爲了失敗時恢復
ldxr w0, [x9] ; 從val所在的內存中讀取一個 int,並標記 Exclusive
add w0, w0, #1
stxr w8, w0, [x9] ; 嘗試寫回 val 位置,寫入結果保存在 w8
cbz w8, atom_add_done ; 若是 w8 爲 0 說明成功,跳到程序結束
mov x0, x9 ; 恢復備份的 x0,從新執行 atom_add
b _atom_add
atom_add_done:
ret
複製代碼
一樣的例子存在於 libkern 提供的 OSAtomicAdd32 函數:
;int32_t OSAtomicAdd32(int32_t __theAmount, volatile int32_t *__theValue);
ldxr w8, [x1]
add w8, w8, w0
stxr w9, w8, [x1]
cbnz w9, _OSAtomicAdd32
mov x0, x8
ret lr
複製代碼
除了 Exclusive 語義外,LDXR & STXR 還有其 Acquire-Release 語義的 LDAXR & STLXR 版本,用於保證執行順序。對於單純的 Atomic Add 操做,前者已經足夠;若是涉及到相似於 上一篇文章 提到的讀寫等待操做,則須要經過後者強保證不被亂序執行干擾。
ARM 提供了多條指令直接完成 Compare and Swap 操做,其中 CAS 是最基礎的版本,它的使用方法以下[4]:
CAS Xs, Xt, [Xn|SP{,#0}] ; 64-bit, no memory ordering
複製代碼
嘗試將 Xt
與內存中的值進行交換,首先比較 Xs
是否等於內存中的 [Xn|SP{,#0}]
,若是相等則將 Xt
寫入內存,同時將內存中的值寫回到 Xs
,所以只要在 CAS 以後判斷 Xs
是否等於 Xt
便可知道是否寫入成功,若是寫入失敗則 Xs
的值應爲原始值,即 Xs
≠ Xt
,若是寫入成功則內存中的值已被更新,即 Xs
= Xt
。
下面的例子採用 CAS 方式一樣實現了原子加一操做:
; extern int cas_add(int *val);
_cas_add:
mov x9, x0
ldr w10, [x9]
mov w11, w10 ; w11 is used to check cas status
add w10, w10, #1
cas w11, w10, [x9]
cmp w10, w11 ; if cas succeed, w11 = <new value in memory> = w10
b.ne _cas_add
mov w0, w10
ret
複製代碼
注意:爲了在 iOS 系統上編譯包含 CAS 指令的內容,須要給 .s 文件添加一個 Compile Flag: -march=armv8.1-a
[5]。
一樣的,CAS 也有其含有 Acquire-Release 語義的版本,分別是含有 Acquire 語義的 CASA, 含有 Release 語義的 CASL,和同時包含 Acquire-Release 兩種語義的 CASAL。
你們若是想親自實踐和驗證這些指令,能夠複用下面給出的實驗代碼,本文上述代碼大部分出自這些代碼。
main.m 中的代碼,可新建一個 iOS Project 並在 main.m 中添加這些代碼:
// main.m
#include <pthread.h>
#define N 100
extern int atom_add(int *val);
extern int cas_add(int *val);
int as[10000] = {0};
int flags[10000] = {0};
int counter = 0;
void* pthread_add(void *arg1) {
int idx = *(int *)arg1;
// in this way will break the assert
// as[idx] += 1;
cas_add(as + idx);
__asm__ __volatile__("dmb sy");
atom_add(flags + idx);
return NULL;
}
void* pthread_end(void *arg1) {
int idx = *(int *)arg1;
while (flags[idx] != N);
assert(as[idx] == N);
printf("a = %d\n", as[idx]);
return NULL;
}
void test(int idx) {
printf("begin test %d\n", idx);
int n = N;
pthread_t threads[n + 1];
for (NSInteger i = 0; i < n; i++) {
int *copyIdx = calloc(1, 4);
*copyIdx = idx;
pthread_create(threads + i, NULL, &pthread_add, (void *)copyIdx);
}
for (NSInteger i = 0; i < n; i++) {
pthread_detach(threads[i]);
}
pthread_create(threads + n, NULL, (void *)pthread_end, (void *)(&idx));
pthread_detach(threads[n]);
}
int main(int argc, char * argv[]) {
printf("atom_add at %p\n", atom_add);
int round = 0;
while (true) {
test(round++);
}
// omit codes...
}
複製代碼
兩種方式實現原子加一的彙編代碼,須要添加Compile Flag: -march=armv8.1-a
。
; exclusive.s
.section __TEXT,__text, regular, pure_instructions
.p2align 2
.global _atom_add, _cas_add
_atom_add:
mov x9, x0
ldxr w0, [x9]
add w0, w0, #1
stxr w8, w0, [x9]
cbz w8, atom_add_done
mov x0, x9
b _atom_add
atom_add_done:
ret
_cas_add:
mov x9, x0
ldr w10, [x9]
mov w11, w10 ; w11 is used to check cas status
add w10, w10, #1
cas w11, w10, [x9]
cmp w10, w11 ; if cas succeed, w11 = new value in memory = w10
b.ne _cas_add
mov w0, w10
ret
複製代碼