APUE 學習筆記(八) 線程同步

1. 進程的全部信息對該進程內的全部線程都是共享的

包括 可執行的程序文本、程序全局內存、堆內存以及文件描述符數組

線程包含了表示進程內執行環境必需的信息,包括線程ID、寄存器值、棧、調度優先級和策略、信號屏蔽字、線程私有數據
 
判斷線程相等時 採用 pthread_equal 函數
 
線程建立時並不能保證哪一個線程會先執行,不能在線程調度上作出任何假設
 
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>

void printid(const char* str)
{
    pid_t     pid = getpid();
    pthread_t tid = pthread_self();
    fprintf(stdout, "%s pid:%u,tid:%u\n", str, (unsigned int)pid, (unsigned int)tid);
}

void* thread_func(void* arg)
{
    printid("new thread: ");
    return (void*)0;
}

int main(int argc, char* argv[])
{
    pthread_t tid;
    int ret = pthread_create(&tid, NULL, thread_func, NULL);
    if (ret != 0) {
        fprintf(stderr, "pthread_create error\n");
        return -1;
    }
    printid("main thread: ");
    sleep(1);
    return 0;
}
這段代碼中須要兩個地方須要處理主線程和新線程之間的競爭:
(1)主線程須要休眠,若是主線程不休眠,就可能在新線程有機會運行以前就退出終止了
(2)新線程經過 pthread_self 函數獲取本身的線程ID,而不是從共享內存讀出或者從線程的啓動例程以參數的形式接收到
pthread_create 會經過第一個參數返回新建線程的線程ID,而新線程並不能安全的使用這個ID,由於 新線程可能在 主線程調用 pthread_create 回填線程ID以前就運行了,這時新線程使用的是未經初始化的ID,並非正確的線程ID
 
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>

#define NTHREADS 5

void* PrintHello(void* threadId)
{
    int tid = ((int)threadId);
    fprintf(stdout, "Hello world, thread %d\n", tid);
    pthread_exit(NULL);
}

int main(int argc, char* argv[])
{
    pthread_t threads[NTHREADS];
    int rc = 0;
    for (int i = 0; i < NTHREADS; ++i) {
        fprintf(stdout, "In main: creating thread %d\n", i);
        rc = pthread_create(&threads[i], NULL, PrintHello, (void*)i);
        if (rc != 0) {
            fprintf(stderr, "error:return code from pthread_create is %d\n", rc);
            exit(-1);
        }
    }
    pthread_exit(NULL);
}

上述代碼,咱們建立了5個線程,每一個線程打印一條包含線程編號的語句安全

能夠預想到:每次運行程序時,結果不盡相同。由於 線程建立時並不能保證哪一個線程會先執行,不能在線程調度上作出任何假設數據結構

 

 

假如咱們將上述代碼中
多線程

 rc = pthread_create(&threads[i], NULL, PrintHello, (void*)i);
 void* PrintHello(void* threadId)
 {
     int tid = ((int)threadId);
     fprintf(stdout, "Hello world, thread %d\n", tid);
     pthread_exit(NULL);
 }

改成如下:併發

rc = pthread_create(&threads[i], NULL, PrintHello, (void*)&i);
void* PrintHello(void* threadId)
{
    int tid = *((int*)threadId);
    fprintf(stdout, "Hello world, thread %d\n", tid);
    pthread_exit(NULL);
}

僅有的差異就是線程執行函數的參數傳遞不一樣,執行改過以後的程序:函數

 

咱們能夠看到程序執行結果徹底不一樣而且不正確性能

對於修改前:直接傳遞 變量i 的值,這是值語義,以後線程操做的只是 變量i 的副本,跟原來的 變量i 沒有任何關係,沒有競爭出現spa

對於修改後:傳遞的是 變量i 的地址(地址),這是引用語義,以後線程操做的是 原變量i,這時多個線程就出現了競爭,操作系統

                 由於這時 變量i 的地址是共享內存,對全部線程可見,其他5個線程經過共享內存在讀這個變量i,而主線程經過 i++在寫這個變量值線程

                 這之間並無任何同步,因此5個線程讀取的值並不正確

 

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
#include <math.h>

#define NTHREADS 5

