線程同步-生產者消費者問題

http://blog.csdn.net/big_bit/article/details/51356393

在進行多線程編程時,難免還要碰到兩個問題,那就線程間的互斥與同步

線程同步是指線程之間所具有的一種制約關係,一個線程的執行依賴另一個線程的消息,當它沒有得到另一個線程的消息時應等待,直到消息到達時才被喚醒。

線程互斥是指對於共享的進程系統資源,在各單個線程訪問時的排它性。當有若干個線程都要使用某一共享資源時,任何時刻最多隻允許一個線程去使用,其它要使用該資源的線程必須等待,直到佔用資源者釋放該資源。線程互斥可以看成是一種特殊的線程同步(下文統稱爲同步)。

生產者消費者問題就是一個著名的線程同步問題,該問題描述如下:有一個生產者在生產產品,這些產品將提供給若干個消費者去消費,爲了使生產者和消費者能併發執行,在兩者之間設置一個具有多個緩衝區的緩衝池,生產者將它生產的產品放入一個緩衝區中,消費者可以從緩衝區中取走產品進行消費,顯然生產者和消費者之間必須保持同步,即不允許消費者到一個空的緩衝區中取產品,也不允許生產者向一個已經放入產品的緩衝區中再次投放產品。

關於線程同步和互斥的詳細說明可以看:    http://blog.csdn.net/big_bit/article/details/51356381這篇文章

線程間的同步方法大體可分爲兩類:用戶模式和內核模式。顧名思義,內核模式就是指利用系統內核對象的單一性來進行同步,使用時需要切換內核態與用戶態,而用戶模式就是不需要切換到內核態,只在用戶態完成操作。

用戶模式下的方法有:原子操作(例如一個單一的全局變量),臨界區。內核模式下的方法有:事件,信號量,互斥量。下面我們來分別看一下這些方法:

一、互斥鎖或互斥量(mutex)
    下面是用互斥量來解決生產者和消費者問題。爲了現集中體現互斥量這個概念(就是一次只能有一個線程訪問,其他線程阻塞),我們先簡化一下問題:緩衝區或者倉庫無限大(生產者和消費者都可以生產和消費產品,而且產品初始化時候數量就是無限多,這裏我們主要體現),只有一個生產者和一個消費者,我們這個時候就可以把緩衝區設置爲一個互斥量,一次要麼生產者要麼消費者霸佔它。

·  初始化鎖。在Linux下,線程的互斥量數據類型是pthread_mutex_t。在使用前,要對它進行初始化。
靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
動態分配:int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutex_attr_t *mutexattr);

·  加鎖。對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,調用線程會阻塞,直到互斥量被解鎖。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);

·  解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。
int pthread_mutex_unlock(pthread_mutex_t *mutex);

·  銷燬鎖。鎖在是使用完成後,需要進行銷燬以釋放資源。
int pthread_mutex_destroy(pthread_mutex *mutex);

接下來我們來看看實現流程:

下面開始代碼實現:

[cpp]   view plain  copy
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3.   
  4. #define LOOP_COUNT 5            //生產者和消費者各自循環次數  
  5. pthread_mutex_t mutex;          //定義一個全局互斥量,在不同函數中  
  6.                                 //初始化和使用  
  7.   
  8. void *producer( void *arg );    //生產者線程  
  9. void *consumer( void *arg );    //消費者線程  
  10.   
  11. int main(int argc , char *argv[]){  
  12.     pthread_t thrd_prod , thrd_cons;  
  13.   
  14.     pthread_mutex_init( &mutex , NULL );    //初始化互斥量  
  15.   
  16.     //創建生產者和消費者線程  
  17.     if( pthread_create( &thrd_prod , NULL, producer ,  
  18.                 NULL ) != 0 )  
  19.         oops( "thread create failed." );  
  20.     sleep(1);                               //保證生產者線程先運行  
  21.   
  22.     if( pthread_create( &thrd_cons , NULL, consumer ,  
  23.                 NULL ) != 0 )  
  24.         oops( "thread create failed." );  
  25.   
  26.     //等待線程結束  
  27.     if( pthread_join( thrd_prod , NULL ) != 0 )  
  28.         oops( " wait thread failed.");  
  29.     if( pthread_join( thrd_cons , NULL ) != 0 )  
  30.         oops( " wait thread failed.");  
  31.   
  32.     pthread_mutex_destroy( &mutex );        //關閉互斥量  
  33.     return 0;  
  34. }  
  35.   
  36. void *producer( void *arg){  
  37.     int count = 0 ;             //循環計數  
  38.   
  39.     while( count++ < LOOP_COUNT ){  
  40.         pthread_mutex_lock( &mutex );   //加鎖  
  41.   
  42.         //成功佔有互斥量,接下來可以對緩衝區(倉庫)進行生產  
  43.         //操作  
  44.         printf( " producer put a product to buffer.\n");  
  45.         sleep(3);               //休眠3秒, 便於程序觀察  
  46.   
  47.         pthread_mutex_unlock( &mutex ); //解鎖  
  48.         sleep(1);               //休眠一秒,防止它又馬上佔據鎖  
  49.     }  
  50. }  
  51. void *consumer( void *arg ){  
  52.     int count = 0 ;             //循環計數  
  53.   
  54.     while( count++ < LOOP_COUNT ){  
  55. //      sleep(2);               //休眠一秒, 便於程序觀察  
  56.         pthread_mutex_lock( &mutex );   //加鎖  
  57.   
  58.         //成功佔有互斥量,接下來可以對緩衝區(倉庫)進行取出  
  59.         //操作  
  60.         printf( " consumer get a product from buffer.\n");  
  61.   
  62.         pthread_mutex_unlock( &mutex ); //解鎖  
  63.         sleep(1);               //休眠一秒,防止它又馬上佔據鎖  
  64.     }  
  65. }  



