C++筆記--thread pool【轉】

版權聲明:轉載著名出處 https://blog.csdn.net/gcola007/article/details/78750220html

背景

剛粗略看完一遍c++ primer第五版,一直在找一些c++小項目練手,實驗樓裏面有不少項目,可是會員太貴了,學生黨就只能google+github自行搜索完成項目了。注:本文純提供本身的理解,代碼徹底照抄,有想法的歡迎評論留言一塊兒討論。ios

本文參考:

涉及到的c++11的特性:

  • std::vector
  • std::thread
  • std::mutex
  • std::future
  • std::condition_variable

線程池原理介紹

線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。線程池線程都是後臺線程。每一個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。c++

線程池的組成部分:

  • 線程池管理器(ThreadPoolManager):用於建立並管理線程池
  • 工做線程(WorkThread): 線程池中線程
  • 任務接口(Task):每一個任務必須實現的接口,以供工做線程調度任務的執行。
  • 任務隊列:用於存放沒有處理的任務。提供一種緩衝機制。

代碼

#ifndef ThreadPool_h #define ThreadPool_h #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional>

class ThreadPool { public: ThreadPool(size_t); //構造函數,size_t n 表示鏈接數
 template<class F, class... Args> auto enqueue(F&& f, Args&&... args)   //任務管道函數
    -> std::future<typename std::result_of<F(Args...)>::type>;  //利用尾置限定符 std future用來獲取異步任務的結果

    ~ThreadPool(); private: // need to keep track of threads so we can join them
    std::vector< std::thread > workers;   //追蹤線程 // the task queue
    std::queue< std::function<void()> > tasks;    //任務隊列,用於存放沒有處理的任務。提供緩衝機制 // synchronization 同步?
    std::mutex queue_mutex;   //互斥鎖
    std::condition_variable condition;   //條件變量?
    bool stop; }; // the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false) { for(size_t i = 0;i<threads;++i) workers.emplace_back( //如下爲構造一個任務,即構造一個線程
            [this] { for(;;) { std::function<void()> task;   //線程中的函數對象
                    {//大括號做用:臨時變量的生存期,即控制lock的時間
                    std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); //當stop==false&&tasks.empty(),該線程被阻塞 !this->stop&&this->tasks.empty()
                    if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); //調用函數,運行函數
 } } ); } // add new work item to the pool
template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args)  //&& 引用限定符,參數的右值引用, 此處表示參數傳入一個函數
-> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; //packaged_task是對任務的一個抽象,咱們能夠給其傳遞一個函數來完成其構造。以後將任務投遞給任何線程去完成,經過 //packaged_task.get_future()方法獲取的future來獲取任務完成後的產出值
    auto task = std::make_shared<std::packaged_task<return_type()> >(  //指向F函數的智能指針
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)  //傳遞函數進行構造
 ); //future爲指望,get_future獲取任務完成後的產出值
    std::future<return_type> res = task->get_future();   //獲取future對象,若是task的狀態不爲ready,會阻塞當前調用者
 { std::unique_lock<std::mutex> lock(queue_mutex);  //保持互斥性,避免多個線程同時運行一個任務 // don't allow enqueueing after stopping the pool
        if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); });  
//將task投遞給線程去完成,vector尾部壓入,std::packaged_task 重載了 operator(),重載後的operator()執行function。所以能夠(*task)()能夠壓入vector<function<void()>> } condition.notify_one(); //選擇一個wait狀態的線程進行喚醒,並使他得到對象上的鎖來完成任務(即其餘線程沒法訪問對象) return res; }//notify_one不能保證得到鎖的線程真正須要鎖,而且所以可能產生死鎖 // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); //通知全部wait狀態的線程競爭對象的控制權,喚醒全部線程執行 for(std::thread &worker: workers) worker.join(); //由於線程都開始競爭了,因此必定會執行完,join可等待線程執行完 } #endif /* ThreadPool_h */

 

線程池大約100行,下面是運行代碼git

#include <iostream> #include <vector> #include <chrono> #include "ThreadPool.h"

int main() { ThreadPool pool(4); std::vector< std::future<int> > results; for(int i = 0; i < 8; ++i) { results.emplace_back( pool.enqueue([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; }) ); } for(auto && result: results)    //經過future.get()獲取返回值
        std::cout << result.get() << ' '; std::cout << std::endl; return 0; }


代碼剖析

經過新建一個線程池類,以類來管理資源(《c++ effective》資源管理一章有提到)。該類包含3個公有成員函數與5個私有成員:構造函數與析構函數即知足(RAII:Resource Acquisition Is Initialization)。github

  • 構造函數接受一個size_t類型的數,表示鏈接數
  • enqueue表示線程池部分中的任務管道,是一個模板函數
  • workers是一個成員爲thread的vector,用來監視線程狀態
  • tasks表示線程池部分中的任務隊列,提供緩衝機制
  • queue_mutex表示互斥鎖
  • condition表示條件變量(互斥鎖,條件變量以及stop將在後面經過例子說明)

queue_mutex、condition與stop這三個成員讓初次接觸多線程的我很是的迷惑,互斥究竟是什麼意思?爲何須要一個bool量來控制?條件變量condition又是什麼?
不懂的能夠搜索:多線程的生產者與消費者模型
同時附上condition_variable詳解markdown

構造函數ThreadPOOL(size_t):

  • 省略了參數
  • emplace_back至關於push_back但比push_back更爲高效
  • wokers壓入了一個lambda表達式(即一個匿名函數),表示一個任務(線程),使用for的無限循環,task表示函數對象,線程池中的函數接口在enqueue傳入的參數之中,condition.wait(lock,bool),當bool爲false的時候,線程將會被堵塞掛起,被堵塞時須要notify_one來喚醒線程才能繼續執行

任務隊列函數enqueue(F&& f, Args&&… args)

  • 這類多參數模板的格式就是如此
  • -> 尾置限定符,語法就是如此,用來推斷auto類型
  • typename與class的區別
  • result_of用來獲得返回類型的對象,它有一個成員::type

析構函數~ThreadPool()

  • 經過notify_all能夠喚醒線程競爭任務的執行,從而使全部任務不被遺漏
相關文章
相關標籤/搜索