C++11: std::condition_variable

須要特別注意的是: std::condition_variable最好配合std::unique_lock使用!!!!!!!!ios

有時候,被不一樣線程執行的task必須彼此等待。因此對於「併發操做」來講同步化除了防止data race以外還有其餘緣由。併發

你可能會爭辯說咱們已經引入了Future容許咱們停下來直到另一個線程結束提供數據,可是這樣等待一個線程結束才能提供數據要經歷一個函數的過程呀!浪費了不少時間,並且Future對象只能get()一次結果。async

Condition Variable(條件變量)的意圖:ide

bool ready_flag;
std::mutex mutex;


//等待直到ready_flag被設置爲true;
{
   std::unique_lock<std::mutex> unique_l(mutex);
   while(!ready_flag){                         
        unique_l.unlock();                           
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        unique_l.lock(); 
      }
}

可是上面的輪詢等待條件的具有一般不是最好的作法(雖然是合理的也沒有問題的)。函數

等待中的線程消耗寶貴的CPU時間重複檢驗ready_flag,且當它鎖住mutex時候「負責設置ready_flag爲true"的那個線程會被阻塞(blocking)...並且上面的代碼讓咱們很難找到適當的sleep週期。this

#include <iostream>
#include <future>
#include <condition_variable>
#include <thread>
#include <mutex>

std::mutex mutex;
bool ready_flag = false;
std::condition_variable ready_condition;

void thread_one()
{
	std::cout << " <retun> " << std::endl;
	std::cin.get();

	{
		std::lock_guard<std::mutex> lock_g(mutex);
		ready_flag = true;
	}

	ready_condition.notify_one(); //注意這裏.
}

void thread_two()
{
	{
		std::unique_lock<std::mutex> unique_l(mutex);
		ready_condition.wait(unique_l, []()->bool { return ready_flag; });
	}

	std::cout << "done(完成)" << std::endl;
}

int main()
{
	std::future<void> result_one = std::async(std::launch::async, thread_one);
	std::future<void> result_two = std::async(std::launch::async, thread_two);

	return 0;
}

在包含必要的頭文件以後,咱們還須要三個條件以保證在線程之間通信.atom

1,一個用以表示條件真的知足了的flag(也就是此處的ready_flag).spa

2,一個mutex(也就是上面代碼中的std::mutex mutex);線程

3,一個condition variable(也就是上面代碼中的ready_condition);code

再來一個例子:

#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

std::condition_variable ready_condition;
std::mutex mutex;
std::queue<int> queue;

void provider(const int& val)
{
	std::cout << "test1" << std::endl;
	//把不一樣的值放到std::queue中去.
	for (int i = 0; i < 6; ++i) {
		{
			std::lock_guard<std::mutex> lock_g(mutex); //這裏確保了queue的讀寫是atomic(不可分割的).
			queue.push(val + i);
		}

		ready_condition.notify_one(); //每次quque中加入新元素都會喚醒一個線程輸出該元素.
		//還有注意上面這條並不在保護區內,也不須要在保護區內.雖然std::queue是的讀寫操做不能同時進行
		//可是當保護區結束的時候push()操做已經完成了.而對std::queue執行讀操做在另外線程的時候也是被上鎖的所以不可能會產生data race.

		std::this_thread::sleep_for(std::chrono::milliseconds(val));//休眠一段時間保證consumer運行.否則notify完了沒有wait的什麼事也不會發生.
		//由於咱們的consumer()在main()函數中是稍微落後一點provider()的.
	}
}

void consumer(const int& number)
{
	std::cout << "test2" << std::endl;
	while (true) {
		int val;
		{
			std::unique_lock<std::mutex> unique_l(mutex);
			ready_condition.wait(unique_l, []()->bool { return !queue.empty(); });
			val = queue.front();
			queue.pop();
		}

		std::cout << "consumer " << number << ": " << val << std::endl;
	}
}

int main()
{
	std::future<void> result_one = std::async(std::launch::async, provider, 100);
	std::future<void> result_two = std::async(std::launch::async, provider, 200);
	std::future<void> result_three = std::async(std::launch::async, provider, 300);

	std::future<void> result_four = std::async(std::launch::async, consumer, 1);
	std::future<void> result_five = std::async(std::launch::async, consumer, 2);

	return 0;
}

 

class std::condition_variable

構造函數:

condition_variable();
	
condition_variable (const condition_variable&) = delete;

構造一個std::condition_variable對象,該對象不能被copy,也不能被移動.

