目錄git
不久前寫過一篇線程池,那時候剛用C++寫東西不久,不少C++標準庫裏面的東西沒怎麼用,今天基於C++11從新實現了一個線程池。
線程池(thread pool)
:一種線程的使用模式,線程過多會帶來調度開銷,進而影響緩存局部性和總體性能。而線程池維護着多個線程,等待着監督管理者分配可併發執行的任務。這避免了在處理短期任務時建立與銷燬線程的代價。線程池不只可以保證內核的充分利用,還能防止過度調度。可用線程數量應該取決於可用的併發處理器、處理器內核、內存、網絡sockets等的數量。github
建立必定數量的線程,啓動線程,調配任務,管理着線程池。
本篇線程池目前只須要啓動(start()),中止方法(stop()),及任務添加方法(addTask).
start()建立必定數量的線程池,進行線程循環.
stop()中止全部線程循環,回收全部資源.
addTask()添加任務.緩存
線程池中線程,在線程池中等待並執行分配的任務.
本篇選用條件變量實現等待與通知機制.網絡
添加任務的接口,以供工做線程調度任務的執行。併發
用於存放沒有處理的任務。提供一種緩衝機制
同時任務隊列具備調度功能,高優先級的任務放在任務隊列前面;本篇選用priority_queue 與pair的結合用做任務優先隊列的結構.dom
假設咱們的線程池大小爲3,任務隊列目前不作大小限制.socket
此狀況下全部工做線程處於空閒的等待狀態,任務緩衝隊列爲空.oop
此狀況基於情形1,全部工做線程已處在等待狀態,主線程開始添加三個任務,添加後通知(notif())喚醒線程池中的線程開始取(take())任務執行. 此時的任務緩衝隊列仍是空。性能
此狀況發生情形2後面,全部工做線程都在工做中,主線程開始添加第四個任務,添加後發現如今線程池中的線程用完了,因而存入任務緩衝隊列。工做線程空閒後主動從任務隊列取任務執行.
測試
此狀況發生情形3且設置了任務緩衝隊列大小後面,主程序添加第N個任務,添加後發現池子中的線程用完了,任務緩衝隊列也滿了,因而進入等待狀態、等待任務緩衝隊列中的任務騰空通知。
可是要注意這種情形會阻塞主線程,本篇暫不限制任務隊列大小,必要時再來優化.
等待通知機制經過條件變量實現,Logger和CurrentThread,用於調試,能夠無視.
#ifndef _THREADPOOL_HH #define _THREADPOOL_HH #include <vector> #include <utility> #include <queue> #include <thread> #include <functional> #include <mutex> #include "Condition.hh" class ThreadPool{ public: static const int kInitThreadsSize = 3; enum taskPriorityE { level0, level1, level2, }; typedef std::function<void()> Task; typedef std::pair<taskPriorityE, Task> TaskPair; ThreadPool(); ~ThreadPool(); void start(); void stop(); void addTask(const Task&); void addTask(const TaskPair&); private: ThreadPool(const ThreadPool&);//禁止複製拷貝. const ThreadPool& operator=(const ThreadPool&); struct TaskPriorityCmp { bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2) { return p1.first > p2.first; //first的小值優先 } }; void threadLoop(); Task take(); typedef std::vector<std::thread*> Threads; typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks; Threads m_threads; Tasks m_tasks; std::mutex m_mutex; Condition m_cond; bool m_isStarted; }; #endif //Cpp #include <assert.h> #include "Logger.hh" // debug #include "CurrentThread.hh" // debug #include "ThreadPool.hh" ThreadPool::ThreadPool() :m_mutex(), m_cond(m_mutex), m_isStarted(false) { } ThreadPool::~ThreadPool() { if(m_isStarted) { stop(); } } void ThreadPool::start() { assert(m_threads.empty()); m_isStarted = true; m_threads.reserve(kInitThreadsSize); for (int i = 0; i < kInitThreadsSize; ++i) { m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this))); } } void ThreadPool::stop() { LOG_TRACE << "ThreadPool::stop() stop."; { std::unique_lock<std::mutex> lock(m_mutex); m_isStarted = false; m_cond.notifyAll(); LOG_TRACE << "ThreadPool::stop() notifyAll()."; } for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it) { (*it)->join(); delete *it; } m_threads.clear(); } void ThreadPool::threadLoop() { LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start."; while(m_isStarted) { Task task = take(); if(task) { task(); } } LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit."; } void ThreadPool::addTask(const Task& task) { std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ TaskPair taskPair(level2, task); m_tasks.push(taskPair); m_cond.notify(); } void ThreadPool::addTask(const TaskPair& taskPair) { std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ m_tasks.push(taskPair); m_cond.notify(); } ThreadPool::Task ThreadPool::take() { std::unique_lock<std::mutex> lock(m_mutex); //always use a while-loop, due to spurious wakeup while(m_tasks.empty() && m_isStarted) { LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait."; m_cond.wait(lock); } LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup."; Task task; Tasks::size_type size = m_tasks.size(); if(!m_tasks.empty() && m_isStarted) { task = m_tasks.top().second; m_tasks.pop(); assert(size - 1 == m_tasks.size()); /*if (TaskQueueSize_ > 0) { cond2.notify(); }*/ } return task; }
測試線程池基本的建立退出工做,及檢測資源是否正常回收.
int main() { { ThreadPool threadPool; threadPool.start(); getchar(); } getchar(); return 0; }
./test.out 2018-11-25 16:50:36.054805 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3680 start. 2018-11-25 16:50:36.054855 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3680 wait. 2018-11-25 16:50:36.055633 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3679 start. 2018-11-25 16:50:36.055676 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3679 wait. 2018-11-25 16:50:36.055641 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3681 start. 2018-11-25 16:50:36.055701 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3681 wait. 2018-11-25 16:50:36.055736 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3682 start. 2018-11-25 16:50:36.055746 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3682 wait. 2018-11-25 16:51:01.411792 [TRACE] [ThreadPool.cpp:36] [stop] ThreadPool::stop() stop. 2018-11-25 16:51:01.411863 [TRACE] [ThreadPool.cpp:39] [stop] ThreadPool::stop() notifyAll(). 2018-11-25 16:51:01.411877 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3680 wakeup. 2018-11-25 16:51:01.411883 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3680 exit. 2018-11-25 16:51:01.412062 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3682 wakeup. 2018-11-25 16:51:01.412110 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3682 exit. 2018-11-25 16:51:01.413052 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3679 wakeup. 2018-11-25 16:51:01.413098 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3679 exit. 2018-11-25 16:51:01.413112 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3681 wakeup. 2018-11-25 16:51:01.413141 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3681 exit.
測試添加任務接口,及優先任務隊列.
主線程首先添加了5個普通任務、 1s後添加一個高優先級任務,當前3個線程中的最早一個空閒後,會最早執行後面添加的priorityFunc().
std::mutex g_mutex; void priorityFunc() { for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); LOG_DEBUG << "priorityFunc() [" << i << "at thread [ " << CurrentThread::tid() << "] output";// << std::endl; } } void testFunc() { // loop to print character after a random period of time for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); LOG_DEBUG << "testFunc() [" << i << "] at thread [ " << CurrentThread::tid() << "] output";// << std::endl; } } int main() { ThreadPool threadPool; threadPool.start(); for(int i = 0; i < 5 ; i++) threadPool.addTask(testFunc); std::this_thread::sleep_for(std::chrono::seconds(1)); threadPool.addTask(ThreadPool::TaskPair(ThreadPool::level0, priorityFunc)); getchar(); return 0; }
./test.out 2018-11-25 18:24:20.886837 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4121 start. 2018-11-25 18:24:20.886893 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup. 2018-11-25 18:24:20.887580 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4120 start. 2018-11-25 18:24:20.887606 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup. 2018-11-25 18:24:20.887610 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4122 start. 2018-11-25 18:24:20.887620 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup. 2018-11-25 18:24:21.887779 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4120] output 2018-11-25 18:24:21.887813 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output 2018-11-25 18:24:21.888909 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output 2018-11-25 18:24:22.888049 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4120] output 2018-11-25 18:24:22.888288 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output 2018-11-25 18:24:22.889978 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output 2018-11-25 18:24:23.888467 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4120] output 2018-11-25 18:24:23.888724 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup. 2018-11-25 18:24:23.888778 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output 2018-11-25 18:24:23.888806 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup. 2018-11-25 18:24:23.890413 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output 2018-11-25 18:24:23.890437 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup. 2018-11-25 18:24:24.889247 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [1at thread [ 4120] output 2018-11-25 18:24:24.891187 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output 2018-11-25 18:24:24.893163 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output 2018-11-25 18:24:25.889567 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [2at thread [ 4120] output 2018-11-25 18:24:25.891477 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output 2018-11-25 18:24:25.893450 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output 2018-11-25 18:24:26.890295 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [3at thread [ 4120] output 2018-11-25 18:24:26.890335 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4120 wait. 2018-11-25 18:24:26.892265 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output 2018-11-25 18:24:26.892294 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4121 wait. 2018-11-25 18:24:26.894274 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output 2018-11-25 18:24:26.894299 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4122 wait. 2018-11-25 18:24:35.359003 [TRACE] [ThreadPool.cpp:37] [stop] ThreadPool::stop() stop. 2018-11-25 18:24:35.359043 [TRACE] [ThreadPool.cpp:42] [stop] ThreadPool::stop() notifyAll(). 2018-11-25 18:24:35.359061 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup. 2018-11-25 18:24:35.359067 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4120 exit. 2018-11-25 18:24:35.359080 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup. 2018-11-25 18:24:35.359090 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4122 exit. 2018-11-25 18:24:35.359123 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup. 2018-11-25 18:24:35.359130 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4121 exit.
若是有須要,能夠訪問個人GitHub進行下載
: https://github.com/BethlyRoseDaisley/ThreadPool