筆記整理--Linux多線程

Unix高級環境編程系列筆記 (2013/11/17 14:26:38)

Unix高級環境編程系列筆記

   經過這篇文字,您將可以解答以下問題:javascript

  • 如何來標識一個線程?php

  • 如何建立一個新線程?html

  • 如何實現單個線程的退出?java

  • 如何使調用線程阻塞等待指定線程的退出,並得到退出線程的返回碼?linux

  • 如何經過一個線程讓另一個線程退出?c++

  • 如何實現線程退出時的清理動做?程序員

  • Unix系統如何實現線程之間的同步?web

  • 什麼狀況會發生線程死鎖,如何避免死鎖?shell

  • 讀寫鎖的使用方法。數據庫

  • 什麼是條件變量,它有什麼做用?

  • 如何使用條件變量?

  • 1. 如何來標識一個線程?

       表示進程號的爲pid_t類型,表示線程號的是pthread_t類型。pthread_t是一個結構體而不是整型。

       使用pthread_equal肯定兩個線程號是否相等:

    #include 
       
       
       
       
        
        
        
        int pthread_equal(pthread_t tid1, pthread_t tid2);Returns: nonzero if equal, 0 otherwise
       
       
       
       

       使用pthread_self函數來獲取線程的ID:

    #include 
       
       
       
       
        
        
        
        pthread_t pthread_self(void);Returns: the thread ID of the calling thread
       
       
       
       

    2.如何建立一個新線程?

       使用pthread_create函數建立一個新線程。


    如下是代碼片斷:
    #include  
    int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr, void *(*start_rtn)(void), void *restrict arg);
    Returns: 0 if OK, error number on failure


       當該函數成功返回的時候,tidp所指向的內存位置將被分配給新建立的帶有thread ID的線程。

       attr用來定製各類線程參數。

       新建立的線程將在start_rtn函數所指向的地址開始運行,該函數接受一個參數無類型的指針arg做爲參數

       線程建立時沒法保證哪一個線程會先運行。新建立的線程能夠訪問進程地址空間,而且繼承了調用線程的浮點環境以及信號量掩碼,但對於線程的未決信號量也將會被清除。

       下面的這段程序建立新的線程,並打印線程id,新線程經過pthread_self函數獲取本身的線程ID。

    #include "apue.h"
    #include 
       
       
       
       
        
        
        
        pthread_t ntid;void printids(const char *s){    pid_t      pid;    pthread_t  tid;    pid = getpid();    tid = pthread_self();    printf("%s pid %u tid %u (0x%x)\\n", s, (unsigned int)pid,      (unsigned int)tid, (unsigned int)tid);}void * thr_fn(void *arg){    printids("new thread: ");    return((void *)0);}int main(void){    int     err;    err = pthread_create(&ntid, NULL, thr_fn, NULL);    if (err != 0)        err_quit("can\'t create thread: %s\\n", strerror(err));    printids("main thread:");    sleep(1);    exit(0);}
       
       
       
       

    3. 如何實現單個線程的退出?

       若是一個線程調用了exit, _Exit, 或者_exit,將致使整個進程的終止。要實現單個線程的退出,能夠採用以下方式:

       o 線程能夠簡單的從start routine返回,返回值就是線程的退出代碼。

       o 線程能夠被同一進程中的其它線程終止。

       o 線程調用pthread_exit

    #include 
       
       
       
       
        
        
        
        void pthread_exit(void *rval_ptr);
       
       
       
       

    4.如何使調用線程阻塞等待指定線程的退出,並得到退出線程的返回碼?

    #include 
       
       
       
       
        
        
        
        int pthread_join(pthread_t thread, void **rval_ptr);Returns: 0 if OK, error number on failure
       
       
       
       

       調用線程將會被阻塞直到指定的線程終止。若是線程簡單的從start routine返回則rval_ptr將包含返回代碼。若是線程是被撤銷(調用pthread_exit)的,rval_ptr指向的內存地址將被設置爲PTHREAD_CANCELED.

       經過調用pthread_join,咱們自動的將一個線程變成分離狀態,這樣就能夠實現資源的回收。若是線程已經處於分離狀態,調用pthread_join將會失敗,並返回EINVAL。

       若是咱們對於線程的返回值不感興趣,能夠將rval_ptr設置成NULL。

       一段有缺陷的代碼:

    #include "apue.h"
    #include 
       
       
       
       
        
        
        
        struct foo {    int a, b, c, d;};voidprintfoo(const char *s, const struct foo *fp){    printf(s);    printf("  structure at 0x%x\\n", (unsigned)fp);    printf("  foo.a = %d\\n", fp->a);    printf("  foo.b = %d\\n", fp->b);    printf("  foo.c = %d\\n", fp->c);    printf("  foo.d = %d\\n", fp->d);}void *thr_fn1(void *arg){    struct foo  foo = {1, 2, 3, 4};    printfoo("thread 1:\\n", &foo);    pthread_exit((void *)&foo);}void *thr_fn2(void *arg){    printf("thread 2: ID is %d\\n", pthread_self());    pthread_exit((void *)0);}intmain(void){    int         err;    pthread_t   tid1, tid2;    struct foo  *fp;    err = pthread_create(&tid1, NULL, thr_fn1, NULL);    if (err != 0)        err_quit("can\'t create thread 1: %s\\n", strerror(err));    err = pthread_join(tid1, (void *)&fp);    if (err != 0)        err_quit("can\'t join with thread 1: %s\\n", strerror(err));    sleep(1);    printf("parent starting second thread\\n");    err = pthread_create(&tid2, NULL, thr_fn2, NULL);    if (err != 0)        err_quit("can\'t create thread 2: %s\\n", strerror(err));    sleep(1);    printfoo("parent:\\n", fp);    exit(0);}
       
       
       
       

       注意,pthread_create 和 pthread_exit函數的無類型指針能夠傳遞複雜的結構信息,但這個結構所使用的內存在調用者完成後必須仍然有效(分配在堆上或者是靜態變量),不然就會出現使用無效的錯誤。這段代碼中thr_fn1函數中變量foo分配在棧上,但該線程退出後,主線程經過pthread_join獲取foo的地址並進行操做(調用printfoo函數時)就會出現錯誤,由於此時thr_fn1已經退出它的棧已經被銷燬。

    5.如何經過一個線程讓另一個線程退出?

       調用pthread_cancel函數將致使tid所指向的線程終止運行。可是,一個線程能夠選擇忽略其它線程控制該線程什麼時候退出。注意,該函數並不等待線程終止,它僅僅提出要求。

    #include 
       
       
       
       
        
        
        
        int pthread_cancel(pthread_t tid);Returns: 0 if OK, error number on failure
       
       
       
       

    6.如何實現線程退出時的清理動做?

       線程能夠創建多個清理處理程序,這些程序記錄在棧中,也就是說他們的執行順序與註冊順序想法。使用以下函數註冊清理函數:

       void pthread_cleanup_push(void (*rtn)(void *), void *arg);

       void pthread_cleanup_pop(int execute);

       rtn將被調用,並傳以arg參數,引發該函數調用的狀況以下:

       o 調用pthread_exit

       o 對於退出請求的反應

       o 以非0參數調用pthread_cleanup_push

       若是pthread_cleanup_pop的參數非0則僅僅移除該處理函數而不執行。

       若是函數已經處於分離狀態,則當它退出時線程底層的存儲資源會被當即回收。處於分離狀態的線程,若是調用pthread_join來等待其退出將會出現錯誤。

       經過下列函數可讓進程處於分離狀態:

    #include 
       
       
       
       
        
        
        
        int pthread_detach(pthread_t tid);Returns: 0 if OK, error number on failure
       
       
       
       

    7.Unix系統如何實現線程之間的同步?

       使用pthreads mutual-exclusion interfaces。引入了mutex,用pthread_mutex_t類型來表示。在使用這個變量以前,咱們首先要將其初始化,或者賦值爲PTHREAD_MUTEX_INITIALIZER(僅僅用於靜態分配的mutexs),或者調用pthread_mutex_init。若是咱們動態的爲mutex分配空間(例如經過調用malloc),咱們須要在調用free釋放內存以前調用pthread_mutex_destroy。

       函數定義以下:


    如下是代碼片斷:
    #include  
    int pthread_mutex_init(pthread_mutex_t *restrict mutex,  const pthread_mutexattr_t *restrict attr);
    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    Both return: 0 if OK, error number on failure


       初始化mutex時參數attr用來指定mutex的屬性,要使用默認值將它設置爲NULL。

       使用以下函數對mutex進行加鎖或解鎖:

    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_trylock(pthread_mutex_t *mutex);
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    All return: 0 if OK, error number on failure
    

       注意當mutex已經被加鎖則 pthread_mutex_lock會阻塞。若是一個線程沒法忍受阻塞,能夠調用pthread_mutex_trylock來加鎖,加鎖失敗則當即返回EBUSY。

    8.什麼狀況會發生線程死鎖,如何避免死鎖?

       若是一個線程對mutex加兩次鎖則顯然會致使死鎖。但實際上死鎖的狀況要複雜的多:when we use more than one mutex in our programs, a deadlock can occur if we allow one thread to hold a mutex and block while trying to lock a second mutex at the same time that another thread holding the second mutex tries to lock the first mutex. Neither thread can proceed, because each needs a resource that is held by the other, so we have a deadlock.

       死鎖能夠經過控制加鎖的順序來避免。有兩個mutex A和B,若是全部的線程老是先對A加鎖再對B加鎖就不會產生死鎖。但實際應用中可能很難保證這種順序加鎖的方式,這種狀況下,可使用pthread_mutex_trylock來避免死鎖的發生。

    9.讀寫鎖的使用方法。

       讀寫鎖的初始化與銷燬:


    如下是代碼片斷:
    #include  
    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
    Both return: 0 if OK, error number on failure
   對於讀寫鎖的初始化與銷燬獨佔鎖相似。


   加鎖與解鎖:

#include 
  
  
  
  
   
   
   
   int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);All return: 0 if OK, error number on failure
  
  
  
  

   對於讀者的數量會有限制,所以調用 pthread_rwlock_rdlock時須要檢查返回值。

   在正確使用的狀況下,不須要檢查pthread_rwlock_wrlock和pthread_rwlock_unlock的返回值。

   條件加鎖:

#include 
  
  
  
  
   
   
   
   int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);Both return: 0 if OK, error number on failure
  
  
  
  

10.什麼是條件變量,它有什麼做用?

   條件變量是線程可用的另一種同步機制。條件變量給多個線程提供了一個會合的場所。條件變量與互斥量一塊兒使用時,容許線程以無競爭的方式等待特定條件的發生。條件自己是由互斥量保護的。線程在改變狀態前必須首先鎖住互斥量,其它線程在得到互斥量以前不會覺察到這種變化。

11.如何使用條件變量?

   條件變量的類型爲pthread_cond_t ,其初始化與銷燬的方式與mutex相似,注意靜態變量能夠經過指定常量PTHREAD_COND_INITIALIZER來進行初始化。

#include 
  
  
  
  
   
   
   
   int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);int pthread_cond_destroy(pthread_cond_t *cond);Both return: 0 if OK, error number on failure
  
  
  
  

   使用pthread_cond_wait來等待條件變成真。

   函數定義以下:


如下是代碼片斷:
#include  
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,  pthread_mutex_t *restrict mutex, const struct timespec *restrict timeout);

Both return: 0 if OK, error number on failure
   調用者把鎖住的mutex傳遞給pthread_cond_wait,函數把調用線程放到等待條件變量的線程列表上,而後對互斥量解鎖,這兩個操做是原子操做。這樣就關閉了條件檢查和線程進入休眠狀態等待條件改變這兩個操做之間的時間窗口。pthread_cond_wait返回時,mutex會再次被鎖住。


   注意,調用成功返回後,線程須要從新計算條件變量,由於其它線程可能已經改變了條件。

   有兩個函數用於通知線程一個條件已經被知足。pthread_cond_signal函數用來喚醒一個等待條件知足的線程, pthread_cond_broadcast用來喚醒全部等待條件知足的線程。

   他們的定義爲:

#include 
  
  
  
  
   
   
   
   int pthread_cond_signal(pthread_cond_t *cond);int pthread_cond_broadcast(pthread_cond_t *cond);Both return: 0 if OK, error number on failure
  
  
  
  

   下面的這段代碼實現了相似於生產者消費者模型的程序,生產者經過enqueue_msg將消息放入隊列,併發送信號通知給消費者線程。消費者線程被喚醒而後處理消息。

#include 
  
  
  
  
   
   
   
   struct msg {    struct msg *m_next;    /* ... more stuff here ... */};struct msg *workq;pthread_cond_t qready = PTHREAD_COND_INITIALIZER;pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;voidprocess_msg(void){    struct msg *mp;    for (;;) {        pthread_mutex_lock(&qlock);        while (workq == NULL)            pthread_cond_wait(&qready, &qlock); /*get msg from the queue*/        mp = workq;        workq = mp->m_next;        pthread_mutex_unlock(&qlock);        /* now process the message mp */    }}voidenqueue_msg(struct msg *mp){    pthread_mutex_lock(&qlock);    /*put msg in queue*/    mp->m_next = workq;    workq = mp;    pthread_mutex_unlock(&qlock);    pthread_cond_signal(&qready);}
  
  
  
  

   在pthread_cond_signal發送消息以前並不須要佔用鎖,由於一旦線程被喚醒後經過while發現沒有要處理的msg存在則會再次陷入睡眠。若是系統不能容忍這種競爭環境,則須要在unlock以前調用cond_signal,可是在多處理器機器上,這樣會致使多線程被喚醒而後當即進入阻塞(cond_signal喚醒線程,但因爲咱們仍佔用着鎖,因此這些線程又會當即阻塞)。


Linux多線程編程詳細解析----條件變量 pthread_cond_t - IT-Homer - 博客頻道 - CSDN.NET - Google Chrome (2013/9/5 22:01:51)

Linux多線程編程詳細解析----條件變量 pthread_cond_t

分類: Linux/Shell   9642人閱讀   評論(0)   收藏   舉報

Linux操做系統下的多線程編程詳細解析----條件變量

 

1.初始化條件變量pthread_cond_init

#include <pthread.h>

int pthread_cond_init(pthread_cond_t *cv,

const pthread_condattr_t *cattr);

返回值:函數成功返回0;任何其餘返回值都表示錯誤

初始化一個條件變量。當參數cattr爲空指針時,函數建立的是一個缺省的條件變量。不然條件變量的屬性將由cattr中的屬性值來決定。調用 pthread_cond_init函數時,參數cattr爲空指針等價於cattr中的屬性爲缺省屬性,只是前者不須要cattr所佔用的內存開銷。這個函數返回時,條件變量被存放在參數cv指向的內存中。

