c++實現簡單線程池

 

 主要結合操做系統的基本原理和c++11特性來寫ios

首先了解一下lambda表達式,利用Lambda表達式,能夠方便的定義和建立匿名函數c++

捕獲外部變量有3種方法安全

1.值捕獲多線程

int main()
{
    int a = 123;
    auto f = [a] { cout << a << endl; }; 
    a = 321;
    f(); // 輸出:123
}

 

2.引用捕獲併發

int main()
{
    int a = 123;
    auto f = [&a] { cout << a << endl; }; 
    a = 321;
    f(); // 輸出:321
}

 3.隱式捕獲異步

int main()
{
    int a = 123;
    auto f = [=] { cout << a << endl; };    // 值捕獲
    f(); // 輸出:123
}

int main()
{
    int a = 123;
    auto f = [&] { cout << a << endl; };    // 引用捕獲
    a = 321;
    f(); // 輸出:321
}

 類型尾置函數

讓編譯器在函數定義的時候知道返回類型測試

template <typename T>
auto &getItem(T begin, T end) -> decltype(*begin) {
    return *begin; // 返回序列中一個元素的引用
}

下面就介紹一下c++11的特性ui

#include <iostream>
#include <thread>
void foo() {
    std::cout << "hello world" << std::endl;
}
int main() {
    std::thread t(foo);//建立一個線程實例
    t.join();//加入一個線程
    return 0;
}

