CAS原子操做實現無鎖及性能分析html
Author:Echo Chen(陳斌)nginx
Email:chenb19870707@gmail.comshell
Blog:Blog.csdn.net/chen19870707編程
Date:Nov 13th, 2014windows
近期在研究nginx的自旋鎖的時候,又見到了GCC CAS原子操做,因而決定動手分析下CAS實現的無鎖究竟性能怎樣,網上關於CAS實現無鎖的文章很是多。但少有研究這樣的無鎖的性能提高的文章,這裏就以實驗結果和我本身的理解逐步展開。併發
在研究無鎖以前。咱們需要首先了解一下CAS原子操做——Compare & Set,或是 Compare & Swap,現在差點兒所
有的CPU指令都支持CAS的原子操做,X86下相應的是 CMPXCHG 彙編指令。app
你們應該還記得操做系統裏面關於「原子操做」的概念,一個操做是原子的(atomic),假設這個操做所處的層(layer)的更高層不能發現其內部實現與結構。原子操做可以是一個步驟,也可以是多個操做步驟。但是其順序是不可以被打亂,或者分割掉僅僅運行部分。有了這個原子操做這個保證咱們就可以實現無鎖了。函數
CAS原子操做在維基百科中的代碼描寫敘述例如如下:高併發
1: int compare_and_swap(int* reg, int oldval, int newval)2: {
3: ATOMIC();
4: int old_reg_val = *reg;5: if (old_reg_val == oldval)6: *reg = newval;
7: END_ATOMIC();
8: return old_reg_val;9: }
也就是檢查內存*reg裏的值是否是oldval,假設是的話。則對其賦值newval。上面的代碼老是返回old_reg_value,調用者假設需要知道是否更新成功還需要作進一步推斷,爲了方便,它可以變種爲直接返回是否更新成功,例如如下:性能
1: bool compare_and_swap (int *accum, int *dest, int newval)2: {
3: if ( *accum == *dest ) {4: *dest = newval;
5: return true;6: }
7: return false;8: }
除了CAS還有下面原子操做:
- Fetch And Add,通常用來對變量作 +1 的原子操做。
1: << atomic >>
2: function FetchAndAdd(address location, int inc) {3: int value := *location4: *location := value + inc
5: return value6: }
Test-and-set,寫值到某個內存位置並傳回其舊值。彙編指令BST。1: #define LOCKED 1
2:
3: int TestAndSet(int* lockPtr) {4: int oldValue;5:
6: // Start of atomic segment7: // The following statements should be interpreted as pseudocode for8: // illustrative purposes only.9: // Traditional compilation of this code will not guarantee atomicity, the10: // use of shared memory (i.e. not-cached values), protection from compiler11: // optimization, or other required properties.12: oldValue = *lockPtr;
13: *lockPtr = LOCKED;
14: // End of atomic segment15:
16: return oldValue;17: }
- Test and Test-and-set,用來實現多核環境下相互排斥鎖。
1: boolean locked := false // shared lock variable2: procedure EnterCritical() {
3: do {4: while (locked == true) skip // spin until lock seems free5: } while TestAndSet(locked) // actual atomic locking6: }
2.CAS 在各個平臺下的實現
2.1 Linux GCC 支持的 CAS
GCC4.1+版本號中支持CAS的原子操做(完整的原子操做可參看 GCC Atomic Builtins)
1: bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)2: type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2.2 Windows支持的CAS
在Windows下。你可以使用如下的Windows API來完畢CAS:(完整的Windows原子操做可參看MSDN的InterLocked Functions)
1: InterlockedCompareExchange ( __inout LONG volatile *Target,2: __in LONG Exchange,
3: __in LONG Comperand);
2.3 C++ 11支持的CAS
C++11中的STL中的atomic類的函數可以讓你跨平臺。(完整的C++11的原子操做可參看 Atomic Operation Library)
1: template< class T >2: bool atomic_compare_exchange_weak( std::atomic<T>* obj,3: T* expected, T desired );
4: template< class T >5: bool atomic_compare_exchange_weak( volatile std::atomic<T>* obj,6: T* expected, T desired );
1: #include <stdio.h>
2: #include <stdlib.h>
3: #include <pthread.h>
4: #include <time.h>
5: #include "timer.h"
6:
7: pthread_mutex_t mutex_lock;
8: static volatile int count = 0;
9: void *test_func(void *arg)
10: {
11: int i = 0;
12: for(i = 0; i < 2000000; i++)
13: {
14: pthread_mutex_lock(&mutex_lock);
15: count++;
16: pthread_mutex_unlock(&mutex_lock);
17: }
18: return NULL;
19: }
20:
21: int main(int argc, const char *argv[])
22: {
23: Timer timer; // 爲了計時,暫時封裝的一個類Timer。
24: timer.Start(); // 計時開始
25: pthread_mutex_init(&mutex_lock, NULL);
26: pthread_t thread_ids[10];
27: int i = 0;
28: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
29: {
30: pthread_create(&thread_ids[i], NULL, test_func, NULL);
31: }
32:
33: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
34: {
35: pthread_join(thread_ids[i], NULL);
36: }
37:
38: timer.Stop();// 計時結束
39: timer.Cost_time();// 打印花費時間
40: printf("結果:count = %d\n",count);
41:
42: return 0;
43: }
注:Timer類僅做統計時間用,事實上現在文章最後給出。
1: #include <stdio.h>
2: #include <stdlib.h>
3: #include <pthread.h>
4: #include <unistd.h>
5: #include <time.h>
6: #include "timer.h"
7:
8: int mutex = 0;
9: int lock = 0;
10: int unlock = 1;
11:
12: static volatile int count = 0;
13: void *test_func(void *arg)
14: {
15: int i = 0;
16: for(i = 0; i < 2000000; i++)
17: {
18: while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
19: count++;
20: __sync_bool_compare_and_swap (&mutex, unlock, 0);
21: }
22: return NULL;
23: }
24:
25: int main(int argc, const char *argv[])
26: {
27: Timer timer;
28: timer.Start();
29: pthread_t thread_ids[10];
30: int i = 0;
31:
32: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
33: {
34: pthread_create(&thread_ids[i], NULL, test_func, NULL);
35: }
36:
37: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
38: {
39: pthread_join(thread_ids[i], NULL);
40: }
41:
42: timer.Stop();
43: timer.Cost_time();
44: printf("結果:count = %d\n",count);
45:
46: return 0;
47: }
48:
1: #include <stdio.h>
2: #include <stdlib.h>
3: #include <pthread.h>
4: #include <unistd.h>
5: #include <time.h>
6: #include "timer.h"
7:
8: static volatile int count = 0;
9: void *test_func(void *arg)
10: {
11: int i = 0;
12: for(i = 0; i < 2000000; i++)
13: {
14: __sync_fetch_and_add(&count, 1);
15: }
16: return NULL;
17: }
18:
19: int main(int argc, const char *argv[])
20: {
21: Timer timer;
22: timer.Start();
23: pthread_t thread_ids[10];
24: int i = 0;
25:
26: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
27: pthread_create(&thread_ids[i], NULL, test_func, NULL);
28: }
29:
30: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
31: pthread_join(thread_ids[i], NULL);
32: }
33:
34: timer.Stop();
35: timer.Cost_time();
36: printf("結果:count = %d\n",count);
37: return 0;
38: }
39:
在同一臺機器上,各執行以上3份代碼10次,並統計平均值。其結果例如如下:(單位微秒)
因而可知。無鎖操做在性能上遠遠優於加鎖操做,消耗時間僅爲加鎖操做的1/3左右,無鎖編程方式確實可以比傳統加鎖方式效率高,經上面測試可以發現,可以快到3倍左右。因此在極力推薦在高併發程序中採用無鎖編程的方式可以進一步提升程序效率。
timer.h
1: #ifndef TIMER_H
2: #define TIMER_H
3:
4: #include <sys/time.h>
5: class Timer6: {
7: public:8: Timer();
9: // 開始計時時間10: void Start();11: // 終止計時時間12: void Stop();13: // 又一次設定14: void Reset();15: // 耗時時間16: void Cost_time();17: private:18: struct timeval t1;19: struct timeval t2;20: bool b1,b2;21: };
22: #endiftimer.cpp
1: #include "timer.h"2: #include <stdio.h>
3:
4: Timer::Timer()
5: {
6: b1 = false;
7: b2 = false;
8: }
9: void Timer::Start()10: {
11: gettimeofday(&t1,NULL);
12: b1 = true;
13: b2 = false;
14: }
15:
16: void Timer::Stop()17: {
18: if (b1 == true)19: {
20: gettimeofday(&t2,NULL);
21: b2 = true;
22: }
23: }
24:
25: void Timer::Reset()26: {
27: b1 = false;
28: b2 = false;
29: }
30:
31: void Timer::Cost_time()32: {
33: if (b1 == false)34: {
35: printf("計時出錯,應該先運行Start()。而後運行Stop(),再來運行Cost_time()");36: return ;37: }
38: else if (b2 == false)39: {
40: printf("計時出錯,應該運行完Stop()。再來運行Cost_time()");41: return ;42: }
43: else44: {
45: int usec,sec;46: bool borrow = false;47: if (t2.tv_usec > t1.tv_usec)48: {
49: usec = t2.tv_usec - t1.tv_usec;
50: }
51: else52: {
53: borrow = true;
54: usec = t2.tv_usec+1000000 - t1.tv_usec;
55: }
56:
57: if (borrow)58: {
59: sec = t2.tv_sec-1 - t1.tv_sec;
60: }
61: else62: {
63: sec = t2.tv_sec - t1.tv_sec;
64: }
65: printf("花費時間:%d秒 %d微秒\n",sec,usec);66: }
67: }
68:
-
Echo Chen:Blog.csdn.net/chen19870707
-