能夠用宏PTHREAD_COND_INITIALIZER來初始化靜態定義的條件變量,使其具備缺省屬性。這和用pthread_cond_init函數動態分配的效果是同樣的。初始化時不進行錯誤檢查。如:

pthread_cond_t cv = PTHREAD_COND_INITIALIZER;

不能由多個線程同時初始化一個條件變量。當須要從新初始化或釋放一個條件變量時,應用程序必須保證這個條件變量未被使用。

 

2.阻塞在條件變量上pthread_cond_wait

#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *cv,

pthread_mutex_t *mutex);

返回值:函數成功返回0;任何其餘返回值都表示錯誤

函數將解鎖mutex參數指向的互斥鎖,並使當前線程阻塞在cv參數指向的條件變量上。

被阻塞的線程能夠被pthread_cond_signal函數,pthread_cond_broadcast函數喚醒,也可能在被信號中斷後被喚醒。

pthread_cond_wait函數的返回並不意味着條件的值必定發生了變化,必須從新檢查條件的值。

pthread_cond_wait函數返回時,相應的互斥鎖將被當前線程鎖定,即便是函數出錯返回。

通常一個條件表達式都是在一個互斥鎖的保護下被檢查。當條件表達式未被知足時,線程將仍然阻塞在這個條件變量上。當另外一個線程改變了條件的值並向條件變量發出信號時,等待在這個條件變量上的一個線程或全部線程被喚醒,接着都試圖再次佔有相應的互斥鎖。

阻塞在條件變量上的線程被喚醒之後,直到pthread_cond_wait()函數返回以前條件的值都有可能發生變化。因此函數返回之後,在鎖定相應的互斥鎖以前,必須從新測試條件值。最好的測試方法是循環調用pthread_cond_wait函數,並把知足條件的表達式置爲循環的終止條件。如:

pthread_mutex_lock();

while (condition_is_false)

pthread_cond_wait();

pthread_mutex_unlock();

阻塞在同一個條件變量上的不一樣線程被釋放的次序是不必定的。

注意:pthread_cond_wait()函數是退出點,若是在調用這個函數時,已有一個掛起的退出請求,且線程容許退出,這個線程將被終止並開始執行善後處理函數,而這時和條件變量相關的互斥鎖仍將處在鎖定狀態。

 

3.解除在條件變量上的阻塞pthread_cond_signal

#include <pthread.h>

int pthread_cond_signal(pthread_cond_t *cv);

返回值:函數成功返回0;任何其餘返回值都表示錯誤

函數被用來釋放被阻塞在指定條件變量上的一個線程。

必須在互斥鎖的保護下使用相應的條件變量。不然對條件變量的解鎖有可能發生在鎖定條件變量以前,從而形成死鎖。

喚醒阻塞在條件變量上的全部線程的順序由調度策略決定,若是線程的調度策略是SCHED_OTHER類型的,系統將根據線程的優先級喚醒線程。

若是沒有線程被阻塞在條件變量上,那麼調用pthread_cond_signal()將沒有做用。

 

4.阻塞直到指定時間pthread_cond_timedwait

#include <pthread.h>

#include <time.h>

int pthread_cond_timedwait(pthread_cond_t *cv,

pthread_mutex_t *mp, const structtimespec * abstime);

返回值:函數成功返回0;任何其餘返回值都表示錯誤

函數到了必定的時間,即便條件未發生也會解除阻塞。這個時間由參數abstime指定。函數返回時,相應的互斥鎖每每是鎖定的,即便是函數出錯返回。

注意:pthread_cond_timedwait函數也是退出點。

超時時間參數是指一天中的某個時刻。使用舉例:

pthread_timestruc_t to;

to.tv_sec = time(NULL) + TIMEOUT;

to.tv_nsec = 0;

超時返回的錯誤碼是ETIMEDOUT。

 

5.釋放阻塞的全部線程pthread_cond_broadcast

#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cv);

返回值:函數成功返回0;任何其餘返回值都表示錯誤

函數喚醒全部被pthread_cond_wait函數阻塞在某個條件變量上的線程,參數cv被用來指定這個條件變量。當沒有線程阻塞在這個條件變量上時,pthread_cond_broadcast函數無效。

因爲pthread_cond_broadcast函數喚醒全部阻塞在某個條件變量上的線程,這些線程被喚醒後將再次競爭相應的互斥鎖,因此必須當心使用pthread_cond_broadcast函數。

 

6.釋放條件變量pthread_cond_destroy

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cv);

返回值:函數成功返回0;任何其餘返回值都表示錯誤

釋放條件變量。

注意:條件變量佔用的空間並未被釋放。

 

7.喚醒丟失問題

在線程未得到相應的互斥鎖時調用pthread_cond_signal或pthread_cond_broadcast函數可能會引發喚醒丟失問題。

喚醒丟失每每會在下面的狀況下發生:

  1. 一個線程調用pthread_cond_signal或pthread_cond_broadcast函數;
  2. 另外一個線程正處在測試條件變量和調用pthread_cond_wait函數之間;
  3. 沒有線程正在處在阻塞等待的狀態下。

 

轉載聲明: 本文轉自 http://pzs1237.blog.163.com/blog/static/29813006200952335454934/

 

===============================================================================

 

條件鎖pthread_cond_t

 

說明,
等待線程
1。使用pthread_cond_wait前要先加鎖
2。pthread_cond_wait內部會解鎖,而後等待條件變量被其它線程激活
3。pthread_cond_wait被激活後會再自動加鎖

激活線程:
1。加鎖(和等待線程用同一個鎖)
2。pthread_cond_signal發送信號
3。解鎖
激活線程的上面三個操做在運行時間上都在等待線程的pthread_cond_wait函數內部。

程序示例:

[cpp]   view plain copy print ?
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3. #include <unistd.h>  
  4.   
  5. pthread_mutex_t count_lock;  
  6. pthread_cond_t count_nonzero;  
  7. unsigned count = 0;  
  8.   
  9. void *decrement_count(void *arg)  
  10. {  
  11.     pthread_mutex_lock(&count_lock);  
  12.     printf("decrement_count get count_lock/n");  
  13.     while(count == 0)  
  14.     {  
  15.         printf("decrement_count count == 0 /n");  
  16.         printf("decrement_count before cond_wait /n");  
  17.         pthread_cond_wait(&count_nonzero, &count_lock);  
  18.         printf("decrement_count after cond_wait /n");  
  19.     }  
  20.   
  21.     count = count + 1;  
  22.     pthread_mutex_unlock(&count_lock);  
  23. }  
  24.   
  25. void *increment_count(void *arg)  
  26. {  
  27.     pthread_mutex_lock(&count_lock);  
  28.     printf("increment_count get count_lock /n");  
  29.     if(count == 0)  
  30.     {  
  31.         printf("increment_count before cond_signal /n");  
  32.         pthread_cond_signal(&count_nonzero);  
  33.         printf("increment_count after cond_signal /n");  
  34.     }  
  35.   
  36.     count = count + 1;  
  37.     pthread_mutex_unlock(&count_lock);  
  38. }  
  39.   
  40. int main(void)  
  41. {  
  42.     pthread_t tid1, tid2;  
  43.   
  44.     pthread_mutex_init(&count_lock, NULL);  
  45.     pthread_cond_init(&count_nonzero, NULL);  
  46.   
  47.     pthread_create(&tid1, NULL, decrement_count, NULL);  
  48.     sleep(2);  
  49.     pthread_create(&tid2, NULL, increment_count, NULL);  
  50.   
  51.     sleep(10);  
  52.     pthread_exit(0);  
  53.   
  54.     return 0;  
  55. }  


運行結果:

[work@db-testing-com06-vm3.db01.baidu.com pthread]$ gcc -o pthread_cond pthread_cond.c -lpthread
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ ./pthread_cond 
decrement_count get count_lock
decrement_count count == 0 
decrement_count before cond_wait 
increment_count get count_lock 
increment_count before cond_signal 
increment_count after cond_signal 
decrement_count after cond_wait

 

轉載聲明: 本文轉自 http://egeho123.blogbus.com/logs/10821816.html

 

===============================================================================

 

多線程編程,條件變量pthread_cond_t應用

 

程序代碼:

 

[cpp]   view plain copy print ?
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3. #include <unistd.h>  
  4.   
  5. pthread_mutex_t counter_lock;  
  6. pthread_cond_t counter_nonzero;  
  7. int counter = 0;  
  8. int estatus = -1;  
  9.   
  10. void *decrement_counter(void *argv);  
  11. void *increment_counter(void *argv);  
  12.   
  13. int main(int argc, char **argv)  
  14. {  
  15.     printf("counter: %d/n", counter);  
  16.     pthread_t thd1, thd2;  
  17.     int ret;  
  18.   
  19.     ret = pthread_create(&thd1, NULL, decrement_counter, NULL);  
  20.     if(ret){  
  21.         perror("del:/n");  
  22.         return 1;  
  23.     }  
  24.   
  25.     ret = pthread_create(&thd2, NULL, increment_counter, NULL);  
  26.     if(ret){  
  27.         perror("inc: /n");  
  28.         return 1;  
  29.     }  
  30.   
  31.     int counter = 0;  
  32.     while(counter != 10){  
  33.         printf("counter: %d/n", counter);  
  34.         sleep(1);  
  35.         counter++;  
  36.     }  
  37.   
  38.     return 0;  
  39. }  
  40.   
  41. void *decrement_counter(void *argv)  
  42. {  
  43.     pthread_mutex_lock(&counter_lock);  
  44.     while(counter == 0)  
  45.         pthread_cond_wait(&counter_nonzero, &counter_lock);  
  46.   
  47.     counter--;  
  48.     pthread_mutex_unlock(&counter_lock);  
  49.   
  50.     return &estatus;  
  51. }  
  52.   
  53. void *increment_counter(void *argv)  
  54. {  
  55.     pthread_mutex_lock(&counter_lock);  
  56.     if(counter == 0)  
  57.         pthread_cond_signal(&counter_nonzero);  
  58.   
  59.     counter++;  
  60.     pthread_mutex_unlock(&counter_lock);  
  61.   
  62.     return &estatus;  
  63. }  

 

運行結果:
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ gcc -o pthread_cond2 pthread_cond2.c -lpthread
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ ./pthread_cond2                               
counter: 0
counter: 0
counter: 1
counter: 2
counter: 3
counter: 4
counter: 5
counter: 6
counter: 7
counter: 8
counter: 9

 

調試程序的運行過程:

一、開始時   counter 爲0  (main)
二、ret = pthread_create(&thrd1, NULL, decrement_counter, NULL)處生成一個thrd1線程運行decrement_counter(),
此線程內函數運行流程爲:
先鎖定  互斥鎖(count_lock) 若是 counter爲0,此線程被阻塞在 條件 變量(count_nonzero)上.同時釋放互斥鎖count_lock( wait內部會先釋放鎖,等待signal激活後自動再加上鎖).

 

