C++併發編程 條件變量 condition_variable,線程安全隊列示例

1. 背景

c++11中提供了對線程與條件變量的更好支持,對於寫多線程程序方便了不少。
再看c++併發編程,記一下學習筆記。ios

2. c++11 提供的相關api

3.1 wait

wait用於無條件等待,其中Predicate表示校驗條件,能夠避免假喚醒。c++

unconditional (1)   
void wait (unique_lock<mutex>& lck);
predicate (2)   
template <class Predicate>
  void wait (unique_lock<mutex>& lck, Predicate pred);

3.2 wait for

wait_for能夠指定超時時間,其中Predicate表示校驗條件,能夠避免假喚醒。編程

unconditional (1)   
template <class Rep, class Period>
  cv_status wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time);
predicate (2)   
template <class Rep, class Period, class Predicate>
       bool wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time, Predicate pred);

3. 線程安全隊列示例(生產者與消費者模型)

一個生產者向隊列中添加數據;多個消費者從隊列中讀取任務。api

#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <thread>
#include <iostream>

template<typename T>
class threadsafe_queue
{
private:
    std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue(){ }

    void push(T new_value) {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    } 

    //無限等待
    int pop(T& value) {
        std::unique_lock<std::mutex> lk(mut);
    
        // (1) 帶判斷條件的wait, 條件不知足則繼續等待; 知足則繼續後續代碼 
        data_cond.wait(lk,[this]{return !data_queue.empty();}); 

        // (2)wait喚醒後須要再次判斷, 避免假喚醒
        //while(true){
        //  data_cond.wait(lk);
        //  if (data_queue.empty())
        //      continue;
        //  else
        //      break;
        //} 
        value=data_queue.front();
        data_queue.pop();
        return 0;
    }

    //有限等待
    int pop_with_timeout(T& value, int timeout) {
        if (timeout < 0){
            return this->pop(value);
        }

        std::unique_lock<std::mutex> lk(mut);
        //帶超時, 帶判斷條件的wait
        data_cond.wait_for(lk, std::chrono::milliseconds(timeout), [this] {
                std::cout << "thread id: " << std::this_thread::get_id() << " wait..." << std::endl;
                return !data_queue.empty();}
        );
        if (!data_queue.empty()){
            value=data_queue.front();
            data_queue.pop();
            return 0;
        }
        return -1;
    }

    bool is_empty(){
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

template<typename T>
void consume(threadsafe_queue<T> &queue, bool &stop){
    while(true){
        if (stop && queue.is_empty()){  //結束條件
            break;
        }

        int job_id = 0;
        if (0 == queue.pop_with_timeout(job_id, 3)){
            std::cout << "thread id: " << std::this_thread::get_id() << ", job:" << job_id << std::endl;
        }
        std::this_thread::sleep_for (std::chrono::milliseconds(5));
    }
}

template<typename T>
void product(threadsafe_queue<T> &queue, bool &stop){
    for (auto i = 0; i < 100; ++i){
        queue.push(i);
        std::this_thread::sleep_for (std::chrono::milliseconds(1));
    }
    stop = true;    //設置結束條件
}
int main(){
    threadsafe_queue<int> my_queue;
    bool stop_flag = false;

    //生產者
    std::thread prod(product<int>, std::ref(my_queue), std::ref(stop_flag));
    //消費者
    std::vector<std::thread> cons;
    for(auto i = 0; i < 5; ++i){
        std::thread tmp = std::thread(consume<int>, std::ref(my_queue), std::ref(stop_flag));
        cons.emplace_back(std::move(tmp));
    }

    prod.join();
    for(auto &t : cons){
        t.join();
    }
    return 0;
}
相關文章
相關標籤/搜索