前面八章介紹了 C++11 併發編程的基礎(抱歉哈,第五章-第八章還在草稿中),本文將綜合運用 C++11 中的新的基礎設施(主要是多線程、鎖、條件變量)來闡述一個經典問題——生產者消費者模型,並給出完整的解決方案。ios
生產者消費者問題是多線程併發中一個很是經典的問題,相信學過操做系統課程的同窗都清楚這個問題的根源。本文將就四種狀況分析並介紹生產者和消費者問題,它們分別是:單生產者-單消費者模型,單生產者-多消費者模型,多生產者-單消費者模型,多生產者-多消費者模型,我會給出四種狀況下的 C++11 併發解決方案,若是文中出現了錯誤或者你對代碼有異議,歡迎交流 ;-)。git
顧名思義,單生產者-單消費者模型中只有一個生產者和一個消費者,生產者不停地往產品庫中放入產品,消費者則從產品庫中取走產品,產品庫容積有限制,只能容納必定數目的產品,若是生產者生產產品的速度過快,則須要等待消費者取走產品以後,產品庫不爲空才能繼續往產品庫中放置新的產品,相反,若是消費者取走產品的速度過快,則可能面臨產品庫中沒有產品可以使用的狀況,此時須要等待生產者放入一個產品後,消費者才能繼續工做。C++11實現單生產者單消費者模型的代碼以下:github
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int kItemRepositorySize = 10; // Item buffer size. static const int kItemsToProduce = 1000; // How many items we plan to produce. struct ItemRepository { int item_buffer[kItemRepositorySize]; // 產品緩衝區, 配合 read_position 和 write_position 模型環形隊列. size_t read_position; // 消費者讀取產品位置. size_t write_position; // 生產者寫入產品位置. std::mutex mtx; // 互斥量,保護產品緩衝區 std::condition_variable repo_not_full; // 條件變量, 指示產品緩衝區不爲滿. std::condition_variable repo_not_empty; // 條件變量, 指示產品緩衝區不爲空. } gItemRepository; // 產品庫全局變量, 生產者和消費者操做該變量. typedef struct ItemRepository ItemRepository; void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while(((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); // 生產者等待"產品庫緩衝區不爲滿"這一條件發生. } (ir->item_buffer)[ir->write_position] = item; // 寫入產品. (ir->write_position)++; // 寫入位置後移. if (ir->write_position == kItemRepositorySize) // 寫入位置如果在隊列最後則從新設置爲初始位置. ir->write_position = 0; (ir->repo_not_empty).notify_all(); // 通知消費者產品庫不爲空. lock.unlock(); // 解鎖. } int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while(ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); // 消費者等待"產品庫緩衝區不爲空"這一條件發生. } data = (ir->item_buffer)[ir->read_position]; // 讀取某一產品 (ir->read_position)++; // 讀取位置後移 if (ir->read_position >= kItemRepositorySize) // 讀取位置若移到最後,則從新置位. ir->read_position = 0; (ir->repo_not_full).notify_all(); // 通知消費者產品庫不爲滿. lock.unlock(); // 解鎖. return data; // 返回產品. } void ProducerTask() // 生產者任務 { for (int i = 1; i <= kItemsToProduce; ++i) { // sleep(1); std::cout << "Produce the " << i << "^th item..." << std::endl; ProduceItem(&gItemRepository, i); // 循環生產 kItemsToProduce 個產品. } } void ConsumerTask() // 消費者任務 { static int cnt = 0; while(1) { sleep(1); int item = ConsumeItem(&gItemRepository); // 消費一個產品. std::cout << "Consume the " << item << "^th item" << std::endl; if (++cnt == kItemsToProduce) break; // 若是產品消費個數爲 kItemsToProduce, 則退出. } } void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; // 初始化產品寫入位置. ir->read_position = 0; // 初始化產品讀取位置. } int main() { InitItemRepository(&gItemRepository); std::thread producer(ProducerTask); // 建立生產者線程. std::thread consumer(ConsumerTask); // 建立消費之線程. producer.join(); consumer.join(); }
與單生產者和單消費者模型不一樣的是,單生產者-多消費者模型中能夠容許多個消費者同時從產品庫中取走產品。因此除了保護產品庫在多個讀寫線程下互斥以外,還須要維護消費者取走產品的計數器,代碼以下:編程
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int kItemRepositorySize = 4; // Item buffer size. static const int kItemsToProduce = 10; // How many items we plan to produce. struct ItemRepository { int item_buffer[kItemRepositorySize]; size_t read_position; size_t write_position; size_t item_counter; std::mutex mtx; std::mutex item_counter_mtx; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; } gItemRepository; typedef struct ItemRepository ItemRepository; void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while(((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); } (ir->item_buffer)[ir->write_position] = item; (ir->write_position)++; if (ir->write_position == kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock(); } int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while(ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); } data = (ir->item_buffer)[ir->read_position]; (ir->read_position)++; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data; } void ProducerTask() { for (int i = 1; i <= kItemsToProduce; ++i) { // sleep(1); std::cout << "Producer thread " << std::this_thread::get_id() << " producing the " << i << "^th item..." << std::endl; ProduceItem(&gItemRepository, i); } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void ConsumerTask() { bool ready_to_exit = false; while(1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx); if (gItemRepository.item_counter < kItemsToProduce) { int item = ConsumeItem(&gItemRepository); ++(gItemRepository.item_counter); std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item" << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) break; } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0; } int main() { InitItemRepository(&gItemRepository); std::thread producer(ProducerTask); std::thread consumer1(ConsumerTask); std::thread consumer2(ConsumerTask); std::thread consumer3(ConsumerTask); std::thread consumer4(ConsumerTask); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); }
與單生產者和單消費者模型不一樣的是,多生產者-單消費者模型中能夠容許多個生產者同時向產品庫中放入產品。因此除了保護產品庫在多個讀寫線程下互斥以外,還須要維護生產者放入產品的計數器,代碼以下:多線程
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int kItemRepositorySize = 4; // Item buffer size. static const int kItemsToProduce = 10; // How many items we plan to produce. struct ItemRepository { int item_buffer[kItemRepositorySize]; size_t read_position; size_t write_position; size_t item_counter; std::mutex mtx; std::mutex item_counter_mtx; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; } gItemRepository; typedef struct ItemRepository ItemRepository; void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while(((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); } (ir->item_buffer)[ir->write_position] = item; (ir->write_position)++; if (ir->write_position == kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock(); } int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while(ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); } data = (ir->item_buffer)[ir->read_position]; (ir->read_position)++; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data; } void ProducerTask() { bool ready_to_exit = false; while(1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx); if (gItemRepository.item_counter < kItemsToProduce) { ++(gItemRepository.item_counter); ProduceItem(&gItemRepository, gItemRepository.item_counter); std::cout << "Producer thread " << std::this_thread::get_id() << " is producing the " << gItemRepository.item_counter << "^th item" << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) break; } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void ConsumerTask() { static int item_consumed = 0; while(1) { sleep(1); ++item_consumed; if (item_consumed <= kItemsToProduce) { int item = ConsumeItem(&gItemRepository); std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item" << std::endl; } else break; } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0; } int main() { InitItemRepository(&gItemRepository); std::thread producer1(ProducerTask); std::thread producer2(ProducerTask); std::thread producer3(ProducerTask); std::thread producer4(ProducerTask); std::thread consumer(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join(); consumer.join(); }
該模型能夠說是前面兩種模型的綜合,程序須要維護兩個計數器,分別是生產者已生產產品的數目和消費者已取走產品的數目。另外也須要保護產品庫在多個生產者和多個消費者互斥地訪問。併發
代碼以下:app
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int kItemRepositorySize = 4; // Item buffer size. static const int kItemsToProduce = 10; // How many items we plan to produce. struct ItemRepository { int item_buffer[kItemRepositorySize]; size_t read_position; size_t write_position; size_t produced_item_counter; size_t consumed_item_counter; std::mutex mtx; std::mutex produced_item_counter_mtx; std::mutex consumed_item_counter_mtx; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; } gItemRepository; typedef struct ItemRepository ItemRepository; void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while(((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); } (ir->item_buffer)[ir->write_position] = item; (ir->write_position)++; if (ir->write_position == kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock(); } int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while(ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); } data = (ir->item_buffer)[ir->read_position]; (ir->read_position)++; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data; } void ProducerTask() { bool ready_to_exit = false; while(1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx); if (gItemRepository.produced_item_counter < kItemsToProduce) { ++(gItemRepository.produced_item_counter); ProduceItem(&gItemRepository, gItemRepository.produced_item_counter); std::cout << "Producer thread " << std::this_thread::get_id() << " is producing the " << gItemRepository.produced_item_counter << "^th item" << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) break; } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void ConsumerTask() { bool ready_to_exit = false; while(1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx); if (gItemRepository.consumed_item_counter < kItemsToProduce) { int item = ConsumeItem(&gItemRepository); ++(gItemRepository.consumed_item_counter); std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item" << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) break; } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0; ir->produced_item_counter = 0; ir->consumed_item_counter = 0; } int main() { InitItemRepository(&gItemRepository); std::thread producer1(ProducerTask); std::thread producer2(ProducerTask); std::thread producer3(ProducerTask); std::thread producer4(ProducerTask); std::thread consumer1(ConsumerTask); std::thread consumer2(ConsumerTask); std::thread consumer3(ConsumerTask); std::thread consumer4(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); }
另外,全部例子的代碼(包括前面一些指南的代碼均放在github上),但願對你們學習 C++11 多線程併發有所幫助。學習