三、與此同時主程序還在運行,建立另外一個線程thrd2運行 increment_counter,
此線程內的函數流程以下:
先鎖定  互斥鎖(count_lock)【 wait內部釋放鎖的互斥鎖】 若是 counter爲0, 喚醒在 條件 變量(count_nonzero)上的線程即thrd1.可是因爲有互斥鎖count_lock【 signal激活後,wait內部又自動加上鎖了】, thrd1仍是在等待. 而後count++,釋放互斥鎖,.......thrd1因爲互斥鎖釋放,從新判斷counter是否是爲0,若是爲0再把線程阻塞在 條件 變量count_nonzero上,但這時counter已經爲1了.因此線程繼續運行.counter--釋放互斥鎖......( 退出後,運行主線程main
四、與此主程序間隔打印counter運行一段時間退出.

 注:更清晰的運行流程請詳見以下「改進代碼」

後記,在編譯的時候加上 -lpthread
改進代碼:

[cpp]   view plain copy print ?
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3. #include <unistd.h>  
  4.   
  5. pthread_mutex_t counter_lock;  
  6. pthread_cond_t counter_nonzero;  
  7. int counter = 0;  
  8. int estatus = -1;  
  9.   
  10. void *decrement_counter(void *argv);  
  11. void *increment_counter(void *argv);  
  12.   
  13. int main(int argc, char **argv)  
  14. {  
  15.     printf("counter: %d/n", counter);  
  16.     pthread_t thd1, thd2;  
  17.     int ret;  
  18.   
  19.     ret = pthread_create(&thd1, NULL, decrement_counter, NULL);  
  20.     if(ret){  
  21.         perror("del:/n");  
  22.         return 1;  
  23.     }  
  24.   
  25.     ret = pthread_create(&thd2, NULL, increment_counter, NULL);  
  26.     if(ret){  
  27.         perror("inc: /n");  
  28.         return 1;  
  29.     }  
  30.   
  31.     int counter = 0;  
  32.     while(counter != 10){  
  33.         printf("counter(main): %d/n", counter);  
  34.         sleep(1);  
  35.         counter++;  
  36.     }  
  37.   
  38.     return 0;  
  39. }  
  40.   
  41. void *decrement_counter(void *argv)  
  42. {  
  43.     printf("counter(decrement): %d/n", counter);  
  44.     pthread_mutex_lock(&counter_lock);  
  45.     while(counter == 0)  
  46.         pthread_cond_wait(&counter_nonzero, &counter_lock); //進入阻塞(wait),等待激活(signal)  
  47.       
  48.     printf("counter--(before): %d/n", counter);      
  49.     counter--; //等待signal激活後再執行  
  50.     printf("counter--(after): %d/n", counter);      
  51.     pthread_mutex_unlock(&counter_lock);   
  52.   
  53.     return &estatus;  
  54. }  
  55.   
  56. void *increment_counter(void *argv)  
  57. {  
  58.     printf("counter(increment): %d/n", counter);  
  59.     pthread_mutex_lock(&counter_lock);  
  60.     if(counter == 0)  
  61.         pthread_cond_signal(&counter_nonzero); //激活(signal)阻塞(wait)的線程(先執行完signal線程,而後再執行wait線程)  
  62.   
  63.     printf("counter++(before): %d/n", counter);      
  64.     counter++;   
  65.     printf("counter++(after): %d/n", counter);      
  66.     pthread_mutex_unlock(&counter_lock);  
  67.   
  68.     return &estatus;  
  69. }  

運行結果:
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ gcc -o pthread_cond2 pthread_cond2.c -lpthread
[work@db-testing-com06-vm3.db01.baidu.com pthread]$ ./pthread_cond2                                
counter: 0
counter(main): 0
counter(decrement): 0
counter(increment): 0
counter++(before): 0
counter++(after): 1
counter--(before): 1
counter--(after): 0
counter(main): 1
counter(main): 2
counter(main): 3
counter(main): 4
counter(main): 5
counter(main): 6
counter(main): 7
counter(main): 8
counter(main): 9

多線程--條件變量 - DZQABC - 博客園 - Google Chrome (2013/9/5 22:00:10)

條件變量函數

 

操做

相關函數說明

初始化條件變量

pthread_cond_init 語法

基於條件變量阻塞

pthread_cond_wait 語法

解除阻塞特定線程

pthread_cond_signal 語法

在指定的時間以前阻塞

pthread_cond_timedwait 語法

在指定的時間間隔內阻塞

pthread_cond_reltimedwait_np 語法

解除阻塞全部線程

pthread_cond_broadcast 語法

銷燬條件變量狀態

pthread_cond_destroy 語法

初始化條件變量

使用 pthread_cond_init(3C) 能夠將 cv 所指示的條件變量初始化爲其缺省值,或者指定已經使用 pthread_condattr_init() 設置的條件變量屬性。

pthread_cond_init 語法

int pthread_cond_init(pthread_cond_t *cv,

const pthread_condattr_t *cattr);
#include <pthread.h>



pthread_cond_t cv;

pthread_condattr_t cattr;

int ret;



/* initialize a condition variable to its default value */

ret = pthread_cond_init(&cv, NULL);



/* initialize a condition variable */

ret = pthread_cond_init(&cv, &cattr);

cattr 設置爲 NULL。將 cattr 設置爲 NULL 與傳遞缺省條件變量屬性對象的地址等效,可是沒有內存開銷。對於 Solaris 線程,請參見cond_init 語法。

使用 PTHREAD_COND_INITIALIZER 宏能夠將以靜態方式定義的條件變量初始化爲其缺省屬性。PTHREAD_COND_INITIALIZER 宏與動態分配具備 null 屬性的 pthread_cond_init() 等效,可是不進行錯誤檢查。

多個線程決不能同時初始化或從新初始化同一個條件變量。若是要從新初始化或銷燬某個條件變量,則應用程序必須確保該條件變量未被使用。

pthread_cond_init 返回值

pthread_cond_init() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下任一狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cattr 指定的值無效。

EBUSY

描述:

條件變量處於使用狀態。

EAGAIN

描述:

必要的資源不可用。

ENOMEM

描述:

內存不足,沒法初始化條件變量。

基於條件變量阻塞

使用 pthread_cond_wait(3C) 能夠以原子方式釋放 mp 所指向的互斥鎖,並致使調用線程基於 cv 所指向的條件變量阻塞。對於 Solaris 線程,請參見cond_wait 語法。

pthread_cond_wait 語法

int pthread_cond_wait(pthread_cond_t *cv,pthread_mutex_t *mutex);
#include <pthread.h>



pthread_cond_t cv;

pthread_mutex_t mp;

int ret;



/* wait on condition variable */

ret = pthread_cond_wait(&cv, &mp);

阻塞的線程能夠經過 pthread_cond_signal()  pthread_cond_broadcast() 喚醒,也能夠在信號傳送將其中斷時喚醒。

不能經過 pthread_cond_wait() 的返回值來推斷與條件變量相關聯的條件的值的任何變化。必須從新評估此類條件。

pthread_cond_wait() 例程每次返回結果時調用線程都會鎖定而且擁有互斥鎖,即便返回錯誤時也是如此。

該條件得到信號以前,該函數一直被阻塞。該函數會在被阻塞以前以原子方式釋放相關的互斥鎖,並在返回以前以原子方式再次獲取該互斥鎖。

一般,對條件表達式的評估是在互斥鎖的保護下進行的。若是條件表達式爲假,線程會基於條件變量阻塞。而後,當該線程更改條件值時,另外一個線程會針對條件變量發出信號。這種變化會致使全部等待該條件的線程解除阻塞並嘗試再次獲取互斥鎖。

必須從新測試致使等待的條件,而後才能從 pthread_cond_wait() 處繼續執行。喚醒的線程從新獲取互斥鎖並從 pthread_cond_wait() 返回以前,條件可能會發生變化。等待線程可能並未真正喚醒。建議使用的測試方法是,將條件檢查編寫爲調用 pthread_cond_wait()  while() 循環。

pthread_mutex_lock();

while(condition_is_false)

pthread_cond_wait();

pthread_mutex_unlock();

若是有多個線程基於該條件變量阻塞,則沒法保證按特定的順序獲取互斥鎖。


注 –

pthread_cond_wait() 是取消點。若是取消處於暫掛狀態,而且調用線程啓用了取消功能,則該線程會終止,並在繼續持有該鎖的狀況下開始執行清除處理程序。


pthread_cond_wait 返回值

pthread_cond_wait() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cv  mp 指定的值無效。

解除阻塞一個線程

對於基於 cv 所指向的條件變量阻塞的線程,使用 pthread_cond_signal(3C) 能夠解除阻塞該線程。對於 Solaris 線程,請參見cond_signal 語法。

pthread_cond_signal 語法

int pthread_cond_signal(pthread_cond_t *cv);
#include <pthread.h>



pthread_cond_t cv;

int ret;



/* one condition variable is signaled */

ret = pthread_cond_signal(&cv);

應在互斥鎖的保護下修改相關條件,該互斥鎖用於得到信號的條件變量中。不然,可能在條件變量的測試和 pthread_cond_wait() 阻塞之間修改該變量,這會致使無限期等待。

調度策略可肯定喚醒阻塞線程的順序。對於 SCHED_OTHER,將按優先級順序喚醒線程。

若是沒有任何線程基於條件變量阻塞,則調用 pthread_cond_signal() 不起做用。


示例 4–8 使用 pthread_cond_wait()  pthread_cond_signal()

 

pthread_mutex_t count_lock;

pthread_cond_t count_nonzero;

unsigned count;



decrement_count()

{

pthread_mutex_lock(&count_lock);

while (count == 0)

pthread_cond_wait(&count_nonzero, &count_lock);

count = count - 1;

pthread_mutex_unlock(&count_lock);

}



increment_count()

{

pthread_mutex_lock(&count_lock);

if (count == 0)

pthread_cond_signal(&count_nonzero);

count = count + 1;

pthread_mutex_unlock(&count_lock);

}

pthread_cond_signal 返回值

pthread_cond_signal() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cv 指向的地址非法。

說明了如何使用 pthread_cond_wait()  pthread_cond_signal()

在指定的時間以前阻塞

pthread_cond_timedwait(3C) 的用法與 pthread_cond_wait() 的用法基本相同,區別在於在由 abstime 指定的時間以後 pthread_cond_timedwait() 再也不被阻塞。

pthread_cond_timedwait 語法

int pthread_cond_timedwait(pthread_cond_t *cv,

pthread_mutex_t *mp, const struct timespec *abstime);
#include <pthread.h>

#include <time.h>



pthread_cond_t cv;

pthread_mutex_t mp;

timestruct_t abstime;

int ret;



/* wait on condition variable */

ret = pthread_cond_timedwait(&cv, &mp, &abstime);

pthread_cond_timewait() 每次返回時調用線程都會鎖定而且擁有互斥鎖,即便 pthread_cond_timedwait() 返回錯誤時也是如此。 對於 Solaris 線程

pthread_cond_timedwait() 函數會一直阻塞,直到該條件得到信號,或者最後一個參數所指定的時間已過爲止。


注 –

pthread_cond_timedwait() 也是取消點。



示例 4–9 計時條件等待

 

pthread_timestruc_t to;

pthread_mutex_t m;

pthread_cond_t c;

...

pthread_mutex_lock(&m);

to.tv_sec = time(NULL) + TIMEOUT;

to.tv_nsec = 0;

while (cond == FALSE) {

err = pthread_cond_timedwait(&c, &m, &to);

if (err == ETIMEDOUT) {

/* timeout, do something */

break;

}

}

pthread_mutex_unlock(&m);

pthread_cond_timedwait 返回值

pthread_cond_timedwait() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下任一狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cv  abstime 指向的地址非法。

ETIMEDOUT

描述:

abstime 指定的時間已過。

超時會指定爲當天時間,以便在不從新計算值的狀況下高效地從新測試條件,如示例 4–9 中所示。

在指定的時間間隔內阻塞

pthread_cond_reltimedwait_np(3C) 的用法與 pthread_cond_timedwait() 的用法基本相同,惟一的區別在於 pthread_cond_reltimedwait_np() 會採用相對時間間隔而不是未來的絕對時間做爲其最後一個參數的值。

pthread_cond_reltimedwait_np 語法

int  pthread_cond_reltimedwait_np(pthread_cond_t *cv, 

pthread_mutex_t *mp,

const struct timespec *reltime);
#include <pthread.h>

#include <time.h>



pthread_cond_t cv;

pthread_mutex_t mp;

timestruct_t reltime;

int ret;



/* wait on condition variable */

ret = pthread_cond_reltimedwait_np(&cv, &mp, &reltime);

pthread_cond_reltimedwait_np() 每次返回時調用線程都會鎖定而且擁有互斥鎖,即便 pthread_cond_reltimedwait_np() 返回錯誤時也是如此。對於 Solaris 線程,請參見 cond_reltimedwait(3C)pthread_cond_reltimedwait_np() 函數會一直阻塞,直到該條件得到信號,或者最後一個參數指定的時間間隔已過爲止。


注 –

pthread_cond_reltimedwait_np() 也是取消點。


pthread_cond_reltimedwait_np 返回值

pthread_cond_reltimedwait_np() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下任一狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cv  reltime 指示的地址非法。

ETIMEDOUT

描述:

reltime 指定的時間間隔已過。

解除阻塞全部線程

對於基於 cv 所指向的條件變量阻塞的線程,使用 pthread_cond_broadcast(3C) 能夠解除阻塞全部這些線程,這由 pthread_cond_wait() 來指定。

pthread_cond_broadcast 語法

int pthread_cond_broadcast(pthread_cond_t *cv);
#include <pthread.h>



pthread_cond_t cv;

int ret;



/* all condition variables are signaled */

ret = pthread_cond_broadcast(&cv);

若是沒有任何線程基於該條件變量阻塞,則調用 pthread_cond_broadcast() 不起做用。對於 Solaris 線程,請參見cond_broadcast 語法。

因爲 pthread_cond_broadcast() 會致使全部基於該條件阻塞的線程再次爭用互斥鎖,所以請謹慎使用 pthread_cond_broadcast()。例如,經過使用pthread_cond_broadcast(),線程可在資源釋放後爭用不一樣的資源量,如示例 4–10 中所示。


示例 4–10 條件變量廣播

 

pthread_mutex_t rsrc_lock;

pthread_cond_t rsrc_add;

unsigned int resources;



get_resources(int amount)

{

pthread_mutex_lock(&rsrc_lock);

while (resources < amount) {

pthread_cond_wait(&rsrc_add, &rsrc_lock);

}

resources -= amount;

pthread_mutex_unlock(&rsrc_lock);

}



add_resources(int amount)

{

pthread_mutex_lock(&rsrc_lock);

resources += amount;

pthread_cond_broadcast(&rsrc_add);

pthread_mutex_unlock(&rsrc_lock);

}

請注意,在 add_resources() 中,首先更新 resources 仍是首先在互斥鎖中調用 pthread_cond_broadcast() 可有可無。

應在互斥鎖的保護下修改相關條件,該互斥鎖用於得到信號的條件變量中。不然,可能在條件變量的測試和 pthread_cond_wait() 阻塞之間修改該變量,這會致使無限期等待。

pthread_cond_broadcast 返回值

pthread_cond_broadcast() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cv 指示的地址非法。

銷燬條件變量狀態

使用 pthread_cond_destroy(3C) 能夠銷燬與 cv 所指向的條件變量相關聯的任何狀態。對於 Solaris 線程,請參見cond_destroy 語法。

pthread_cond_destroy 語法

int pthread_cond_destroy(pthread_cond_t *cv);
#include <pthread.h>



pthread_cond_t cv;

int ret;



/* Condition variable is destroyed */

ret = pthread_cond_destroy(&cv);

請注意,沒有釋放用來存儲條件變量的空間。

pthread_cond_destroy 返回值

pthread_cond_destroy() 在成功完成以後會返回零。其餘任何返回值都表示出現了錯誤。若是出現如下狀況,該函數將失敗並返回對應的值。

EINVAL

描述:

cv 指定的值無效。

注意:pthread_cond_destroy 銷燬一個條件變量,釋放它擁有的資源。進入 pthread_cond_destroy 以前,必須沒有在該條件變量上等待的線程。
Attempting to destroy a condition variable upon which other threads are currently blocked results in undefined behavior.

喚醒丟失問題

若是線程未持有與條件相關聯的互斥鎖,則調用 pthread_cond_signal()  pthread_cond_broadcast() 會產生喚醒丟失錯誤。

知足如下全部條件時,即會出現喚醒丟失問題:

  • 一個線程調用 pthread_cond_signal()  pthread_cond_broadcast()

  • 另外一個線程已經測試了該條件,可是還沒有調用 pthread_cond_wait()

  • 沒有正在等待的線程

    信號不起做用,所以將會丟失

僅當修改所測試的條件但未持有與之相關聯的互斥鎖時,纔會出現此問題。只要僅在持有關聯的互斥鎖同時修改所測試的條件,便可調用 pthread_cond_signal() pthread_cond_broadcast(),而不管這些函數是否持有關聯的互斥鎖。

生成方和使用者問題

併發編程中收集了許多標準的衆所周知的問題,生成方和使用者問題只是其中的一個問題。此問題涉及到一個大小限定的緩衝區和兩類線程(生成方和使用者),生成方將項放入緩衝區中,而後使用者從緩衝區中取走項。

生成方必須在緩衝區中有可用空間以後才能向其中放置內容。使用者必須在生成方向緩衝區中寫入以後才能從中提取內容。

條件變量表示一個等待某個條件得到信號的線程隊列。

示例 4–11 中包含兩個此類隊列。一個隊列 (less) 針對生成方,用於等待緩衝區中出現空位置。另外一個隊列 (more) 針對使用者,用於等待從緩衝槽位的空位置中提取其中包含的信息。該示例中還包含一個互斥鎖,由於描述該緩衝區的數據結構一次只能由一個線程訪問。


示例 4–11 生成方和使用者的條件變量問題

 

typedef struct {

char buf[BSIZE];

int occupied;

int nextin;

int nextout;

pthread_mutex_t mutex;

pthread_cond_t more;

pthread_cond_t less;

} buffer_t;



buffer_t buffer;

如示例 4–12 中所示,生成方線程獲取該互斥鎖以保護 buffer 數據結構,而後,緩衝區肯定是否有空間可用於存放所生成的項。若是沒有可用空間,生成方線程會調用pthread_cond_wait()pthread_cond_wait() 會致使生成方線程鏈接正在等待 less 條件得到信號的線程隊列。less 表示緩衝區中的可用空間。

與此同時,在調用 pthread_cond_wait() 的過程當中,該線程會釋放互斥鎖的鎖定。正在等待的生成方線程依賴於使用者線程在條件爲真時發出信號,如示例 4–12 中所示。該條件得到信號時,將會喚醒等待 less 的第一個線程。可是,該線程必須再次鎖定互斥鎖,而後才能從 pthread_cond_wait() 返回。

獲取互斥鎖可確保該線程再次以獨佔方式訪問緩衝區的數據結構。該線程隨後必須檢查緩衝區中是否確實存在可用空間。若是空間可用,該線程會向下一個可用的空位置中進行寫入。

與此同時,使用者線程可能正在等待項出如今緩衝區中。這些線程正在等待條件變量 more。剛在緩衝區中存儲內容的生成方線程會調用 pthread_cond_signal() 以喚醒下一個正在等待的使用者。若是沒有正在等待的使用者,此調用將不起做用。

最後,生成方線程會解除鎖定互斥鎖,從而容許其餘線程處理緩衝區的數據結構。


示例 4–12 生成方和使用者問題:生成方

 

void producer(buffer_t *b, char item)

{

pthread_mutex_lock(&b->mutex);



while (b->occupied >= BSIZE)

pthread_cond_wait(&b->less, &b->mutex);



assert(b->occupied < BSIZE);



b->buf[b->nextin++] = item;



b->nextin %= BSIZE;

b->occupied++;



/* now: either b->occupied < BSIZE and b->nextin is the index

of the next empty slot in the buffer, or

b->occupied == BSIZE and b->nextin is the index of the

next (occupied) slot that will be emptied by a consumer

(such as b->nextin == b->nextout) */



pthread_cond_signal(&b->more);



pthread_mutex_unlock(&b->mutex);

}

請注意 assert() 語句的用法。除非在編譯代碼時定義了 NDEBUG,不然 assert() 在其參數的計算結果爲真(非零)時將不執行任何操做。若是參數的計算結果爲假(零),則該程序會停止。在多線程程序中,此類斷言特別有用。若是斷言失敗,assert() 會當即指出運行時問題。assert() 還有另外一個做用,即提供有用的註釋。

 /* now: either b->occupied ... 開頭的註釋最好以斷言形式表示,可是因爲語句過於複雜,沒法用布爾值表達式來表示,所以將用英語表示。

斷言和註釋都是不變量的示例。這些不變量是邏輯語句,在程序正常執行時不該將其聲明爲假,除非是線程正在修改不變量中提到的一些程序變量時的短暫修改過程當中。固然,只要有線程執行語句,斷言就應當爲真。

使用不變量是一種極爲有用的方法。即便沒有在程序文本中聲明不變量,在分析程序時也應將其視爲不變量。

每次線程執行包含註釋的代碼時,生成方代碼中表示爲註釋的不變量始終爲真。若是將此註釋移到緊挨 mutex_unlock() 的後面,則註釋不必定仍然爲真。若是將此註釋移到緊跟assert() 以後的位置,則註釋仍然爲真。

所以,不變量可用於表示一個始終爲真的屬性,除非一個生成方或一個使用者正在更改緩衝區的狀態。線程在互斥鎖的保護下處理緩衝區時,該線程可能會暫時聲明不變量爲假。可是,一旦線程結束對緩衝區的操做,不變量即會恢復爲真。

示例 4–13 給出了使用者的代碼。該邏輯流程與生成方的邏輯流程相對稱。


示例 4–13 生成方和使用者問題:使用者

 

char consumer(buffer_t *b)

{

char item;

pthread_mutex_lock(&b->mutex);

while(b->occupied <= 0)

pthread_cond_wait(&b->more, &b->mutex);



assert(b->occupied > 0);



item = b->buf[b->nextout++];

b->nextout %= BSIZE;

b->occupied--;



/* now: either b->occupied > 0 and b->nextout is the index

of the next occupied slot in the buffer, or

b->occupied == 0 and b->nextout is the index of the next

(empty) slot that will be filled by a producer (such as

b->nextout == b->nextin) */



pthread_cond_signal(&b->less);

pthread_mutex_unlock(&b->mutex);



return(item);

}

