C++ 併發編程(五):生產者 - 消費者

生產者 - 消費者(Producer-Consumer),也叫有限緩衝(Bounded-Buffer),是多線程同步的經典問題之一。詳見 Wikipediaios

代碼改寫自 Boost.Thread 自帶的示例(libs/thread/example/condition.cpp),以「條件變量」實現同步。多線程

頭文件

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

有限緩衝類

class BoundedBuffer {
public:
  BoundedBuffer(const BoundedBuffer& rhs) = delete;
  BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete;

  BoundedBuffer(std::size_t size)
      : begin_(0), end_(0), buffered_(0), circular_buffer_(size) {
  }

  void Produce(int n) {
    {
      std::unique_lock<std::mutex> lock(mutex_);
      // 等待緩衝不爲滿。
      not_full_cv_.wait(lock, [=] { return buffered_ < circular_buffer_.size(); });

      // 插入新的元素,更新下標。
      circular_buffer_[end_] = n;
      end_ = (end_ + 1) % circular_buffer_.size();

      ++buffered_;
    }  // 通知前,自動解鎖。

    // 通知消費者。
    not_empty_cv_.notify_one();
  }

  int Consume() {
    std::unique_lock<std::mutex> lock(mutex_);
    // 等待緩衝不爲空。
    not_empty_cv_.wait(lock, [=] { return buffered_ > 0; });

    // 移除一個元素。
    int n = circular_buffer_[begin_];
    begin_ = (begin_ + 1) % circular_buffer_.size();

    --buffered_;

    // 通知前,手動解鎖。
    lock.unlock();
    // 通知生產者。
    not_full_cv_.notify_one();
    return n;
  }

private:
  std::size_t begin_;
  std::size_t end_;
  std::size_t buffered_;
  std::vector<int> circular_buffer_;
  std::condition_variable not_full_cv_;
  std::condition_variable not_empty_cv_;
  std::mutex mutex_;
};

生產者與消費者線程共享的緩衝。g_io_mutex 是用來同步輸出的。this

BoundedBuffer g_buffer(2);
boost::mutex g_io_mutex;

生產者

生產 100000 個元素,每 10000 個打印一次。線程

void Producer() {
  int n = 0;
  while (n < 100000) {
    g_buffer.Produce(n);
    if ((n % 10000) == 0) {
      std::unique_lock<std::mutex> lock(g_io_mutex);
      std::cout << "Produce: " << n << std::endl;
    }
    ++n;
  }

  g_buffer.Produce(-1);
}

消費者

每消費到 10000 的倍數,打印一次。code

void Consumer() {
  std::thread::id thread_id = std::this_thread::get_id();

  int n = 0;
  do {
    n = g_buffer.Consume();
    if ((n % 10000) == 0) {
      std::unique_lock<std::mutex> lock(g_io_mutex);
      std::cout << "Consume: " << n << " (" << thread_id << ")" << std::endl;
    }
  } while (n != -1);  // -1 表示緩衝已達末尾。

  // 往緩衝裏再放一個 -1,這樣其餘消費者才能結束。
  g_buffer.Produce(-1);
}

主程序

一個生產者線程,三個消費者線程。ip

int main() {
  std::vector<std::thread> threads;

  threads.push_back(std::thread(&Producer));
  threads.push_back(std::thread(&Consumer));
  threads.push_back(std::thread(&Consumer));
  threads.push_back(std::thread(&Consumer));

  for (auto& t : threads) {
    t.join();
  }

  return 0;
}

輸出(括號中爲線程 ID):ci

Produce: 0
Consume: 0 (13c0)
Produce: 10000
Consume: 10000 (15fc)
Produce: 20000
Consume: 20000 (2558)
Produce: 30000
Consume: 30000 (13c0)
Produce: 40000
Consume: 40000 (15fc)
Produce: 50000
Consume: 50000 (13c0)
Produce: 60000
Consume: 60000 (15fc)
Produce: 70000
Consume: 70000 (13c0)
Produce: 80000
Consume: 80000 (15fc)
Produce: 90000
Consume: 90000 (15fc)

分析

考慮一個生產者和一個消費者的情形,假定緩衝的大小爲 2,來看看三個成員變量如何變化。element

buffered_    begin_      end_
 初始           0          0          0
 生產           1          0          1
 消費           0          1          1
 消費          等待 buffered_ > 0 ...
 生產           1          1          0
 ...

參考:get

相關文章
相關標籤/搜索