生產者 - 消費者(Producer-Consumer),也叫有限緩衝(Bounded-Buffer),是多線程同步的經典問題之一。詳見 Wikipedia。ios
代碼改寫自 Boost.Thread 自帶的示例(libs/thread/example/condition.cpp
),以「條件變量」實現同步。多線程
#include <condition_variable> #include <iostream> #include <mutex> #include <thread> #include <vector>
class BoundedBuffer { public: BoundedBuffer(const BoundedBuffer& rhs) = delete; BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete; BoundedBuffer(std::size_t size) : begin_(0), end_(0), buffered_(0), circular_buffer_(size) { } void Produce(int n) { { std::unique_lock<std::mutex> lock(mutex_); // 等待緩衝不爲滿。 not_full_cv_.wait(lock, [=] { return buffered_ < circular_buffer_.size(); }); // 插入新的元素,更新下標。 circular_buffer_[end_] = n; end_ = (end_ + 1) % circular_buffer_.size(); ++buffered_; } // 通知前,自動解鎖。 // 通知消費者。 not_empty_cv_.notify_one(); } int Consume() { std::unique_lock<std::mutex> lock(mutex_); // 等待緩衝不爲空。 not_empty_cv_.wait(lock, [=] { return buffered_ > 0; }); // 移除一個元素。 int n = circular_buffer_[begin_]; begin_ = (begin_ + 1) % circular_buffer_.size(); --buffered_; // 通知前,手動解鎖。 lock.unlock(); // 通知生產者。 not_full_cv_.notify_one(); return n; } private: std::size_t begin_; std::size_t end_; std::size_t buffered_; std::vector<int> circular_buffer_; std::condition_variable not_full_cv_; std::condition_variable not_empty_cv_; std::mutex mutex_; };
生產者與消費者線程共享的緩衝。g_io_mutex
是用來同步輸出的。this
BoundedBuffer g_buffer(2); boost::mutex g_io_mutex;
生產 100000 個元素,每 10000 個打印一次。線程
void Producer() { int n = 0; while (n < 100000) { g_buffer.Produce(n); if ((n % 10000) == 0) { std::unique_lock<std::mutex> lock(g_io_mutex); std::cout << "Produce: " << n << std::endl; } ++n; } g_buffer.Produce(-1); }
每消費到 10000 的倍數,打印一次。code
void Consumer() { std::thread::id thread_id = std::this_thread::get_id(); int n = 0; do { n = g_buffer.Consume(); if ((n % 10000) == 0) { std::unique_lock<std::mutex> lock(g_io_mutex); std::cout << "Consume: " << n << " (" << thread_id << ")" << std::endl; } } while (n != -1); // -1 表示緩衝已達末尾。 // 往緩衝裏再放一個 -1,這樣其餘消費者才能結束。 g_buffer.Produce(-1); }
一個生產者線程,三個消費者線程。ip
int main() { std::vector<std::thread> threads; threads.push_back(std::thread(&Producer)); threads.push_back(std::thread(&Consumer)); threads.push_back(std::thread(&Consumer)); threads.push_back(std::thread(&Consumer)); for (auto& t : threads) { t.join(); } return 0; }
輸出(括號中爲線程 ID):ci
Produce: 0 Consume: 0 (13c0) Produce: 10000 Consume: 10000 (15fc) Produce: 20000 Consume: 20000 (2558) Produce: 30000 Consume: 30000 (13c0) Produce: 40000 Consume: 40000 (15fc) Produce: 50000 Consume: 50000 (13c0) Produce: 60000 Consume: 60000 (15fc) Produce: 70000 Consume: 70000 (13c0) Produce: 80000 Consume: 80000 (15fc) Produce: 90000 Consume: 90000 (15fc)
考慮一個生產者和一個消費者的情形,假定緩衝的大小爲 2,來看看三個成員變量如何變化。element
buffered_ begin_ end_ 初始 0 0 0 生產 1 0 1 消費 0 1 1 消費 等待 buffered_ > 0 ... 生產 1 1 0 ...
參考:get