線程的分離狀態 pthread_attr_setdetachstate 函數使用 - 輕飄飛揚 - 博客頻道 - CSDN.NET - Google Chrome (2013/9/5 16:53:00)

 

線程的分離狀態 pthread_attr_setdetachstate 函數使用

分類: 【Linux應用開發】   800人閱讀   評論(0)   收藏   舉報

在任何一個時間點上,線程是可結合的(joinable),或者是分離的(detached)。一個可結合的線程可以被其餘線程收回其資源和殺死;在被其餘線程回收以前,它的存儲器資源(如棧)是不釋放的。相反,一個分離的線程是不能被其餘線程回收或殺死的,它的存儲器資源在它終止時由系統自動釋放。

        線程的分離狀態決定一個線程以什麼樣的方式來終止本身。在默認狀況下線程是非分離狀態的,這種狀況下,原有的線程等待建立的線程結束。只有當pthread_join()函數返回時,建立的線程纔算終止,才能釋放本身佔用的系統資源。而分離線程不是這樣子的,它沒有被其餘的線程所等待,本身運行結束了,線程也就終止了,立刻釋放系統資源。程序員應該根據本身的須要,選擇適當的分離狀態。因此若是咱們在建立線程時就知道不須要了解線程的終止狀態,則能夠pthread_attr_t結構中的detachstate線程屬性,讓線程以分離狀態啓動。

設置線程分離狀態的函數爲pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)。第二個參數可選爲PTHREAD_CREATE_DETACHED(分離線程)和 PTHREAD _CREATE_JOINABLE(非分離線程)。這裏要注意的一點是,若是設置一個線程爲分離線程,而這個線程運行又很是快,它極可能在pthread_create函數返回以前就終止了,它終止之後就可能將線程號和系統資源移交給其餘的線程使用,這樣調用pthread_create的線程就獲得了錯誤的線程號。要避免這種狀況能夠採起必定的同步措施,最簡單的方法之一是能夠在被建立的線程裏調用pthread_cond_timewait函數,讓這個線程等待一下子,留出足夠的時間讓函數pthread_create返回。設置一段等待時間,是在多線程編程裏經常使用的方法。可是注意不要使用諸如wait()之類的函數,它們是使整個進程睡眠,並不能解決線程同步的問題。

另一個可能經常使用的屬性是線程的優先級,它存放在結構sched_param中。用函數pthread_attr_getschedparam和函數pthread_attr_setschedparam進行存放,通常說來,咱們老是先取優先級,對取得的值修改後再存放回去。

線程等待——正確處理線程終止

#include <pthread.h>

void pthread_exit(void *retval);

void pthread_join(pthread_t th,void *thread_return);//掛起等待th結束,*thread_return=retval;

int pthread_detach(pthread_t th);

若是線程處於joinable狀態,則只能只能被建立他的線程等待終止。

在Linux平臺默認狀況下,雖然各個線程之間是相互獨立的,一個線程的終止不會去通知或影響其餘的線程。可是已經終止的線程的資源並不會隨着線程的終止而獲得釋放,咱們須要調用 pthread_join() 來得到另外一個線程的終止狀態而且釋放該線程所佔的資源。(說明:線程處於joinable狀態下)

調用該函數的線程將掛起,等待 th 所表示的線程的結束。 thread_return 是指向線程 th 返回值的指針。須要注意的是 th 所表示的線程必須是 joinable 的,即處於非 detached(遊離)狀態;而且只能夠有惟一的一個線程對 th 調用 pthread_join() 。若是 th 處於 detached 狀態,那麼對 th 的 pthread_join() 調用將返回錯誤。

若是不關心一個線程的結束狀態,那麼也能夠將一個線程設置爲 detached 狀態,從而讓操做系統在該線程結束時來回收它所佔的資源。將一個線程設置爲detached 狀態能夠經過兩種方式來實現。一種是調用 pthread_detach() 函數,能夠將線程 th 設置爲 detached 狀態。另外一種方法是在建立線程時就將它設置爲 detached 狀態,首先初始化一個線程屬性變量,而後將其設置爲 detached 狀態,最後將它做爲參數傳入線程建立函數 pthread_create(),這樣所建立出來的線程就直接處於 detached 狀態。

建立 detach 線程:

pthread_t tid;

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_create(&tid, &attr, THREAD_FUNCTION, arg);

總之爲了在使用 pthread 時避免線程的資源在線程結束時不能獲得正確釋放,從而避免產生潛在的內存泄漏問題,在對待線程結束時,要確保該線程處於 detached 狀態,否着就須要調用 pthread_join() 函數來對其進行資源回收。

linux c學習筆記----消息隊列(ftok,msgget,msgsnd,msgrcv,msgctl) - 知知爲知知 - ITeye技術網站 - Google Chrome (2013/7/3 19:18:54)

 


ftok()

#include <sys/types.h>

#include <sys/ipc.h>

函數原型:

 key_t  ftok( const  char * pathname , int   proj_id  );

參數:

  pathname 就時你指定的文件名(該文件必須是存在並且能夠訪問的)id是子序號,雖         然爲int,可是隻有8個比特被使用(0-255)

返回值: 成功時候返回key_t 類型的key值,失敗返回-1


msgget

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函數原型: int    msgget ( key_t  key , int  msgflg );

函數描述:創建消息隊列

參數:

msgget()函數的第一個參數是消息隊列對象的關鍵字(key),函數將它與已有的消息隊
列對象的關鍵字進行比較來判斷消息隊列對象是否已經建立。而函數進行的具體操做是 第二個參數,msgflg 控制的。它能夠取下面的幾個值:
IPC_CREAT 若是消息隊列對象不存在,則建立之,不然則進行打開操做;
IPC_EXCLIPC_CREAT 一塊兒使用(用」|」鏈接),若是消息對象不存在則建立之,否     則產生一個錯誤並返回。

返回值:

成功時返回隊列ID,失敗返回-1,錯誤緣由存於error

EEXIST (Queue exists, cannot create)
EIDRM (Queue is marked for deletion)
ENOENT (Queue does not exist)
ENOMEM (Not enough memory to create queue)
ENOSPC (Maximum queue limit exceeded)




msgsnd函數:將消息送入消息隊列

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h

函數原型:int  msgsnd ( int msgid ,  struct msgbuf*msgp , int msgsz, int msgflg );

參數說明:

傳給msgsnd()函數的第一個參數msqid 是消息隊列對象的標識符(由msgget()函數得

到),第二個參數msgp 指向要發送的消息所在的內存,第三個參數msgsz 是要發送信息     的長度(字節數),能夠用如下的公式計算:
msgsz = sizeof(struct mymsgbuf) - sizeof(long);
第四個參數是控制函數行爲的標誌,能夠取如下的值:
0,忽略標誌位;
IPC_NOWAIT,若是消息隊列已滿,消息將不被寫入隊列,控制權返回調用函數的線
程。若是不指定這個參數,線程將被阻塞直到消息被能夠被寫入。

smgbuf結構體定義以下:

struct smgbuf

{

                     long   mtype;

                    char   mtext [x] ;  //長度由msgsz決定

}


msgflg 可設置爲 IPC_NOWAIT 。若是消息隊列已滿或其餘狀況沒法送入消息,則當即  返回 EAGIN

返回: 0 on success

