[C++11]半同步半異步線程池實現與分析

線程池介紹

服務器完成一項任務的時間可分爲:T1:建立線程或進程時間;T2:執行任務時間;T3:銷燬進程或線程時間。一般T1+T3的時間大於T2,線程池正是關注如何縮短T1和T3的時間。
線程池經過在系統中預先建立必定數量的線程,當任務請求到來時從線程池中分一個預先建立的線程去處理任務,線程在處理完任務後還能夠重用,不會銷燬,繼續等待下次任務的到開。這樣能避免大量的線程建立和銷燬操做,從而節省系統資源;同時有不少任務時,也會減小建立線程的數量。
用C++11的線程相關特性,好比線程、條件變量、互斥量,讓咱們編寫併發程序更簡單。

linux

半同步半異步線程池實現的關鍵技術分析

線程池又三層組成:
1. 同步服務層:不斷的將新任務添加到同步隊列中,能夠用多路複用或者多線程來完成。一開始沒看懂任務是什麼,其實一個函數就是一個任務,C++11經過std::function將函數封裝爲類模板對象,能夠將這些任務(函數)放到容器中保存起來,以進行添加讀取任務操做。
2. 排隊層:就是一個同步隊列,處於核心地位。全部待處理的任務都存在這裏,要保證隊列中共享數據線程安全(加鎖),還要控制任務的數量,上層服務層往隊列添加任務,下層從這裏取任務去執行。
3. 異步服務層: 預先建立好線程,來並行處理隊列中的任務。


ios

本身畫了一張圖:安全

這裏寫圖片描述

代碼實現與分析

用到了不少C++11的特性,裏面寫了不少註釋,是根據本身理解分析的。服務器

同步隊列:多線程

#include <iostream>
#include <string>
#include <stack>
#include <vector>
#include <algorithm>
#include <cstdio>
#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>

using namespace std;

template<typename T>
class Syncqueue
{
public:
    //初始化,隊列的最大元素個數,開始不終止
    Syncqueue( int maxsize ) : m_maxsize(maxsize),m_needStop(false){}

    //往隊列中添加任務,重載兩個版本,左值和右值引用
    void Put( const T& x )
    {
        Add(x);
    }
    void Put(T&& x)
    {
        Add(forward<T>(x));
    }

    //Take和Add相似
    void Take(list<T>& list)
    {
        unique_lock<mutex> locker(m_mtx);
        m_notEmpty.wait(locker, [this] { return m_needStop|| NotEmpty(); });//中止或者不空就繼續執行,不用wait

        if(m_needStop)  return ;

        //一次加鎖,一下取出隊列中的全部數據
        list  = move(m_queue);  //經過移動,將 m_queue 轉移到 list,而不是拷貝
        m_notFull.notify_one(); //喚醒線程去添加任務
    }

    //每次獲取一個數據,效率較低
    void Take(T& t)
    {
        unique_lock<mutex> locker(m_mtx);
        m_notFull.wait(locker, [this] { return m_needStop || NotEmpty(); });

        if(m_needStop)  return ;

        t = m_queue.front();  //取出一個
        m_queue.pop_front();
        m_notFull.notify_one();    
    }

    //方便讓用戶能終止任務
    void Stop()
    {
        {
            lock_guard<mutex> locker(m_mtx);
            m_needStop = true;      //將須要中止標誌 置爲 true
            //執行到這,lock_guard釋放鎖
        }
        //喚醒全部等待的線程,到if(m_needStop)時爲真,而後相繼退出
        m_notEmpty.notify_all();    //被喚醒的線程直接獲取鎖
        m_notFull.notify_all();
    }

    //判斷隊列是否爲空
    bool Empty()
    {
        lock_guard<mutex> locker(m_mtx);
        return m_queue.empty();
    }
    //判斷隊列滿了
    bool Full()
    {
        lock_guard<mutex> locker(m_mtx);
        return m_queue.size() == m_maxsize;
    }

    //隊列大小
    size_t Size()
    {
        lock_guard<mutex> locker(m_mtx);
        return m_queue.size();
    }

private:
    //隊列未滿
    bool NotFull() const
    {
        bool full = m_queue.size() >= m_maxsize;
        if(full)
            cout << "緩衝區滿了,須要等待…… " << endl;
        return !full;
    }

    //隊列不空
    bool NotEmpty() const
    {
        bool empty = m_queue.empty();
        if(empty)
        cout << "緩衝區空了,須要等待…… 異步層的線程id: " << this_thread::get_id() << endl;
        return !empty;
    }

    //範型事件函數
    template<typename F>
    void  Add(F&& x)
    {
        unique_lock<mutex> locker(m_mtx);
        m_notFull.wait(locker,[this]{ return m_needStop|| NotFull(); });  //須要中止 或者 不滿則繼續往下執行,不然wait
        if(m_needStop)  return;             //若是須要終止就 return 
        m_queue.push_back(forward<F>(x));   //不終止,把任務添加到同步隊列
        m_notEmpty.notify_one();            //提醒線程隊列不爲空,喚醒線程去取數據
    }

private:
    list<T> m_queue;    //緩衝區 用鏈表實現
    mutex m_mtx;        //互斥量 
    condition_variable m_notEmpty;  //不爲空的條件變量
    condition_variable m_notFull;   //沒有滿的條件變量

