__sync_fetch_and_add

最近在公司離職的前輩寫的代碼哪裏看到了__sync_fetch_and_add這個東東.比較好奇.找些資料學習學習html

http://www.lxway.com/4091061956.htmlinux

 http://www.cnblogs.com/FrankTan/archive/2010/12/11/1903377.htmlios

可以使用的環境: gcc.version > 4.1.2c++

做用:提供多線程下變量的加減和邏輯運算的原子操做程序員

 正文以下:算法

最近編碼須要實現多線程環境下的計數器操做,統計相關事件的次數。下面是一些學習心得和體會。不敢妄稱原創,基本是學習筆記。遇到相關的引用,我會致謝。
    固然咱們知道,count++這種操做不是原子的。一個自加操做,本質是分紅三步的:
     1 從緩存取到寄存器
     2 在寄存器加1
     3 存入緩存。編程

mov eax,dword ptr [a]
add eax,1
mov dword ptr [a],eax


    因爲時序的因素,多個線程操做同一個全局變量,會出現問題。這也是併發編程的難點。在目前多核條件下,這種困境會愈來愈彰顯出來。
    最簡單的處理辦法就是加鎖保護,這也是我最初的解決方案。看下面的代碼:緩存

1       pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;
2 
3       pthread_mutex_lock(&count_lock);
4       global_int++;
5       pthread_mutex_unlock(&count_lock);
linux 變量 : pthread_mutex_t
linux 函數 : pthread_mutex_lock; pthread_mutex_unlock

    後來在網上查找資料,找到了__sync_fetch_and_add系列的命令安全

     __sync_fetch_and_add系列一共有十二個函數,有加/減/與/或/異或/等函數的原子性操做函數,數據結構

__snyc_fetch_and_add : 先fetch而後自加,返回的是自加之前的值
__snyc_add_and_fetch : 先自加而後返回,返回的是自加之後的值 (參照 ++i 和 i++)


__snyc_fetch_and_add的一個簡單使用
1
int count = 4; 2 __sync_fetch_and_add(&count, 1); // __sync_fetch_and_add(&count, 1) == 4 3 cout<<count<<endl; //--->count=5

    對於多線程對全局變量進行自加,咱們就不再用理線程鎖了

下面這行代碼,和上面被pthread_mutex保護的那行代碼做用是同樣的,並且也是線程安全的。

__sync_fetch_and_add( &global_int, 1 );

    下面是這羣函數的全家福,你們看名字就知道是這些函數是幹啥的了。

 1 //在用gcc編譯的時候要加上選項 -march=i686
 2 type __sync_fetch_and_add (type *ptr, type value, ...);
 3 type __sync_fetch_and_sub (type *ptr, type value, ...);
 4 type __sync_fetch_and_or (type *ptr, type value, ...);
 5 type __sync_fetch_and_and (type *ptr, type value, ...);
 6 type __sync_fetch_and_xor (type *ptr, type value, ...);
 7 type __sync_fetch_and_nand (type *ptr, type value, ...);
 8 type __sync_add_and_fetch (type *ptr, type value, ...);
 9 type __sync_sub_and_fetch (type *ptr, type value, ...);
10 type __sync_or_and_fetch (type *ptr, type value, ...);
11 type __sync_and_and_fetch (type *ptr, type value, ...);
12 type __sync_xor_and_fetch (type *ptr, type value, ...);
13 type __sync_nand_and_fetch (type *ptr, type value, ...);

__sync_fetch_and_add,速度是線程鎖的6~7倍

type能夠是1,2,3或者8字節長度的int類型,即

 

 1 int8_t    
 2 uint8_t
 3 
 4 int16_t
 5 uint16_t
 6 
 7 int32_t
 8 uint32_t
 9 
10 int64_t
11 uint64_t

 