-1 on error: errno = EAGAIN (queue is full, and IPC_NOWAIT was asserted)
EACCES (permission denied, no write permission)
EFAULT (msgp address isn't accessable – invalid)
EIDRM (The message queue has been removed)
EINTR (Received a signal while waiting to write)
EINVAL (Invalid message queue identifier, nonpositive
message type, or invalid message size)
ENOMEM (Not enough memory to copy message buffer)


msgrcv函數:從消息隊列中讀取消息

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函數定義:int  msgrcv( int  msgid , struct   msgbuf*  msgp ,  int msgsz ,  long msgtyp, int msgflg);

參數:

函數的前三個參數和msgsnd()函數中對應的參數的含義是相同的。第四個參數mtype

指定了函數從隊列中所取的消息的類型。函數將從隊列中搜索類型與之匹配的消息並將 返回。不過這裏有一個例外。若是mtype 的值是零的話,函數將不作類型檢查而自動返     回隊列中的最舊的消息。第五個參數依然是是控制函數行爲的標誌,取值能夠是:
0,表示忽略;
IPC_NOWAIT,若是消息隊列爲空,則返回一個ENOMSG,並將控制權交回調用函數
的進程。若是不指定這個參數,那麼進程將被阻塞直到函數能夠從隊列中獲得符合條件 消息爲止。若是一個client 正在等待消息的時候隊列被刪除,EIDRM 就會被返回。若是     進程在阻塞等待過程當中收到了系統的中斷信號,EINTR 就會被返回。
MSG_NOERROR,若是函數取得的消息長度大於msgsz,將只返回msgsz 長度的信息,
剩下的部分被丟棄了。若是不指定這個參數,E2BIG 將被返回,而消息則留在隊列中不     被取出。
當消息從隊列內取出後,相應的消息就從隊列中刪除了。

msgbuf:結構體,定義以下:

struct msgbuf

{

                      long  mtype ;  //信息種類

                       char   mtest[x];//信息內容   ,長度由msgsz指定

}


msgtyp:  信息類型。 取值以下:

 msgtyp = 0 ,不分類型,直接返回消息隊列中的第一項

 msgtyp > 0 ,返回第一項 msgtyp與 msgbuf結構體中的mtype相同的信息

msgtyp <0 , 返回第一項 mtype小於等於msgtyp絕對值的信息


msgflg:取值以下:

IPC_NOWAIT ,不阻塞

IPC_NOERROR ,若信息長度超過參數msgsz,則截斷信息而不報錯。



返回值:

成功時返回所獲取信息的長度,失敗返回-1,錯誤信息存於error

Number of bytes copied into message buffer
-1 on error: errno = E2BIG (Message length is greater than
msgsz,no MSG_NOERROR)
EACCES (No read permission)
EFAULT (Address pointed to by msgp is invalid)
EIDRM (Queue was removed during retrieval)
EINTR (Interrupted by arriving signal)
EINVAL (msgqid invalid, or msgsz less than 0)
ENOMSG (IPC_NOWAIT asserted, and no message exists in the queue to satisfy the request)

 

 

 

 例子:

server.c


C代碼     收藏代碼
  1. <span><strong><em><span style="font-size: small;">#include <stdio.h>  
  2. #include <string.h>  
  3. #include <stdlib.h>  
  4. #include <errno.h>  
  5. #include <unistd.h>  
  6. #include <sys/types.h>  
  7. #include <sys/ipc.h>  
  8. #include <sys/stat.h>  
  9. #include <sys/msg.h>  
  10. #define MSG_FILE "server.c"  
  11. #define BUFFER 255  
  12. #define PERM S_IRUSR|S_IWUSR  
  13. struct msgtype {  
  14. long mtype;  
  15. char buffer[BUFFER+1];  
  16. };  
  17. int main()  
  18. {  
  19. struct msgtype msg;  
  20. key_t key;  
  21. int msgid;  
  22. if((key=ftok(MSG_FILE,'a'))==-1)  
  23. {  
  24. fprintf(stderr,"Creat Key Error:%s\a\n",strerror(errno));  
  25. exit(1);  
  26. }  
  27. if((msgid=msgget(key,PERM|IPC_CREAT|IPC_EXCL))==-1)  
  28. {  
  29. fprintf(stderr,"Creat Message Error:%s\a\n",strerror(errno));  
  30. exit(1);  
  31. }  
  32. while(1)  
  33. {  
  34. msgrcv(msgid,&msg,sizeof(struct msgtype),1,0);  
  35. fprintf(stderr,"Server Receive:%s\n",msg.buffer);  
  36. msg.mtype=2;  
  37. msgsnd(msgid,&msg,sizeof(struct msgtype),0);  
  38. }  
  39. exit(0);  
  40. }</span></em></strong></span>  

 client.c


C代碼     收藏代碼
  1. <span><strong><em><span style="font-size: small;">#include <stdio.h>  
  2. #include <string.h>  
  3. #include <stdlib.h>  
  4. #include <errno.h>  
  5. #include <sys/types.h>  
  6. #include <sys/ipc.h>  
  7. #include <sys/msg.h>  
  8. #include <sys/stat.h>  
  9. #define MSG_FILE "server.c"  
  10. #define BUFFER 255  
  11. #define PERM S_IRUSR|S_IWUSR  
  12. struct msgtype {  
  13. long mtype;  
  14. char buffer[BUFFER+1];  
  15. };  
  16. int main(int argc,char **argv)  
  17. {  
  18. struct msgtype msg;  
  19. key_t key;  
  20. int msgid;  
  21. if(argc!=2)  
  22. {  
  23. fprintf(stderr,"Usage:%s string\n\a",argv[0]);  
  24. exit(1);  
  25. }  
  26. if((key=ftok(MSG_FILE,'a'))==-1)  
  27. {  
  28. fprintf(stderr,"Creat Key Error:%s\a\n",strerror(errno));  
  29. exit(1);  
  30. }  
  31. if((msgid=msgget(key,PERM))==-1)  
  32. {  
  33. fprintf(stderr,"Creat Message Error:%s\a\n",strerror(errno));  
  34. exit(1);  
  35. }  
  36. msg.mtype=1;  
  37. strncpy(msg.buffer,argv[1],BUFFER);  
  38. msgsnd(msgid,&msg,sizeof(struct msgtype),0);  
  39. memset(&msg,'\0',sizeof(struct msgtype));  
  40. msgrcv(msgid,&msg,sizeof(struct msgtype),2,0);  
  41. fprintf(stderr,"Client receive:%s\n",msg.buffer);  
  42. exit(0);  
  43. }</span></em></strong></span>  

Linux下c開發 之 線程通訊(轉) - 土貓敢死隊 - 博客園 - Google Chrome (2013/7/3 10:10:22)

Linux下c開發 之 線程通訊(轉)

1.Linux「線程」

     進程與線程之間是有區別的,不過Linux內核只提供了輕量進程的支持,未實現線程模型。Linux是一種「多進程單線程」的操做系統。Linux自己只有進程的概念,而其所謂的「線程」本質上在內核裏仍然是進程。

     你們知道,進程是資源分配的單位,同一進程中的多個線程共享該進程的資源(如做爲共享內存的全局變量)。Linux中所謂的「線程」只是在被建立時clone了父進程的資源,所以clone出來的進程表現爲「線程」,這一點必定要弄清楚。所以,Linux「線程」這個概念只有在打冒號的狀況下才是最準確的。

     目前Linux中最流行的線程機制爲LinuxThreads,所採用的就是線程-進程「一對一」模型,調度交給核心,而在用戶級實現一個包括信號處理在內的線程管理機制。LinuxThreads由Xavier Leroy (Xavier.Leroy@inria.fr)負責開發完成,並已綁定在GLIBC中發行,它實現了一種BiCapitalized面向Linux的Posix 1003.1c 「pthread」標準接口。Linuxthread能夠支持Intel、Alpha、MIPS等平臺上的多處理器系統。

  按照POSIX 1003.1c 標準編寫的程序與Linuxthread 庫相連接便可支持Linux平臺上的多線程,在程序中需包含頭文件pthread. h,在編譯連接時使用命令:

gcc -D -REENTRANT -lpthread xxx. c

  其中-REENTRANT宏使得相關庫函數(如stdio.h、errno.h中函數) 是可重入的、線程安全的(thread-safe),-lpthread則意味着連接庫目錄下的libpthread.a或libpthread.so文件。使用Linuxthread庫須要2.0以上版本的Linux內核及相應版本的C庫(libc 5.2.1八、libc 5.4.十二、libc 6)。

     2.「線程」控制

  線程建立

  進程被建立時,系統會爲其建立一個主線程,而要在進程中建立新的線程,則能夠調用pthread_create:

pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(start_routine)(void*), void *arg);

  start_routine爲新線程的入口函數,arg爲傳遞給start_routine的參數。

  每一個線程都有本身的線程ID,以便在進程內區分。線程ID在pthread_create調用時回返給建立線程的調用者;一個線程也能夠在建立後使用pthread_self()調用獲取本身的線程ID:

pthread_self (void) ;

  線程退出

  線程的退出方式有三:

  (1)執行完成後隱式退出;

  (2)由線程自己顯示調用pthread_exit 函數退出;

pthread_exit (void * retval) ;

  (3)被其餘線程用pthread_cance函數終止:

pthread_cance (pthread_t thread) ;

  在某線程中調用此函數,能夠終止由參數thread 指定的線程。

  若是一個線程要等待另外一個線程的終止,可使用pthread_join函數,該函數的做用是調用pthread_join的線程將被掛起直到線程ID爲參數thread的線程終止:

pthread_join (pthread_t thread, void** threadreturn);

3.線程通訊

  線程互斥

  互斥意味着「排它」,即兩個線程不能同時進入被互斥保護的代碼。Linux下能夠經過pthread_mutex_t 定義互斥體機制完成多線程的互斥操做,該機制的做用是對某個須要互斥的部分,在進入時先獲得互斥體,若是沒有獲得互斥體,代表互斥部分被其它線程擁有,此時欲獲取互斥體的線程阻塞,直到擁有該互斥體的線程完成互斥部分的操做爲止。

  下面的代碼實現了對共享全局變量x 用互斥體mutex 進行保護的目的:

int x; // 進程中的全局變量
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL); //按缺省的屬性初始化互斥體變量mutex
pthread_mutex_lock(&mutex); // 給互斥體變量加鎖
… //對變量x 的操做
phtread_mutex_unlock(&mutex); // 給互斥體變量解除鎖

  線程同步

  同步就是線程等待某個事件的發生。只有當等待的事件發生線程才繼續執行,不然線程掛起並放棄處理器。當多個線程協做時,相互做用的任務必須在必定的條件下同步。

  Linux下的C語言編程有多種線程同步機制,最典型的是條件變量(condition variable)。pthread_cond_init用來建立一個條件變量,其函數原型爲:

pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr);

  pthread_cond_wait和pthread_cond_timedwait用來等待條件變量被設置,值得注意的是這兩個等待調用須要一個已經上鎖的互斥體mutex,這是爲了防止在真正進入等待狀態以前別的線程有可能設置該條件變量而產生競爭。pthread_cond_wait的函數原型爲:

pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);

  pthread_cond_broadcast用於設置條件變量,即便得事件發生,這樣等待該事件的線程將再也不阻塞:

pthread_cond_broadcast (pthread_cond_t *cond) ;

  pthread_cond_signal則用於解除某一個等待線程的阻塞狀態:

pthread_cond_signal (pthread_cond_t *cond) ;

  pthread_cond_destroy 則用於釋放一個條件變量的資源。

  在頭文件semaphore.h 中定義的信號量則完成了互斥體和條件變量的封裝,按照多線程程序設計中訪問控制機制,控制對資源的同步訪問,提供程序設計人員更方便的調用接口。

sem_init(sem_t *sem, int pshared, unsigned int val);

  這個函數初始化一個信號量sem 的值爲val,參數pshared 是共享屬性控制,代表是否在進程間共享。

sem_wait(sem_t *sem);

  調用該函數時,若sem爲無狀態,調用線程阻塞,等待信號量sem值增長(post )成爲有信號狀態;若sem爲有狀態,調用線程順序執行,但信號量的值減一。

sem_post(sem_t *sem);

  調用該函數,信號量sem的值增長,能夠從無信號狀態變爲有信號狀態。

     

 

4.實例

  下面咱們仍是以名的生產者/消費者問題爲例來闡述Linux線程的控制和通訊。一組生產者線程與一組消費者線程經過緩衝區發生聯繫。生產者線程將生產的產品送入緩衝區,消費者線程則從中取出產品。緩衝區有N 個,是一個環形的緩衝池。

#include <stdio.h>
#include <pthread.h>
#define BUFFER_SIZE 16 // 緩衝區數量
struct prodcons
{
// 緩衝區相關數據結構
int buffer[BUFFER_SIZE]; /* 實際數據存放的數組*/
pthread_mutex_t lock; /* 互斥體lock 用於對緩衝區的互斥操做 */
int readpos, writepos; /* 讀寫指針*/
pthread_cond_t notempty; /* 緩衝區非空的條件變量 */
pthread_cond_t notfull; /* 緩衝區未滿的條件變量 */
};
/* 初始化緩衝區結構 */
void init(struct prodcons *b)
{
pthread_mutex_init(&b->lock, NULL);
pthread_cond_init(&b->notempty, NULL);
pthread_cond_init(&b->notfull, NULL);
b->readpos = 0;
b->writepos = 0;
}
/* 將產品放入緩衝區,這裏是存入一個整數*/
void put(struct prodcons *b, int data)
{
pthread_mutex_lock(&b->lock);
/* 等待緩衝區未滿*/
if ((b->writepos + 1) % BUFFER_SIZE == b->readpos)
{
pthread_cond_wait(&b->notfull, &b->lock);
}
/* 寫數據,並移動指針 */
b->buffer[b->writepos] = data;
b->writepos++;
if (b->writepos > = BUFFER_SIZE)
b->writepos = 0;
/* 設置緩衝區非空的條件變量*/
pthread_cond_signal(&b->notempty);
pthread_mutex_unlock(&b->lock);
} 
/* 從緩衝區中取出整數*/
int get(struct prodcons *b)
{
int data;
pthread_mutex_lock(&b->lock);
/* 等待緩衝區非空*/
if (b->writepos == b->readpos)
{
pthread_cond_wait(&b->notempty, &b->lock);
}
/* 讀數據,移動讀指針*/
data = b->buffer[b->readpos];
b->readpos++;
if (b->readpos > = BUFFER_SIZE)
b->readpos = 0;
/* 設置緩衝區未滿的條件變量*/
pthread_cond_signal(&b->notfull);
pthread_mutex_unlock(&b->lock);
return data;
}

/* 測試:生產者線程將1 到10000 的整數送入緩衝區,消費者線
程從緩衝區中獲取整數,二者都打印信息*/
#define OVER ( - 1)
struct prodcons buffer;
void *producer(void *data)
{
int n;
for (n = 0; n < 10000; n++)
{
printf("%d --->\n", n);
put(&buffer, n);
} put(&buffer, OVER);
return NULL;
}

void *consumer(void *data)
{
int d;
while (1)
{
d = get(&buffer);
if (d == OVER)
break;
printf("--->%d \n", d);
}
return NULL;
}

int main(void)
{
pthread_t th_a, th_b;
void *retval;
init(&buffer);
/* 建立生產者和消費者線程*/
pthread_create(&th_a, NULL, producer, 0);
pthread_create(&th_b, NULL, consumer, 0);
/* 等待兩個線程結束*/
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);
return 0;
}

  5.WIN3二、VxWorks、Linux線程類比

  目前爲止,筆者已經創做了《基於嵌入式操做系統VxWorks的多任務併發程序設計》(《軟件報》2006年5~12期連載)、《深刻淺出Win32多線程程序設計》(天極網技術專題)系列,咱們來找出這兩個系列文章與本文的共通點。

   看待技術問題要瞄準其本質,無論是Linux、VxWorks仍是WIN32,其涉及到多線程的部分都是那些內容,無非就是線程控制和線程通訊,它們的許多函數只是名稱不一樣,其實質含義是等價的,下面咱們來列個三大操做系統共同點詳細表單:

事項 WIN32 VxWorks Linux
線程建立 CreateThread taskSpawn pthread_create
線程終止 執行完成後退出;線程自身調用ExitThread函數即終止本身;被其餘線程調用函數TerminateThread函數 執行完成後退出;由線程自己調用exit退出;被其餘線程調用函數taskDelete終止 執行完成後退出;由線程自己調用pthread_exit 退出;被其餘線程調用函數pthread_cance終止
獲取線程ID GetCurrentThreadId taskIdSelf pthread_self
建立互斥 CreateMutex semMCreate pthread_mutex_init
獲取互斥 WaitForSingleObject、WaitForMultipleObjects semTake pthread_mutex_lock
釋放互斥 ReleaseMutex semGive phtread_mutex_unlock
建立信號量 CreateSemaphore semBCreate、semCCreate sem_init
等待信號量 WaitForSingleObject semTake sem_wait
釋放信號量 ReleaseSemaphore semGive sem_post

    6.小結

  本章講述了Linux下多線程的控制及線程間通訊編程方法,給出了一個生產者/消費者的實例,並將Linux的多線程與WIN3二、VxWorks多線程進行了類比,總結了通常規律。鑑於多線程編程已成爲開發併發應用程序的主流方法,學好本章的意義也便不言自明。

Linux線程池(C語言描述) - 互斥量+條件變量同步 - Sean's Blog的日誌 - 網易博客 - Google Chrome (2013/7/2 22:05:18)

Linux線程池(C語言描述) - 互斥量+條件變量同步  

2012-04-24 18:16:50|  分類: Linux C |  標籤:線程池  linux  c  |字號 訂閱

 
 
 

建立線程或者進程的開銷是很大的,爲了防止頻繁的建立線程,提升程序的運行效率,每每都會創建一個線程池用於多線程程序的調度

下面的程序就是完整的線程池實現,主要採用互斥量和條件變量實現同步

 

首先定義頭文件threadpool.h

在該文件中定義了線程池的數據結構和全部的函數

#ifndef THREADPOOL_H_
#define THREADPOOL_H_

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