    int m_maxsize;      //同步隊列最大的size

    bool m_needStop;    //中止的標誌,開始是false
};

線程池:併發

/************************************************************************* > File Name: ThreadPool.h > Author: Tanswer > Mail: duxm@xiyoulinux.org > Created Time: 2017年08月10日 星期四 14時46分51秒 ************************************************************************/

#include <iostream>
#include <string>
#include <stack>
#include <vector>
#include <list>
#include <algorithm>
#include <cstdio>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include "Syncqueue.h"

using namespace std;

const int MaxTaskCount = 100;   //最大任務數量

class ThreadPool
{
public:

    using Task = function<void()>;  
    //任務類型,這裏是無參數無返回值,能夠修改成任何類型的範型函數模板

    //hardware_concurrency CPU核數 當默認線程數
    ThreadPool(int numThreads = thread::hardware_concurrency()) : m_queue(MaxTaskCount)
    {
        Start(numThreads);      //啓動
    }

    ~ThreadPool()
    {
        Stop();     //若是沒有中止時,則主動終止線程池
    }

    //終止線程池,銷燬池中全部線程
    void Stop()
    {
        //保證多線程狀況下只調用一次StopThreadGroup
        call_once(m_flag, [this] { StopThreadGroup(); });
    }

    //同步服務層:往同步隊列中添加任務,兩個版本
    void AddTask(Task&& task)
    {
        m_queue.Put(forward<Task>(task));    
    }
    void AddTask(const Task& task)
    {
        m_queue.Put(task);
    }

private:
    void Start( int numThreads ) //線程池開始,預先建立包含numThreads 個線程的線程組
    {
        m_running = true;

        //建立線程組
        for(int i=0; i<numThreads; i++)  
        {
            //智能指針管理,給出線程函數&ThreadPool::RunInThread 和對應參數this
            m_threadgroup.push_back( make_shared<thread>(&ThreadPool::RunInThread, this) );
        }
    }

    void RunInThread()
    {
        while(m_running)
        {
            //一次取出隊列中全部任務
            list<Task> list;
            m_queue.Take(list);

            for(auto& task : list)
            {
                if(!m_running)  //若是中止
                    return ;
                task();         //執行任務
            }
        }
    }

    //終止線程池,銷燬池中全部線程
    void StopThreadGroup()
    {
        m_queue.Stop();     //讓同步隊列中的線程中止
        m_running = false;  //讓內部線程跳出循環並退出

        for(auto thread : m_threadgroup)
        {
            if(thread)
                thread -> join();
        }
        m_threadgroup.clear();
    }

private:
    list<shared_ptr<thread>> m_threadgroup;     //處理任務的線程組,用list保存
    Syncqueue<Task> m_queue;    //同步隊列
    atomic_bool m_running;      //是否中止的標誌
    once_flag m_flag;           //call_once的參數
};

測試例子:異步

/************************************************************************* > File Name: TestThreadPool.cpp > Author: Tanswer > Mail: duxm@xiyoulinux.org > Created Time: 2017年08月10日 星期四 16時49分35秒 ************************************************************************/

#include <iostream>
#include <string>
#include <stack>
#include <vector>
#include <algorithm>
#include <cstdio>
#include <thread>
#include "ThreadPool.h"

using namespace std;

void TestThreadPool()
{
    ThreadPool pool(2);
    //線程池建立兩個線程,異步層此時無任務須要先等待
    //pool.Start(2);

    //建立兩個同步層的線程不斷往線程池中添加任務

    //在這任務很簡單,打印同步層線程ID,用lambda表達式表示,每一個線程處理10個任務
    thread thd1( [&pool]{
        for(int i=0; i<10; i++ )
        {
            auto thdId = this_thread::get_id();
            pool.AddTask( [thdId]{
                cout << "同步層線程1的線程ID: " << thdId << endl;
            } );
        }
    } );

    thread thd2( [&pool]{
        for( int i=0; i<10; i++ )
        {
            auto thdId = this_thread::get_id();
            pool.AddTask( [thdId]{
                cout << "同步層線程2的線程ID: " << thdId << endl;
            } );
        }
    } );

    this_thread::sleep_for(chrono::seconds(2));
    getchar();
    //中止線程池
    pool.Stop();

    //等待同步層的兩個線程執行完
    thd1.join();
    thd2.join();
}

int main()
{
    TestThreadPool();

    exit(EXIT_SUCCESS);
}

測試結果:函數

這裏寫圖片描述

線程池預先建立了兩個線程,線程ID分別爲: 140141544822528 和 140141536429824,開始時同步隊列是空的,尚未任務,因此兩個線程都等待。而後建立了兩個同步層線程1和2,線程ID分別爲:140141528037120 和 140141519644416,這兩個線程開始不斷往線程池同步隊列中添加任務。有了任務後,線程池異步層中的兩個線程開始處理任務,任務很簡單,就是打印同步層線程ID,異步層線程交替處理上層的任務。測試

相關文章
相關標籤/搜索