後面的可擴展參數(...)用來指出哪些變量須要memory barrier,由於目前gcc實現的是full barrier(相似於linux kernel 中的mb(),表示這個操做以前的全部內存操做不會被重排序到這個操做以後),因此能夠略掉這個參數。

 

恩.再找個帖子學習學習.http://blog.csdn.net/hzhsan/article/details/25124901

有一個概念叫過無鎖化編程, 知道linux支持的哪些操做是具備原子特性的是理解和設計無鎖化編程算法的基礎

除了上面提到的12個外 還有4個能夠實現互斥鎖的功能

//如下兩個函數提供原子的比較和交換, 若是*ptr = oldValue, 就將newValue寫入*ptr
//第一個函數在相等並寫入的狀況下返回true
//第二個函數返回操做以前的值

bool __sync_bool_compare_and_swap(type* ptr, type oldValue, type newValue, ....);

type __sync_val_compare_and_swap(type* ptr, type oldValue, type newValue, ....);

//將*ptr設爲value並返回*ptr操做以前的值
type __sync_lock_test_and_set(type *ptr, type value, ....);

//置*ptr爲0
void __sync_lock_release(type* ptr, ....);
 1 __sync_synchronize(...)
 2 
 3 //做用 : 發出一個full barrier
 4 /*關於memory barrier,cpu會對咱們的指令進行排序,通常說來會提升程序的效率,但有時候可能形成咱們不但願獲得的結果,舉一個例子,好比咱們有一個硬件設備,它有4個寄存器,當你發出一個操做指令的時候,一個寄存器存的是你的操做指令(好比READ),兩個寄存器存的是參數(好比是地址和size),最後一個寄存器是控制寄存器,在全部的參數都設置好以後向其發出指令,設備開始讀取參數,執行命令,程序可能以下:*/
 5 write1(dev.register_size, size);
 6 write1(dev.register_addr, addr);
 7 write1(dev.register_cmd, Read);
 8 write1(dev.register_control, GO);
 9 /*若是最後一條write1被換到了前幾條語句以前,那麼確定不是咱們所指望的,這時候咱們能夠在最後一條語句以前加入一個memory barrier,強制cpu執行完前面的寫入之後再執行最後一條:*/
10 write1(dev.register_size, size);
11 write1(dev.register_addr, addr);
12 write1(dev.register_cmd, Read);
13 __sync_synchronize();
14 write1(dev.register_control, GO);
15 
16 //memory barrier有幾種類型:
17 //acquire barrier : 不容許將barrier以後的內存讀取指令移到barrier以前(linux kernel中的wmb())
18 //release barrier : 不容許將barrier以前的內存讀取指令移到barrier以後 (linux kernel中的rmb())
19 //full barrier    : 以上兩種barrier的合集(linux kernel中的mb())
20 
21 //好吧,說實話這個函數的說明基本沒看懂

 

最後從網上找一個代碼寫一寫:http://blog.csdn.net/hzhsan/article/details/25837189

測試場景:假設有一個應用:如今有一個全局變量,用來計數,再建立10個線程併發執行,每一個線程中循環對這個全局變量進行++操做(i++),循環加2000000次。

因此很容易知道,這必然會涉及到併發互斥操做。下面經過三種方式[傳統互斥量加鎖方式, no lock不加鎖的方式, 原子函數方式]來實現這種併發操做。並對比出其在效率上的不一樣之處。

