【APUE】Chapter11 Threads

看完了APUE第三版的Chapter11 Threads,跟着書上的demo走了一遍,而且參考了這個blog(http://www.cnblogs.com/chuyuhuashi/p/4447817.html)的很是好的example。html

下面的內容就是看書過程當中記錄的,能夠做爲一個參考,但決不能代替看APUE原著。面試

原本想在本身的mac上跑(畢竟也叫unix系統),後來發現mac上有些pthread的庫支持的不全(好比,沒有barrier),就改到了centos server上跑。centos

(一)Thread Identification數組

  1. 線程的id只在建立它的進程中有效安全

  2. pthread_t惟一標示一個線程,可是不一樣系統對於pthread_t的實現是不同的(有的是long有的是unsigned integer,有的甚至是structure)數據結構

(二)Thread Creation多線程

#include "apue.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


pthread_t ntid;

void printids(const char *s)
{
    pid_t pid;
    pthread_t tid;

    pid = getpid();
    tid = pthread_self();
    printf("%s pid %lu tid %lu (0x%lx)\n",
        s ,
        (unsigned long)pid, 
        (unsigned long)tid, 
        (unsigned long)tid);
}

void * thr_fn(void *arg)
{
    printids("new thread: ");
    return ((void *)0);
}

int main(int argc, char const *argv[])
{
    int err;
    err = pthread_create(&ntid, NULL, thr_fn, NULL);
    printids("main thread: ");
    sleep(1);
    exit(0);
}

結果以下:dom

這個例子主要看不一樣系統下tid的值的不一樣。函數

(三)Thread Termination學習

第一個例子

#include "apue.h"
#include <pthread.h>

void * thr_fn1(void *arg)
{
    printf(("thread 1 returning\n"));
    return ((void *)11);
}

void * thr_fn2(void *arg)
{
    printf(("thread 2 exiting\n"));
    pthread_exit((void *)22);
}

int main(int argc, char const *argv[])
{
    int err;
    pthread_t tid1, tid2;
    void *tret;
    err = pthread_create(&tid1, NULL, thr_fn1, NULL);
    if (err!=0)
        err_exit(err, "can't create thread 1");
    err = pthread_create(&tid2, NULL, thr_fn2, NULL);
    if (err!=0)
        err_exit(err, "can't create thread 2");
    sleep(1);
    err = pthread_join(tid1, &tret);
    if (err!=0)
        err_exit(err, "can't join with thread 1");
    printf("thread 1 exit code %ld\n",(long)tret);
    err = pthread_join(tid2, &tret);
    if (err!=0)
        err_exit(err, "can't join with thread 2");
    printf("thread 2 exit code %ld\n",(long)tret);
    return 0;
}

執行結果:

1. 這裏有個的err記錄的是pthread_create()或pthread_join()是否執行成功:0或者非0與具體線程函數返回值沒有關係

2. 即便是子線程已經執行完畢了,在main中調用pthread_join(..., &tret)依然能夠得到這個子線程的返回值,記錄在tret中

第二個例子(這個例子被我在書上的例子基礎上修改過了,紅色的兩行)

#include "apue.h"
#include <pthread.h>

struct foo
{
    int a, b, c, d;
};

void printfoo(const char *s, const struct foo *fp)
{
    printf("%s",s);
    printf(" structure at 0x%lx\n", (unsigned long)fp);
    printf(" foo.a = %d\n", fp->a);
    printf(" foo.b = %d\n", fp->b);
    printf(" foo.c = %d\n", fp->c);
    printf(" foo.d = %d\n", fp->d);
}

void * thr_fn1(void *arg)
{
    struct foo *foo = malloc(sizeof(struct foo));
    foo->a = 1;
    foo->b = 2;
    foo->c = 3;
    foo->d = 4;
    printfoo("thread 1:\n", foo);
    pthread_exit((void *)foo);
}

void * thr_fn2(void *arg)
{
    printf("thread 2: ID is %lu\n", (unsigned long)pthread_self());
    pthread_exit((void *)0);
}

int main(int argc, char const *argv[])
{
    int err;
    pthread_t tid1, tid2;
    struct foo *fp;
    err = pthread_create(&tid1, NULL, thr_fn1, NULL);
    err = pthread_join(tid1, (void *)&fp);
    sleep(1);
    printf("parent starting second thread\n");
    err = pthread_create(&tid2, NULL, thr_fn2, NULL);
    sleep(1);
    printfoo("parent:\n", fp);
    free(fp);
    exit(0);
}

運行結果以下:

若是在一個子線程中建立的資源要保證子線程退出後仍是能夠用的,就要用malloc動態分配。

固然,若是忘記了free(fp)就產生了內存泄露。

第三個例子

#include "apue.h"
#include <pthread.h>

void cleanup(void *arg)
{
    printf("cleanup: %s\n", (char *)arg);
}

void * thr_fn1(void *arg)
{
    printf("thread 1 start\n");
    pthread_cleanup_push(cleanup, "thread 1 first handler");
    pthread_cleanup_push(cleanup, "thread 1 second handler");
    printf("thread 1 push complete\n");
    if (arg)
        pthread_exit((void *)1);
    pthread_cleanup_pop(0);
    pthread_cleanup_pop(0);
    return ((void *)1);
}

void * thr_fn2(void *arg)
{
    printf("thread 2 start\n");
    //if (arg)
    //    pthread_exit((void *)2);
    pthread_cleanup_push(cleanup, "thread 2 first handler");
    pthread_cleanup_push(cleanup, "thread 2 second handler");
    printf("thread 2 push complete\n");
    pthread_cleanup_pop(0);
    pthread_cleanup_pop(0);
    pthread_exit((void *)2);
}

int main(int argc, char const *argv[])
{
    int err;
    pthread_t tid1, tid2;
    void *tret;
    err = pthread_create(&tid1, NULL, thr_fn1, (void *)1);
    err = pthread_create(&tid2, NULL, thr_fn2, (void *)1);
    err = pthread_join(tid1, &tret);
    if (err!=0)
        err_exit(err, "can't join with thread 1");
    printf("thread 1 exit code %ld\n",(long)tret);
    err = pthread_join(tid2, &tret);
    if (err!=0)
        err_exit(err, "can't join with thread 2");
    printf("thread 2 exit code %ld\n",(long)tret);
    exit(0);
}

運行結果以下:

這個例子主要講的是termination中的cleanup的用法:

1. 若是線程退出的時候,是調用pthread_exit,系統就會檢查是否有清理函數在棧中;若是有,則按push的逆序,一個個彈出來並執行。

2. push和pop必須是成對出現的,有個push就有一個pop,必須這樣的緣由是push和pop都用了macro實現,不這麼作complie通不過。

  但這裏就會有個容易含糊的地方:thr_fn1中,pop出如今exit以後,若是arg不是NULL,那麼pop確定執行不了啊?

  沒錯,若是arg不是NULL,pop確定執行不了,可是也必須出現(相似能夠不發揮做用,可是必須開會佔個位置

3. 若是餵給pop的參數是0,則就直接把push進去的函數彈出來就完事了,並不執行(如上面的例子);若是餵給pop的參數不是0,好比下面這樣:

void * thr_fn2(void *arg)
{
    printf("thread 2 start\n");
    //if (arg)
    //    pthread_exit((void *)2);
    pthread_cleanup_push(cleanup, "thread 2 first handler");
    pthread_cleanup_push(cleanup, "thread 2 second handler");
    printf("thread 2 push complete\n");
    pthread_cleanup_pop(1);
    pthread_cleanup_pop(1);
    pthread_exit((void *)2);
}

執行結果就是下面這樣了:

4. 這個例子還說明了:thread1和thread2在被主線程join以前都執行完了,可是這個時候thread1和thread2的status are also retained。

(四)Thread Synchronization

這部份內容書上的例子並非能夠run,而是按部就班,經過對某種需求場景下線程同步的不斷改進變化來讓讀者加深理解。

一共有兩個場景,兩者的關係是漸進豐富的關係

場景1說的是互斥量mutex的基本用法:「structure ojbect是動態分配的,而且一旦分配了資源,就可能被其餘線程用上;所以對於object須要上鎖保護,防止還有其餘線程用着這個object的時候,這個object就被destory了;具體的作法是在structure object中維護一個計數變量f_count,記錄當前有多少個其餘線程用着這個ojbect。」

上面說的基本就是書上的原話,有些抽象,能夠舉個不恰當的例子來幫助理解:

  能夠想象成快遞員(structure ojbect)與交給快遞員的快遞(用着object的線程);只有分配給這個快遞員全部的快遞都送完了以後(f_count爲0),才能告訴這個快遞員「你去休息吧(destory);不然快遞員手上還有快遞就休息了,這個快遞就瞎了。

  下面在分析代碼的時候,爲了便於理解,就用快遞員和快遞分別代指object和threads了。

代碼以下:

#include <stdlib.h>
#include <pthread.h>

struct foo {
    int             f_count;
    pthread_mutex_t f_lock;
    int             f_id;
    /* ... more stuff here ... */
};

struct foo *
foo_alloc(int id) /* allocate the object */
{
    struct foo *fp;

    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        /* ... continue initialization ... */
    }
    return(fp);
}

void
foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

void
foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    if (--fp->f_count == 0) { /* last reference */
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&fp->f_lock);
    }
}

