C++ 併發編程之互斥鎖和條件變量的性能比較

介紹

本文以最簡單生產者消費者模型,經過運行程序,觀察該進程的cpu使用率,來對比使用互斥鎖互斥鎖+條件變量的性能比較。ios

本例子的生產者消費者模型,1個生產者,5個消費者。
生產者線程往隊列裏放入數據,5個消費者線程從隊列取數據,取數據前須要判斷一下隊列中是否有數據,這個隊列是全局隊列,是線程間共享的數據,因此須要使用互斥鎖進行保護。即生產者在往隊列裏放入數據時,其他消費者不能取,反之亦然。c++


互斥鎖實現的代碼

#include <iostream> // std::cout
#include <deque>    // std::deque
#include <thread>   // std::thread
#include <chrono>   // std::chrono
#include <mutex>    // std::mutex


// 全局隊列
std::deque<int> g_deque;

// 全局鎖
std::mutex g_mutex;

// 生產者運行標記
bool producer_is_running = true;

// 生產者線程函數
void Producer()
{
    // 庫存個數
    int count = 8;
    
    do
    {
        // 智能鎖,初始化後即加鎖,保護的範圍是代碼花括號內,花括號退出即會自動解鎖
        // 能夠手動解鎖,從而控制互斥鎖的細粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        // 入隊一個數據
        g_deque.push_front( count );
        // 提早解鎖,縮小互斥鎖的細粒度,只針對共享的隊列數據進行同步保護
        locker.unlock(); 

        std::cout << "生產者    :我如今庫存有 :" << count << std::endl;
            
        // 放慢生產者生產速度,睡1秒
        std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

        // 庫存自減小
        count--;
    } while( count > 0 );
    
    // 標記生產者打樣了
    producer_is_running = false;

    std::cout << "生產者    : 個人庫存沒有了,我要打樣了!"  << std::endl;
}

// 消費者線程函數
void Consumer(int id)
{
    int data = 0;

    do
    {
        std::unique_lock<std::mutex> locker( g_mutex );
        if( !g_deque.empty() )
        {
            data = g_deque.back();
            g_deque.pop_back();
            locker.unlock();

            std::cout << "消費者[" << id << "] : 我搶到貨的編號是 :" << data << std::endl;
        }
        else
        {
            locker.unlock();
        }
    } while( producer_is_running );
    
    std::cout << "消費者[" << id << "] :賣家沒有貨打樣了,真惋惜,下次再來搶!"  << std::endl;
}

int main(void)
{
    std::cout << "1 producer start ..." << std::endl;
    std::thread producer( Producer );

    std::cout << "5 consumer start ..." << std::endl;
    std::thread consumer[ 5 ];
    for(int i = 0; i < 5; i++)
    {
        consumer[i] = std::thread(Consumer, i + 1);
    }

    producer.join();

    for(int i = 0; i < 5; i++)
    {
        consumer[i].join();
    }

    std::cout << "All threads joined." << std::endl;

    return 0;
}

互斥鎖實現運行結果:

結果輸出shell

