[c++11]多線程編程(六)——條件變量(Condition Variable)

互斥鎖std::mutex是一種最多見的線程間同步的手段,可是在有些狀況下不過高效。ios

假設想實現一個簡單的消費者生產者模型,一個線程往隊列中放入數據,一個線程往隊列中取數據,取數據前須要判斷一下隊列中確實有數據,因爲這個隊列是線程間共享的,因此,須要使用互斥鎖進行保護,一個線程在往隊列添加數據的時候,另外一個線程不能取,反之亦然。用互斥鎖實現以下:c++

#include <iostream>
#include <deque>
#include <thread>
#include <mutex>

std::deque<int> q;
std::mutex mu;

void function_1() {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mu);
        q.push_front(count);
        locker.unlock();
        std::this_thread::sleep_for(std::chrono::seconds(1));
        count--;
    }
}

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        if (!q.empty()) {
            data = q.back();
            q.pop_back();
            locker.unlock();
            std::cout << "t2 got a value from t1: " << data << std::endl;
        } else {
            locker.unlock();
        }
    }
}
int main() {
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

//輸出結果
//t2 got a value from t1: 10
//t2 got a value from t1: 9
//t2 got a value from t1: 8
//t2 got a value from t1: 7
//t2 got a value from t1: 6
//t2 got a value from t1: 5
//t2 got a value from t1: 4
//t2 got a value from t1: 3
//t2 got a value from t1: 2
//t2 got a value from t1: 1

能夠看到,互斥鎖其實能夠完成這個任務,可是卻存在着性能問題。編程

首先,function_1函數是生產者,在生產過程當中,std::this_thread::sleep_for(std::chrono::seconds(1));表示延時1s,因此這個生產的過程是很慢的;function_2函數是消費者,存在着一個while循環,只有在接收到表示結束的數據的時候,纔會中止,每次循環內部,都是先加鎖,判斷隊列不空,而後就取出一個數,最後解鎖。因此說,在1s內,作了不少無用功!這樣的話,CPU佔用率會很高,可能達到100%(單核)。如圖:併發

CPU佔用率.png

解決辦法之一是給消費者也加一個小延時,若是一次判斷後,發現隊列是空的,就懲罰一下本身,延時500ms,這樣能夠減少CPU的佔用率。函數

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        if (!q.empty()) {
            data = q.back();
            q.pop_back();
            locker.unlock();
            std::cout << "t2 got a value from t1: " << data << std::endl;
        } else {
            locker.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    }
}

如圖:性能

使用延時的CPU佔用率.png

而後困難之處在於,如何肯定這個延時時間呢,假如生產者生產的很快,消費者卻延時500ms,也不是很好,若是生產者生產的更慢,那麼消費者延時500ms,仍是沒必要要的佔用了CPU。this

這就引出了條件變量(condition variable),c++11中提供了#include <condition_variable>頭文件,其中的std::condition_variable能夠和std::mutex結合一塊兒使用,其中有兩個重要的接口,notify_one()wait()wait()可讓線程陷入休眠狀態,在消費者生產者模型中,若是生產者發現隊列中沒有東西,就可讓本身休眠,可是不能一直不幹活啊,notify_one()就是喚醒處於wait中的其中一個條件變量(可能當時有不少條件變量都處於wait狀態)。那什麼時刻使用notify_one()比較好呢,固然是在生產者往隊列中放數據的時候了,隊列中有數據,就能夠趕忙叫醒等待中的線程起來幹活了。spa

使用條件變量修改後以下:線程

#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;

void function_1() {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mu);
        q.push_front(count);
        locker.unlock();
        cond.notify_one();  // Notify one waiting thread, if there is one.
        std::this_thread::sleep_for(std::chrono::seconds(1));
        count--;
    }
}

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        while(q.empty())
            cond.wait(locker); // Unlock mu and wait to be notified
        data = q.back();
        q.pop_back();
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}
int main() {
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

此時CPU的佔用率也很低。3d

使用條件變量時的CPU佔用率.png

上面的代碼有三個注意事項:

  1. function_2中,在判斷隊列是否爲空的時候,使用的是while(q.empty()),而不是if(q.empty()),這是由於wait()從阻塞到返回,不必定就是因爲notify_one()函數形成的,還有可能因爲系統的不肯定緣由喚醒(可能和條件變量的實現機制有關),這個的時機和頻率都是不肯定的,被稱做僞喚醒,若是在錯誤的時候被喚醒了,執行後面的語句就會錯誤,因此須要再次判斷隊列是否爲空,若是仍是爲空,就繼續wait()阻塞。
  2. 在管理互斥鎖的時候,使用的是std::unique_lock而不是std::lock_guard,並且事實上也不能使用std::lock_guard,這須要先解釋下wait()函數所作的事情。能夠看到,在wait()函數以前,使用互斥鎖保護了,若是wait的時候什麼都沒作,豈不是一直持有互斥鎖?那生產者也會一直卡住,不可以將數據放入隊列中了。因此,wait()函數會先調用互斥鎖的unlock()函數,而後再將本身睡眠,在被喚醒後,又會繼續持有鎖,保護後面的隊列操做。lock_guard沒有lockunlock接口,而unique_lock提供了。這就是必須使用unique_lock的緣由。
  3. 使用細粒度鎖,儘可能減少鎖的範圍,在notify_one()的時候,不須要處於互斥鎖的保護範圍內,因此在喚醒條件變量以前能夠將鎖unlock()

還能夠將cond.wait(locker);換一種寫法,wait()的第二個參數能夠傳入一個函數表示檢查條件,這裏使用lambda函數最爲簡單,若是這個函數返回的是truewait()函數不會阻塞會直接返回,若是這個函數返回的是falsewait()函數就會阻塞着等待喚醒,若是被僞喚醒,會繼續判斷函數返回值。

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        cond.wait(locker, [](){ return !q.empty();} );  // Unlock mu and wait to be notified
        data = q.back();
        q.pop_back();
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}

除了notify_one()函數,c++還提供了notify_all()函數,能夠同時喚醒全部處於wait狀態的條件變量。

參考

  1. C++併發編程實戰
  2. C++ Threading #6: Condition Variable
相關文章
相關標籤/搜索