基於c++11 thread的半同步半異步線程池

c++11標準給廣大c++程序員提供了很是強大的武器庫。今天咱們就利用c++11中的std::thread 實現一個半同步半異步的線程池。ios

首先簡單介紹一下線程池。c++

線程池是爲了解決大量的短時任務頻繁的建立銷燬線程形成性能開銷的一個技術。在程序開始時建立必定量的線程。當有任務到達時,從池中選出空閒線程來處理任務,當任務處理完成再將線程返回到池中。程序員

線程池實現有兩種模型:1.半同步半異步模型。2.領導者跟隨者模型。異步

半同步半異步模型原理圖:函數

原理性能

同步層:經過線程將處理任務添加到隊列中,這個過程是同步執行的。this

隊列層:全部的任務都會放到這裏,上層聽任務到這裏,下層從這裏取任務。atom

異步層:事先建立好線程,讓線程不斷的去處理隊列層的任務。上層不關心這些,只負責把任務放入隊列,全部對上層來講這裏是異步的。spa

 

代碼實現:線程

任務隊列:TaskQueue.h

#pragma once

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

//隊列層
template<typename T>
class TaskQueue
{
public:
    TaskQueue(int maxsize) : m_maxSize(maxsize), m_needStop(false) {}
    ~TaskQueue() {}

    //添加事件,左值拷貝,右值移動
    void Put(const T& x)
    {
        Add(x);
    }
    void Put(T && x)
    {
        Add(x);
    }

    //從隊列中取事件,取全部事件 
    void Take(std::list<T> &list)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        //知足條件則喚醒 不知足就阻塞
        m_notEmpty.wait(locker, [this](){
            return m_needStop || NotEmpty();
        });
        if (m_needStop)
        {
            return;
        }
        list = std::move(m_queue);
        //喚醒其餘阻塞的線程
        m_notFull.notify_one();
    }

    void Take(T &t)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this](){
            return m_needStop || NotEmpty();
        });
        if (m_needStop)
        {
            return;
        }

        t = m_queue.front();
        m_queue.pop_front();
        m_notFull.notify_one();
        t();
    }

    //中止全部線程在隊列中讀取
    void Stop()
    {
        {
            std::lock_guard<std::mutex> locker(m_mutex);
            m_needStop = true;
        }
        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    //隊列爲空
    bool Empty()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.empty();
    }

    //隊列爲滿 
    bool Full()
    {
        std::try_guard<std::mutex> locker(m_mutex);
        return m_queue.size() == m_maxSize;
    }

    //隊列大小
    size_t Size()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size();
    }

private:
    //往隊列裏添加事件,事件是泛型的,經過std::function封裝爲對象
    template<typename F>
    void Add(F &&x)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notFull.wait(locker, [this](){
            return m_needStop || NotFull();
        });
        if (m_needStop)
        {
            return;
        }
        m_queue.push_back(std::forward<F>(x));
        m_notEmpty.notify_one();
    }

    //隊列未滿
    bool NotFull() const
    {
        bool full = m_queue.size() >= m_maxSize;
        if (full)
        {
            std::cout << "queue is full" << std::endl;
        }
        return !full;
    }

    //隊列未空
    bool NotEmpty() const
    {
        bool empty = m_queue.empty();
        if (empty)
        {
            std::cout << "queue is empty...wait" << std::endl;
            std::cout << "thread id:" << std::this_thread::get_id() << std::endl;
        }
        return !empty;
    }


private:
    std::mutex m_mutex;   // 互斥鎖
    std::list<T> m_queue; //任務隊列
    std::condition_variable m_notEmpty; //隊列不爲空條件變量
    std::condition_variable m_notFull;  //隊列不爲滿條件變量
    int m_maxSize; //任務最大長度
    bool m_needStop; //終止標識
};

線程池:ThreadPool.h

#pragma once
#include <functional>
#include <thread>
#include <memory>
#include <atomic>
#include "TaskQueue.h"

const int MAX_TASK_COUNT = 100;

