C++11 併發編程教程 - Part 3 : 鎖的進階與條件變量

上一篇文章中咱們學習瞭如何使用互斥量來解決一些線程同步問題。這一講咱們將進一步討論互斥量的話題,並向你們介紹 C++11 併發庫中的另外一種同步機制 —— 條件變量。 多線程


遞歸鎖
併發

考慮下面這個簡單類:
app

1
2
3
4
5
6
7
8
9
10
11
12
13
struct  Complex {
    std::mutex mutex;
    int  i;
    Complex() : i(0) {}
    void  mul(int  x){
        std::lock_guard<std::mutex> lock(mutex);
        i *= x;
    }
    void  div(int  x){
        std::lock_guard<std::mutex> lock(mutex);
        i /= x;
    }
};

如今你想添加一個操做以便無誤地一併執行上述兩項操做,因而你添加了一個函數: 函數

1
2
3
4
5
void  both(int  x,  int  y){
    std::lock_guard<std::mutex> lock(mutex);
    mul(x);
    div(y);
}

讓咱們來測試這個函數:
學習

1
2
3
4
5
int  main(){
    Complex complex;
    complex.both(32, 23);
    return  0;
}

若是你運行上述測試,你會發現這個程序將永遠不會結束。緣由很簡單,在 both() 函數中,線程將申請鎖,而後調用 mul() 函數,在這個函數[譯註:指 mul() ]中,線程將再次申請該鎖,但該鎖已經被鎖住了。這是死鎖的一種狀況。默認狀況下,一個線程不能重複申請同一個互斥量上的鎖。 測試

這裏有一個簡單的解決辦法:std::recursive_mutex 。這個互斥量可以被同一個線程重複上鎖,下面就是 Complex 結構體的正確實現:
fetch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct  Complex {
    std::recursive_mutex mutex;
    int  i;
    Complex() : i(0) {}
    void  mul(int  x){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i *= x;
    }
    void  div(int  x){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i /= x;
    }
    void  both(int  x,  int  y){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        mul(x);
        div(y);
    }
};

這樣一來,程序就能正常的結束了。
this


計時鎖
spa

有些時候,你並不想某個線程永無止境地去等待某個互斥量上的鎖。譬如說你的線程但願在等待某個鎖的時候作點其餘的事情。爲了達到這一目的,標準庫提供了一套解決方案:std::timed_mutex std::recursive_timed_mutex (若是你的鎖須要具有遞歸性的話)。他們具有與 std::mutex 相同的函數:lock()  unlock(),同時還提供了兩個新的函數:try_lock_for()  try_lock_until()  線程

第一個函數,也是最有用的一個,它容許你設置一個超時參數,一旦超時,就算當前尚未得到鎖,函數也會自動返回。該函數在得到鎖以後返回 true,不然 false。下面咱們來看一個簡單示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::timed_mutex mutex;
void  work(){
    std::chrono::milliseconds timeout(100);
    while(true){
        if(mutex.try_lock_for(timeout)){
            std::cout << std::this_thread::get_id() <<  ": do work with the mutex"  << std::endl;
            std::chrono::milliseconds sleepDuration(250);
            std::this_thread::sleep_for(sleepDuration);
            mutex.unlock();
            std::this_thread::sleep_for(sleepDuration);
        }  else  {
            std::cout << std::this_thread::get_id() <<  ": do work without mutex"  << std::endl;
            std::chrono::milliseconds sleepDuration(100);
            std::this_thread::sleep_for(sleepDuration);
        }
    }
}
int  main(){
    std::thread  t1(work);
    std::thread  t2(work);
    t1.join();
    t2.join();
    return  0;
}

(這個示例在實踐中是毫無用處的)

值得注意的是示例中時間間隔聲明:std::chrono::milliseconds 。它一樣是 C++11 的新特性。你能夠獲得多種時間單位:納秒、微妙、毫秒、秒、分以及小時。咱們使用上述某個時間單位以設置try_lock_for() 函數的超時參數。咱們一樣可使用它們並經過 std::this_thread::sleep_for() 函數來設置線程的睡眠時間。示例中剩下的代碼就沒什麼使人激動的了,只是一些使得結果可見的打印語句。注意:這段示例永遠不會結束,你須要本身把他 kill 掉。


Call Once

有時候你但願某個函數在多線程環境中只被執行一次。譬如一個由兩部分組成的函數,第一部分只能被執行一次,而第二部分則在該函數每次被調用時都應該被執行。咱們可使用 std::call_once 函數垂手可得地實現這一功能。下面是針對這一機制的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::once_flag flag;
void  do_something(){
    std::call_once(flag, [](){std::cout <<  "Called once"  << std::endl;});
    std::cout <<  "Called each time"  << std::endl;
}
int  main(){
    std::thread  t1(do_something);
    std::thread  t2(do_something);
    std::thread  t3(do_something);
    std::thread  t4(do_something);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    return  0;
}

