C語言的原子操做

###gcc內建函數 內建gcc在4.0.1版本後就經過其內建函數支持原子操做。在這以前編程真必需要經過參考各類cpu的指令操做手冊,用其彙編指令編寫原子操做。而gcc經過內建函數屏蔽了這些差別。gcc支持以下原子操做:編程

#if (GCC_VERSION >= 40100)
/* 內存訪問柵 */
  #define barrier()             	(__sync_synchronize())
/* 原子獲取 */
  #define AO_GET(ptr)       		({ __typeof__(*(ptr)) volatile *_val = (ptr); barrier(); (*_val); })
/*原子設置,若是原值和新值不同則設置*/
  #define AO_SET(ptr, value)        ((void)__sync_lock_test_and_set((ptr), (value)))
/* 原子交換,若是被設置,則返回舊值,不然返回設置值 */
  #define AO_SWAP(ptr, value)       ((__typeof__(*(ptr)))__sync_lock_test_and_set((ptr), (value)))
/* 原子比較交換,若是當前值等於舊值,則新值被設置,返回舊值,不然返回新值*/
  #define AO_CAS(ptr, comp, value)  ((__typeof__(*(ptr)))__sync_val_compare_and_swap((ptr), (comp), (value)))
/* 原子比較交換,若是當前值等於舊指,則新值被設置,返回真值,不然返回假 */
  #define AO_CASB(ptr, comp, value) (__sync_bool_compare_and_swap((ptr), (comp), (value)) != 0 ? true : false)
/* 原子清零 */
  #define AO_CLEAR(ptr)             ((void)__sync_lock_release((ptr)))
/* 經過值與舊值進行算術與位操做,返回新值 */
  #define AO_ADD_F(ptr, value)      ((__typeof__(*(ptr)))__sync_add_and_fetch((ptr), (value)))
  #define AO_SUB_F(ptr, value)      ((__typeof__(*(ptr)))__sync_sub_and_fetch((ptr), (value)))
  #define AO_OR_F(ptr, value)       ((__typeof__(*(ptr)))__sync_or_and_fetch((ptr), (value)))
  #define AO_AND_F(ptr, value)      ((__typeof__(*(ptr)))__sync_and_and_fetch((ptr), (value)))
  #define AO_XOR_F(ptr, value)      ((__typeof__(*(ptr)))__sync_xor_and_fetch((ptr), (value)))
/* 經過值與舊值進行算術與位操做,返回舊值 */
  #define AO_F_ADD(ptr, value)      ((__typeof__(*(ptr)))__sync_fetch_and_add((ptr), (value)))
  #define AO_F_SUB(ptr, value)      ((__typeof__(*(ptr)))__sync_fetch_and_sub((ptr), (value)))
  #define AO_F_OR(ptr, value)       ((__typeof__(*(ptr)))__sync_fetch_and_or((ptr), (value)))
  #define AO_F_AND(ptr, value)      ((__typeof__(*(ptr)))__sync_fetch_and_and((ptr), (value)))
  #define AO_F_XOR(ptr, value)      ((__typeof__(*(ptr)))__sync_fetch_and_xor((ptr), (value)))
#else
  #error "can not supported atomic operation by gcc(v4.0.0+) buildin function."
#endif	/* if (GCC_VERSION >= 40100) */
/* 忽略返回值,算術和位操做 */
#define AO_INC(ptr)                 ((void)AO_ADD_F((ptr), 1))
#define AO_DEC(ptr)                 ((void)AO_SUB_F((ptr), 1))
#define AO_ADD(ptr, val)            ((void)AO_ADD_F((ptr), (val)))
#define AO_SUB(ptr, val)            ((void)AO_SUB_F((ptr), (val)))
#define AO_OR(ptr, val)			 ((void)AO_OR_F((ptr), (val)))
#define AO_AND(ptr, val)			((void)AO_AND_F((ptr), (val)))
#define AO_XOR(ptr, val)			((void)AO_XOR_F((ptr), (val)))
/* 經過掩碼,設置某個位爲1,並返還新的值 */
#define AO_BIT_ON(ptr, mask)        AO_OR_F((ptr), (mask))
/* 經過掩碼,設置某個位爲0,並返還新的值 */
#define AO_BIT_OFF(ptr, mask)       AO_AND_F((ptr), ~(mask))
/* 經過掩碼,交換某個位,1變0,0變1,並返還新的值 */
#define AO_BIT_XCHG(ptr, mask)      AO_XOR_F((ptr), (mask))

###普通匯編指令 以加法指令操做實現 x = x + n爲例 ,gcc編譯出來的彙編形式上以下:多線程

...
movl 0xc(%ebp), %eax
addl $n, %eax
movl %eax, 0xc(%ebp)
...