這裏先貼上代碼,共5個文件:2個用於作時間統計的文件:timer.h  timer.cpp。這兩個文件是臨時封裝的,只用來計時,能夠沒必要細看。

 1 //timer.h 用於計時
 2 
 3 #ifndef TIMER_H_
 4 #define TIMER_H_
 5 
 6 #include <sys/time.h>
 7 
 8 class Timer
 9 {
10     public:
11         Timer();
12         Timer(const Timer& t) = delete;
13         ~Timer();
14 
15         void start();
16         void stop();
17         void reset();
18 
19         double costTime();
20 
21     private: 
22         struct timeval t1;
23         struct timeval t2;
24         bool b1, b2;
25 };
//timer.cpp 
1
#include "timer.h" 2 #include <iostream> 3 4 using namespace std; 5 6 Timer::Timer():b1(false), b2(false) 7 { 8 } 9 Timer::~Timer() 10 { 11 } 12 void Timer::start() 13 { 14 gettimeofday(&t1, NULL); 15 b1 = true; 16 b2 = false; 17 } 18 void Timer::stop() 19 { 20 gettimeofday(&t2, NULL); 21 b2 = true; 22 } 23 void Timer::reset() 24 { 25 b1 = false; 26 b2 = false; 27 } 28 double Timer::costTime() 29 { 30 if (!b1) 31 { 32 cout<<"error, do not call function start()"<<endl; 33 cout<<"the right sequence : start() ..... stop() costTime()"<<endl; 34 35 return 0; 36 } 37 38 if (!b2) 39 { 40 cout<<"error, do not call function stop()"<<endl; 41 cout<<"the right sequence : start() ..... stop() costTime()"<<endl; 42 return 0; 43 } 44 45 size_t sec = t2.tv_sec - t1.tv_sec; 46 double usec = t2.tv_usec - t1.tv_usec; 47 48 if (sec < 0) 49 { 50 cout<<"error, call stop() before start()"<<endl; 51 cout<<"the right sequence : start() ..... stop() costTime()"<<endl; 52 return 0; 53 } 54 55 if (usec < 0) 56 { 57 usec += 1000000; 58 --sec; 59 if (sec < 0) 60 { 61 cout<<"error, call stop() before start()"<<endl; 62 cout<<"the right sequence : start() ..... stop() costTime()"<<endl; 63 return 0; 64 } 65 } 66 67 return sec + usec * 1.0 / 1000000; 68 }

 

1 //thread_function.h  -->多線程要調用的函數
2 #ifndef THREAD_FUNCTION_H_
3 #define THREAD_FUNCTION_H_
4 void* thread_lock_execFunc(void* arg);
5 void* thread_nolock_execFunc(void* arg);
6 void* thread_atom_execFunc(void* arg);
7 #endif

 

 1 //thread_function.cpp
 2 #include "thread_function.h"
 3 #include "lock.h"
 4 #include <pthread.h>
 5 #include <unistd.h>
 6 
 7 extern volatile int count;
 8 struct LOCK;
 9 
10 void* thread_lock_execFunc(void* arg)
11 {
12 
13 
14     for (int i = 0; i < 2000000; ++i)
15     {
16         pthread_mutex_lock(reinterpret_cast<pthread_mutex_t*>(arg));
17         ++count;
18         pthread_mutex_unlock(reinterpret_cast<pthread_mutex_t*>(arg));
19     }
20 
21     return NULL;
22 }
23 
24 void* thread_nolock_execFunc(void* arg)
25 {
26     LOCK* pLock = reinterpret_cast<LOCK*>(arg);
27     for (int i = 0; i < 2000000; ++i)
28     {
29         while(!(__sync_bool_compare_and_swap(&(pLock->mutex), pLock->use, 1)))
30         {
31             usleep(100000);
32         }
33         ++count;
34         __sync_bool_compare_and_swap(&(pLock->mutex), pLock->unUse, 0);
35     }
36     return NULL;
37 }
38 
39 void* thread_atom_execFunc(void* arg)
40 {
41     for (int i = 0; i < 2000000; ++i)
42     {
43         __sync_fetch_and_add(&count, 1);
44     }
45 
46     return NULL;
47 }

 

 1 //lock.h --->給mainnolock.cpp使用的類
 2 #ifndef LOCK_H_
 3 #define LOCK_H_
 4 struct LOCK
 5 {
 6     int mutex;
 7     int use;
 8     int unUse;
 9     LOCK() : mutex(0), use(0), unUse(1)
10     {
11     }
12 };
13 #endif
 1 //mainlock.cpp  使用mutex加鎖方式的多線程
 2 #include <iostream>
 3 #include <pthread.h>
 4 #include <iomanip>
 5 
 6 #include "timer.h"
 7 #include "thread_function.h"
 8 
 9 using namespace std;