void* busywork(void* ptr)
{
    int tid = (int)ptr;
    fprintf(stdout, "Thread %d starting...\n", tid);
    double result = 0.0;
    for (int i = 0; i < 1000000; ++i) {
        result = result + sin(i) * tan(i);
    }
    fprintf(stdout, "Thread %d done. Result = %e\n", tid, result);
    pthread_exit((void*)ptr);
}

int main(int argc, char* argv[])
{
    pthread_t thread[NTHREADS];
    pthread_attr_t attr;

    /* Initialize and set thread detached attribute */
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    for (int i = 0; i < NTHREADS; ++i) {
        fprintf(stdout, "Main: creating thread %d\n", i);
        int rc = pthread_create(&thread[i], &attr, busywork, (void*)i);
        if (rc != 0) {
            fprintf(stderr, "error:return code from pthread_create is %d\n", rc);
            exit(-1);
        }
    }

    /* Free attribute and wait for the other threads */
    void* status;
    pthread_attr_destroy(&attr);
    for (int i = 0; i < NTHREADS; ++i) {
        int rc = pthread_join(thread[i], &status);
        if (rc != 0) {
            fprintf(stderr, "error:return code from pthread_join id %d\n", rc);
            exit(-1);
        }
        fprintf(stdout, "Main:completed join with thread %d having a status of %d\n", i, (int)status);
    }
    fprintf(stdout, "Main: program completed. Exiting\n");
    pthread_exit(NULL);
}

 

2.線程能夠經過phread_cancel函數來請求取消同一進程中的其它線程

3.進程原語和線程原語的比較

在默認狀況下,線程的終止狀態會保存到對該線程調用pthread_join,若是線程已經處於分離狀態,那麼不能用pthread_join函數等待它的終止狀態。

 

3.線程同步

當多個控制線程共享相同的內存時,須要確保每一個線程都看到一致的數據視圖
 
(1)讀寫互斥
在變量修改時間多於1個存儲器訪問週期的處理器結構中,當存儲器讀與存儲器寫這兩個週期交叉時,就有潛在的不一致性

 

爲了解決這個問題,線程必須使用鎖,在同一時間只容許一個線程訪問該變量

 

(2)寫寫互斥
當多個線程在同一時間修改同一變量時,也須要同步
變量遞增操做:
a.從內存單元讀入寄存器
b.在寄存器中進行變量值的增長
c.把新值寫回內存單元
 
若是修改操做是原子操做,那麼就不存在競爭。然而在現代計算機系統中,存儲器訪問須要多個總線週期,一般在多個處理器上交叉進程
 
(3) 互斥量
對互斥量進行加鎖之後,任何其它試圖再次對互斥量加鎖的線程將會被阻塞直到當前線程釋放該互斥鎖
 
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

struct foo {
    int        f_count;
    pthread_t  f_lock;
    /* more stuff here... */
};

struct foo* foo_alloc(void)
{
    struct foo* fp = malloc(sizeof(struct foo));
    if (fp != NULL) {
        fp->f_count = 1;
        int ret = pthread_mutex_init(&fp->f_lock, NULL);
        if (ret != 0) {
            free(fp);
            return NULL;
        }
    }
    return fp;
}

/* increase a reference to the object */
void foo_increase(struct foo* fp)
{
    assert(fp != NULL);
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}
/* decrease a reference to the object */
void foo_decrease(struct foo* fp)
{
    assert(fp != NULL);
    pthread_mutex_lock(&fp->f_lock);
    if (--fp->f_count == 0) {
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&fp->f_lock);
    }
}
 

4.避免死鎖