成員函數部分:

std::condition_variable::wait

這裏須要指出的是wait()和notify_one()或者notify_all()必須成對使用.

void wait (unique_lock<mutex>& lck);
	
template <class Predicate>
  void wait (unique_lock<mutex>& lck, Predicate pred);

該函數在解鎖他所管理的mutex的狀況下阻塞(block)正在調用wait()的線程(此時正處於阻塞狀態)直到其餘線程中的notify_one()或者notify_all()被調用,再次lock他管理的Mutex.

須要注意的是,std::condition_variable的wait()老是在已經被上鎖的mutex的基礎上執行操做的.wait()會指向下面三個操做暫時解鎖當前線程已經鎖住的mutex:

1,解鎖(unlock)mutex而後使當前線程進入等待狀態.

2,收到notify信號後解除等待狀態.

3,再次鎖住mutex,由前面std::unique_lock來決定什麼時候解.

還有一點須要注意的是:wait系列的函數均可以再接受一個callable object,用以判斷條件是否知足,同時防止處於等待狀態的線程假醒.

好比在上面的第一個例子中:

ready_condition.wait(unique_l, []()->bool { return ready_flag; });

其實至關於:

{
  std::unique_lock<std::mutex> unique(mutex);
  while(!ready_flag){
     unique.unlock();
     ready_condition.wait(unique_l);
     unique.lock();
}

 

咱們再來看一個更加詳細的例子:

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
 
std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;
 
void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cout << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cout << "...finished waiting. i == 1\n";
    done = true;
}
 
void signals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Notifying falsely...\n";
    cv.notify_one(); // waiting thread is notified with i == 0.
                     // cv.wait wakes up, checks i, and goes back to waiting
 
    std::unique_lock<std::mutex> lk(cv_m);
    i = 1;
    while (!done) 
    {
        std::cout << "Notifying true change...\n";
        lk.unlock();
        cv.notify_one(); // waiting thread is notified with i == 1, cv.wait returns
        std::this_thread::sleep_for(std::chrono::seconds(1));
        lk.lock();
    }
}
 
int main()
{
    std::thread t1(waits), t2(signals);
    t1.join(); 
    t2.join();
}

可能Output結果:

Waiting... 
Notifying falsely...
Notifying true change...
...finished waiting. i == 1

 

std::condition_variable::wait_for

template <class Rep, class Period>
  cv_status wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time);
	
template <class Rep, class Period, class Predicate>
       bool wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time, Predicate pred);

該函數在解鎖(unlock)當前線程已經上鎖(lock)mutex的狀況下阻塞(block)當前線程一段時間(duration),直到其餘線程中的notify_one()或者notify_all()被調用,或者超過了這段時間也沒有收到來自其餘線程的notify_one()(或者是notify_all())那麼該函數都會返回一個std::cv_status(稍後會介紹)類型的參數.

請注意:std::condition_variable::wait_for也是能夠接受一個callable object.其效果更wait()接受一個callable object相似只不過是在給定的時間段內.最好這麼作可以防止假醒.

std::condition_variable::wait_until

template <class Clock, class Duration>
  cv_status wait_until (unique_lock<mutex>& lck,
                        const chrono::time_point<Clock,Duration>& abs_time);

template <class Clock, class Duration, class Predicate>
       bool wait_until (unique_lock<mutex>& lck,
                        const chrono::time_point<Clock,Duration>& abs_time,
                        Predicate pred);

該函數在解鎖(unlock)當前線程已經上鎖(lock)了的mutex的狀況下阻塞(block)當前線程到某個時間點。若是其餘線程中的notify_one()(或者是notify_all())在該時間點以前到達以前被調用了那麼返回一個std::cv_status類型的對象,若是超過了該指定的時間點也沒有收到任何來自notify_one()或者notify_all()的信號也返回一個std::cv_status類型的對象.

std::condition_variable::notify_one

void notify_one() noexcept;

喚醒(weak)某個正在等待該信號的線程,若是有多個線程都在等待該信號的話具體是哪一個會被喚醒(weak)是無法指定的.若是沒有任何線程等待該型號那麼什麼事情也不作.

std::condition_variable_notify_all

void notify_all() noexcept;

喚醒(weak)全部的正在等待信號的線程,若是沒有等待者什麼都不作.

std::cv_status

enum class cv_status;
cv_status::timeout //超時的話返回這個.
cv_status::no_timeout //沒有超時.
相關文章
相關標籤/搜索