【C++併發與多線程】 8_condition_variable、wait、notify_one、notify_all

std::condition_variable

  • 條件變量是一個對象,該對象可以阻塞調用線程,直到被通知恢復;
  • 當調用其等待函數(wait,wait_for,wait_until)之一時,它使用 unique_lock (經過互斥鎖)來鎖定線程,該線程將保持阻塞狀態,直到被另外一個同在 condition_variable 對象上調用通知功能的線程喚醒爲止;
  • condition_variable 類型的對象始終使用 unique_lock<mutex> 等待(有關可與任何類型的可鎖定類型一塊兒使用的替代方法,可參見 condition_variable_any)。
// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << '\n';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

輸出:ios

10 threads ready to race...
thread 5
thread 6
thread 0
thread 9
thread 4
thread 1
thread 3
thread 2
thread 8
thread 7

std::condition_variable::notify_one

void notify_one() noexcept;併發

  • 解鎖當前正在等待此條件的其中一個線程;
  • 若是沒有線程在等待,則該函數將不執行任何操做(不產生任何影響);
  • 若是超過一個線程在等待,則未指定選擇哪一個線程。
// condition_variable::notify_one
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable produce,consume;

int cargo = 0;     // shared value by producers and consumers

void consumer () {
  std::unique_lock<std::mutex> lck(mtx);
  while (cargo==0) consume.wait(lck);
  std::cout << cargo << '\n';
  cargo=0;
  produce.notify_one();
}

void producer (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (cargo!=0) produce.wait(lck);
  cargo = id;
  consume.notify_one();
}

int main ()
{
  std::thread consumers[10],producers[10];
  // spawn 10 consumers and 10 producers:
  for (int i=0; i<10; ++i) {
    consumers[i] = std::thread(consumer);
    producers[i] = std::thread(producer,i+1);
  }

  // join them back:
  for (int i=0; i<10; ++i) {
    producers[i].join();
    consumers[i].join();
  }

  return 0;
}

輸出:函數

1
2
3
4
5
7
6
9
10
8

std::condition_variable::notify_all

void notify_all() noexcept;this

  • 解鎖當前正在等待此條件的全部線程;
  • 若是沒有線程在等待,則該函數不執行任何操做(不產生任何影響)。
// condition_variable::notify_all
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << '\n';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

輸出:atom

10 threads ready to race...
thread 7
thread 4
thread 3
thread 2
thread 0
thread 1
thread 6
thread 5
thread 8
thread 9

std::condition_variable::wait

unconditional (1)
void wait (unique_lock<mutex>& lck);

predicate (2)
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);
  • 當前線程(應以鎖定lck的互斥對象)的執行被阻塞,直到獲得通知;
  • 在阻塞線程的時刻,該函數自動調用 lck.unlock(), 從而容許其它鎖定的線程繼續執行;
  • 一旦獲得通知(明確的由其它線程通知),該函數將取消阻塞並調用 lck.lock(), 使 lck 處於與調用該函數時相同的狀態。而後函數返回(注意,最後一次互斥鎖可能會在返回以前再次阻塞線程);
  • 一般,經過另外一個線程對成員 notify_one 或 notify_all 的調用來通知該函數喚醒。可是某些實現可能會產生虛假的喚醒調用,而不會調用這些函數中的任何一個。所以,使用此功能的用戶應確保知足其恢復條件;
  • 若是指定了 pred(2), 則該函數僅在 pred 返回 false 時調用 wait 纔會阻塞當前線程,而且通知只能在線程變爲 true 時才取消阻塞線程(這對檢查虛假喚醒調用特別有用)spa

    • 此版本 (2) 的行爲就像是實現爲:while (!pred()) wait(lck);
參數說明
  • lck線程

    • 一個 unique_lock 對象,其互斥對象當前已被該線程鎖定;
    • 該對象的全部等待成員函數的全部併發調用均應使用相同的基礎互斥對象(由 lck.mutex()返回)。
  • predcode

    • 可調用的對象或函數,不帶任何參數,並返回能夠做爲 bool 值評估的值;

    反覆調用它,直到評估值爲 true。對象

// condition_variable::wait (with predicate)
#include <iostream>           // std::cout
#include <thread>             // std::thread, std::this_thread::yield
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available() {return cargo!=0;}

void consume (int n) {
  for (int i=0; i<n; ++i) {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck,shipment_available);
    // consume:
    std::cout << cargo << '\n';
    cargo=0;
  }
}