兩個線程都在相互請求另外一個線程擁有的資源,這兩個線程都沒法向前運行,就產生了死鎖
  1 #include <unistd.h>
  2 #include <stdio.h>
  3 #include <stdlib.h>
  4 #include <assert.h>
  5 #include <pthread.h>
  6 
  7 #define NHASH 29
  8 #define HASH(fp)  (((unsigned long)(fp)) % NHASH)
  9 
 10 pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
 11 
 12 struct foo {
 13     struct foo*      f_next;
 14     int              f_count;
 15     pthread_mutex_t  f_lock;
 16     int              f_id;
 17     /* more stuff here... */
 18 };
 19 
 20 struct foo* fh[NHASH];
 21 
 22 struct foo* foo_alloc(void)
 23 {
 24     struct foo* fp = malloc(sizeof(struct foo));
 25     if (fp != NULL) {
 26         fp->f_count = 1;
 27         int ret = pthread_mutex_init(&fp->f_lock, NULL);
 28         if (ret != 0) {
 29             free(fp);
 30             return NULL;
 31         }
 32         int idx = HASH(fp);
 33         pthread_mutex_lock(&hashlock);
 34         fp->f_next = fh[idx];
 35         fh[idx] = fp->f_next;
 36         pthread_mutex_lock(&fp->f_lock);
 37         pthread_mutex_unlock(&hashlock);
 38 
 39         /*  continue initialization...... */
 40         pthread_mutex_unlock(&fp->f_lock);
 41     }
 42     return fp;
 43 }
 44 
 45 /* increase a reference to the object */
 46 void foo_increase(struct foo* fp)
 47 {
 48     assert(fp != NULL);
 49     pthread_mutex_lock(&fp->f_lock);
 50     fp->f_count++;
 51     pthread_mutex_unlock(&fp->f_lock);
 52 }
 53 
 54 /* find an existing object */
 55 struct foo* foo_find(int id)
 56 {
 57     struct foo* fp;
 58     int idx = HASH(fp);
 59     pthread_mutex_lock(&hashlock);
 60     for (fp = fh[idx]; fp != NULL; fp = fp->f_next) {
 61         if (fp->f_id == id) {
 62             foo_increase(fp);
 63             break;
 64         }
 65     }
 66     pthread_mutex_unlock(&hashlock);
 67     return fp;
 68 }
 69 
 70 /* decrease a reference to the object */
 71 void foo_decrease(struct foo* fp)
 72 {
 73     assert(fp != NULL);
 74     struct foo* tfp = NULL;
 75     int idx = 0;
 76     pthread_mutex_lock(&fp->f_lock);
 77     if (fp->f_count == 1) {
 78         pthread_mutex_unlock(&fp->f_lock);
 79         pthread_mutex_lock(&hashlock);
 80         pthread_mutex_lock(&fp->f_lock);
 81         /* need to recheck the condition */
 82         if (fp->f_count != 1) {
 83             fp->f_count--;
 84             pthread_mutex_unlock(&fp->f_lock);
 85             pthread_mutex_unlock(&hashlock);
 86             return;
 87         }
 88 
 89         /* remove from list */
 90         idx = HASH(fp);
 91         tfp = fh[idx];
 92         if (tfp == fp) {
 93             fh[idx] = fp->f_next;
 94         } else {
 95             while (tfp->f_next != fp) {
 96                 tfp = tfp->f_next;
 97             }
 98             tfp->f_next = fp->f_next;
 99         }
100         pthread_mutex_unlock(&hashlock);
101         pthread_mutex_unlock(&fp->f_lock);
102         pthread_mutex_destroy(&fp->f_lock);
103         free(fp);
104 
105     } else {
106         fp->f_count--;
107         pthread_mutex_unlock(&fp->f_lock);
108     }
109 }
上述代碼描述了兩個互斥量的使用方法
當同時須要兩個互斥量時,老是讓它們 以相同的順序加鎖,以免死鎖
 
hashlock維護着一個用於跟蹤foo數據結構的散列列表,這樣就保護foo結構中的fh散列表和f_next散列鏈字段
foo結構中的f_lock互斥量保護對foo結構中的其餘字段的訪問
 
在foo_decrease函數中,首先須要讀取 f_count 這個時候咱們必須對其加鎖訪問,防止其它線程在此過程當中修改此變量
互斥讀取到f_count變量後,若是它是最後一個引用,則須要從散列表中刪除這個結構,這時須要讀取全局的數據結構fh結構體數組,咱們必須 先對 結構體互斥量解鎖(第78行),才能夠從新得到散列表鎖(第79行),而後從新獲取結構體互斥量(第80行),由於 兩個互斥量的加鎖順序必須保持絕對一致。線程可能爲了知足鎖順序而阻塞,其它線程可能就在此阻塞過程當中對結構引用計數加1,因此必須從新檢查條件。也就是說 在第78行至80行之間,其它線程是能夠對結構體引用計數加1的
 上述代碼過於複雜,能夠簡化以下:
 1 #include <unistd.h>
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include <assert.h>
 5 #include <pthread.h>
 6 
 7 #define NHASH 29
 8 #define HASH(fp)  (((unsigned long)(fp)) % NHASH)
 9 