class ThreadPool
{
public:
    //規定任務類型爲void(),咱們能夠經過c++11不定參數模板來實現一個可接受任何參數的範式函數模板,
    //這樣就是一個能夠接收任何任務的任務隊列了
    using Task = std::function<void()>;
    //hardware_concurrency 檢測硬件性能,給出默認線程數
    ThreadPool(int numThreads = std::thread::hardware_concurrency());
    //銷燬線程池
    ~ThreadPool();

    //終止全部線程,call_once保證函數只執行一次
    void Stop();

    //添加任務,普通版本
    void AddTask(const Task &task);

    //添加任務,右值引用版本
    void AddTask(Task && task);

private:
    //中止線程池
    void StopThreadGroup();
    void Start(int numThreads);
    //一次取出全部事件
    void RunInThreadList();
    //一次取出一個事件
    void RunInThread();

private:
    //線程池
    std::list<std::shared_ptr<std::thread>> m_threadGroup;
    //任務隊列
    TaskQueue<Task> m_queue;
    //原子布爾值
    std::atomic_bool m_running;
    //輔助變量->call_once
    std::once_flag m_flag;
};

ThreadPool.cpp

#include "stdafx.h"
#include "ThreadPool.h"


ThreadPool::ThreadPool(int numThreads/* = std::thread::hardware_concurrency()*/)
: m_queue(MAX_TASK_COUNT)
{
    Start(numThreads);
}

//銷燬線程池
ThreadPool::~ThreadPool()
{
    Stop();
}

//終止全部線程,call_once保證函數只執行一次
void ThreadPool::Stop()
{
    std::call_once(m_flag, [this](){ StopThreadGroup(); });
}

//添加任務,普通版本
void ThreadPool::AddTask(const Task &task)
{
    m_queue.Put(task);
}

//添加任務,右值引用版本
void ThreadPool::AddTask(Task && task)
{
    m_queue.Put(std::forward<Task>(task));
}

//中止線程池
void ThreadPool::StopThreadGroup()
{
    m_queue.Stop();
    m_running = false;
    for (auto thread : m_threadGroup)
    {
        if (thread)
        {
            thread->join();
        }
    }
    m_threadGroup.clear();
}

void ThreadPool::Start(int numThreads)
{
    m_running = true;
    for (int i = 0; i < numThreads; ++i)
    {
        //智能指針管理,並給出構建線程的參數,線程調用函數和參數
        std::cout << "create thread pool " << i << std::endl;
        m_threadGroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
    }

}

//一次取出全部事件
void ThreadPool::RunInThreadList()
{
    while (m_running)
    {
        std::list<Task> list;
        std::cout << "take" << std::endl;
        m_queue.Take(list);
        for (auto &task : list)
        {
            if (!m_running)
            {
                return;
            }
            task();
        }
    }
}

//一次取出一個事件
void ThreadPool::RunInThread()
{
    std::cout << m_queue.Size() << std::endl;
    while (m_running)
    {
        Task task;
        if (!m_running)
        {
            return;
        }
        m_queue.Take(task);
    }
}

main.cpp

#include "stdafx.h"
#include "ThreadPool.h"
#include <chrono>



int _tmain(int argc, _TCHAR* argv[])
{
    ThreadPool pool;
    //建立線程向任務隊列添加任務
    std::thread thd1([&pool](){
        for (int i = 0; i < 10; i++)
        {
            auto thid = std::this_thread::get_id();
            pool.AddTask([thid, i](){
         std::cout << "ThreadID:" << thid << " Task " << i << " Done!" << std::endl;
         std::this_thread::sleep_for(std::chrono::milliseconds(500));
     });
 } });
 thd1.join(); pool.Stop(); system("pause"); return 0; }

結果:

TheadID:5779 Task 1 Done!

TheadID:5779 Task 2 Done!

TheadID:5779 Task 3 Done!

TheadID:5780 Task 4 Done!

TheadID:5779 Task 5 Done!

TheadID:5779 Task 6 Done!

TheadID:5779 Task 7 Done!

TheadID:5779 Task 8 Done!\

TheadID:5779 Task 9 Done!

 

end;

相關文章
相關標籤/搜索