[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o  main
[root@lincoding condition]# ./main
1 producer start ...
5 consumer start ...
生產者    :我如今庫存有 :8
消費者[1] : 我搶到貨的編號是 :8
消費者[1] : 我搶到貨的編號是 :7
生產者    :我如今庫存有 :7
生產者    :我如今庫存有 :6
消費者[3] : 我搶到貨的編號是 :6
生產者    :我如今庫存有 :5
消費者[1] : 我搶到貨的編號是 :5
生產者    :我如今庫存有 :4
消費者[2] : 我搶到貨的編號是 :4
生產者    :我如今庫存有 :3
消費者[5] : 我搶到貨的編號是 :3
生產者    :我如今庫存有 :2
消費者[2] : 我搶到貨的編號是 :2
生產者    :我如今庫存有 :1
消費者[1] : 我搶到貨的編號是 :1
生產者    : 個人庫存沒有了,我要打樣了!消費者[
5] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[2] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[3] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[4] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[1] :賣家沒有貨打樣了,真惋惜,下次再來搶!
All threads joined.

能夠看到,互斥鎖其實能夠完成這個任務,可是卻存在着性能問題。函數

  • Producer是生產者線程,在生產者數據過程當中,會休息1秒,因此這個生產過程是很慢的;
  • Consumer是消費者線程,存在着一個while循環,只有判斷到生產者不運行了,纔會退出while循環,那麼每次在循環體內,都是會先加鎖,判斷隊列不空,而後從列隊取出一個數據,最後解鎖。因此說,在生產者休息1秒的時候,消費者線程實際上會作不少無用功,致使CPU使用率很是高!

運行的環境是4核cpu性能

[root@lincoding ~]# grep 'model name' /proc/cpuinfo | wc -l
4

top命令查看cpu使用狀況,可見使用純互斥鎖cpu的開銷是很大的,main進程的cpu使用率達到了357.5%CPU,系統開銷的cpu爲54.5%sy,用戶開銷的cpu爲18.2%usui

[root@lincoding ~]# top
top - 19:13:41 up 36 min,  3 users,  load average: 0.06, 0.05, 0.01
Tasks: 179 total,   1 running, 178 sleeping,   0 stopped,   0 zombie
Cpu(s): 18.2%us, 54.5%sy,  0.0%ni, 27.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   1004412k total,   313492k used,   690920k free,    41424k buffers
Swap:  2031608k total,        0k used,  2031608k free,    79968k cached

   PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                       
 35346 root      20   0  137m 3288 1024 S 357.5  0.3   0:05.92 main                                                                                                                          
     1 root      20   0 19232 1492 1224 S  0.0  0.1   0:02.16 init                                                                                                                           
     2 root      20   0     0    0    0 S  0.0  0.0   0:00.01 kthreadd                                                                                                                       
     3 root      RT   0     0    0    0 S  0.0  0.0   0:00.68 migration/0

解決的辦法之一就是給消費者也加一個小延時,當消費者沒取到數據時,就休息一下500毫秒,這樣能夠減小互斥鎖給cpu帶來的開銷。this

// 消費者線程函數
void Consumer(int id)
{
    int data = 0;

    do
    {
        std::unique_lock<std::mutex> locker( g_mutex );
        if( !g_deque.empty() )
        {
            data = g_deque.back();
            g_deque.pop_back();
            locker.unlock();

            std::cout << "消費者[" << id << "] : 我搶到貨的編號是 :" << data << std::endl;
        }
        else
        {
            locker.unlock();
            // 當消費者沒取到數據時,就休息一下500毫秒
            std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
        }
    } while( producer_is_running );
    
    std::cout << "消費者[" << id << "] :賣家沒有貨打樣了,真惋惜,下次再來搶!"  << std::endl;
}

從運行結果可知,cpu使用率大大下降了線程

[root@lincoding ~]# ps aux | grep -v grep  |grep main
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root      61296  0.0  0.1 141068  1244 pts/1    Sl+  19:40   0:00 ./main

條件變量+互斥鎖實現的代碼

那麼問題來了,如何肯定消費者延時(休息)多久呢?code

  • 若是生產者生產的很是快,消費者卻延時了500毫秒,也不是很好
  • 若是生產者生產的更慢,那麼消費延時500毫秒,也會有無用功,佔用了CPU

這就須要引入條件變量std::condition_variable,應用於消費者生產模型中,就是生產者生產完一個數據後,經過notify_one()喚醒正在wait()消費者線程,使得消費者從隊列取出一個數據。接口

#include <iostream> // std::cout
#include <deque>    // std::deque
#include <thread>   // std::thread
#include <chrono>   // std::chrono
#include <mutex>    // std::mutex

#include <condition_variable> // std::condition_variable


// 全局隊列
std::deque<int> g_deque;

// 全局鎖
std::mutex g_mutex;

// 全局條件變量
std::condition_variable g_cond;

// 生產者運行標記
bool producer_is_running = true;

// 生產者線程函數
void Producer()
{
    // 庫存個數
    int count = 8;
    
    do
    {
        // 智能鎖,初始化後即加鎖,保護的範圍是代碼花括號內,花括號退出即會自動解鎖
        // 能夠手動解鎖,從而控制互斥鎖的細粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        // 入隊一個數據
        g_deque.push_front( count );
        // 提早解鎖,縮小互斥鎖的細粒度,只針對共享的隊列數據進行同步保護
        locker.unlock(); 

        std::cout << "生產者    :我如今庫存有 :" << count << std::endl;
        
        // 喚醒一個線程
        g_cond.notify_one();
        
        // 睡1秒
        std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

        // 庫存自減小
        count--;
    } while( count > 0 );
    
    // 標記生產者打樣了
    producer_is_running = false;
    
    // 喚醒全部消費線程
    g_cond.notify_all();
    
    std::cout << "生產者    : 個人庫存沒有了,我要打樣了!"  << std::endl;
}

// 消費者線程函數
void Consumer(int id)
{
    // 購買的貨品編號
    int data = 0;

    do
    {
        // 智能鎖,初始化後即加鎖,保護的範圍是代碼花括號內,花括號退出即會自動解鎖
        // 能夠手動解鎖,從而控制互斥鎖的細粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        
        // wait()函數會先調用互斥鎖的unlock()函數,而後再將本身睡眠,在被喚醒後,又會繼續持有鎖,保護後面的隊列操做
        // 必須使用unique_lock,不能使用lock_guard,由於lock_guard沒有lock和unlock接口,而unique_lock則都提供了
        g_cond.wait(locker); 
        
        // 隊列不爲空
        if( !g_deque.empty() )
        {
            // 取出隊列裏最後一個數據
            data = g_deque.back();
            
            // 刪除隊列裏最後一個數據
            g_deque.pop_back();
            
            // 提早解鎖,縮小互斥鎖的細粒度,只針對共享的隊列數據進行同步保護
            locker.unlock(); 

            std::cout << "消費者[" << id << "] : 我搶到貨的編號是 :" << data << std::endl;
        }
        // 隊列爲空
        else
        {
            locker.unlock();
        }
    
    } while( producer_is_running );
    
    std::cout << "消費者[" << id << "] :賣家沒有貨打樣了,真惋惜,下次再來搶!"  << std::endl;
}

int main(void)
{
    std::cout << "1 producer start ..." << std::endl;
    std::thread producer( Producer );

    std::cout << "5 consumer start ..." << std::endl;
    std::thread consumer[ 5 ];
    for(int i = 0; i < 5; i++)
    {
        consumer[i] = std::thread(Consumer, i + 1);
    }

    producer.join();

    for(int i = 0; i < 5; i++)
    {
        consumer[i].join();
    }

    std::cout << "All threads joined." << std::endl;

    return 0;
}

條件變量+互斥鎖運行結果

[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o  main
[root@lincoding condition]# 
[root@lincoding condition]# ./main 
1 producer start ...
5 consumer start ...
生產者    :我如今庫存有 :8
消費者[4] : 我搶到貨的編號是 :8
生產者    :我如今庫存有 :7
消費者[2] : 我搶到貨的編號是 :7
生產者    :我如今庫存有 :6
消費者[3] : 我搶到貨的編號是 :6
生產者    :我如今庫存有 :5
消費者[5] : 我搶到貨的編號是 :5
生產者    :我如今庫存有 :4
消費者[1] : 我搶到貨的編號是 :4
生產者    :我如今庫存有 :3
消費者[4] : 我搶到貨的編號是 :3
生產者    :我如今庫存有 :2
消費者[2] : 我搶到貨的編號是 :2
生產者    :我如今庫存有 :1
消費者[3] : 我搶到貨的編號是 :1
生產者    : 個人庫存沒有了,我要打樣了!
消費者[5] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[1] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[4] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[2] :賣家沒有貨打樣了,真惋惜,下次再來搶!
消費者[3] :賣家沒有貨打樣了,真惋惜,下次再來搶!
All threads joined.

CPU開銷很是的小

[root@lincoding ~]# ps aux | grep -v grep  |grep main
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root      73838  0.0  0.1 141068  1256 pts/1    Sl+  19:54   0:00 ./main

總結

在不肯定生產者的生產速度是快仍是慢的場景裏,不能只使用互斥鎖保護共享的數據,這樣會對CPU的性能開銷很是大,可使用互斥鎖+條件變量的方式,當生產者線程生產了一個數據,就喚醒消費者線程進行消費,避免一些無用功的性能開銷。

相關文章
相關標籤/搜索