結果如下:


從結果可以看到,當生產者和消費者成功lock互斥量時,另一個就阻塞等待。

二、讀寫鎖

   讀寫鎖也叫做共享-獨佔鎖,當讀寫鎖以讀模式鎖住時,它是以共享模式鎖住的;當它以寫模式鎖住時,它是以獨佔模式鎖住的。

接下來我們改變一下生產者消費者問題:現在緩衝區或者倉庫無限大(生產者和消費者都可以生產和消費產品,而且產品初始化時候數量就是無限多,這裏我們主要體現),只有一個生產者(讀寫鎖也可以應用到多個生產者問題),但有多個消費者, 我們這個時候就可以把爲生產者設置一個寫鎖,爲每個消費者設置一個讀鎖。

  1. 1.初始化讀寫鎖。

    #include <pthread.h>
    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,constpthread_rwlockattr_t *restrict attr);

    2.加鎖。要在讀模式下鎖定讀寫鎖,需要調用pthread_rwlock_rdlock要在寫模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖

    intpthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

    intpthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

    3.解鎖。在完成了對共享資源的訪問後,要對讀寫鎖進行解鎖。
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

    4.銷燬鎖。在釋放讀寫鎖佔用的內存之前,需要調用pthread_rwlock_destroy做清理工作如果pthread_rwlock_init爲讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在調用pthread_rwlock_destroy之前就釋放了讀寫鎖佔用的內存空間,那麼分配給這個鎖的資源就丟失了。
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

[cpp]   view plain  copy
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3.   
  4. #define LOOP_COUNT 2            //生產者和消費者各自循環次數  
  5. #define LOOP_THRD 5             //消費者線程個數  
  6. pthread_rwlock_t rwlock;        //定義一個全局讀寫鎖,在不同函數中  
  7.                                 //初始化和使用  
  8.   
  9. void *producer( void *arg );    //生產者線程  
  10. void *consumer( void *arg );    //消費者線程  
  11.   
  12. int main(int argc , char *argv[]){  
  13.     int thrd_num ,thrd_id[LOOP_THRD]  ;  
  14.     pthread_t thrd_prod , thrd_cons[LOOP_THRD];  
  15.   
  16.     pthread_rwlock_init( &rwlock , NULL );  //初始化互斥量  
  17.   
  18.     //創建一個生產者和多個消費者線程  
  19.     if( pthread_create( &thrd_prod , NULL, producer ,  
  20.                 NULL ) != 0 )  
  21.         oops( "thread create failed." );  
  22.   
  23.     for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){  
  24.         thrd_id[thrd_num] = thrd_num;       //線程id,注意線程共享變量  
  25.         if( pthread_create( &thrd_cons[thrd_num], NULL, consumer   
  26.                     , <span style="background-color: rgb(255, 0, 0);">(void *)( thrd_id+thrd_num)</span> ) != 0 )  
  27.             oops( "thread %d create failed." , thrd_num );  
  28.     }  
  29.   
  30.     //等待線程結束  
  31.     if( pthread_join( thrd_prod , NULL ) != 0 )  
  32.         oops( " wait thread failed.");  
  33.     for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){  
  34.         if( pthread_join( thrd_cons[thrd_num] , NULL ) != 0 )  
  35.             oops( " wait thread %d failed." , thrd_num);  
  36. //      printf("wait %d thread.\n" , thrd_num);  
  37.     }  
  38.     pthread_rwlock_destroy( &rwlock );      //關閉互斥量  
  39.     return 0;  
  40. }  
  41.   
  42. void *producer( void *arg){  
  43.     int count = 0 ;             //循環計數  
  44.   
  45.     while( count++ < LOOP_COUNT ){  
  46.         printf( "producer try to lock wrlock.\n");  
  47.         pthread_rwlock_wrlock( &rwlock );   //加鎖  
  48.   
  49.         //成功佔有互斥量,接下來可以對緩衝區(倉庫)進行生產  
  50.         //操作  
  51.         printf( "producer lock successful, producer put a product to buffer.\n");  
  52.           
  53.         /* 
  54.             休眠3秒, 便於程序觀察,可以看到 
  55.             其他讀取線程不能佔據鎖而阻塞 
  56.         */        
  57.         sleep(3);                 
  58.         printf("prducer finished ,unlock wrlock.\n");  
  59.         pthread_rwlock_unlock( &rwlock ); //解鎖  
  60.         sleep(1);                           //休眠一秒, 防止馬上又佔據寫鎖  
  61.     }  
  62. }  
  63. void *consumer( void *arg ){  
  64.     int count = 0 ;                         //循環計數  
  65.     int thrd_id = *( ( int*)arg );  
  66.   
  67. //  printf( "consumer %d ,%#x . \n" , thrd_id ,arg);  
  68.     while( count++ < LOOP_COUNT ){  
  69. //      sleep( thrd_id+1 );                 //休眠一秒, 便於程序觀察  
  70.         printf( "consumer try to lock rdlock.\n" );  
  71.         pthread_rwlock_rdlock( &rwlock );   //加鎖  
  72.   
  73.         //成功佔有互斥量,接下來可以對緩衝區(倉庫)進行取出  
  74.         //操作  
  75.         printf( " consumer locked successful ,consumer %d get a product from buffer."  
  76.                 "\n" , thrd_id);  
  77.         /* 
  78.             休眠3秒, 便於程序觀察,可以看到 
  79.             其他讀取線程能佔據讀鎖 
  80.         */  
  81.         sleep(3);  
  82.         printf("consumer finished ,unlock rdlock.\n");  
  83.         pthread_rwlock_unlock( &rwlock );   //解鎖  
  84.         sleep(thrd_id+1);                           //休眠一秒, 防止馬上又佔據讀鎖  
  85.     }  
  86. }  