這份代碼不難理解:

1. foo_hold線程的功能是:分配給快遞員一個新的快遞(f_count++)

2. foo_rele線程的功能是:判斷快遞員身上是否是沒有快遞要送了,若是沒有了就讓他休息(pthread_mutex_destory, free)

foo_hold與foo_rele中都有多ojbect中f_count的操做,所以mutex互斥量就派上用場了:即對f_count的加和讀都是線程安全的。mutex的做用就是:

防止一遍得知快遞員身上已經沒有快遞能夠休息了;同時,又來了一個新的快遞分配給這個快遞員。

場景二說的是死鎖問題,分析了避免死鎖的兩種方法以及兩者的trade-off

場景二在場景一的基礎上進行了「立體」豐富:

1. 如今有29個快遞員據點

2. 每一個快遞員據點的快遞員數目是動態變化的(會有新的快遞員加入;也有快遞員退出,相似場景一,快遞員送完快遞就歇了)

3. 若是新來了一個快遞員,那麼就會給這個快遞員一個編號ID,快遞員進入哪一個據點是由它的編號ID決定的(在代碼中其實就是一個HASH函數)

4. 在同一個據點中的快遞員,後面進來的快遞員B只認識在他以前來這個據點的那一個快遞員A(單鏈表數據結構)