/**
 * 線程體數據結構
 */
typedef struct runner
{
 void (*callback)(void* arg); // 回調函數指針
 void* arg; // 回調函數的參數
 struct runner* next;
} thread_runner;

/**
 * 線程池數據結構
 */
typedef struct
{
 pthread_mutex_t mutex; //互斥量
 pthread_cond_t cond; // 條件變量
 thread_runner* runner_head; // 線程池中全部等待任務的頭指針
 thread_runner* runner_tail; // 線程池全部等待任務的尾指針
 int shutdown; // 線程池是否銷燬
 pthread_t* threads; // 全部線程
 int max_thread_size; //線程池中容許的活動線程數目
} thread_pool;

/**
 * 線程體
 */
void run(void *arg);

/**
 * 初始化線程池
 *  參數:
 *   pool:指向線程池結構有效地址的動態指針
 *   max_thread_size:最大的線程數
 */
void threadpool_init(thread_pool* pool, int max_thread_size);

/**
 * 向線程池加入任務
 *  參數:
 *   pool:指向線程池結構有效地址的動態指針
 *   callback:線程回調函數
 *   arg:回調函數參數
 */
void threadpool_add_runner(thread_pool* pool, void (*callback)(void *arg), void *arg);

/**
 * 銷燬線程池
 *  參數:
 *   ppool:指向線程池結構有效地址的動態指針地址(二級指針),銷燬後釋放內存,該指針爲NULL
 */
void threadpool_destroy(thread_pool** ppool);

#endif

 

 

 

而後是函數實現threadpool.h

該文件實現了threadpool.h的函數定義

#include "threadpool.h"

#define DEBUG 1

/**
 * 線程體
 */
void run(void *arg)
{
 thread_pool* pool = (thread_pool*) arg;
 while (1)
 {
  // 加鎖
  pthread_mutex_lock(&(pool->mutex));
#ifdef DEBUG
  printf("run-> locked\n");
#endif
  // 若是等待隊列爲0而且線程池未銷燬,則處於阻塞狀態
  while (pool->runner_head == NULL && !pool->shutdown)
  {
   pthread_cond_wait(&(pool->cond), &(pool->mutex));
  }
  //若是線程池已經銷燬
  if (pool->shutdown)
  {
   // 解鎖
   pthread_mutex_unlock(&(pool->mutex));
#ifdef DEBUG
   printf("run-> unlocked and thread exit\n");
#endif
   pthread_exit(NULL);
  }
  // 取出鏈表中的頭元素
  thread_runner *runner = pool->runner_head;
  pool->runner_head = runner->next;
  // 解鎖
  pthread_mutex_unlock(&(pool->mutex));
#ifdef DEBUG
  printf("run-> unlocked\n");
#endif
  // 調用回調函數,執行任務
  (runner->callback)(runner->arg);
  free(runner);
  runner = NULL;
#ifdef DEBUG
  printf("run-> runned and free runner\n");
#endif
 }
 pthread_exit(NULL);
}

/**
 * 初始化線程池
 *  參數:
 *   pool:指向線程池結構有效地址的動態指針
 *   max_thread_size:最大的線程數
 */
void threadpool_init(thread_pool* pool, int max_thread_size)
{
 // 初始化互斥量
 pthread_mutex_init(&(pool->mutex), NULL);
 // 初始化條件變量
 pthread_cond_init(&(pool->cond), NULL);
 pool->runner_head = NULL;
 pool->runner_tail = NULL;
 pool->max_thread_size = max_thread_size;
 pool->shutdown = 0;
 // 建立全部分離態線程
 pool->threads = (pthread_t *) malloc(max_thread_size * sizeof(pthread_t));
 int i = 0;
 for (i = 0; i < max_thread_size; i++)
 {
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  pthread_create(&(pool->threads[i]), &attr, (void*) run, (void*) pool);
 }
#ifdef DEBUG
 printf("threadpool_init-> create %d detached thread\n", max_thread_size);
#endif
}

/**
 * 向線程池加入任務
 *  參數:
 *   pool:指向線程池結構有效地址的動態指針
 *   callback:線程回調函數
 *   arg:回調函數參數
 */
void threadpool_add_runner(thread_pool* pool, void (*callback)(void *arg), void *arg)
{
 // 構造一個新任務
 thread_runner *newrunner = (thread_runner *) malloc(sizeof(thread_runner));
 newrunner->callback = callback;
 newrunner->arg = arg;
 newrunner->next = NULL;
 // 加鎖
 pthread_mutex_lock(&(pool->mutex));
#ifdef DEBUG
 printf("threadpool_add_runner-> locked\n");
#endif
 // 將任務加入到等待隊列中
 if (pool->runner_head != NULL)
 {
  pool->runner_tail->next = newrunner;
  pool->runner_tail = newrunner;
 }
 else
 {
  pool->runner_head = newrunner;
  pool->runner_tail = newrunner;
 }
 // 解鎖
 pthread_mutex_unlock(&(pool->mutex));
#ifdef DEBUG
 printf("threadpool_add_runner-> unlocked\n");
#endif
 // 喚醒一個等待線程
 pthread_cond_signal(&(pool->cond));
#ifdef DEBUG
 printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");
#endif
}

/**
 * 銷燬線程池
 *  參數:
 *   ppool:指向線程池結構有效地址的動態指針地址(二級指針)
 */
void threadpool_destroy(thread_pool** ppool)
{
 thread_pool *pool = *ppool;
 // 防止2次銷燬
 if (!pool->shutdown)
 {
  pool->shutdown = 1;
  // 喚醒全部等待線程,線程池要銷燬了
  pthread_cond_broadcast(&(pool->cond));
  // 等待全部線程停止
  sleep(1);
#ifdef DEBUG
  printf("threadpool_destroy-> wakeup all waiting threads\n");
#endif
  // 回收空間
  free(pool->threads);
  // 銷燬等待隊列
  thread_runner *head = NULL;
  while (pool->runner_head != NULL)
  {
   head = pool->runner_head;
   pool->runner_head = pool->runner_head->next;
   free(head);
  }

#ifdef DEBUG
  printf("threadpool_destroy-> all runners freed\n");
#endif
  /*條件變量和互斥量也別忘了銷燬*/
  pthread_mutex_destroy(&(pool->mutex));
  pthread_cond_destroy(&(pool->cond));

#ifdef DEBUG
  printf("threadpool_destroy-> mutex and cond destoryed\n");
#endif
  free(pool);
  (*ppool) = NULL;

#ifdef DEBUG
  printf("threadpool_destroy-> pool freed\n");
#endif
 }
}

 

以上就是完整的線城池實現

 

 

下面寫個測試程序來看看

#include "threadpool.h"

void threadrun(void* arg)
{
 int *i = (int *) arg;
 printf("%d\n", *i);
}

int main(void)
{
 thread_pool *pool = malloc(sizeof(thread_pool));
 threadpool_init(pool, 2);
 int i;
 int tmp[3];
 for (i = 0; i < 3; i++)
 {
  tmp[i] = i;
  threadpool_add_runner(pool, threadrun, &tmp[i]);
 }
 sleep(1);
 threadpool_destroy(&pool);
 printf("main-> %p\n",pool);
 printf("main-> test over\n");
 return 0;
}

該函數建立了1個線程池,包含2個線程,而後往線程池中加入了3個任務

並在最後打印了線程池的指針地址(主要是爲了看線程池銷燬後的狀態)

 

運行結果以下

Linux線程池(C語言描述) - 互斥量+條件變量同步 - Seans Blog - Seans Blog

 

輸出的信息主要調試信息,

值得注意的是倒數第二行「main-> (nil)」

說明線程池銷燬後,指向線程池的指針 = NULL(這就是爲何在threadpool_destory函數爲何要用二級指針的緣由)

[C++][Thread] 轉:線程池原理及建立(C++實現) - 痛但快樂着 - 博客頻道 - CSDN.NET - Google Chrome (2013/7/1 10:38:22)

[C++][Thread] 轉:線程池原理及建立(C++實現)

分類: 其餘   360人閱讀   評論(0)   收藏   舉報

      

        看不懂,先收藏
本文給出了一個通用的線程池框架,該框架將與線程執行相關的任務進行了高層次的抽象,使之與具體的執行任務無關。另外該線程池具備動態伸縮性,它能根據執行任務的輕重自動調整線程池中線程的數量。文章的最後,咱們給出一個簡單示例程序,經過該示例程序,咱們會發現,經過該線程池框架執行多線程任務是多麼的簡單。

爲何須要線程池 
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具備一個共同點,就是單位時間內必須處理數目巨大的鏈接請求,但處理時間卻相對較短。 
傳統多線程方案中咱們採用的服務器模型則是一旦接受到請求以後,即建立一個新的線程,由該線程執行任務。任務執行完畢後,線程退出,這就是是「即時建立,即時銷燬」的策略。儘管與建立進程相比,建立線程的時間已經大大的縮短,可是若是提交給線程的任務是執行時間較短,並且執行次數極其頻繁,那麼服務器將處於不停的建立線程,銷燬線程的狀態。
咱們將傳統方案中的線程執行過程分爲三個過程:T一、T二、T3。 


T1:線程建立時間 
T2:線程執行時間,包括線程的同步等時間 
T3:線程銷燬時間 


那麼咱們能夠看出,線程自己的開銷所佔的比例爲(T1+T3) / (T1+T2+T3)。若是線程執行的時間很短的話,這比開銷可能佔到20%-50%左右。若是任務執行時間很頻繁的話,這筆開銷將是不可忽略的。

除此以外,線程池可以減小建立的線程個數。一般線程池所容許的併發線程是有上界的,若是同時須要併發的線程數超過上界,那麼一部分線程將會等待。而傳統方案中,若是同時請求數目爲2000,那麼最壞狀況下,系統可能須要產生2000個線程。儘管這不是一個很大的數目,可是也有部分機器可能達不到這種要求。

所以線程池的出現正是着眼於減小線程池自己帶來的開銷。線程池採用預建立的技術,在應用程序啓動以後,將當即建立必定數量的線程(N1),放入空閒隊列中。這些線程都是處於阻塞(Suspended)狀態,不消耗CPU,但佔用較小的內存空間。當任務到來後,緩衝池選擇一個空閒線程,把任務傳入此線程中運行。當N1個線程都在處理任務後,緩衝池自動建立必定數量的新線程,用於處理更多的任務。在任務執行完畢後線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閒時,大部分線程都一直處於暫停狀態,線程池自動銷燬一部分線程,回收系統資源。
基於這種預建立技術,線程池將線程建立和銷燬自己所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每一個任務所分擔到的線程自己開銷則越小,不過咱們另外可能須要考慮進去線程之間同步所帶來的開銷。


構建線程池框架 

通常線程池都必須具有下面幾個組成部分: 
線程池管理器:用於建立並管理線程池 
工做線程: 線程池中實際執行的線程 
任務接口: 儘管線程池大多數狀況下是用來支持網絡服務器,可是咱們將線程執行的任務抽象出來,造成任務接口,從而是的線程池與具體的任務無關。 
任務隊列:線程池的概念具體到實現則多是隊列,鏈表之類的數據結構,其中保存執行線程。 

咱們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此以外框架中還包括線程同步使用的類CThreadMutex和CCondition。
CJob是全部的任務的基類,其提供一個接口Run,全部的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。 
CThread是Linux中線程的包裝,其封裝了Linux線程最常用的屬性和方法,它也是一個抽象類,是全部線程類的基類,具備一個接口Run。 
CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。 
CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。 
CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。 
CThreadMutex用於線程之間的互斥。 
CCondition則是條件變量的封裝,用於線程之間的同步。 
它們的類的繼承關係以下圖所示: 

線程池的時序很簡單,以下圖所示。CThreadManage直接跟客戶端打交道,其接受須要建立的線程初始個數,並接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操做。CThreadPool建立具體的線程,並把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。

理解系統組件 

下面咱們分開來了解系統中的各個組件。 
CThreadManage 
CThreadManage的功能很是簡單,其提供最簡單的方法,其類定義以下: 
class CThreadManage 
{ 
private: 
CThreadPool* m_Pool; 
int m_NumOfThread; 
protected: 
public: 
void SetParallelNum(int num); 
CThreadManage(); 
CThreadManage(int num); 
virtual ~CThreadManage(); 

void Run(CJob* job,void* jobdata); 
void TerminateAll(void); 
}; 
其中m_Pool指向實際的線程池;m_NumOfThread是初始建立時候容許建立的併發的線程個數。另外Run和TerminateAll方法也很是簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現以下:
CThreadManage::CThreadManage(){ 
m_NumOfThread = 10; 
m_Pool = new CThreadPool(m_NumOfThread); 
} 
CThreadManage::CThreadManage(int num){ 
m_NumOfThread = num; 
m_Pool = new CThreadPool(m_NumOfThread); 
} 
CThreadManage::~CThreadManage(){ 
if(NULL != m_Pool) 
delete m_Pool; 
} 
void CThreadManage::SetParallelNum(int num){ 
m_NumOfThread = num; 
} 
void CThreadManage::Run(CJob* job,void* jobdata){ 
m_Pool->Run(job,jobdata); 
} 
void CThreadManage::TerminateAll(void){ 
m_Pool->TerminateAll(); 
} 
CThread 
CThread 類實現了對Linux中線程操做的封裝,它是全部線程的基類,也是一個抽象類,提供了一個抽象接口Run,全部的CThread都必須實現該Run方法。CThread的定義以下所示:
class CThread 
{ 
private: 
int m_ErrCode; 
Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize
unsigned long m_ThreadID; 
bool m_Detach; //The thread is detached 
bool m_CreateSuspended; //if suspend after creating 
char* m_ThreadName; 
ThreadState m_ThreadState; //the state of the thread 
protected: 
void SetErrcode(int errcode){m_ErrCode = errcode;} 
static void* ThreadFunction(void*); 
public: 
CThread(); 
CThread(bool createsuspended,bool detach); 
virtual ~CThread(); 
virtual void Run(void) = 0; 
void SetThreadState(ThreadState state){m_ThreadState = state;} 

bool Terminate(void); //Terminate the threa 
bool Start(void); //Start to execute the thread 
void Exit(void); 
bool Wakeup(void); 

ThreadState GetThreadState(void){return m_ThreadState;} 
int GetLastError(void){return m_ErrCode;} 
void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);} 
char* GetThreadName(void){return m_ThreadName;} 
int GetThreadID(void){return m_ThreadID;} 

