CAS原子操做實現無鎖及性能分析

 

CAS原子操做實現無鎖及性能分析html

 

  • Author:Echo Chen(陳斌)nginx

  • Email:chenb19870707@gmail.comshell

  • Blog:Blog.csdn.net/chen19870707編程

  • Date:Nov 13th, 2014windows

    近期在研究nginx的自旋鎖的時候,又見到了GCC CAS原子操做,因而決定動手分析下CAS實現的無鎖究竟性能怎樣,網上關於CAS實現無鎖的文章很是多。但少有研究這樣的無鎖的性能提高的文章,這裏就以實驗結果和我本身的理解逐步展開。併發

  • 1.什麼是CAS原子操做

    在研究無鎖以前。咱們需要首先了解一下CAS原子操做——Compare & Set,或是 Compare & Swap,現在差點兒所image有的CPU指令都支持CAS的原子操做,X86下相應的是 CMPXCHG 彙編指令。app

    你們應該還記得操做系統裏面關於「原子操做」的概念,一個操做是原子的(atomic),假設這個操做所處的層(layer)的更高層不能發現其內部實現與結構。原子操做可以是一個步驟,也可以是多個操做步驟。但是其順序是不可以被打亂,或者分割掉僅僅運行部分。有了這個原子操做這個保證咱們就可以實現無鎖了。函數

    CAS原子操做在維基百科中的代碼描寫敘述例如如下:高併發

       1: int compare_and_swap(int* reg, int oldval, int newval)
       2: {
       3:   ATOMIC();
       4:   int old_reg_val = *reg;
       5:   if (old_reg_val == oldval)
       6:      *reg = newval;
       7:   END_ATOMIC();
       8:   return old_reg_val;
       9: }
  • 也就是檢查內存*reg裏的值是否是oldval,假設是的話。則對其賦值newval。上面的代碼老是返回old_reg_value,調用者假設需要知道是否更新成功還需要作進一步推斷,爲了方便,它可以變種爲直接返回是否更新成功,例如如下:性能

  •    1: bool compare_and_swap (int *accum, int *dest, int newval)
       2: {
       3:   if ( *accum == *dest ) {
       4:       *dest = newval;
       5:       return true;
       6:   }
       7:   return false;
       8: }

    除了CAS還有下面原子操做:

    • Fetch And Add,通常用來對變量作 +1 的原子操做。
         1: << atomic >>
         2: function FetchAndAdd(address location, int inc) {
         3:     int value := *location
         4:     *location := value + inc
         5:     return value
         6: }
       
      Test-and-set,寫值到某個內存位置並傳回其舊值。彙編指令BST。

         1: #define LOCKED 1
         2:  
         3: int TestAndSet(int* lockPtr) {
         4:     int oldValue;
         5:  
         6:     // Start of atomic segment
         7:     // The following statements should be interpreted as pseudocode for
         8:     // illustrative purposes only.
         9:     // Traditional compilation of this code will not guarantee atomicity, the
        10:     // use of shared memory (i.e. not-cached values), protection from compiler
        11:     // optimization, or other required properties.
        12:     oldValue = *lockPtr;
        13:     *lockPtr = LOCKED;
        14:     // End of atomic segment
        15:  
        16:     return oldValue;
        17: }
       
    • Test and Test-and-set,用來實現多核環境下相互排斥鎖。
         1: boolean locked := false // shared lock variable
         2: procedure EnterCritical() {
         3:   do {
         4:     while (locked == true) skip // spin until lock seems free
         5:   } while TestAndSet(locked) // actual atomic locking
         6: }

    2.CAS 在各個平臺下的實現

     

    2.1 Linux GCC 支持的 CAS

    GCC4.1+版本號中支持CAS的原子操做(完整的原子操做可參看 GCC Atomic Builtins

       1: bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
       2: type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

    2.2  Windows支持的CAS

    在Windows下。你可以使用如下的Windows API來完畢CAS:(完整的Windows原子操做可參看MSDN的InterLocked Functions

  •    1: InterlockedCompareExchange ( __inout LONG volatile *Target,
       2:                                 __in LONG Exchange,
       3:                                 __in LONG Comperand);

    2.3  C++ 11支持的CAS

    C++11中的STL中的atomic類的函數可以讓你跨平臺。(完整的C++11的原子操做可參看 Atomic Operation Library

       1: template< class T >
       2: bool atomic_compare_exchange_weak( std::atomic<T>* obj,
       3:                                    T* expected, T desired );
       4: template< class T >
       5: bool atomic_compare_exchange_weak( volatile std::atomic<T>* obj,
       6:                                    T* expected, T desired );
     
  • 3.CAS原子操做實現無鎖的性能分析

     

    • image3.1測試方法描寫敘述

     
             這裏由於僅僅是比較性能,因此採用很是easy的方式。建立10個線程併發運行,每個線程中循環對全局變量count進行++操做(i++)。循環加2000000次。這一定會涉及到併發相互排斥操做,在同一臺機器上分析 加普通相互排斥鎖、CAS實現的無鎖、Fetch And Add實現的無鎖消耗的時間,而後進行分析。

     

    3.2 加普通相互排斥鎖代碼

       1: #include <stdio.h>
       2: #include <stdlib.h>
       3: #include <pthread.h>
       4: #include <time.h>
       5: #include "timer.h"
       6:  
       7: pthread_mutex_t mutex_lock;
       8: static volatile int count = 0;
       9: void *test_func(void *arg)
      10: {
      11:         int i = 0;
      12:         for(i = 0; i < 2000000; i++)
      13:         {
      14:                 pthread_mutex_lock(&mutex_lock);
      15:                 count++;
      16:                 pthread_mutex_unlock(&mutex_lock);
      17:         }
      18:         return NULL;
      19: }
      20:  
      21: int main(int argc, const char *argv[])
      22: {
      23:     Timer timer; // 爲了計時,暫時封裝的一個類Timer。
      24:     timer.Start();    // 計時開始
      25:     pthread_mutex_init(&mutex_lock, NULL);
      26:     pthread_t thread_ids[10];
      27:     int i = 0;
      28:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      29:     {
      30:         pthread_create(&thread_ids[i], NULL, test_func, NULL);
      31:     }
      32:  
      33:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      34:     {
      35:         pthread_join(thread_ids[i], NULL);
      36:     }
      37:  
      38:     timer.Stop();// 計時結束
      39:     timer.Cost_time();// 打印花費時間
      40:     printf("結果:count = %d\n",count);
      41:  
      42:     return 0;
      43: }

    注:Timer類僅做統計時間用,事實上現在文章最後給出。

     

    3.2 CAS實現的無鎖

     

       1: #include <stdio.h>
       2: #include <stdlib.h>
       3: #include <pthread.h>
       4: #include <unistd.h>
       5: #include <time.h>
       6: #include "timer.h"
       7:  
       8: int mutex = 0;
       9: int lock = 0;
      10: int unlock = 1;
      11:  
      12: static volatile int count = 0;
      13: void *test_func(void *arg)
      14: {
      15:         int i = 0;
      16:         for(i = 0; i < 2000000; i++)
      17:     {
      18:         while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
      19:          count++;
      20:          __sync_bool_compare_and_swap (&mutex, unlock, 0);
      21:         }
      22:         return NULL;
      23: }
      24:  
      25: int main(int argc, const char *argv[])
      26: {
      27:     Timer timer;
      28:     timer.Start();
      29:     pthread_t thread_ids[10];
      30:     int i = 0;
      31:  
      32:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      33:     {
      34:             pthread_create(&thread_ids[i], NULL, test_func, NULL);
      35:     }
      36:  
      37:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      38:     {
      39:             pthread_join(thread_ids[i], NULL);
      40:     }
      41:  
      42:     timer.Stop();
      43:     timer.Cost_time();
      44:     printf("結果:count = %d\n",count);
      45:  
      46:     return 0;
      47: }
      48:  

     

    3.4 Fetch And Add 原子操做

     
       1: #include <stdio.h>
       2: #include <stdlib.h>
       3: #include <pthread.h>
       4: #include <unistd.h>
       5: #include <time.h>
       6: #include "timer.h"
       7:  
       8: static volatile int count = 0;
       9: void *test_func(void *arg)
      10: {
      11:         int i = 0;
      12:         for(i = 0; i < 2000000; i++)
      13:         {
      14:             __sync_fetch_and_add(&count, 1);
      15:         }
      16:         return NULL;
      17: }
      18:  
      19: int main(int argc, const char *argv[])
      20: {
      21:     Timer timer;
      22:     timer.Start();
      23:     pthread_t thread_ids[10];
      24:     int i = 0;
      25:  
      26:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
      27:             pthread_create(&thread_ids[i], NULL, test_func, NULL);
      28:     }
      29:  
      30:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
      31:             pthread_join(thread_ids[i], NULL);
      32:     }
      33:  
      34:     timer.Stop();
      35:     timer.Cost_time();
      36:     printf("結果:count = %d\n",count);
      37:     return 0;
      38: }
      39:  
     

    4 實驗結果和分析

    在同一臺機器上,各執行以上3份代碼10次,並統計平均值。其結果例如如下:(單位微秒)


    image

    因而可知。無鎖操做在性能上遠遠優於加鎖操做,消耗時間僅爲加鎖操做的1/3左右,無鎖編程方式確實可以比傳統加鎖方式效率高,經上面測試可以發現,可以快到3倍左右。因此在極力推薦在高併發程序中採用無鎖編程的方式可以進一步提升程序效率。

    5.時間統計類Timer

    timer.h

       1: #ifndef TIMER_H
       2: #define TIMER_H
       3:  
       4: #include <sys/time.h>
       5: class Timer
       6: {
       7: public:    
       8:     Timer();
       9:     // 開始計時時間
      10:     void Start();
      11:     // 終止計時時間
      12:     void Stop();
      13:     // 又一次設定
      14:     void Reset();
      15:     // 耗時時間
      16:     void Cost_time();
      17: private:
      18:     struct timeval t1;
      19:     struct timeval t2;
      20:     bool b1,b2;
      21: };
      22: #endif

    timer.cpp

       1: #include "timer.h"
       2: #include <stdio.h>
       3:  
       4: Timer::Timer()
       5: {
       6:     b1 = false;
       7:     b2 = false;
       8: }
       9: void Timer::Start()
      10: {
      11:     gettimeofday(&t1,NULL);  
      12:     b1 = true;
      13:     b2 = false;
      14: }
      15:  
      16: void Timer::Stop()
      17: {    
      18:     if (b1 == true)
      19:     {
      20:         gettimeofday(&t2,NULL);  
      21:         b2 = true;
      22:     }
      23: }
      24:  
      25: void Timer::Reset()
      26: {    
      27:     b1 = false;
      28:     b2 = false;
      29: }
      30:  
      31: void Timer::Cost_time()
      32: {
      33:     if (b1 == false)
      34:     {
      35:         printf("計時出錯,應該先運行Start()。而後運行Stop(),再來運行Cost_time()");
      36:         return ;
      37:     }
      38:     else if (b2 == false)
      39:     {
      40:         printf("計時出錯,應該運行完Stop()。再來運行Cost_time()");
      41:         return ;
      42:     }
      43:     else
      44:     {
      45:         int usec,sec;
      46:         bool borrow = false;
      47:         if (t2.tv_usec > t1.tv_usec)
      48:         {
      49:             usec = t2.tv_usec - t1.tv_usec;
      50:         }
      51:         else
      52:         {
      53:             borrow = true;
      54:             usec = t2.tv_usec+1000000 - t1.tv_usec;
      55:         }
      56:  
      57:         if (borrow)
      58:         {
      59:             sec = t2.tv_sec-1 - t1.tv_sec;
      60:         }
      61:         else
      62:         {
      63:             sec = t2.tv_sec - t1.tv_sec;
      64:         }
      65:         printf("花費時間:%d秒 %d微秒\n",sec,usec);
      66:     }
      67: }
      68:  
  • 6.參考

  • 1.http://blog.csdn.net/hzhsan/article/details/25837189      

  •  2.http://coolshell.cn/articles/8239.html
  • -

  • Echo Chen:Blog.csdn.net/chen19870707

       -

  • 相關文章
    相關標籤/搜索