/*
std::mutex mutex 建立一個互斥量
std::lock_guard<std::mutex> lock(mutex); 對互斥量上鎖
std::unique_lock 也是上鎖,但更靈活
std::packaged_task<int()> task([](){return 7;}); 用來封裝任何能夠調用的目標,從而用於實現異步的調用,異步即主線程A想獲取某個計算結果而調用線程B
std::future<int> result = task.get_future();   用來獲取異步任務的結果
std::thread(std::move(task)).detach(); 一個線程中執行 task
std::this_thread::sleep_for 當前線程休眠一段時間,休眠期間不與其餘線程競爭CPU,根據線程需求,等待若干時間
std::condition_variable 喚醒等待線程從而避免死鎖

std::bind 將實參綁定到調用函數上
std::placeholders::_1 佔用符
std::shared_ptr 一種智能指針,它可以記錄多少個 shared_ptr 共同指向一個對象
std::make_shared 分配建立傳入參數中的對象,並返回這個對象類型的指針
std::move 將本身的參數轉換爲右值
std::forward 會把參數被綁定到一個右值的時候將其轉化爲右值
std::result_of 在編譯的時候推導出一個函數調用表達式的返回值類型
std::shared_ptr

 看一個操做系統中生產者與消費者問題this

假設存在一個緩衝區,生產者往裏面存數據,消費者從裏面取數據,若是緩衝區滿了,生產者就不能再往裏面添加數據。若是緩衝區沒有數據,消費者不能從裏面取

下面利用c++11來寫一個簡單的模型,要理解多線程併發,一個程序可能由多個線程來執行,所以程序上的順序並不一樣於多線程中的執行順序,代碼中先寫5個生產者,再寫5個消費者,可是線程中的順序並非這樣,可能先執行一個生產者,再執行一個消費者。

#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
int main()
{
// 生產者數量
std::queue<int> produced_nums;
// 互斥鎖
std::mutex m;
// 條件變量
std::condition_variable cond_var;
// 結束標誌
bool done = false;
// 通知標誌
bool notified = false;

// 生產者線程
std::thread producer([&]() {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));//當前線程休眠
// 建立互斥鎖
std::unique_lock<std::mutex> lock(m);
std::cout << "producing " << i << '\n';
produced_nums.push(i);
notified = true;
// 通知一個線程
cond_var.notify_one();
}
done = true;//生產結束
cond_var.notify_one();//通知休眠的線程執行,完成全部的進程
});

// 消費者線程
std::thread consumer([&]() {
std::unique_lock<std::mutex> lock(m);
while (!done) {
while (!notified) { // 循環避免虛假喚醒,執行完生產者的進程,notified爲true,不然停掉消費者線程
cond_var.wait(lock);//停掉當前線程

}
while (!produced_nums.empty()) {
std::cout << "consuming " << produced_nums.front() << '\n';
produced_nums.pop();
}
notified = false;
}
});
producer.join();
consumer.join();
}

 程序的結果是生產一個,消費一個,這樣就引起思考了,爲何不多是生產幾個,後消費.發如今生產線程中,先讓當前線程休眠,因此生產一個,消費一個。

 線程池類代碼

#ifndef ThreadPool_hpp
#define ThreadPool_hpp
#include <vector>               // std::vector
#include <queue>                // std::queue
#include <memory>               // std::make_shared
#include <stdexcept>            // std::runtime_error
#include <thread>               // std::thread
#include <mutex>                // std::mutex,        std::unique_lock
#include <condition_variable>   // std::condition_variable
#include <future>               // std::future,       std::packaged_task
#include <functional>           // std::function,     std::bind
#include <utility>              // std::move,         std::forward

class ThreadPool {
public:
    inline ThreadPool(size_t threads) : stop(false) { //構造函數,且把stop變量賦值爲false
        for(size_t i = 0;i<threads;++i)//創造線程實例
            workers.emplace_back([this] {//使用lambda表達式返回this
                for(;;)
                {
                    std::function<void()> task;//function函數對象類,可調用實體的一種類型安全的包裹
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);//互斥量上鎖
                        //std::cout<<"thread"<<std::this_thread::get_id()<<"begin work"<<std::endl;
                        this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });//若是線程池沒有銷燬且任務隊列爲空,返回false,該線程休眠
                        if(this->stop && this->tasks.empty())//線程池銷燬且任務隊列爲空,返回
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();

                    }
                    task();//執行任務
                }

                std::cout<<"thread"<<std::this_thread::get_id()<<"begin work"<<std::endl;
            });


    }
    inline ~ThreadPool() {
        {
            //std::unique_lock<std::mutex> lock(queue_mutex);//互斥量上鎖,避免
            stop = true;
        }
        condition.notify_all();//通知全部的休眠線程
        for(std::thread &worker: workers)
            worker.join();
    }
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)//可變參數的模板,Args 是一個模板參數包。而在後面的函數參數表中,args 則是函數參數包,用來表示零個或多個參數。
    -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;//獲取函數返回類型
        auto task = std::make_shared< std::packaged_task<return_type()>>(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<return_type> res = task->get_future();//得到 std::future 對象以供實施線程同步
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace([task]{ (*task)(); });//把任務加入隊列
        }
        condition.notify_one();//喚醒一個休眠的線程
        return res;
    }
private:
    std::vector<std::thread> workers;//線程池
    std::queue<std::function<void()>> tasks;//任務隊列
    std::mutex queue_mutex;//互斥量
    std::condition_variable condition;//條件變量
    bool stop;//結束標誌
};
#endif /* ThreadPool_hpp */

測試代碼

#include <iostream> // std::cout, std::endl
#include <vector>   // std::vector
#include <string>   // std::string
#include <future>   // std::future
#include <thread>   // std::this_thread::sleep_for
#include <chrono>   // std::chrono::seconds
#include "ThreadPool.h"

int main()
{


    // 建立一個可以併發執行四個線程的線程池
    //std::cout<<std::this_thread::get_id()<<std::endl;
    ThreadPool pool(4);
    // 建立併發執行線程的結果列表

    std::vector< std::future<std::string> > results;

    // 啓動八個須要執行的線程任務
    for(int i = 0; i < 8; ++i) {
        // 將併發執行任務的返回值添加到結果列表中
        results.emplace_back(
                // 將下面的打印任務添加到線程池中併發執行
                pool.enqueue([i] {
                    std::cout << "hello " << i << std::endl;
                    // 上一行輸出後, 該線程會等待1秒鐘
                   std::this_thread::sleep_for(std::chrono::seconds(1));
                    // 而後再繼續輸出並返回執行狀況
                    std::cout << "world " << i << std::endl;

                    return std::string("---thread ") + std::to_string(i) + std::string(" finished.---");
                })
        );
    }

    // 輸出線程任務的結果
    for(auto && result: results)
        std::cout << result.get() << '\n';
    std::cout << std::endl;


    return 0;
}
相關文章
相關標籤/搜索