bool SetPriority(int priority); 
int GetPriority(void); 
int GetConcurrency(void); 
void SetConcurrency(int num); 
bool Detach(void); 
bool Join(void); 
bool Yield(void); 
int Self(void); 
}; 
線程的狀態能夠分爲四種,空閒、忙碌、掛起、終止(包括正常退出和非正常退出)。因爲目前Linux線程庫不支持掛起操做,所以,咱們的此處的掛起操做相似於暫停。若是線程建立後不想當即執行任務,那麼咱們能夠將其「暫停」,若是須要運行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。
線程類的相關操做均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。

CThreadPool 
CThreadPool是線程的承載容器,通常能夠將其實現爲堆棧、單向隊列或者雙向隊列。在咱們的系統中咱們使用STL Vector對線程進行保存。CThreadPool的實現代碼以下:
class CThreadPool 
{ 
friend class CWorkerThread; 
private: 
unsigned int m_MaxNum; //the max thread num that can create at the same time
unsigned int m_AvailLow; //The min num of idle thread that shoule kept 
unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time
unsigned int m_AvailNum; //the normal thread num of idle num; 
unsigned int m_InitNum; //Normal thread num; 
protected: 
CWorkerThread* GetIdleThread(void); 

void AppendToIdleList(CWorkerThread* jobthread); 
void MoveToBusyList(CWorkerThread* idlethread); 
void MoveToIdleList(CWorkerThread* busythread); 

void DeleteIdleThread(int num); 
void CreateIdleThread(int num); 
public: 
CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock
CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock
CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
CThreadMutex m_VarMutex; 

CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list 
CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list 
CCondition m_IdleJobCond; //m_JobCond is used to sync job list 
CCondition m_MaxNumCond; 

vector<CWorkerThread*> m_ThreadList; 
vector<CWorkerThread*> m_BusyList; //Thread List 
vector<CWorkerThread*> m_IdleList; //Idle List 

CThreadPool(); 
CThreadPool(int initnum); 
virtual ~CThreadPool(); 

void SetMaxNum(int maxnum){m_MaxNum = maxnum;} 
int GetMaxNum(void){return m_MaxNum;} 
void SetAvailLowNum(int minnum){m_AvailLow = minnum;} 
int GetAvailLowNum(void){return m_AvailLow;} 
void SetAvailHighNum(int highnum){m_AvailHigh = highnum;} 
int GetAvailHighNum(void){return m_AvailHigh;} 
int GetActualAvailNum(void){return m_AvailNum;} 
int GetAllNum(void){return m_ThreadList.size();} 
int GetBusyNum(void){return m_BusyList.size();} 
void SetInitNum(int initnum){m_InitNum = initnum;} 
int GetInitNum(void){return m_InitNum;} 

void TerminateAll(void); 
void Run(CJob* job,void* jobdata); 
}; 
CThreadPool::CThreadPool() 
{ 
m_MaxNum = 50; 
m_AvailLow = 5; 
m_InitNum=m_AvailNum = 10 ; 
m_AvailHigh = 20; 

m_BusyList.clear(); 
m_IdleList.clear(); 
for(int i=0;i<m_InitNum;i++){ 
CWorkerThread* thr = new CWorkerThread(); 
thr->SetThreadPool(this); 
AppendToIdleList(thr); 
thr->Start(); 
} 
} 

CThreadPool::CThreadPool(int initnum) 
{ 
assert(initnum>0 && initnum<=30); 
m_MaxNum = 30; 
m_AvailLow = initnum-10>0?initnum-10:3; 
m_InitNum=m_AvailNum = initnum ; 
m_AvailHigh = initnum+10; 

m_BusyList.clear(); 
m_IdleList.clear(); 
for(int i=0;i<m_InitNum;i++){ 
CWorkerThread* thr = new CWorkerThread(); 
AppendToIdleList(thr); 
thr->SetThreadPool(this); 
thr->Start(); //begin the thread,the thread wait for job 
} 
} 

CThreadPool::~CThreadPool() 
{ 
TerminateAll(); 
} 

void CThreadPool::TerminateAll() 
{ 
for(int i=0;i < m_ThreadList.size();i++) { 
CWorkerThread* thr = m_ThreadList[i]; 
thr->Join(); 
} 
return; 
} 

CWorkerThread* CThreadPool::GetIdleThread(void) 
{ 
while(m_IdleList.size() ==0 ) 
m_IdleCond.Wait(); 

m_IdleMutex.Lock(); 
if(m_IdleList.size() > 0 ) 
{ 
CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); 
printf("Get Idle thread %dn",thr->GetThreadID()); 
m_IdleMutex.Unlock(); 
return thr; 
} 
m_IdleMutex.Unlock(); 

return NULL; 
} 

//add an idle thread to idle list 
void CThreadPool::AppendToIdleList(CWorkerThread* jobthread) 
{ 
m_IdleMutex.Lock(); 
m_IdleList.push_back(jobthread); 
m_ThreadList.push_back(jobthread); 
m_IdleMutex.Unlock(); 
} 

//move and idle thread to busy thread 
void CThreadPool::MoveToBusyList(CWorkerThread* idlethread) 
{ 
m_BusyMutex.Lock(); 
m_BusyList.push_back(idlethread); 
m_AvailNum--; 
m_BusyMutex.Unlock(); 

m_IdleMutex.Lock(); 
vector<CWorkerThread*>::iterator pos; 
pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread); 
if(pos !=m_IdleList.end()) 
m_IdleList.erase(pos); 
m_IdleMutex.Unlock(); 
} 

void CThreadPool::MoveToIdleList(CWorkerThread* busythread) 
{ 
m_IdleMutex.Lock(); 
m_IdleList.push_back(busythread); 
m_AvailNum++; 
m_IdleMutex.Unlock(); 

m_BusyMutex.Lock(); 
vector<CWorkerThread*>::iterator pos; 
pos = find(m_BusyList.begin(),m_BusyList.end(),busythread); 
if(pos!=m_BusyList.end()) 
m_BusyList.erase(pos); 
m_BusyMutex.Unlock(); 

m_IdleCond.Signal(); 
m_MaxNumCond.Signal(); 
} 

//create num idle thread and put them to idlelist 
void CThreadPool::CreateIdleThread(int num) 
{ 
for(int i=0;i<num;i++){ 
CWorkerThread* thr = new CWorkerThread(); 
thr->SetThreadPool(this); 
AppendToIdleList(thr); 
m_VarMutex.Lock(); 
m_AvailNum++; 
m_VarMutex.Unlock(); 
thr->Start(); //begin the thread,the thread wait for job 
} 
} 

void CThreadPool::DeleteIdleThread(int num) 
{ 
printf("Enter into CThreadPool::DeleteIdleThreadn"); 
m_IdleMutex.Lock(); 
printf("Delete Num is %dn",num); 
for(int i=0;i<num;i++){ 
CWorkerThread* thr; 
if(m_IdleList.size() > 0 ){ 
thr = (CWorkerThread*)m_IdleList.front(); 
printf("Get Idle thread %dn",thr->GetThreadID()); 
} 

vector<CWorkerThread*>::iterator pos; 
pos = find(m_IdleList.begin(),m_IdleList.end(),thr); 
if(pos!=m_IdleList.end()) 
m_IdleList.erase(pos); 
m_AvailNum--; 
printf("The idle thread available num:%d n",m_AvailNum); 
printf("The idlelist num:%d n",m_IdleList.size()); 
} 
m_IdleMutex.Unlock(); 
} 
void CThreadPool::Run(CJob* job,void* jobdata) 
{ 
assert(job!=NULL); 

//if the busy thread num adds to m_MaxNum,so we should wait 
if(GetBusyNum() == m_MaxNum) 
m_MaxNumCond.Wait(); 

if(m_IdleList.size()<m_AvailLow) 
{ 
if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
CreateIdleThread(m_InitNum-m_IdleList.size()); 
else 
CreateIdleThread(m_MaxNum-GetAllNum()); 
} 

CWorkerThread* idlethr = GetIdleThread(); 
if(idlethr !=NULL) 
{ 
idlethr->m_WorkMutex.Lock(); 
MoveToBusyList(idlethr); 
idlethr->SetThreadPool(this); 
job->SetWorkThread(idlethr); 
printf("Job is set to thread %d n",idlethr->GetThreadID()); 
idlethr->SetJob(job,jobdata); 
} 
} 
在CThreadPool中存在兩個鏈表,一個是空閒鏈表,一個是忙碌鏈表。Idle鏈表中存放全部的空閒進程,當線程執行任務時候,其狀態變爲忙碌狀態,同時從空閒鏈表中刪除,並移至忙碌鏈表中。在CThreadPool的構造函數中,咱們將執行下面的代碼:
for(int i=0;i<m_InitNum;i++) 
{ 
CWorkerThread* thr = new CWorkerThread(); 
AppendToIdleList(thr); 
thr->SetThreadPool(this); 
thr->Start(); //begin the thread,the thread wait for job 
} 
在該代碼中,咱們將建立m_InitNum個線程,建立以後即調用AppendToIdleList放入Idle鏈表中,因爲目前沒有任務分發給這些線程,所以線程執行Start後將本身掛起。
事實上,線程池中容納的線程數目並非一成不變的,其會根據執行負載進行自動伸縮。爲此在CThreadPool中設定四個變量: 
m_InitNum:處世建立時線程池中的線程的個數。 
m_MaxNum:當前線程池中所容許併發存在的線程的最大數目。 
m_AvailLow:當前線程池中所容許存在的空閒線程的最小數目,若是空閒數目低於該值,代表負載可能太重,此時有必要增長空閒線程池的數目。實現中咱們老是將線程調整爲m_InitNum個。
m_AvailHigh:當前線程池中所容許的空閒的線程的最大數目,若是空閒數目高於該值,代表當前負載可能較輕,此時將刪除多餘的空閒線程,刪除後調整數也爲m_InitNum個。
m_AvailNum:目前線程池中實際存在的線程的個數,其值介於m_AvailHigh和m_AvailLow之間。若是線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不須要建立,也不須要刪除,保持平衡狀態。所以如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。
線程池在接受到新的任務以後,線程池首先要檢查是否有足夠的空閒池可用。檢查分爲三個步驟: 
(1)檢查當前處於忙碌狀態的線程是否達到了設定的最大值m_MaxNum,若是達到了,代表目前沒有空閒線程可用,並且也不能建立新的線程,所以必須等待直到有線程執行完畢返回到空閒隊列中。
(2)若是當前的空閒線程數目小於咱們設定的最小的空閒數目m_AvailLow,則咱們必須建立新的線程,默認狀況下,建立後的線程數目應該爲m_InitNum,所以建立的線程數目應該爲( 當前空閒線程數與m_InitNum);可是有一種特殊狀況必須考慮,就是現有的線程總數加上建立後的線程數可能超過m_MaxNum,所以咱們必須對線程的建立區別對待。
if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
CreateIdleThread(m_InitNum-m_IdleList.size()); 
else 
CreateIdleThread(m_MaxNum-GetAllNum()); 
若是建立後總數不超過m_MaxNum,則建立後的線程爲m_InitNum;若是超過了,則只建立( m_MaxNum-當前線程總數 )個。 
(3)調用GetIdleThread方法查找空閒線程。若是當前沒有空閒線程,則掛起;不然將任務指派給該線程,同時將其移入忙碌隊列。 
當線程執行完畢後,其會調用MoveToIdleList方法移入空閒鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。

CWorkerThread 
CWorkerThread是CThread的派生類,是事實上的工做線程。在CThreadPool的構造函數中,咱們建立了必定數量的CWorkerThread。一旦這些線程建立完畢,咱們將調用Start()啓動該線程。Start方法最終會調用Run方法。Run方法是個無限循環的過程。在沒有接受到實際的任務的時候,m_Job爲NULL,此時線程將調用Wait方法進行等待,從而處於掛起狀態。一旦線程池將具體的任務分發給該線程,其將被喚醒,從而通知線程從掛起的地方繼續執行。CWorkerThread的完整定義以下:
class CWorkerThread:public CThread 
{ 
private: 
CThreadPool* m_ThreadPool; 
CJob* m_Job; 
void* m_JobData; 

CThreadMutex m_VarMutex; 
bool m_IsEnd; 
protected: 
public: 
CCondition m_JobCond; 
CThreadMutex m_WorkMutex; 
CWorkerThread(); 
virtual ~CWorkerThread(); 
void Run(); 
void SetJob(CJob* job,void* jobdata); 
CJob* GetJob(void){return m_Job;} 
void SetThreadPool(CThreadPool* thrpool); 
CThreadPool* GetThreadPool(void){return m_ThreadPool;} 
}; 
CWorkerThread::CWorkerThread() 
{ 
m_Job = NULL; 
m_JobData = NULL; 
m_ThreadPool = NULL; 
m_IsEnd = false; 
} 
CWorkerThread::~CWorkerThread() 
{ 
if(NULL != m_Job) 
delete m_Job; 
if(m_ThreadPool != NULL) 
delete m_ThreadPool; 
} 

void CWorkerThread::Run() 
{ 
SetThreadState(THREAD_RUNNING); 
for(;;) 
{ 
while(m_Job == NULL) 
m_JobCond.Wait(); 

m_Job->Run(m_JobData); 
m_Job->SetWorkThread(NULL); 
m_Job = NULL; 
m_ThreadPool->MoveToIdleList(this); 
if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum()) 
{ 
m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_T 
hreadPool->GetInitNum()); 
} 
m_WorkMutex.Unlock(); 
} 
} 
void CWorkerThread::SetJob(CJob* job,void* jobdata) 
{ 
m_VarMutex.Lock(); 
m_Job = job; 
m_JobData = jobdata; 
job->SetWorkThread(this); 
m_VarMutex.Unlock(); 
m_JobCond.Signal(); 
} 
void CWorkerThread::SetThreadPool(CThreadPool* thrpool) 
{ 
m_VarMutex.Lock(); 
m_ThreadPool = thrpool; 
m_VarMutex.Unlock(); 
} 
當線程執行任務以前首先必須判斷空閒線程的數目是否低於m_AvailLow,若是低於,則必須建立足夠的空閒線程,使其數目達到m_InitNum個,而後將調用MoveToBusyList()移出空閒隊列,移入忙碌隊列。當任務執行完畢後,其又調用MoveToIdleList()移出忙碌隊列,移入空閒隊列,等待新的任務。
除了Run方法以外,CWorkerThread中另一個重要的方法就是SetJob,該方法將實際的任務賦值給線程。當沒有任何執行任務即m_Job爲NULL的時候,線程將調用m_JobCond.Wait進行等待。一旦Job被賦值給線程,其將調用m_JobCond.Signal方法喚醒該線程。因爲m_JobCond屬於線程內部的變量,每一個線程都維持一個m_JobCond,只有獲得任務的線程才被喚醒,沒有獲得任務的將繼續等待。不管一個線程什麼時候被喚醒,其都將從等待的地方繼續執行m_Job->Run(m_JobData),這是線程執行實際任務的地方。
在線程執行給定Job期間,咱們必須防止另一個Job又賦給該線程,所以在賦值以前,經過m_VarMutex進行鎖定, Job執行期間,其於的Job將不能關聯到該線程;任務執行完畢,咱們調用m_VarMutex.Unlock()進行解鎖,此時,線程又能夠接受新的執行任務。
在線程執行任務結束後返回空閒隊列前,咱們還須要判斷當前空閒隊列中的線程是否高於m_AvailHigh個。若是超過m_AvailHigh,則必須從其中刪除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())個線程,使線程數目保持在m_InitNum個。