5. 接着4,只有經過快遞員B才能找到快遞員A;若是某個據點只有一個快遞員,那麼在據點就能夠找到了

6. 這裏還有一個問題:快遞員身上的快遞是怎麼來的?這個不用考慮,只須要知道這也是一個獨立的過程就行了

如今須要作的工做就是,分配新來快遞員到這29個據點,而且根據快遞員身上快遞的數量找到快遞員,而且決定是否讓快遞員休息(例子中不用管快遞員的身上的快遞是怎麼來的,只須要判斷有仍是沒有)。

場景二比場景一困難在哪呢?

1. 如何找到某個快遞員?只能經過他的編號ID找到所在的據點,而後再按照快遞員入據點的順序順藤摸瓜(遍歷單鏈表)。

  若是正訪問某個快遞員的時候,他前面那個快遞員已經歇了呢,線索不就斷了麼?對據點結構的同步保護結構的鎖,hashlock

2. 若是已經找到了某個快遞員,那麼設計到的問題就是場景一的問題。對某個ojbect的同步保護的鎖,f_lock

這裏面涉及到兩個鎖的同步問題,就可能會出現Dead Lock的問題,先看代碼:

#include <stdlib.h>
#include <pthread.h>

#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)

struct foo *fh[NHASH];

pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo {
    int             f_count;
    pthread_mutex_t f_lock;
    int             f_id;
    struct foo     *f_next; /* protected by hashlock */
    /* ... more stuff here ... */
};

struct foo *
foo_alloc(int id) /* allocate the object */
{
    struct foo    *fp;
    int            idx;

    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        idx = HASH(id);
        pthread_mutex_lock(&hashlock);
        fp->f_next = fh[idx];
        fh[idx] = fp;
 pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock); /* ... continue initialization ... */
        pthread_mutex_unlock(&fp->f_lock);
    }
    return(fp);
}

void
foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

struct foo *
foo_find(int id) /* find an existing object */
{
    struct foo    *fp;

    pthread_mutex_lock(&hashlock);
    for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
        if (fp->f_id == id) {
            foo_hold(fp);
            break;
        }
    }
    pthread_mutex_unlock(&hashlock);
    return(fp);
}