10 
11 pthread_mutex_t mutex_lock;
12 volatile int count = 0;
13 
14 int main( int argc, char** argv)
15 {
16     pthread_mutex_init(&mutex_lock, NULL);
17 
18     Timer timer;
19     timer.start();
20     
21     /*test thread begin*/
22     pthread_t thread_ids[10];
23     
24     for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
25     {
26         pthread_create(&thread_ids[i], NULL, thread_lock_execFunc, &mutex_lock);
27     }
28     
29     for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
30     {
31         pthread_join(thread_ids[i], NULL);
32     }
33     /*test thread end*/
34 
35     timer.stop();
36     cout<<setiosflags(ios::fixed)<<setprecision(4)<<"lock cost["<<timer.costTime()<<"]second"<<endl;
37     return 0;
38 }
 1 //main_nolock.cpp 使用__sync_compare_and_swap的多線程
 2 #include <iostream>
 3 #include <pthread.h>
 4 #include <unistd.h>
 5 #include <iomanip>
 6 #include "timer.h"
 7 #include "thread_function.h"
 8 #include "lock.h"
 9 
10 using namespace std;
11 
12 volatile int count = 0;
13 
14 int main(int argc, char** argv)
15 {
16     LOCK lock;
17 
18     Timer timer;
19     timer.start();
20 
21     /*test thread begin*/
22     pthread_t thread_ids[10];
23     for (int i = 0; i < sizeof(thread_ids) / sizeof(pthread_t); ++i)
24     {
25         pthread_create(&thread_ids[i], NULL, thread_nolock_execFunc, &lock);
26     }
27 
28     for (int i = 0; i < sizeof(thread_ids) / sizeof(pthread_t); ++i)
29     {
30         pthread_join(thread_ids[i], NULL);
31     }
32     /*test thread end*/
33     
34     timer.stop();
35     cout<<setiosflags(ios::fixed)<<setprecision(4)<<"nolock cost["<<timer.costTime()<<"]\n";    
36     return 0;
37 }
 1 //main_atomic.cpp 使用__sync_fetch_and_add的多線程
 2 #include<iostream>
 3 #include<pthread.h>
 4 #include<unistd.h>
 5 #include<iomanip>
 6 #include "timer.h"
 7 #include "thread_function.h"
 8 
 9 using namespace std;
10 
11 volatile int count = 0;
12 
13 int main(int argc, char** argv)
14 {
15     Timer timer;
16     timer.start();
17 
18     /*pthread begin*/
19     pthread_t thread_ids[10];
20 
21     for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
22     {
23         pthread_create(&thread_ids[i], NULL, thread_atom_execFunc, NULL);
24     }
25 
26     for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
27     {
28         pthread_join(thread_ids[i], NULL);
29     }
30 
31     /*pthread end*/
32 
33     timer.stop();
34     cout<<setiosflags(ios::fixed)<<setprecision(4)<<"atomic cost["<<timer.costTime()<<"]\n";
35     return 0;
36 }
 1 //makefile
 2 
 3 CC = g++
 4 CFLAGS = -g -lpthread -std=c++11
 5 
 6 OBJS_LOCK = main_lock.o timer.o thread_function.o
 7 OBJS_UNLOCK = main_nolock.o timer.o thread_function.o
 8 OBJS_ATOMICLOCK = main_atomic.o timer.o thread_function.o
 9 