int main ()
{
  std::thread consumer_thread (consume,10);

  // produce 10 items when needed:
  for (int i=0; i<10; ++i) {
    while (shipment_available()) std::this_thread::yield();
    std::unique_lock<std::mutex> lck(mtx);
    cargo = i+1;
    cv.notify_one();
  }

  consumer_thread.join();

  return 0;
}

輸出:ip

1
2
3
4
5
6
7
8
9
10

std::condition_variable::wait_for

unconditional (1)
template <class Rep, class Period>
  cv_status wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time);

predicate (2)
template <class Rep, class Period, class Predicate>
       bool wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time, Predicate pred);
  • 與 std:condition_variable::wait()相似,不過 wait_for 能夠指定一個時間段,在當前線程收到通知或者指定的時間 rel_time 超時以前,該線程都會處於阻塞狀態。而一旦超時或者收到了其它線程的通知, wait_for 返回,剩下的處理步驟和 wait() 相似;
  • wait_for 的重載版本 predicate (2) 的最後一個參數 pred 表示 wait_for 的預測條件,只有當 pred 條件爲 false 時調用 wait() 纔會阻塞當前線程,而且收到其它線程通知後只有當 pred 爲 true 時纔會解除阻塞,所以至關於以下代碼:

return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));

// condition_variable::wait_for example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status

std::condition_variable cv;

int value;

void read_value() {
  std::cin >> value;
  cv.notify_one();
}

int main ()
{
  std::cout << "Please, enter an integer (I'll be printing dots): \n";
  std::thread th (read_value);

  std::mutex mtx;
  std::unique_lock<std::mutex> lck(mtx);
  while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) {
    std::cout << '.' << std::endl;
  }
  std::cout << "You entered: " << value << '\n';

  th.join();

  return 0;
}

輸出:

Please, enter an integer (I'll be priniting dots):
.
.
2
You entered: 2

std::condition_variable::wait_until

unconditional (1)
template <class Clock, class Duration>
  cv_status wait_until (unique_lock<mutex>& lck,
                        const chrono::time_point<Clock,Duration>& abs_time);

predicate (2)
template <class Clock, class Duration, class Predicate>
       bool wait_until (unique_lock<mutex>& lck,
                        const chrono::time_point<Clock,Duration>& abs_time,
                        Predicate pred);
  • 與 std::condition_variable::wait_for 相似,可是 wait_until 能夠指定一個時間點,在當前線程收到通知或者指定的時間點 abs_time 超時以前,該線程都會處於阻塞狀態。而一旦超時或者收到了其餘線程的通知,wait_until 返回,剩下的處理步驟和 wait_until() 相似;
  • 另外,wait_until 的重載版本predicte(2)的最後一個參數 pred 表示 wait_until 的預測條件,只有當 pred 條件爲 false 時調用 wait() 纔會阻塞當前線程,而且在收到其餘線程的通知後只有當 pred 爲 true 時纔會被解除阻塞,所以至關於以下代碼:
while (!pred())
  if ( wait_until(lck,abs_time) == cv_status::timeout)
    return pred();
return true;
參數:abs_time
  • 線程將中止阻塞的時間點,以容許函數返回;
  • time_point是表明特定絕對時間的對象。
#include <iostream>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
 
std::condition_variable cv;
std::mutex cv_m;
std::atomic<int> i{0};
 
void waits(int idx)
{
    std::unique_lock<std::mutex> lk(cv_m);
    auto now = std::chrono::system_clock::now();
    if(cv.wait_until(lk, now + idx*100ms, [](){return i == 1;}))
        std::cerr << "Thread " << idx << " finished waiting. i == " << i << '\n';
    else
        std::cerr << "Thread " << idx << " timed out. i == " << i << '\n';
}
 
void signals()
{
    std::this_thread::sleep_for(120ms);
    std::cerr << "Notifying...\n";
    cv.notify_all();
    std::this_thread::sleep_for(100ms);
    i = 1;
    std::cerr << "Notifying again...\n";
    cv.notify_all();
}
 
int main()
{
    std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
    t1.join(); 
    t2.join();
    t3.join();
    t4.join();
}

輸出:

Thread 1 timed out. i == 0
Notifying...
Thread 2 timed out. i == 0
Notifying again...
Thread 3 finished waiting. i == 1
相關文章
相關標籤/搜索