能夠看出,實現這條c語句,須要先將x所在內存0xc(%ebp)中的值裝載到寄存器%eax中,而後用addl指令進行與一個當即數$n進行加操做,以後再寄存器中的結果裝載回原內存中。若是在時序上又另外一個線程也操做該內存中的值,且在指令addl $n, %eax完成以後,時間片切換到了另外一個線程中,該線程進行了該內存的修改操做,並且還會在後續的操做中使用,這個時候發生又發生時間片切換,切回到原線程中,進行movl %eax, 0xc(%ebp)指令覆蓋了前一個線程修改內容,若是在這時再切換到另外一個線程中,該線程就會使用到一個錯誤的值進行後續的操做。 ###gcc原子彙編指令 仍然以加法指令操做實現 x = x + n爲例 ,gcc編譯出來的原子彙編形式上以下:函數

...
mov    $0x1,%eax
lock   xadd %eax,-0x4(%rbp)
mov    %eax,-0x4(%rbp)
...

gcc的原子操做是內建函數經過彙編實現的,統一命名以__sync_xxx()起頭,原子操做作了什麼事情呢?原子操做的原理都是經過彙編指令lock在各類xaddcmpxchgxchg指令前進行鎖定操做內存的總線,並將上述的普通3條指令的操做合併爲一條操做,由於內存與cpu都是經過總線進行數據交換,因此即便其它cpu核也同時(真正意義上的多線程,而不是單核上的時間片切換)要對該內存的存取,也要等待。(由於我不是低層開發人員,因此具體時序和動做我不是太瞭解,只能以應用層的鎖動做理解這裏的總線鎖,若是你瞭解,請更正),而被鎖總線的單核應該不會進行時間片切換,直到該指令完成。 ###優化帶來語句倒置 除了多線程操做同一個內存時會發生數據的一致性錯誤,由於編譯器的優化問題也會形成數據一致性問題。若是你的原意要進行以下的操做:fetch

int a = 0;
int b = 0;
void A() {
    a = 1; 
    b = 2;
}
void B() {
    if (b > 0)
        printf("a :%d\n", a);
}

那麼通過編譯器的優化,A()中的兩條複製語句可能被調換順序,若是兩個線程分別同時執行A()和B(),那麼由於這個緣由,B()可能輸出1,也可能輸出0;解決方法是讓a = 1必定在b = 2執行,那麼在二者之間插入內存柵欄__sync_synchronize()能夠保證前後次序。(由於我對這樣的優化發生狀況不是很明瞭,故這裏不能詳細的描述這樣的優化對同線程產生的影響) ###volatile關鍵字與原子 原子操做的內存,要保證其內容已定是存取最新的,而不是cache中的數據,因此要用volatile關鍵字代表,這樣每次存取cpu直接存取內存,而非cache中的數據,咱們定義一個原子類型:優化

#ifndef AO_T
typedef volatile long AO_T;
#endif

##原子操做與普通C語句的等效操做

這裏用上面定義的宏說明原子操做,等效的C語言非原子的操做爲了保證一致性,咱們使用lock()unlock這個僞語句表示鎖的加鎖和解鎖。固然原子操做要比應用層加鎖快了太多太多。ui

內存柵欄使用

int a = 0;
barrier();
int b = 2;

保證a的複製在b的複製前執行atom

原子獲取

int a = 5;
int b = AO_GET(&a); //b==5;
int a = 5;
lock();
int b = a; 
unlock();

保證讀取a的值是內存中的值,而不是寄存器或cache中的值 ###原子設置線程

int a = 0;
AO_SET(&a, 10); //a==10;
int a = 0;
lock();
a = 10;
unlock();

###原子交換設計

int a = 10;
AO_SWAP(&a, 9);
int a = 10;
lock();
if (a != 9)
    a = 9;
unlock();

###原子比較交換code

int a = 10;
int b = AO_CAS(&a, 10, 9); //b==10, a==9;
int c = AO_CAS(&a, 9, 8); //c==8, a==10;
int a = 10;
int b = 0;
int c = 0;
lock();
if (a == 10) {
    b = a;
    a = 9;
} else {
    b = 10;
}
unlock();
lock();
if (a == 9) {
    b = a;
    a = 8;
} else {
    b = 9;
}
unlock();

AO_CASB()的邏輯與AO_CAS()一致,只是返還一個真假值判斷是否發生了交換,就再也不贅訴了。 ###原子清零

int a = 10;
AO_CLEAR(&a); //a==0;
int a = 10;
lock();
a = 0;
unlock();

###先操做後使用的加減運算和邏輯運算

  • 先加一個數,再使用和值 AO_xxx_F()中的F表示fetch提取的意思
int a = 1;
int b = AO_ADD_F(&a, 10);//a==11, b==11
int a = 1;
int b = 0;
lock();
a += 10;
b = a;
unlock();
  • 其它的運算(減,或,與,異或)與加法操做邏輯同樣,就再也不贅訴了 ###先使用後操做的加減運算與邏輯運算
  • 使用原值,後加上一個數
int a = 1;
int b = AO_F_ADD(&a, 10);//a==11, b==1
int a = 1;
int b = 0;
lock();
b = a;
a += 10;
unlock();

##什麼時候使用原子操做最合適

原子操做最合適用來管理狀態,並且最好是程序發現狀態不符合本身要求是,能夠忽略這個錯誤,繼續運行,或稍後在此嘗試。好比咱們使用一個local static變量存儲當前系統有多少個cpu核,以備給出一些策略,好比之後咱們要實現的自旋鎖中的休眠。代碼以下:

long GetCPUCores()
{
    static long g_CPUCores = 0;
    long gcpus = -1;
    /*原子獲取,若是沒有設置過,則繼續,不然返回這個值*/
    if (likely((gcpus = AO_GET(&g_CPUCores)) != -1)) {
        return gcpus;
    }
    gcpus = sysconf(_SC_NPROCESSORS_CONF);
    if (unlikely(gcpus < 0)) {
        printf("Get number of CPU failure : %s", strerror(errno));
        abort();
    }
    /*原子設置*/
    AO_SET(&g_CPUCores, gcpus);
    return gcpus;
}

若是有多個線程同時調用,或單個線程屢次調用,咱們均可以保證g_CPUCores中數據的有效性,不會出現獲取到一個大於0到假值致使後續的邏輯錯誤。並且這樣的設計,還能夠提升效率,若是獲取的系統參數是一個像

#ifdef __APPLE__
	gtid = syscall(SYS_thread_selfid);
#else
	gtid = syscall(SYS_gettid);
#endif

的真正的系統調用,那麼在結果固定的狀況下,代價是昂貴的,由於程序必需要發起中斷服務,切換到內核空間調用代碼爲SYS_thread_selfid SYS_gettid 的中段服務,從而獲得線程ID(線程是一個輕量級的進程,只不過它的堆空間與其它線程共享,而不是進程那樣是彼此獨立的,我之後會在此細談這個ID值的運用)。

##使用原子操做

改進上一篇文章中說起的結構魔數操做

上一節咱們說過,使用帶魔數字段結構的函數經過判斷、修改魔數作出相應的操做,試想若是兩個線程同時操做魔數字段,確定會帶來衝突,因此咱們將其對應的非原子操做,改成原子操做,代碼以下:

/*
 * 魔數
 * 結構體中設置一個magic的成員變量,已檢查結構體是否被正確初始化
 */
#if !defined(OBJMAGIC)
  #define OBJMAGIC (0xfedcba98)
#endif

/*原子的設置魔數*/
#undef REFOBJ
#define REFOBJ(obj)						     \
	({							     \
		int _old = 0;					     \
		bool _ret = false;				     \
		if (likely((obj))) {				     \
			_old = AO_SWAP(&(obj)->magic, OBJMAGIC); \
		}						     \
		_ret = (_old == OBJMAGIC ? false : true);	     \
		_ret;						     \
	})

/*原子的重置魔數*/
#undef UNREFOBJ
#define UNREFOBJ(obj)							\
	({								\
		bool _ret = false;					\
		if (likely((obj))) {					\
			_ret = AO_CASB(&(obj)->magic, OBJMAGIC, 0);	\
		}							\
		_ret;							\
	})

/*原子的驗證魔數*/
#undef ISOBJ
#define ISOBJ(obj) ((obj) && AO_GET(&(obj)->magic) == OBJMAGIC)

/*斷言魔數*/
#undef ASSERTOBJ
#define ASSERTOBJ(obj) (assert(ISOBJ((obj))))

其實這樣的運用也不能100%的保證多線程下數據的一致性,好比兩個線程A和B,同時在操做一個結構體T: ###原子操做

  1. 初始化操做,initT():
void initT(T *t) {
    REFOBJ(t);
    //other initial operate
}
  1. 處理數據操做,dealT():
void dealT(T *t) {
    if(!ISOBJ(t)) return;
    //other deal operate
}
  1. 銷燬數據,destroyT():
void destroy(T *t) {
    if(!UNREF(t)) return;
    //other destroy
}

###原子操做與時序

  • 考慮下列時序:
  1. A剛完成initT ()中的REFOBJ ()語句,將要真正的初始化T的其它的字段,這時切換到B;
  2. B也調用initT ()中的REFOBJ ()語句,發現結構體的初始化標誌已經設置了,則返回並切換到A;
  3. A開始真正的初始化相關字段,在未處理完成時有切回了B;
  4. B調用dealT()開始處理其它的字段了,結果固然是全處理的是髒數據。
  • 再考慮下列時序:
  1. A和B都使用T交錯的操做了T一段時間,A和B都想銷燬T持有的數據而調用destroy()
  2. 假設A先進入destroy(),而後在UNREF ()調用以前切換到B;
  3. B進入destroy(), 併成功的調用UNREF ()
  4. 在後續的操做中,不論什麼時候發生切換都不會形成數據重複銷燬。

上面狀況出現的根本緣由就是原子操做不原子,由於你企圖使用這樣的原子操做,進行非原子的多步驟的字段初始化操做,這是不會成功的。因此在你使用原子操做時,必定要考慮線程切換帶來的時序問題和你的原子操做能不能使你的操做原子的進行。

##下一步咱們作什麼

咱們將使用原子操做實現一個原子鎖,並說明什麼狀況下應該使用原子鎖,什麼狀況下不該該使用原子鎖。敬請期待哦。

相關文章
相關標籤/搜索