最近在公司離職的前輩寫的代碼哪裏看到了__sync_fetch_and_add這個東東.比較好奇.找些資料學習學習html
http://www.lxway.com/4091061956.htmlinux
http://www.cnblogs.com/FrankTan/archive/2010/12/11/1903377.htmlios
可以使用的環境: gcc.version > 4.1.2c++
做用:提供多線程下變量的加減和邏輯運算的原子操做程序員
正文以下:算法
最近編碼須要實現多線程環境下的計數器操做,統計相關事件的次數。下面是一些學習心得和體會。不敢妄稱原創,基本是學習筆記。遇到相關的引用,我會致謝。
固然咱們知道,count++這種操做不是原子的。一個自加操做,本質是分紅三步的:
1 從緩存取到寄存器
2 在寄存器加1
3 存入緩存。編程
因爲時序的因素,多個線程操做同一個全局變量,會出現問題。這也是併發編程的難點。在目前多核條件下,這種困境會愈來愈彰顯出來。
最簡單的處理辦法就是加鎖保護,這也是我最初的解決方案。看下面的代碼:緩存
1 pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER; 2 3 pthread_mutex_lock(&count_lock); 4 global_int++; 5 pthread_mutex_unlock(&count_lock);
linux 變量 : pthread_mutex_t
linux 函數 : pthread_mutex_lock; pthread_mutex_unlock
後來在網上查找資料,找到了__sync_fetch_and_add系列的命令安全
__sync_fetch_and_add系列一共有十二個函數,有加/減/與/或/異或/等函數的原子性操做函數,數據結構
__snyc_fetch_and_add : 先fetch而後自加,返回的是自加之前的值
__snyc_add_and_fetch : 先自加而後返回,返回的是自加之後的值 (參照 ++i 和 i++)
__snyc_fetch_and_add的一個簡單使用
1 int count = 4; 2 __sync_fetch_and_add(&count, 1); // __sync_fetch_and_add(&count, 1) == 4 3 cout<<count<<endl; //--->count=5
對於多線程對全局變量進行自加,咱們就不再用理線程鎖了。
下面這行代碼,和上面被pthread_mutex保護的那行代碼做用是同樣的,並且也是線程安全的。
__sync_fetch_and_add( &global_int, 1 );
下面是這羣函數的全家福,你們看名字就知道是這些函數是幹啥的了。
1 //在用gcc編譯的時候要加上選項 -march=i686 2 type __sync_fetch_and_add (type *ptr, type value, ...); 3 type __sync_fetch_and_sub (type *ptr, type value, ...); 4 type __sync_fetch_and_or (type *ptr, type value, ...); 5 type __sync_fetch_and_and (type *ptr, type value, ...); 6 type __sync_fetch_and_xor (type *ptr, type value, ...); 7 type __sync_fetch_and_nand (type *ptr, type value, ...); 8 type __sync_add_and_fetch (type *ptr, type value, ...); 9 type __sync_sub_and_fetch (type *ptr, type value, ...); 10 type __sync_or_and_fetch (type *ptr, type value, ...); 11 type __sync_and_and_fetch (type *ptr, type value, ...); 12 type __sync_xor_and_fetch (type *ptr, type value, ...); 13 type __sync_nand_and_fetch (type *ptr, type value, ...);
__sync_fetch_and_add,速度是線程鎖的6~7倍
type能夠是1,2,3或者8字節長度的int類型,即
1 int8_t 2 uint8_t 3 4 int16_t 5 uint16_t 6 7 int32_t 8 uint32_t 9 10 int64_t 11 uint64_t
後面的可擴展參數(...)用來指出哪些變量須要memory barrier,由於目前gcc實現的是full barrier(相似於linux kernel 中的mb(),表示這個操做以前的全部內存操做不會被重排序到這個操做以後),因此能夠略掉這個參數。
恩.再找個帖子學習學習.http://blog.csdn.net/hzhsan/article/details/25124901
有一個概念叫過無鎖化編程, 知道linux支持的哪些操做是具備原子特性的是理解和設計無鎖化編程算法的基礎
除了上面提到的12個外 還有4個能夠實現互斥鎖的功能
//如下兩個函數提供原子的比較和交換, 若是*ptr = oldValue, 就將newValue寫入*ptr //第一個函數在相等並寫入的狀況下返回true //第二個函數返回操做以前的值 bool __sync_bool_compare_and_swap(type* ptr, type oldValue, type newValue, ....); type __sync_val_compare_and_swap(type* ptr, type oldValue, type newValue, ....); //將*ptr設爲value並返回*ptr操做以前的值 type __sync_lock_test_and_set(type *ptr, type value, ....); //置*ptr爲0 void __sync_lock_release(type* ptr, ....);
1 __sync_synchronize(...) 2 3 //做用 : 發出一個full barrier 4 /*關於memory barrier,cpu會對咱們的指令進行排序,通常說來會提升程序的效率,但有時候可能形成咱們不但願獲得的結果,舉一個例子,好比咱們有一個硬件設備,它有4個寄存器,當你發出一個操做指令的時候,一個寄存器存的是你的操做指令(好比READ),兩個寄存器存的是參數(好比是地址和size),最後一個寄存器是控制寄存器,在全部的參數都設置好以後向其發出指令,設備開始讀取參數,執行命令,程序可能以下:*/ 5 write1(dev.register_size, size); 6 write1(dev.register_addr, addr); 7 write1(dev.register_cmd, Read); 8 write1(dev.register_control, GO); 9 /*若是最後一條write1被換到了前幾條語句以前,那麼確定不是咱們所指望的,這時候咱們能夠在最後一條語句以前加入一個memory barrier,強制cpu執行完前面的寫入之後再執行最後一條:*/ 10 write1(dev.register_size, size); 11 write1(dev.register_addr, addr); 12 write1(dev.register_cmd, Read); 13 __sync_synchronize(); 14 write1(dev.register_control, GO); 15 16 //memory barrier有幾種類型: 17 //acquire barrier : 不容許將barrier以後的內存讀取指令移到barrier以前(linux kernel中的wmb()) 18 //release barrier : 不容許將barrier以前的內存讀取指令移到barrier以後 (linux kernel中的rmb()) 19 //full barrier : 以上兩種barrier的合集(linux kernel中的mb()) 20 21 //好吧,說實話這個函數的說明基本沒看懂
最後從網上找一個代碼寫一寫:http://blog.csdn.net/hzhsan/article/details/25837189
測試場景:假設有一個應用:如今有一個全局變量,用來計數,再建立10個線程併發執行,每一個線程中循環對這個全局變量進行++操做(i++),循環加2000000次。
因此很容易知道,這必然會涉及到併發互斥操做。下面經過三種方式[傳統互斥量加鎖方式, no lock不加鎖的方式, 原子函數方式]來實現這種併發操做。並對比出其在效率上的不一樣之處。
這裏先貼上代碼,共5個文件:2個用於作時間統計的文件:timer.h timer.cpp。這兩個文件是臨時封裝的,只用來計時,能夠沒必要細看。
1 //timer.h 用於計時 2 3 #ifndef TIMER_H_ 4 #define TIMER_H_ 5 6 #include <sys/time.h> 7 8 class Timer 9 { 10 public: 11 Timer(); 12 Timer(const Timer& t) = delete; 13 ~Timer(); 14 15 void start(); 16 void stop(); 17 void reset(); 18 19 double costTime(); 20 21 private: 22 struct timeval t1; 23 struct timeval t2; 24 bool b1, b2; 25 }; |
//timer.cpp
|
1 //thread_function.h -->多線程要調用的函數 2 #ifndef THREAD_FUNCTION_H_ 3 #define THREAD_FUNCTION_H_ 4 void* thread_lock_execFunc(void* arg); 5 void* thread_nolock_execFunc(void* arg); 6 void* thread_atom_execFunc(void* arg); 7 #endif
|
1 //thread_function.cpp 2 #include "thread_function.h" 3 #include "lock.h" 4 #include <pthread.h> 5 #include <unistd.h> 6 7 extern volatile int count; 8 struct LOCK; 9 10 void* thread_lock_execFunc(void* arg) 11 { 12 13 14 for (int i = 0; i < 2000000; ++i) 15 { 16 pthread_mutex_lock(reinterpret_cast<pthread_mutex_t*>(arg)); 17 ++count; 18 pthread_mutex_unlock(reinterpret_cast<pthread_mutex_t*>(arg)); 19 } 20 21 return NULL; 22 } 23 24 void* thread_nolock_execFunc(void* arg) 25 { 26 LOCK* pLock = reinterpret_cast<LOCK*>(arg); 27 for (int i = 0; i < 2000000; ++i) 28 { 29 while(!(__sync_bool_compare_and_swap(&(pLock->mutex), pLock->use, 1))) 30 { 31 usleep(100000); 32 } 33 ++count; 34 __sync_bool_compare_and_swap(&(pLock->mutex), pLock->unUse, 0); 35 } 36 return NULL; 37 } 38 39 void* thread_atom_execFunc(void* arg) 40 { 41 for (int i = 0; i < 2000000; ++i) 42 { 43 __sync_fetch_and_add(&count, 1); 44 } 45 46 return NULL; 47 }
|
1 //lock.h --->給mainnolock.cpp使用的類 2 #ifndef LOCK_H_ 3 #define LOCK_H_ 4 struct LOCK 5 { 6 int mutex; 7 int use; 8 int unUse; 9 LOCK() : mutex(0), use(0), unUse(1) 10 { 11 } 12 }; 13 #endif
1 //mainlock.cpp 使用mutex加鎖方式的多線程 2 #include <iostream> 3 #include <pthread.h> 4 #include <iomanip> 5 6 #include "timer.h" 7 #include "thread_function.h" 8 9 using namespace std; 10 11 pthread_mutex_t mutex_lock; 12 volatile int count = 0; 13 14 int main( int argc, char** argv) 15 { 16 pthread_mutex_init(&mutex_lock, NULL); 17 18 Timer timer; 19 timer.start(); 20 21 /*test thread begin*/ 22 pthread_t thread_ids[10]; 23 24 for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i) 25 { 26 pthread_create(&thread_ids[i], NULL, thread_lock_execFunc, &mutex_lock); 27 } 28 29 for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i) 30 { 31 pthread_join(thread_ids[i], NULL); 32 } 33 /*test thread end*/ 34 35 timer.stop(); 36 cout<<setiosflags(ios::fixed)<<setprecision(4)<<"lock cost["<<timer.costTime()<<"]second"<<endl; 37 return 0; 38 } |
1 //main_nolock.cpp 使用__sync_compare_and_swap的多線程 2 #include <iostream> 3 #include <pthread.h> 4 #include <unistd.h> 5 #include <iomanip> 6 #include "timer.h" 7 #include "thread_function.h" 8 #include "lock.h" 9 10 using namespace std; 11 12 volatile int count = 0; 13 14 int main(int argc, char** argv) 15 { 16 LOCK lock; 17 18 Timer timer; 19 timer.start(); 20 21 /*test thread begin*/ 22 pthread_t thread_ids[10]; 23 for (int i = 0; i < sizeof(thread_ids) / sizeof(pthread_t); ++i) 24 { 25 pthread_create(&thread_ids[i], NULL, thread_nolock_execFunc, &lock); 26 } 27 28 for (int i = 0; i < sizeof(thread_ids) / sizeof(pthread_t); ++i) 29 { 30 pthread_join(thread_ids[i], NULL); 31 } 32 /*test thread end*/ 33 34 timer.stop(); 35 cout<<setiosflags(ios::fixed)<<setprecision(4)<<"nolock cost["<<timer.costTime()<<"]\n"; 36 return 0; 37 } |
1 //main_atomic.cpp 使用__sync_fetch_and_add的多線程 2 #include<iostream> 3 #include<pthread.h> 4 #include<unistd.h> 5 #include<iomanip> 6 #include "timer.h" 7 #include "thread_function.h" 8 9 using namespace std; 10 11 volatile int count = 0; 12 13 int main(int argc, char** argv) 14 { 15 Timer timer; 16 timer.start(); 17 18 /*pthread begin*/ 19 pthread_t thread_ids[10]; 20 21 for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i) 22 { 23 pthread_create(&thread_ids[i], NULL, thread_atom_execFunc, NULL); 24 } 25 26 for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i) 27 { 28 pthread_join(thread_ids[i], NULL); 29 } 30 31 /*pthread end*/ 32 33 timer.stop(); 34 cout<<setiosflags(ios::fixed)<<setprecision(4)<<"atomic cost["<<timer.costTime()<<"]\n"; 35 return 0; 36 } |
1 //makefile 2 3 CC = g++ 4 CFLAGS = -g -lpthread -std=c++11 5 6 OBJS_LOCK = main_lock.o timer.o thread_function.o 7 OBJS_UNLOCK = main_nolock.o timer.o thread_function.o 8 OBJS_ATOMICLOCK = main_atomic.o timer.o thread_function.o 9 10 INC = timer.h thread_function.h lock.h 11 12 lock : $(OBJS_LOCK) $(INC) 13 $(CC) -o mainlock $(OBJS_LOCK) $(CFLAGS) 14 rm *.o 15 16 nolock : $(OBJS_UNLOCK) $(INC) 17 $(CC) -o mainnolock $(OBJS_UNLOCK) $(CFLAGS) 18 rm *.o 19 20 atomiclock : $(OBJS_ATOMICLOCK) $(INC) 21 $(CC) -o mainatomic $(OBJS_ATOMICLOCK) $(CFLAGS) 22 23 main_lock.o : main_lock.cpp 24 $(CC) -c main_lock.cpp $(CFLAGS) 25 26 main_nolock.o : main_nolock.cpp 27 $(CC) -c main_nolock.cpp $(CFLAGS) 28 29 main_atomic.o : main_atomic.cpp 30 $(CC) -c main_atomic.cpp $(CFLAGS) 31 32 timer.o : timer.cpp 33 $(CC) -c timer.cpp $(CFLAGS) 34 35 thread_function.o : thread_function.cpp 36 $(CC) -c thread_function.cpp $(CFLAGS) 37 38 clean: 39 rm *.o
執行makefile
make lock
make nolock
make atomiclock
而後生成3個可執行文件
運行這3個可執行文件:
另外:針對main_nolock.cpp而言,做者提到了一個現象
在thread_function.cpp中, 隨着一下代碼的改變,運行時間會有變化 while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )); while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1); while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10); while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100); while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000); while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000); while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000); 執行時間的關係是 : T(;)<T(1)<T(10)<T(100)<T(1000)<T(10000)>T(100000) |
經過編程測試及測試得出結論:
一、若是是想用全局變量來作統計操做。而又不得不考慮多線程間的互斥訪問的話,最好使用編譯器支持的原子操做函數。再知足互斥訪問的前提下,編程最簡單,效率最高。
二、lock-free,無鎖編程方式確實可以比傳統加鎖方式效率高。因此在高併發程序中採用無鎖編程的方式能夠進一步提升程序效率。可是得對無鎖方式有足夠熟悉的瞭解,否則效率反而會更低並且容易出錯。(好比在某些狀況下main_nolock比main_lock的效率還要低)
在學習一個無鎖化編程的分析帖子 http://blog.csdn.net/hzhsan/article/details/25141421
Lock-free 算法一般比基於鎖的算法要好:
可是 lock-freedom 並非萬能藥。下面是一些很明顯的不利因素:
比較項目
|
無鎖編程
|
分佈式編程
|
|
1
|
加速比性能
|
取決於競爭方式,除非也採用分佈式競爭,不然不如分佈式鎖競爭的性能
|
加速比和CPU核數成正比關係,接近於單核多任務時的性能
|
2
|
實現的功能
|
有限
|
不受限制
|
3
|
程序員掌握難易程度
|
難度過高,過於複雜,普通程序員沒法掌握,目前世界上只有少數幾我的掌握。
|
和單核時代的數據結構算法難度差很少,普通程序員能夠掌握
|
4
|
現有軟件的移植
|
使用無鎖算法後,以往的算法須要廢棄掉,沒法複用
|
能夠繼承已有的算法,在已有程序基礎上重構便可。
|
可在分佈計算機系統的幾臺計算機上同時協調執行的程序設計方法,分佈式程序設計的主要特徵是分佈和通訊。採用分佈式程序設計方法設計程序時,一個程序由若干個可獨立執行的程序模塊組成。這些程序模塊分佈於一個分佈式計算機系統的幾臺計算機上同時執行。分佈在各臺計算機上的程序模塊是相互關聯的,它們在執行中須要交換數據,即通訊。只有經過通訊,各程序模塊才能協調地完成一個共同的計算任務。採用分佈式程序設計方法解決計算問題時,必須提供用以進行分佈式程序設計的語言和設計相應的分佈式算法。分佈式程序設計語言與經常使用的各類程序設計語言的主要區別,在於它具備程序分佈和通訊的功能。所以,分佈式程序設計語言,每每能夠由一種程序設計語言增長分佈和通訊的功能而構成。分佈式算法和適用於多處理器系統的並行算法,都具備並行執行的特色,但它們是有區別的。設計分佈式算法時,必須保證明現算法的各程序模塊間不會有公共變量,它們只能經過通訊來交換數據。此外,設計分佈式算法時,每每須要考慮堅決性,即當系統中幾臺計算機失效時,算法還是有效的。