void
foo_rele(struct foo *fp) /* release a reference to the object */
{
    struct foo    *tfp;
    int            idx;

    pthread_mutex_lock(&fp->f_lock);
    if (fp->f_count == 1) { /* last reference */
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_lock(&hashlock);
        pthread_mutex_lock(&fp->f_lock);
        /* need to recheck the condition */
        if (fp->f_count != 1) {
            fp->f_count--;
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            return;
        }
        /* remove from list */
        idx = HASH(fp->f_id);
        tfp = fh[idx];
        if (tfp == fp) {
            fh[idx] = fp->f_next;
        } else {
            while (tfp->f_next != fp)
                tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        fp->f_count--;
        pthread_mutex_unlock(&fp->f_lock);
    }
}

上面的代碼是不會出現死鎖的,可是須要搞清楚上面的代碼爲何這麼設計。

APUE書中已經對代碼的設計進行了解釋,這裏我記錄一下書上沒有的提到的內容。

問題一:foo_rele中爲何要先對hashlock上鎖,再對f_lock上鎖呢?

假如其餘函數設計不變,foo_rele的上鎖順序顛倒一下就可能出現以下場景:

a. foo_rele已經得到對某個object的鎖了,準備請求hashlock的鎖

b. 同時,foo_find已經得到了hashlock的鎖了,準備請求ojbect的鎖

若是foo_rele和foo_find針對的都是同一個ojbect,那麼死鎖了:foo_rele佔着ojbect等着hashlock,foo_find佔着hashlock等着ojbect,雙方互相等着,就都鎖住了

問題二:foo_alloc新產生一個ojbect的時候,爲何要在釋放hashlock以前,先對這個object的f_lock上鎖呢?(紅字)

若是已經生成了一個ojbect了(這個時候object還不對外可見);可是一旦執行了「fh[idx]=fp」,這個ojbect就在「據點」中能夠被找到了!

再若是,哪一個不開眼的,這個時候正好foo_find到這個object,而且得到了object的f_count鎖了,就會出現以下的場景:

  foo_alloc正對新生成的object內容初始化着呢,foo_find同時就找到了這個ojbect而且扔給其餘人去用了,這就瞎了,ojbect就不一樣步了

想明白上面兩個問題,也就能夠理解書上的代碼設計了。

總結一下解決方法:爲了不死鎖狀況出現,在鎖不太多的狀況下,保證每一個線程的上鎖順序要一致。即,

線程A : lock(mutex_a) lock(mutex_b)

線程B :lock(mutex_a) lock(mutex_b)

若是是下面的狀況:

線程A : lock(mutex_b) lock(mutex_a)

線程B :lock(mutex_a) lock(mutex_b)

極可能就死鎖了,並且這種死鎖仍是不按期的,很難debug。

乍一看就是上鎖的順序保持一致唄,沒啥難的啊。可是考慮實際狀況中,業務邏輯多是比較複雜的,兩個鎖可能被業務邏輯分割的比較遠,解鎖的位置也不必定在哪。就像上面書上的demo,這種上鎖解鎖的過程是須要仔細設計的,這也就體現出來技術含量了。

面對一樣的問題,書上給了另外一種避免死鎖的程序設計思路:

#include <stdlib.h>
#include <pthread.h>

#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)

struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo {
    int             f_count; /* protected by hashlock */
    pthread_mutex_t f_lock;
    int             f_id;
    struct foo     *f_next; /* protected by hashlock */
    /* ... more stuff here ... */
};

struct foo *
foo_alloc(int id) /* allocate the object */
{
    struct foo    *fp;
    int            idx;

    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        idx = HASH(id);
        pthread_mutex_lock(&hashlock);
        fp->f_next = fh[idx];
        fh[idx] = fp;
        pthread_mutex_lock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        /* ... continue initialization ... */
        pthread_mutex_unlock(&fp->f_lock);
    }
    return(fp);
}

void
foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&hashlock);
    fp->f_count++;
    pthread_mutex_unlock(&hashlock);
}

struct foo *
foo_find(int id) /* find an existing object */
{
    struct foo    *fp;

    pthread_mutex_lock(&hashlock);
    for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
        if (fp->f_id == id) {
            fp->f_count++;
            break;
        }
    }
    pthread_mutex_unlock(&hashlock);
    return(fp);
}