每個 std::call_once 函數都有一個 std::once_flag 變量與之匹配。在上例中我使用了 Lambda 表達式[譯註:此處意譯]來做爲只被執行一次的代碼,而使用函數指針以及 std::function 對象也一樣可行。


條件變量

條件變量維護着一個線程列表,列表中的線程都在等待該條件變量上的另外某個線程將其喚醒。[譯註:原文對於如下內容的闡釋有誤,故譯者參照cppreference.com `條件變量` 一節進行翻譯] 每一個想要在 std::condition_variable 上等待的線程都必須首先得到一個 std::unique_lock 鎖。[譯註:條件變量的] wait 操做會自動地釋放鎖並掛起對應的線程。當條件變量被通知時,掛起的線程將被喚醒,鎖將會被再次申請。

一個很是好的例子就是有界緩衝區。它是一個環形緩衝,擁有肯定的容量、起始位置以及結束位置。下面就是使用條件變量實現的一個有界緩衝區。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct  BoundedBuffer {
    int* buffer;
    int  capacity;
    int  front;
    int  rear;
    int  count;
    std::mutex lock;
    std::condition_variable not_full;
    std::condition_variable not_empty;
    BoundedBuffer(int  capacity) : capacity(capacity), front(0), rear(0), count(0) {
        buffer =  new  int[capacity];
    }
    ~BoundedBuffer(){
        delete[] buffer;
    }
    void  deposit(int  data){
        std::unique_lock<std::mutex> l(lock);
        not_full.wait(l, [&count, &capacity](){return  count != capacity; });
        buffer[rear] = data;
        rear = (rear + 1) % capacity;
        ++count;
        not_empty.notify_one();
    }
    int  fetch(){
        std::unique_lock<std::mutex> l(lock);
        not_empty.wait(l, [&count](){return  count != 0; });
        int  result = buffer[front];
        front = (front + 1) % capacity;
        --count;
        not_full.notify_one();
        return  result;
    }
};

類中互斥量由 std::unique_lock 接管,它是用於管理鎖的 Wrapper,是使用條件變量的必要條件。咱們使用 notify_one() 函數喚醒等待在條件變量上的某個線程。而函數 wait() 就有些特別了,其第一個參數是咱們的 std::unique_lock,而第二個參數是一個斷言。要想持續等待的話,這個斷言就必須返回false,這就有點像 while(!predicate()) { cv.wait(l); } 的形式。上例剩下的部分就沒什麼好說的了。

咱們可使用上例的緩衝區解決「多消費者/多生產者」問題。這是一個很是廣泛的同步問題,許多線程(消費者)在等待由其餘一些線程(生產者)生產的數據。下面就是一個使用這個緩衝區的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void  consumer(int  id, BoundedBuffer& buffer){
    for(int  i = 0; i < 50; ++i){
        int  value = buffer.fetch();
        std::cout <<  "Consumer "  << id <<  " fetched "  << value << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
    }
}
void  producer(int  id, BoundedBuffer& buffer){
    for(int  i = 0; i < 75; ++i){
        buffer.deposit(i);
        std::cout <<  "Produced "  << id <<  " produced "  << i << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
int  main(){
    BoundedBuffer buffer(200);
    std::thread  c1(consumer, 0, std::ref(buffer));
    std::thread  c2(consumer, 1, std::ref(buffer));
    std::thread  c3(consumer, 2, std::ref(buffer));
    std::thread  p1(producer, 0, std::ref(buffer));
    std::thread  p2(producer, 1, std::ref(buffer));
    c1.join();
    c2.join();
    c3.join();
    p1.join();
    p2.join();
    return  0;
}

三個消費者線程和兩個生產者線程被建立後就不斷地對緩衝區進行查詢。值得關注的是例子中使用std::ref 來傳遞緩衝區的引用,以避免形成對緩衝區的拷貝。


總結

這一節咱們講到了許多東西,首先,咱們看到如何使用遞歸鎖實現某個線程對同一鎖的屢次加鎖。接下來知道了如何在加鎖時設定一個超時屬性。而後咱們學習了一種調用某個函數有且只有一次的方法。最後咱們使用條件變量解決了「多生產者/多消費者」同步問題。


下篇

下一節咱們將講到 C++11同步庫中另外一個新特性 —— 原子量。

相關文章
相關標籤/搜索