CJob 
CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼以下: 
class CJob 
{ 
private: 
int m_JobNo; //The num was assigned to the job 
char* m_JobName; //The job name 
CThread *m_pWorkThread; //The thread associated with the job 
public: 
CJob( void ); 
virtual ~CJob(); 

int GetJobNo(void) const { return m_JobNo; } 
void SetJobNo(int jobno){ m_JobNo = jobno;} 
char* GetJobName(void) const { return m_JobName; } 
void SetJobName(char* jobname); 
CThread *GetWorkThread(void){ return m_pWorkThread; } 
void SetWorkThread ( CThread *pWorkThread ){ 
m_pWorkThread = pWorkThread; 
} 
virtual void Run ( void *ptr ) = 0; 
}; 
CJob::CJob(void) 
:m_pWorkThread(NULL) 
,m_JobNo(0) 
,m_JobName(NULL) 
{ 
} 
CJob::~CJob(){ 
if(NULL != m_JobName) 
free(m_JobName); 
} 
void CJob::SetJobName(char* jobname) 
{ 
if(NULL !=m_JobName) { 
free(m_JobName); 
m_JobName = NULL; 
} 
if(NULL !=jobname) { 
m_JobName = (char*)malloc(strlen(jobname)+1); 
strcpy(m_JobName,jobname); 
} 
} 
線程池使用示例 
至此咱們給出了一個簡單的與具體任務無關的線程池框架。使用該框架很是的簡單,咱們所須要的作的就是派生CJob類,將須要完成的任務實如今Run方法中。而後將該Job交由CThreadManage去執行。下面咱們給出一個簡單的示例程序
class CXJob:public CJob 
{ 
public: 
CXJob(){i=0;} 
~CXJob(){} 
void Run(void* jobdata) { 
printf("The Job comes from CXJOB\n"); 
sleep(2); 
} 
}; 

class CYJob:public CJob 
{ 
public: 
CYJob(){i=0;} 
~CYJob(){} 
void Run(void* jobdata) { 
printf("The Job comes from CYJob\n"); 
} 
}; 

main() 
{ 
CThreadManage* manage = new CThreadManage(10); 
for(int i=0;i<40;i++) 
{ 
CXJob* job = new CXJob(); 
manage->Run(job,NULL); 
} 
sleep(2); 
CYJob* job = new CYJob(); 
manage->Run(job,NULL); 
manage->TerminateAll(); 
} 
CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句」The Job comes from CXJob」,CYJob也只打印」The Job comes from CYJob」,而後均休眠2秒鐘。在主程序中咱們初始建立10個工做線程。而後分別執行40次CXJob和一次CYJob。
線程池使用後記 
線程池適合場合 
事實上,線程池並非萬能的。它有其特定的使用場合。線程池致力於減小線程自己的開銷對應用所產生的影響,這是有前提的,前提就是線程自己開銷與線程執行任務相比不可忽略。若是線程自己的開銷相對於線程任務執行開銷而言是能夠忽略不計的,那麼此時線程池所帶來的好處是不明顯的,好比對於FTP服務器以及Telnet服務器,一般傳送文件的時間較長,開銷較大,那麼此時,咱們採用線程池未必是理想的方法,咱們能夠選擇「即時建立,即時銷燬」的策略。
總之線程池一般適合下面的幾個場合: 
(1) 單位時間內處理任務頻繁並且任務處理時間短 
(2) 對實時性要求較高。若是接受到任務後在建立線程,可能知足不了實時要求,所以必須採用線程池進行預建立。 
(3) 必須常常面對高突發性事件,好比Web服務器,若是有足球轉播,則服務器將產生巨大的衝擊。此時若是採起傳統方法,則必須不停的大量產生線程,銷燬線程。此時採用動態線程池能夠避免這種狀況的發生。

結束語 
本文給出了一個簡單的通用的與任務無關的線程池的實現,經過該線程池可以極大的簡化Linux下多線程的開發工做。該線程池的進一步完善開發工做還在進行中,但願可以獲得你的建議和支持。
參考資料 
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtml 
POSIX多線程程序設計,David R.Butenhof 譯者:於磊 曾剛,中國電力出版社 
C++面向對象多線程編程,CAMERON HUGHES等著 周良忠譯,人民郵電出版社 
Java Pro,結合線程和分析器池,Edy Yu 
關於做者 
張中慶,西安交通大學軟件所,在讀碩士,目前研究方向爲分佈式網絡與移動中間件,對Linux極其愛好,能夠經過flydish1234@sina.com.cn與我聯繫。

C++實現線程池的經典模型_Linux編程_Linux公社-Linux系統門戶網站 - Google Chrome (2013/7/1 10:23:31)

C++實現線程池的經典模型_Linux編程_Linux公社-Linux系統門戶網站 - Google Chrome (2013/7/1 10:23:31)

何時須要建立線程池呢?簡單的說,若是一個應用須要頻繁的建立和銷燬線程,而任務執行的時間又很是短,這樣線程建立和銷燬的帶來的開銷就不容忽視,這時也是線程池該出場的機會了。若是線程建立和銷燬時間相比任務執行時間能夠忽略不計,則沒有必要使用線程池了。

下面列出線程的一些重要的函數

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,  
                  void *(*start_routine) (void *), void *arg);  
void pthread_exit(void *retval);  
  
int pthread_mutex_destroy(pthread_mutex_t *mutex);  
int pthread_mutex_init(pthread_mutex_t *restrict mutex,  
                      const pthread_mutexattr_t *restrict attr);  
int pthread_mutex_lock(pthread_mutex_t *mutex);  
int pthread_mutex_trylock(pthread_mutex_t *mutex);  
int pthread_mutex_unlock(pthread_mutex_t *mutex);  
  
int pthread_cond_destroy(pthread_cond_t *cond);  
int pthread_cond_init(pthread_cond_t *restrict cond,  
                      const pthread_condattr_t *restrict attr);  
int pthread_cond_destroy(pthread_cond_t *cond);  
int pthread_cond_init(pthread_cond_t *restrict cond,  
                      const pthread_condattr_t *restrict attr);  
int pthread_cond_broadcast(pthread_cond_t *cond);  
int pthread_cond_signal(pthread_cond_t *cond); 


C語言實現簡單線程池 - Newerth - 博客園 - Google Chrome (2013/7/1 10:39:41)

C語言實現簡單線程池

有時咱們會須要大量線程來處理一些相互獨立的任務,爲了不頻繁的申請釋放線程所帶來的開銷,咱們可使用線程池。下面是一個C語言實現的簡單的線程池。

頭文件:

   1: #ifndef THREAD_POOL_H__
   2: #define THREAD_POOL_H__
   3:  
   4: #include <pthread.h>
   5:  
   6: /* 要執行的任務鏈表 */
   7: typedef struct tpool_work {
   8:     void*               (*routine)(void*);       /* 任務函數 */
   9:     void                *arg;                    /* 傳入任務函數的參數 */
  10:     struct tpool_work   *next;                    
  11: }tpool_work_t;
  12:  
  13: typedef struct tpool {
  14:     int             shutdown;                    /* 線程池是否銷燬 */
  15:     int             max_thr_num;                /* 最大線程數 */
  16:     pthread_t       *thr_id;                    /* 線程ID數組 */
  17:     tpool_work_t    *queue_head;                /* 線程鏈表 */
  18:     pthread_mutex_t queue_lock;                    
  19:     pthread_cond_t  queue_ready;    
  20: }tpool_t;
  21:  
  22: /*
  23:  * @brief     建立線程池 
  24:  * @param     max_thr_num 最大線程數
  25:  * @return     0: 成功 其餘: 失敗  
  26:  */
  27: int
  28: tpool_create(int max_thr_num);
  29:  
  30: /*
  31:  * @brief     銷燬線程池 
  32:  */
  33: void
  34: tpool_destroy();
  35:  
  36: /*
  37:  * @brief     向線程池中添加任務
  38:  * @param    routine 任務函數指針
  39:  * @param     arg 任務函數參數
  40:  * @return     0: 成功 其餘:失敗 
  41:  */
  42: int
  43: tpool_add_work(void*(*routine)(void*), void *arg);
  44:  
  45: #endif

實現:

   1: #include <unistd.h>
   2: #include <stdlib.h>
   3: #include <errno.h>
   4: #include <string.h>
   5: #include <stdio.h>
   6:  
   7: #include "tpool.h"
   8:  
   9: static tpool_t *tpool = NULL;
  10:  
  11: /* 工做者線程函數, 從任務鏈表中取出任務並執行 */
  12: static void* 
  13: thread_routine(void *arg)
  14: {
  15:     tpool_work_t *work;
  16:     
  17:     while(1) {
  18:         /* 若是線程池沒有被銷燬且沒有任務要執行,則等待 */
  19:         pthread_mutex_lock(&tpool->queue_lock);
  20:         while(!tpool->queue_head && !tpool->shutdown) {
  21:             pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);
  22:         }
  23:         if (tpool->shutdown) {
  24:             pthread_mutex_unlock(&tpool->queue_lock);
  25:             pthread_exit(NULL);
  26:         }
  27:         work = tpool->queue_head;
  28:         tpool->queue_head = tpool->queue_head->next;
  29:         pthread_mutex_unlock(&tpool->queue_lock);
  30:  
  31:         work->routine(work->arg);
  32:         free(work);
  33:     }
  34:     
  35:     return NULL;   
  36: }
  37:  
  38: /*
  39:  * 建立線程池 
  40:  */
  41: int
  42: tpool_create(int max_thr_num)
  43: {
  44:     int i;
  45:  
  46:     tpool = calloc(1, sizeof(tpool_t));
  47:     if (!tpool) {
  48:         printf("%s: calloc failed\n", __FUNCTION__);
  49:         exit(1);
  50:     }
  51:     
  52:     /* 初始化 */
  53:     tpool->max_thr_num = max_thr_num;
  54:     tpool->shutdown = 0;
  55:     tpool->queue_head = NULL;
  56:     if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) {
  57:         printf("%s: pthread_mutex_init failed, errno:%d, error:%s\n",
  58:             __FUNCTION__, errno, strerror(errno));
  59:         exit(1);
  60:     }
  61:     if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) {
  62:         printf("%s: pthread_cond_init failed, errno:%d, error:%s\n", 
  63:             __FUNCTION__, errno, strerror(errno));
  64:         exit(1);
  65:     }
  66:     
  67:     /* 建立工做者線程 */
  68:     tpool->thr_id = calloc(max_thr_num, sizeof(pthread_t));
  69:     if (!tpool->thr_id) {
  70:         printf("%s: calloc failed\n", __FUNCTION__);
  71:         exit(1);
  72:     }
  73:     for (i = 0; i < max_thr_num; ++i) {
  74:         if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){
  75:             printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, 
  76:                 errno, strerror(errno));
  77:             exit(1);
  78:         }
  79:         
  80:     }    
  81:  
  82:     return 0;
  83: }
  84:  
  85: /* 銷燬線程池 */
  86: void
  87: tpool_destroy()
  88: {
  89:     int i;
  90:     tpool_work_t *member;
  91:  
  92:     if (tpool->shutdown) {
  93:         return;
  94:     }
  95:     tpool->shutdown = 1;
  96:  
  97:     /* 通知全部正在等待的線程 */
  98:     pthread_mutex_lock(&tpool->queue_lock);
  99:     pthread_cond_broadcast(&tpool->queue_ready);
 100:     pthread_mutex_unlock(&tpool->queue_lock);
 101:     for (i = 0; i < tpool->max_thr_num; ++i) {
 102:         pthread_join(tpool->thr_id[i], NULL);
 103:     }
 104:     free(tpool->thr_id);
 105:  
 106:     while(tpool->queue_head) {
 107:         member = tpool->queue_head;
 108:         tpool->queue_head = tpool->queue_head->next;
 109:         free(member);
 110:     }
 111:  
 112:     pthread_mutex_destroy(&tpool->queue_lock);    
 113:     pthread_cond_destroy(&tpool->queue_ready);
 114:  
 115:     free(tpool);    
 116: }
 117:  
 118: /* 向線程池添加任務 */
 119: int
 120: tpool_add_work(void*(*routine)(void*), void *arg)
 121: {
 122:     tpool_work_t *work, *member;
 123:     
 124:     if (!routine){
 125:         printf("%s:Invalid argument\n", __FUNCTION__);
 126:         return -1;
 127:     }
 128:     
 129:     work = malloc(sizeof(tpool_work_t));
 130:     if (!work) {
 131:         printf("%s:malloc failed\n", __FUNCTION__);
 132:         return -1;
 133:     }
 134:     work->routine = routine;
 135:     work->arg = arg;
 136:     work->next = NULL;
 137:  
 138:     pthread_mutex_lock(&tpool->queue_lock);    
 139:     member = tpool->queue_head;
 140:     if (!member) {
 141:         tpool->queue_head = work;
 142:     } else {
 143:         while(member->next) {
 144:             member = member->next;
 145:         }
 146:         member->next = work;
 147:     }
 148:     /* 通知工做者線程,有新任務添加 */
 149:     pthread_cond_signal(&tpool->queue_ready);
 150:     pthread_mutex_unlock(&tpool->queue_lock);
 151:  
 152:     return 0;    
 153: }
 154:     
 155:  

測試代碼:

   1: #include <unistd.h>
   2: #include <stdio.h>
   3: #include <stdlib.h>
   4: #include "tpool.h"
   5:  
   6: void *func(void *arg)
   7: {
   8:     printf("thread %d\n", (int)arg);
   9:     return NULL;
  10: }
  11:  
  12: int
  13: main(int arg, char **argv)
  14: {
  15:     if (tpool_create(5) != 0) {
  16:         printf("tpool_create failed\n");
  17:         exit(1);
  18:     }
  19:     
  20:     int i;
  21:     for (i = 0; i < 10; ++i) {
  22:         tpool_add_work(func, (void*)i);
  23:     }
  24:     sleep(2);
  25:     tpool_destroy();
  26:     return 0;
  27: }

這個實現是在調用tpool_destroy以後,僅將當前正在執行的任務完成以後就會退出,咱們也能夠修改代碼使得線程池在執行完任務鏈表中全部任務後再退出。



相關文章
相關標籤/搜索