void
foo_rele(struct foo *fp) /* release a reference to the object */
{
    struct foo    *tfp;
    int            idx;

    pthread_mutex_lock(&hashlock);
    if (--fp->f_count == 0) { /* last reference, remove from list */
        idx = HASH(fp->f_id);
        tfp = fh[idx];
        if (tfp == fp) {
            fh[idx] = fp->f_next;
        } else {
            while (tfp->f_next != fp)
                tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&hashlock);
    }
}

上面代碼照比以前的版本簡潔了不少,核心的緣由就是:每一個ojbect的f_count的同步保護交給了hashlock而不是f_lock

1. foo_alloc的時候仍是須要先鎖hashlock再鎖f_lock,這個沒有變化。

2. foo_hold foo_find foo_rele都沒有用到f_lock鎖,都只用了hashlock鎖;既然只用了一個鎖,那麼就不涉及到了死鎖了問題了。

APUE書上的說法是「f_count的保護交給了hashlock」,其實換個角度來講更好「f_lock只在初始化時候保護了object,初始化完成就沒用了」。

對比兩種解決死鎖的思路:

(1)思路一複雜,加鎖解鎖頻繁;可是對hash和ojbect的鎖分的比較清,該鎖誰鎖誰,該放誰就立刻放,可能不會耽誤並行處理

(2)思路二簡潔,每一個鎖負責的職責比較單一;可是每次處理一個ojbect就要對整個hash都鎖住,若是這種處理比較頻繁,可能會影響並行效果

實際中,不得不考慮相似思路一還有思路二的trade-off。

(五)Barrier

Barrier的做用就是讓多個線程coordinate的milestone:你們分頭去幹各自的事情,先幹完的等着後幹完的,然#include "apue.h"

#include <pthread.h>
#include <limits.h>
#include <sys/time.h>

#define NTHR 1
#define NUMNUM 8000000L
#define TNUM (NUMNUM/NTHR)

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#define sort qsort

// compare two long integers ( helper fun for sort )
int complong(const void *arg1, const void *arg2)
{
    long l1 = *(long *)arg1;
    long l2 = *(long *)arg2;
    if (l1==l2)
        return 0;
    else if (l1<l2)
        return -1;
    else
        return 1;
}