結果如下:

可以看到當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖。雖然讀寫鎖的實現各不相同,但當讀寫鎖處於讀模式鎖住狀態時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨後的讀模式鎖請求(貌似在程序裏面沒有體現出來)。這樣可以避免讀模式鎖長期佔用,而等待的寫模式鎖請求一直得不到滿足
 另外我要說明的一點就是,傳遞參數 arg 爲(void *)( thrd_id+thrd_num),我一開始並沒有定義一個數組thrd_cons[LOOP_THRD]來存儲線程編號的, 而是直接傳thrd_num的地址,但通過在線程
int thrd_id = *( ( int*)arg );
// printf( "consumer %d ,%#x . \n" , thrd_id ,arg);
 這兩句話就可以知道,當傳遞的是thrd_num地址時候,由於進程的所有信息對該進程的所有線程都是共享的,包括可執行的程序文本、程序的全局內存和堆內存、棧以及文件描述符。地址, 由於進程的所有信息對該進程的所有線程都是共享的,包括可執行的程序文本、程序的全局內存和堆內存、棧以及文件描述符。 thrd_num的值會隨着線程的執行而發生改變,系統調度頻率之快是我們無法想像的,所以thrd_num的值也是動態改變的。

三、條件變量(cond)

與互斥鎖不同,條件變量是用來等待而不是用來上鎖的。條件變量用來自動阻塞一個線程,直到某特殊情況發生爲止。通常條件變量和互斥鎖同時使用。條件變量分爲兩部分:條件和變量。條件本身是由互斥量保護的。線程在改變條件狀態前先要鎖住互斥量。條件變量使我們可以睡眠等待某種條件出現。條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;另一個線程使"條件成立"(給出條件成立信號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件爲假,一個線程自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個線程改變了條件,它發信號給關聯的條件變量,喚醒一個或多個等待它的線程,重新獲得互斥鎖,重新評價條件。如果兩進程共享可讀寫的內存,條件變量可以被用來實現這兩進程間的線程同步。

1.初始化條件變量。
靜態態初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
動態初始化,int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);

2.等待條件成立。釋放鎖,同時阻塞等待條件變量爲真才行。timewait()設置等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個線程wait)
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex 
*mutex,consttimespec *abstime);

3.激活條件變量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待線程)
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //
解除所有線程的阻塞