10 INC = timer.h thread_function.h lock.h
11 
12 lock : $(OBJS_LOCK) $(INC) 
13     $(CC) -o mainlock $(OBJS_LOCK) $(CFLAGS)
14     rm *.o
15 
16 nolock : $(OBJS_UNLOCK) $(INC)
17     $(CC) -o mainnolock $(OBJS_UNLOCK) $(CFLAGS)
18     rm *.o
19 
20 atomiclock : $(OBJS_ATOMICLOCK) $(INC)
21     $(CC) -o mainatomic $(OBJS_ATOMICLOCK) $(CFLAGS)
22 
23 main_lock.o : main_lock.cpp 
24     $(CC) -c main_lock.cpp $(CFLAGS)
25 
26 main_nolock.o : main_nolock.cpp 
27     $(CC) -c main_nolock.cpp $(CFLAGS)
28 
29 main_atomic.o : main_atomic.cpp
30     $(CC) -c  main_atomic.cpp $(CFLAGS)
31 
32 timer.o : timer.cpp 
33     $(CC) -c timer.cpp $(CFLAGS)
34 
35 thread_function.o : thread_function.cpp
36     $(CC) -c thread_function.cpp $(CFLAGS)
37 
38 clean:
39     rm *.o

執行makefile

make lock

make nolock

make atomiclock

而後生成3個可執行文件 

運行這3個可執行文件:

另外:針對main_nolock.cpp而言,做者提到了一個現象

在thread_function.cpp中, 隨着一下代碼的改變,運行時間會有變化

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )); 

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10);

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100);

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000);

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000);

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);

執行時間的關係是  :    T(;)<T(1)<T(10)<T(100)<T(1000)<T(10000)>T(100000)

 經過編程測試及測試得出結論:
一、若是是想用全局變量來作統計操做。而又不得不考慮多線程間的互斥訪問的話,最好使用編譯器支持的原子操做函數。再知足互斥訪問的前提下,編程最簡單,效率最高。

二、lock-free,無鎖編程方式確實可以比傳統加鎖方式效率高。因此在高併發程序中採用無鎖編程的方式能夠進一步提升程序效率。可是得對無鎖方式有足夠熟悉的瞭解,否則效率反而會更低並且容易出錯。(好比在某些狀況下main_nolock比main_lock的效率還要低)

在學習一個無鎖化編程的分析帖子 http://blog.csdn.net/hzhsan/article/details/25141421

Lock-free 算法一般比基於鎖的算法要好:

  • 從其定義來看,它們是 wait-free 的,能夠確保線程永遠不會阻塞。
  • 狀態轉變是原子性的,以致於在任何點失敗都不會惡化數據結構
  • 由於線程永遠不會阻塞,因此當同步的細粒度是單一原子寫或比較交換時,它們一般能夠帶來更高的吞吐量
  • 在某些狀況下,lock-free 算法會有更少的同步寫操做(好比 Interlocked 操做),所以純粹從性能來看,它可能更便宜

可是 lock-freedom 並非萬能藥。下面是一些很明顯的不利因素:

  • 樂觀的併發使用會對 hot data structures 致使 livelock。
  • 代碼須要大量困難的測試。一般其正確性取決於對目標機器內存模型的正確解釋。
  • 基於衆多緣由,lock-free 代碼很難編寫和維護