// worker thread to sort a portion of the set of numbers
void *thr_fn(void *arg)
{
    long idx = (long)arg;
    sort(&nums[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);
    // do some work
    return ((void *)0);    
}


// merge the result of individual sorted ranges
void merge()
{
    long idx[NTHR];
    long i, minidx, sidx, num;
    for ( i=0; i<NTHR; ++i ) idx[i] = i * TNUM;
    for ( sidx=0; sidx < NUMNUM; ++sidx )
    {
        num = LONG_MAX;
        for ( i=0; i<NTHR; ++i )
        {
            if ( (idx[i]<(i+1)*TNUM) && (nums[idx[i]]<num) )
            {
                num = nums[idx[i]];
                minidx = i;
            }
        }
        snums[sidx] = nums[idx[minidx]];
        idx[minidx]++;
    }
}

int main()
{
    unsigned long i;
    struct timeval start, end;
    long long startusec, endusec;
    double elapsed;
    int err;
    pthread_t tid;
    // create the initial set of numbers to sort
    srandom(1);
    for ( i=0; i<NUMNUM; ++i )
        nums[i] = random();
    // create 8 threads to sort the numbers
    gettimeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR+1);
    for ( i=0; i<NTHR; ++i )
    {
        err = pthread_create(&tid, NULL, thr_fn, (void *)(i*TNUM));
        if ( err!=0 )
            err_exit(err, "can't create thread");
    }
    pthread_barrier_wait(&
    merge();
    gettimeofday(&end, NULL);
    startusec = start.tv_sec*1000000 + start.tv_usec;
    endusec = end.tv_sec*1000000 + end.tv_usec;
    elapsed = (double)(endusec-startusec)/1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    exit(0);
}

上面的代碼,是我在原著基礎上作了一些改動(紅字)。改動的緣由是:因爲試驗平臺系統沒有heapsort()函數,因此就改爲了qsort(),也不影響體會barrier的功能。

對代碼的解釋:

1. 上述的代碼是對8000000個long進行排序,一共用了8個線程,每一個線程對相鄰的1000000個數進行qsort

2. 將8個線程的排序結果合併起來,造成新的數組

這裏

pthread_barrier_init(&b, NULL, NTHR+1);

爲何是NTHR+1呢?由於多出來的這個1是main線程。得多出來一個master線程,等着其他8個worker線程把活兒作完了再往下走。

還有一點,就是同一個平臺上,對8000000個數據進行排序,使用多少個線程合適呢?

試驗結果以下(個人試驗平臺是雙路24線程cpu):

若是是單線程,排序時間大概是4秒;

若是是8線程,排序時間大概是0.8秒;

若是是16線程,排序時間是1.5秒;

能夠獲得結論:多線程在多核平臺上是有速度優點的;可是不能認爲僅僅靠增長線程數就能夠提速,由於線程也是有開銷的;所以,仍是考慮trade-off

(六)Condition Variable

這個部分書上給的例子我以爲不能run,不利於初學者學習體會。

而這個blog(http://www.cnblogs.com/chuyuhuashi/p/4447817.html)中,經過一道常見的多線程面試題目來體會了condition variable的用法。

背景很簡潔:「咱們再來看一道經典的面試題:用 4 個線程瘋狂的打印 abcd 持續 5 秒鐘,可是要按照順序打印,不能是亂序的。

這個問題有能夠有兩種解決方案,一種是chainlock(鏈鎖解決方案),代碼以下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>

#define NTHR 4
pthread_mutex_t muts[NTHR];

void *thr_func(void *arg)
{
    int n = (int)(arg);
    char c = n+'a';

    while(1)
    {
        pthread_mutex_lock(muts+n);
        printf("%c\n",c);
        pthread_mutex_unlock(muts+(n+1)%NTHR);
    }
    pthread_exit((void *)0);
}

int main()
{
    pthread_t tid[NTHR];
    int i, err;
    for ( i=0; i<NTHR; ++i )
    {
        // initialize mutex and lock them
        pthread_mutex_init(muts+i, NULL);
        pthread_mutex_lock(muts+i);
        // create new thread
        err = pthread_create(tid+i, NULL, thr_func, (void *)i );
        if ( err!=0 )
        {
            fprintf(stderr, "pthread_create():%s\n",strerror(err));
            exit(1);
        }
    }
    pthread_mutex_unlock(muts);
    alarm(2); // wait 5 seconds
    for (i=0; i<NTHR; ++i) pthread_join(tid[i], NULL);
    exit(0);

}

另外一種解決方案是採用condition variable,代碼以下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

#define NTHR 4
pthread_mutex_t mut_num = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_num = PTHREAD_COND_INITIALIZER;
volatile int num = 0;

void *thr_fun(void *arg)
{
    int n = (int)arg;
    char ch = n+'a';
    while(1)
    {
        pthread_mutex_lock(&mut_num);
        while ( num!=n )
        {
            pthread_cond_wait(&cond_num, &mut_num);
        }
        printf("%c\n", ch);
        num = (num+1)%NTHR;
        pthread_cond_broadcast(&cond_num);
        pthread_mutex_unlock(&mut_num);
    }
    pthread_exit((void *)0);
}

int main()
{
    int i, err;
    pthread_t tid[NTHR];
    for ( i=0; i<NTHR; ++i ) 
    {
        err = pthread_create(tid+i, NULL, thr_fun, (void *)i);
        if ( err )
        {
            fprintf(stderr, "pthread_create():%s\n",strerror(err));
            exit(1);
        }
    }
    alarm(2);
    for ( i=0; i<NTHR; ++i )
        pthread_join(tid[i],NULL);
    exit(0);
}

兩種代碼的設計思路在原blog中都說明了,再也不贅述。

這兩種方法比較,有什麼trade-off呢?

(1)我以爲主要是採用condition variable的思路,只用一個互斥量mutex就能夠了,從而下降了維護互斥量的代價。

(2)至於效率上,從試驗結果來看:

    chainlock在5秒鐘內打印了700000個字符,而condition variable在相同時間內只打印了3000000個字符,由此能夠反推condition variable的效率在當前問題上是chainlock的1/2。

    若是是其餘場景的話,可能兩者的效率就又有所不一樣了。

相關文章
相關標籤/搜索