4.清除條件變量。無線程等待,否則返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond); 

    接下來我們又改變一下生產者消費者問題:現在緩衝區或者倉庫大小爲BUFSIZE,只有一個生產者和一個消費者(其實也適用於多個生產者和消費者), 我們這個時候就可以把緩衝區設置爲一個互斥量,一次要麼生產者要麼消費者霸佔它。但接下來處理方式與互斥量有所不同:假如生產者成功佔據鎖(緩衝區),這時它不能馬上開始往裏面生產東西,要先判斷緩衝區是不是滿的,如果緩衝區滿了,那麼生產者就會把自己放到等待條件的線程列表上,然後對互斥量進行解鎖,這是一個原子操作。如果緩衝區不滿則可以生產產品,然後給消費者發送notempty信號,表示緩衝區有產品了, 你可以yy了。然後解鎖互斥量。假如是消費者成功佔據鎖(緩衝區),同樣它要檢查緩衝區是不是空的,如果空,那麼消費者就會把自己放到等待條件的線程列表上,然後對互斥量進行解鎖。如果不空,消費者開始yy,然後給生產者發送nofull信號, 表示緩衝區有位置可以生產了, 你快生產吧。然後解鎖互斥量。就這樣,生產者消費者和諧同步工作着。

 

流程圖我就不畫了,看代碼也能明白過程:

 ---producer過程:lock(mutex)->checknotfull->(if notfull wait until notfull)->produce product->sendnotempty to consumer->unlock(mutex) 

---consumer過程:lock(mutex)->checknotempty->(if notempty wait until notempty)->get productfrom buffer->send notfull to poducer->unlock(mutex)

 


[cpp]   view plain  copy
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3.   
  4. #define LOOP_COUNT 20               //生產者和消費者各自循環次數,也可以說生產商品的總量  
  5. //#define LOOP_THRD 5               //消費者線程個數  
  6. #define BUFSIZE 5                   //緩衝區大小,也就是最多能放多少個產品  
  7.   
  8. pthread_mutex_t mutex;              //定義一個全局互斥量,在不同函數中  
  9.                                     //初始化和使用  
  10. pthread_cond_t notempty , notfull;  //定義兩個條件變量,當作信號投放  
  11. unsigned int prod_pos = 3;          //定義生產者在緩衝區開始生產的位置  
  12. unsigned int cons_pos = 0;          //定義消費者在緩衝區開始消費的位置  
  13.   
  14. void *producer( void *arg );        //生產者線程  
  15. void *consumer( void *arg );        //消費者線程  
  16.   
  17. int main(int argc , char *argv[]){  
  18.     pthread_t thrd_prod , thrd_cons;  
  19.   
  20.     pthread_mutex_init( &mutex , NULL );    //初始化互斥量  
  21.   
  22.     //創建生產者和消費者線程  
  23.     if( pthread_create( &thrd_prod , NULL, producer ,  
  24.                 NULL ) != 0 )  
  25.         oops( "thread create failed." );  
  26.     sleep(1);                               //保證生產者線程先運行  
  27.   
  28.     if( pthread_create( &thrd_cons , NULL, consumer ,  
  29.                 NULL ) != 0 )  
  30.         oops( "thread create failed." );  
  31.   
  32.     //等待線程結束  
  33.     if( pthread_join( thrd_prod , NULL ) != 0 )  
  34.         oops( " wait thread failed.");  
  35.     if( pthread_join( thrd_cons , NULL ) != 0 )  
  36.         oops( " wait thread failed.");  
  37.   
  38.     pthread_mutex_destroy( &mutex );        //關閉互斥量  
  39.     return 0;  
  40. }  
  41.   
  42. void *producer( void *arg){  
  43.     int count = 0 ;             //循環計數  
  44.   
  45.     while( count++ < LOOP_COUNT ){  
  46.         printf( "producer try to lock .\n");  
  47.         pthread_mutex_lock( &mutex );   //加鎖  
  48.   
  49.         /* 
  50.            成功佔有互斥量,接着檢查緩衝區是不是滿了, 
  51.         */  
  52.         if( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){  
  53.             //緩衝區滿了  
  54.             printf( "producer wait not full.\n");  
  55.             pthread_cond_wait( &notfull , &mutex ); //等待條件滿足  
  56.         }  
  57.         //如果沒滿,接下來可以對緩衝區(倉庫)進行生產  
  58. /* 
  59.            成功佔有互斥量,接着檢查緩衝區是不是滿了, 
  60.         */  
  61.         if( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){  
  62.             //緩衝區滿了  
  63.             printf( "producer wait not full.\n");  
  64.             pthread_cond_wait( &notfull , &mutex ); //等待條件滿足  
  65.         }  
  66.         //如果沒滿,接下來可以對緩衝區(倉庫)進行生產  
  67.         //操作  
  68.         printf( "producer lock successful, producer put %d's "  
  69.                 "product to buffer.\n" ,count);  
  70.         prod_pos = ( prod_pos +1 ) % BUFSIZE;  &n
相關文章
相關標籤/搜索