10 struct foo {
11     struct foo*       f_next;       /* protected by hashlock */
12     int               f_count;      /* protected by hashlock */
13     pthread_mutex_t   f_lock;
14     int               f_id;
15     /* more stuff here */
16 };
17 
18 struct foo* fh[NHASH];
19 pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
20 
21 struct foo* foo_alloc(void)
22 {
23     int idx = 0;
24     struct foo* fp = malloc(sizeof(struct foo));
25     if (fp != NULL) {
26         fp->f_count = 1;
27         int ret = pthread_mutex_init(&fp->f_lock, NULL);
28         if (ret != 0) {
29             free(fp);
30             return NULL;
31         }
32         idx = HASH(fp);
33         pthread_mutex_lock(&hashlock);
34         fp->f_next = fh[idx];
35         fh[idx] = fp->f_next;
36         pthread_mutex_lock(&fp->f_count);
37         pthread_mutex_unlock(&hashlock);
38         /* continue initialization */
39     }
40     return fp;
41 }
42 
43 void foo_increase(struct foo* fp)
44 {
45     assert(fp != NULL);
46     pthread_mutex_lock(&hashlock);
47     fp->f_count++;
48     pthread_mutex_unlock(&hashlock);
49 }
50 
51 struct foo* foo_find(int id)
52 {
53     struct foo* fp = NULL;
54     int idx = HASH(fp);
55     pthread_mutex_lock(&hashlock);
56     for (fp = fh[idx]; fp != NULL; fp = fp->f_next) {
57         if (fp->f_id == id) {
58             fp->f_count++;
59             break;
60         }
61     }
62     pthread_mutex_unlock(&hashlock);
63     return fp;
64 }
65 
66 void foo_decrease(struct foo* fp)
67 {
68     assert(fp != NULL);
69     struct foo* tfp = NULL;
70     int idx = 0;
71 
72     pthread_mutex_lock(&hashlock);
73     if (--fp->f_count == 0) {
74         idx = HASH(fp);
75         tfp = fh[idx];
76         if (tfp == fp) {
77             fh[idx] = fp->f_next;
78         } else {
79             while (tfp->f_next != fp) {
80                 tfp = tfp->f_next;
81             }
82             tfp->f_next = fp->f_next;
83         }
84         pthread_mutex_unlock(&hashlock);
85         pthread_mutex_destroy(&fp->f_lock);
86         free(fp);
87     } else {
88         pthread_mutex_unlock(&hashlock);
89     }
90 }

 

若是鎖的粒度太粗,就會出現不少線程阻塞等待相同的鎖,源自併發性的改善微乎其微
若是鎖的粒度太細,那麼過多的鎖會使系統性能開銷受到影響,並且代碼變得至關複雜
 

5.讀寫鎖

互斥量要麼是鎖住狀態,要麼是不加鎖狀態,並且一次只有一個線程能夠對其加鎖
讀寫鎖有三種狀態:讀狀態加鎖,寫狀態加鎖,不加鎖。一次只有一個線程能夠佔有 寫狀態鎖,多個線程能夠同時佔有多個讀狀態鎖
讀寫鎖很是適合於對數據結構讀的次數遠大於寫的狀況
讀寫鎖也叫作 共享-獨佔鎖
 

6.條件變量

7. 線程屬性

#include<pthread.h>
int pthread_attr_init(pthread_attr_t* attr);
int pthread_attr_destroy(pthread_attr_t* attr);
 
 
若是對現有的某個線程的終止狀態不感興趣的話,可使用pthread_detach函數讓操做系統在線程退出時回收它所佔用的資源
 
pthread_attr_getstack  pthread_attr_setstack 這兩個函數能夠用於管理stackaddr線程屬性
pthread_attr_getstacksize  pthread_attr_setstacksize 這兩個函數能夠用於管理stacksize線程屬性
 

8.互斥量屬性

#include <pthread.h>
int pthread_mutexattr_init(pthread_mutexattr_t* attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t* attr);

 

9.重入、線程安全

若是一個函數在同一時刻能夠被多個線程安全地調用,則該函數是線程安全的。
相關文章
相關標籤/搜索