無鎖編程與分佈式編程那個更適合多核CPU?
無鎖編程主要是使用原子操做替代鎖來實現對共享資源的訪問保護,舉個例子,要對某個整數變量進行加1操做的話,用鎖保護操做的代碼以下:
int a = 0;
Lock();
a+= 1;
Unlock();
若是對上述代碼反編譯能夠發現 a+=1;被翻譯成了如下三條彙編指令:
mov eax,dword ptr [a]
add eax,1
mov dword ptr [a],eax
若是在單核系統中,因爲在上述三條指令的任何一條執行完後均可能發生任務切換,好比執行完第1條指令後就發生了任務切換,這時若是有其餘任務來對a進行操做的話,當任務切換回來後,將繼續對a進行操做,極可能出現不可預測的結果,所以上述三條指令必須使用鎖來保護,以使這段時間內其餘任務沒法對a進行操做。
須要注意的是 ,在多核系統中,由於多個CPU核在物理上是並行的,可能發生同時寫的現象;因此必須保證一個CPU核在對共享內存進行寫操做時,其餘CPU核不能寫這塊內存。所以在多核系統中和單核有區別,即便只有一條指令,也須要要加鎖保護。
若是使用原子操做來實現上述加1操做的話,例如使用VC裏的InterlockedIncrement來操做的話,那麼對a的加1操做須要如下語句
InterlockedIncrement (&a);
這條語句最終的實際加1操做會被翻譯成如下一條帶lock前綴的彙編指令:
lock xadd dword ptr [ecx],eax
使用原子操做時,在進行實際的寫操做時,使用了lock指令,這樣就能夠阻止其餘任務寫這塊內存,避免出現數據競爭現象。原子操做速度比鎖快,通常要快一倍以上。
使用lock前綴的指令實際上在系統中是使用了內存柵障(memory barrier),當原子操做在進行時,其餘任務都不能對內存操做,會影響其餘任務的執行。所以這種原子操做實際上屬於一種激烈競爭的鎖,不過因爲它的操做時間很快,所以能夠當作是一種極細粒度鎖。
在無鎖(Lock-free)編程環境中,主要使用的原子操做爲 CAS(Compare and Swap)操做,在VC裏對應的操做爲InterlockedCompareExchange或者InterlockedCompareExchangeAcquire;若是是64位的操做,須要使用InterlockedCompareExchange64或者InterlockedCompareExchangeAcquire64。 使用這種原子操做替代鎖的最大的一個好處是它是非阻塞的。
 

 

 
比較項目
無鎖編程
分佈式編程
1
加速比性能
取決於競爭方式,除非也採用分佈式競爭,不然不如分佈式鎖競爭的性能
加速比和CPU核數成正比關係,接近於單核多任務時的性能
2
實現的功能
有限
不受限制
3
程序員掌握難易程度
難度過高,過於複雜,普通程序員沒法掌握,目前世界上只有少數幾我的掌握。
和單核時代的數據結構算法難度差很少,普通程序員能夠掌握
4
現有軟件的移植
使用無鎖算法後,以往的算法須要廢棄掉,沒法複用
能夠繼承已有的算法,在已有程序基礎上重構便可。

 

 
從上表的四個方面的綜合比較能夠看出, 無鎖編程的實用價值是遠遠不如分佈式編程的,所以分佈式編程比無鎖編程更適合多核CPU系統

 

可在分佈計算機系統的幾臺計算機上同時協調執行的程序設計方法,分佈式程序設計的主要特徵是分佈和通訊。採用分佈式程序設計方法設計程序時,一個程序由若干個可獨立執行的程序模塊組成。這些程序模塊分佈於一個分佈式計算機系統的幾臺計算機上同時執行。分佈在各臺計算機上的程序模塊是相互關聯的,它們在執行中須要交換數據,即通訊。只有經過通訊,各程序模塊才能協調地完成一個共同的計算任務。採用分佈式程序設計方法解決計算問題時,必須提供用以進行分佈式程序設計的語言和設計相應的分佈式算法。分佈式程序設計語言與經常使用的各類程序設計語言的主要區別,在於它具備程序分佈和通訊的功能。所以,分佈式程序設計語言,每每能夠由一種程序設計語言增長分佈和通訊的功能而構成。分佈式算法和適用於多處理器系統的並行算法,都具備並行執行的特色,但它們是有區別的。設計分佈式算法時,必須保證明現算法的各程序模塊間不會有公共變量,它們只能經過通訊來交換數據。此外,設計分佈式算法時,每每須要考慮堅決性,即當系統中幾臺計算機失效時,算法還是有效的。

本站公眾號
   歡迎關注本站公眾號,獲取更多信息