這是我翻譯的codeproject上面的一篇文章,題目是:線程池windows
一 簡介安全
個人應用中,須要將數據備份到不一樣的地方(CD,USB,網絡共享)。若是我只用單線程來作這件事情,須要花2個小時以上。而如何改善性能對我來講是一個挑戰,最終我用多線程來實現了這個問題,結果很是可觀,只用了30分鐘就完成了個人工做。網絡
其實在剛開始開發時,備份耗費了一成天,後來我被迫增長頻率到1個小時。如今我發現了一個問題:實際中因爲頻率太低致使線程頻繁的重複建立和銷燬。怎麼解決這個問題?爲何線程不能被重複利用呢?因此這就是線程池的由來。多線程
若是你的回答是確定的,那麼線程池很是適合你。你的程序會所以具備低耦合性而且易於實現。app
二 背景異步
做爲一名碼農,你應該至少有過一次使用單線程的經歷,我也無例外。如今我須要建立一個線程池並用C++實現,因而我Google並下載了不少例子,可是它們都不湊效,雖然例子不少,可是沒有適合我須要的,因此我寫下了這邊文章。ide
我爲何須要一個線程次?通常大多數的IO操做(好比文件,磁盤等)比較耗時,因此用單線程的話,不少系統資源(好比內存,處理器等)會處於等待狀態,這顯然是一種浪費。這時用多線程的話就能有效利用空閒時間,提升內存和處理器利用率。函數
線程次的建立post
線程池會建立指定數量的線程並等待調用請求。一旦收到一個線程使用請求,它會激活一個線程並開始執行。等執行完成之後,這個線程會變成等待狀態,而後等待下一次的請求。若是要求請求清除,全部的線程將會退出線程池。 性能
關係圖以下所示:
下面是其中的類的介紹:
這個類用於建立,管理和銷燬線程池。在使用時,你能夠建立一個ThreadPool的對象,並能夠指定須要建立的線程數量。這裏的線程池最大支持64個線程,它也能夠減小至最小線程數以免過分線程切換和系統資源消耗。
這個類表明了對線程池的請求操做。客戶端應用應該繼承這個類,並重載其Execute()函數。而且你須要利用其提供的臨界區對象來保證線程安全。取消操做能夠經過IsAborted()函數來判斷。把繼承的抽象請求發送到線程池讓其處理,也能夠經過Abort()取消。
線程池有錯誤信息和log記錄功能。默認的log輸出是在調試窗口。你也能夠建立一個繼承Logger的自定義的類並重載這個輸出,並實現其中的LogError()和LogInfo()函數。而後將Logger類實例在線程池建立的時候傳遞給它就能夠了。
下面這張圖說明了如何在你的應用中使用線程池?
應用程序中有一個ThreadPool的實例,當接收到建立請求時,這個類會建立指定數量的線程,每一個線程都將等待被調用。一旦接收到請求,其中的一個線程將結束等待狀態並開始處理請求。等處理完成之後,線程恢復到等待狀態。用戶在任什麼時候候都能調用AbstractRequest的Abort()來停止請求過程。線程池在處理完請求後不會刪除它。
三 示例
// Create thread pool with specified number of threads. bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL );
這個函數將建立指定數量的線程並進入等待狀態。若是指定了Logger,它將會被用來處理錯誤。這個函數失敗時返回false,最大數量的線程數是64
// Destroy the existing thread pool. bool Destroy();
這個函數停止全部的請求並銷燬線程池。失敗時返回false。
// Post request to thread pool for processing. bool PostRequest( AbstractRequest* pRequest_io );
這個函數發送指定的請求到線程池,失敗時返回false
依賴關係
ThreadPool.h 包含了windows.h, list.h (STL) 和 string.h (STL)
如何在你的應用中使用線程池?
ThreadPoolDemo
是一個應用ThreadPool類的demo。這個線程池針對Windows操做系統,也能夠移植到Linux和IOS平臺。能夠在本文最下面提供的下載連接獲取。
這裏是ThreadPool的代碼:
1 /** 2 * @author : Suresh 3 */ 4 5 #ifndef _THREAD_POOL_MGR_H_ 6 #define _THREAD_POOL_MGR_H_ 7 8 #include <windows.h> 9 #include <list> 10 11 namespace TP 12 { 13 14 /** 15 * Logger - This is base class for the error logger class and it is polymorphic. 16 * The users of the ThreadPool create a class which derived from this 17 * and override LogError() and LogInfo() for their own error logging mechanism. 18 * The default error logging will be in output window. 19 */ 20 class Logger 21 { 22 23 public: 24 25 // Constructor 26 Logger(){}; 27 // Destructor 28 virtual ~Logger(){}; 29 // Log error description. 30 void LogError( const long lActiveReq_i, const std::wstring& wstrError_i ); 31 // Log information. 32 void LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i ); 33 // Override this function to log errors. Default log will be in output window. 34 virtual void LogError( const std::wstring& wstrError_i ); 35 // Override this function to log informations. Default log will be in output window. 36 virtual void LogInfo( const std::wstring& wstrInfo_i ); 37 38 private: 39 40 // Log thread ID, Active thread count and last error. 41 void PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io ); 42 }; 43 44 /** 45 * SyncObject - The class is a wrapper of Critical section object to provide 46 * synchronization for thread pool. 47 */ 48 class SyncObject 49 { 50 51 public: 52 // Constructor 53 SyncObject() 54 { 55 ::InitializeCriticalSection( &m_stCriticalSection ); 56 } 57 58 // Destructor 59 ~SyncObject() 60 { 61 ::DeleteCriticalSection( &m_stCriticalSection ); 62 } 63 64 // Lock critical section. 65 bool Lock() 66 { 67 ::EnterCriticalSection( &m_stCriticalSection ); 68 return true; 69 } 70 71 // Unlock critical section. 72 bool Unlock() 73 { 74 ::LeaveCriticalSection( &m_stCriticalSection ); 75 return true; 76 } 77 78 private: 79 SyncObject( const SyncObject& ); 80 SyncObject& operator = ( const SyncObject& ); 81 82 private: 83 84 // Critical section object. 85 CRITICAL_SECTION m_stCriticalSection; 86 }; 87 88 /** 89 * AutoLock - This class own synchronization object during construction and 90 * release the ownership during the destruction. 91 */ 92 class AutoLock 93 { 94 95 public: 96 97 /** 98 * Parameterized constructor 99 * 100 * @param LockObj_i - Synchronization object. 101 * @return Nil 102 * @exception Nil 103 * @see Nil 104 * @since 1.0 105 */ 106 AutoLock( SyncObject& LockObj_i ) : m_pSyncObject( &LockObj_i ) 107 { 108 if( NULL != m_pSyncObject ) 109 { 110 m_pSyncObject->Lock(); 111 } 112 } 113 114 /** 115 * Destructor. 116 * 117 * @param Nil 118 * @return Nil 119 * @exception Nil 120 * @see Nil 121 * @since 1.0 122 */ 123 ~AutoLock() 124 { 125 if( NULL != m_pSyncObject ) 126 { 127 m_pSyncObject->Unlock(); 128 m_pSyncObject = NULL; 129 } 130 } 131 132 private: 133 SyncObject* m_pSyncObject; 134 }; 135 136 137 /** 138 * AbstractRequest - This is abstract base class for the request to be processed in thread pool. 139 * and it is polymorphic. The users of the ThreadPool must create a class 140 * which derived from this and override Execute() function. 141 */ 142 class AbstractRequest 143 { 144 145 public: 146 // Constructor 147 AbstractRequest() : m_bAborted( false ), m_usRequestID( 0u ){} 148 // Destructor 149 virtual ~AbstractRequest(){} 150 // Thread procedure to be override in derived class. This function should return if request aborted. 151 // Abort request can check by calling IsAborted() function during time consuming operation. 152 virtual long Execute() = 0; 153 // Set request ID. 154 void SetRequestID( unsigned short uRequestID_i ) 155 { 156 AutoLock LockRequest( m_LockWorkerThread ); 157 m_usRequestID = uRequestID_i; 158 } 159 // Get request ID. 160 unsigned short GetRequestID() 161 { 162 AutoLock LockRequest( m_LockWorkerThread ); 163 return m_usRequestID; 164 } 165 // Abort the processing of the request. 166 void Abort() 167 { 168 AutoLock LockRequest( m_LockWorkerThread ); 169 m_bAborted = true; 170 } 171 // Clear abort flag for re-posting the same request. 172 void ClearAbortFlag() 173 { 174 AutoLock LockRequest( m_LockWorkerThread ); 175 m_bAborted = false; 176 } 177 178 protected: 179 // Check for the abort request 180 bool IsAborted() 181 { 182 AutoLock LockRequest( m_LockWorkerThread ); 183 return m_bAborted; 184 } 185 // Prepare error or information log. 186 void PrepareLog( std::wstring& wstrLog_io ); 187 188 protected: 189 // Synchronization object for resource locking. 190 SyncObject m_LockWorkerThread; 191 192 private: 193 // Abort flag. 194 bool m_bAborted; 195 // Request Identifier. 196 unsigned short m_usRequestID; 197 198 }; 199 200 /** 201 * AutoCounter - Increment and decrement counter 202 */ 203 class AutoCounter 204 { 205 206 public: 207 // Constructor. 208 AutoCounter( unsigned short& usCount_io, 209 SyncObject& Lock_io ) : 210 m_usCount( usCount_io ), m_LockThread( Lock_io ) 211 { 212 AutoLock Lock( m_LockThread ); 213 m_usCount++; 214 } 215 216 // Destructor. 217 ~AutoCounter() 218 { 219 AutoLock Lock( m_LockThread ); 220 m_usCount--; 221 } 222 223 private: 224 // Counter variable. 225 unsigned short& m_usCount; 226 // Synchronization object for resource locking. 227 SyncObject& m_LockThread; 228 }; 229 230 231 typedef std::list<AbstractRequest*> REQUEST_QUEUE; 232 233 234 /** 235 * ThreadPool - This class create and destroy thread pool based on the request. 236 * The requested to be processed can be post to pool as derived object of 237 * AbstractRequest. Also a class can be derive from Logger to error and 238 * information logging. 239 */ 240 class ThreadPool 241 { 242 243 public: 244 // Constructor. 245 ThreadPool(); 246 // Destructor. 247 ~ThreadPool(); 248 249 // Create thread pool with specified number of threads. 250 bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL ); 251 // Destroy the existing thread pool. 252 bool Destroy(); 253 // Post request to thread pool for processing. 254 bool PostRequest( AbstractRequest* pRequest_io ); 255 256 private: 257 AbstractRequest* PopRequest( REQUEST_QUEUE& RequestQueue_io ); 258 bool AddThreads(); 259 bool NotifyThread(); 260 bool ProcessRequests(); 261 bool WaitForRequest(); 262 bool DestroyPool(); 263 bool IsDestroyed(); 264 void SetDestroyFlag( const bool bFlag_i ); 265 void CancelRequests(); 266 void LogError( const std::wstring& wstrError_i ); 267 void LogInfo( const std::wstring& wstrInfo_i ); 268 static UINT WINAPI ThreadProc( LPVOID pParam_i ); 269 270 private: 271 ThreadPool( const ThreadPool& ); 272 ThreadPool& operator = ( const ThreadPool& ); 273 274 private: 275 // Used for thread pool destruction. 276 bool m_bDestroyed; 277 // Hold thread count in the pool. 278 unsigned short m_usThreadCount; 279 // Released semaphore count. 280 unsigned short m_usSemaphoreCount; 281 // Active thread count. 282 unsigned short m_lActiveThread; 283 // Active thread count. 284 unsigned short m_usPendingReqCount; 285 // Manage active thread count in pool. 286 HANDLE m_hSemaphore; 287 // Hold thread handles. 288 HANDLE* m_phThreadList; 289 // Request queue. 290 REQUEST_QUEUE m_RequestQueue; 291 // Synchronization object for resource locking. 292 SyncObject m_LockWorkerThread; 293 // User defined error and information logger class. 294 Logger* m_pLogger; 295 // Default error and information logger. 296 Logger m_Logger; 297 }; 298 } // namespace TP 299 300 #endif // #ifndef _THREAD_POOL_MGR_H_
1 /** 2 * @author : Suresh 3 */ 4 5 #include "ThreadPool.h" 6 #include <sstream> 7 #include <iomanip> 8 9 namespace TP 10 { 11 12 /** 13 * Log error description. 14 * 15 * @param lActiveReq_i - Count of active requests. 16 * @param wstrError_i - Error message. 17 */ 18 void Logger::LogError( const long lActiveReq_i, const std::wstring& wstrError_i ) 19 { 20 std::wstring wstrLog( wstrError_i ); 21 PrepareLog( lActiveReq_i, wstrLog ); 22 LogError( wstrLog ); 23 } 24 25 26 /** 27 * Log information. 28 * 29 * @param lActiveReq_i - Count of active requests. 30 * @param wstrInfo_i - Information message. 31 */ 32 void Logger::LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i ) 33 { 34 std::wstring wstrLog( wstrInfo_i ); 35 PrepareLog( lActiveReq_i, wstrLog ); 36 LogInfo( wstrLog ); 37 } 38 39 40 /** 41 * Override this function to log errors. Default log will be in output window. 42 * 43 * @param wstrError_i - Error description 44 */ 45 void Logger::LogError( const std::wstring& wstrError_i ) 46 { 47 OutputDebugString( wstrError_i.c_str()); 48 } 49 50 51 /** 52 * Override this function to log informations. Default log will be in output window. 53 * 54 * @param wstrInfo_i - Information description. 55 */ 56 void Logger::LogInfo( const std::wstring& wstrInfo_i ) 57 { 58 OutputDebugString( wstrInfo_i.c_str()); 59 } 60 61 62 /** 63 * Log thread ID, Active thread count and last error. 64 * 65 * @param lActiveReq_i - Active thread count. 66 * @param wstrLog_io - Error or information description 67 */ 68 void Logger::PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io ) 69 { 70 std::wstringstream wstrmLog; 71 wstrmLog << L"##TP## [TID=" << std::setfill( L'0' ) << std::setw(8) << ::GetCurrentThreadId() 72 << L"] [ACTIVE REQUEST=" << std::setw(4) << lActiveReq_i 73 << L"] [LAST ERROR=" << std::setw(4) << ::GetLastError() 74 << L"] " << wstrLog_io.c_str() << + L"]"; 75 wstrLog_io = wstrmLog.str(); 76 } 77 78 79 /** 80 * Prepare error or information log. 81 * 82 * @param wstrLog_io - Log information 83 */ 84 void AbstractRequest::PrepareLog( std::wstring& wstrLog_io ) 85 { 86 std::wstringstream wstrmLog; 87 wstrmLog << std::setfill( L'0' ); 88 wstrmLog << L"##RQ## [RID=" << std::setw(8) << GetRequestID() 89 << L"] [Desc=" << wstrLog_io.c_str() << + L"]"; 90 wstrLog_io = wstrmLog.str(); 91 } 92 93 94 /** 95 * Constructor 96 */ 97 ThreadPool::ThreadPool() : m_bDestroyed( false ), 98 m_usThreadCount( 0u ), 99 m_usSemaphoreCount( 0u ), 100 m_lActiveThread( 0u ), 101 m_usPendingReqCount( 0u ), 102 m_hSemaphore( NULL ), 103 m_phThreadList( NULL ), 104 m_pLogger( &m_Logger ) 105 { 106 } 107 108 109 /** 110 * Destructor 111 */ 112 ThreadPool::~ThreadPool() 113 { 114 if( NULL != m_phThreadList ) 115 { 116 if( !Destroy()) 117 { 118 LogError( L"Destroy() failed" ); 119 } 120 } 121 } 122 123 124 /** 125 * Create thread pool with specified number of threads. 126 * 127 * @param usThreadCount_i - Thread count. 128 * @param pLogger_i - Logger instance to log errors and informations 129 */ 130 bool ThreadPool::Create( const unsigned short usThreadCount_i, Logger* pLogger_i ) 131 { 132 try 133 { 134 // Assign logger object. If user not provided then use existing and 135 // error will be logged in output window. 136 m_pLogger = ( NULL != pLogger_i ) ? pLogger_i : &m_Logger; 137 // Check thread pool is initialized already. 138 if( NULL != m_phThreadList ) 139 { 140 LogError( L"ThreadPool already created" ); 141 return false; 142 } 143 // Validate thread count. 144 if( 0 == usThreadCount_i ) 145 { 146 LogError( L"Minimum allowed thread count is one" ); 147 return false; 148 } 149 if( usThreadCount_i > 64 ) 150 { 151 LogError( L"Maximum allowed thread count is 64" ); 152 return false; 153 } 154 LogInfo( L"Thread pool creation requested" ); 155 156 // Initialize values. 157 m_lActiveThread = 0u; 158 m_usSemaphoreCount = 0u; 159 m_usPendingReqCount = 0u; 160 m_usThreadCount = usThreadCount_i; 161 // Create semaphore for thread count management. 162 m_hSemaphore = CreateSemaphore( NULL, 0, m_usThreadCount, NULL ); 163 if( NULL == m_hSemaphore ) 164 { 165 LogError( L"Semaphore creation failed" ); 166 m_usThreadCount = 0u; 167 return false; 168 } 169 // Create worker threads and make pool active 170 if( !AddThreads()) 171 { 172 LogError( L"Threads creation failed" ); 173 Destroy(); 174 return false; 175 } 176 SetDestroyFlag( false ); 177 LogInfo( L"Thread pool created successfully" ); 178 return true; 179 } 180 catch( ... ) 181 { 182 LogError( L"Exception occurred in Create()" ); 183 return false; 184 } 185 } 186 187 188 /** 189 * Destroy thread pool. 190 */ 191 bool ThreadPool::Destroy() 192 { 193 try 194 { 195 // Check whether thread pool already destroyed. 196 if( NULL == m_phThreadList ) 197 { 198 LogError( L"ThreadPool is already destroyed or not created yet" ); 199 return false; 200 } 201 // Cancel all requests. 202 CancelRequests(); 203 // Set destroyed flag to true for exiting threads. 204 SetDestroyFlag( true ); 205 // Release remaining semaphores to exit thread. 206 { 207 AutoLock LockThread( m_LockWorkerThread ); 208 if( m_lActiveThread < m_usThreadCount ) 209 { 210 if( NULL == ReleaseSemaphore( m_hSemaphore, m_usThreadCount - m_lActiveThread, NULL )) 211 { 212 LogError( L"Failed to release Semaphore" ); 213 return false; 214 } 215 } 216 } 217 // Wait for destroy completion and clean the thread pool. 218 if( !DestroyPool()) 219 { 220 LogError( L"Thread pool destruction failed" ); 221 return false; 222 } 223 LogInfo( L"Thread Pool destroyed successfully" ); 224 return true; 225 } 226 catch( ... ) 227 { 228 LogError( L"Exception occurred in Destroy()" ); 229 return false; 230 } 231 } 232 233 234 /** 235 * Post request to thread pool for processing 236 * 237 * @param pRequest_io - Request to be processed. 238 */ 239 bool ThreadPool::PostRequest( AbstractRequest* pRequest_io ) 240 { 241 try 242 { 243 AutoLock LockThread( m_LockWorkerThread ); 244 if( NULL == m_phThreadList ) 245 { 246 LogError( L"ThreadPool is destroyed or not created yet" ); 247 return false; 248 } 249 m_RequestQueue.push_back( pRequest_io ); 250 if( m_usSemaphoreCount < m_usThreadCount ) 251 { 252 // Thread available to process, so notify thread. 253 if( !NotifyThread()) 254 { 255 LogError( L"NotifyThread failed" ); 256 // Request notification failed. Try after some time. 257 m_usPendingReqCount++; 258 return false; 259 } 260 } 261 else 262 { 263 // Thread not available to process. 264 m_usPendingReqCount++; 265 } 266 return true; 267 } 268 catch( ... ) 269 { 270 LogError( L"Exception occurred in PostRequest()" ); 271 return false; 272 } 273 } 274 275 276 /** 277 * Pop request from queue for processing. 278 * 279 * @param RequestQueue_io - Request queue. 280 * @return AbstractRequest* - Request pointer. 281 */ 282 AbstractRequest* ThreadPool::PopRequest( REQUEST_QUEUE& RequestQueue_io ) 283 { 284 AutoLock LockThread( m_LockWorkerThread ); 285 if( !RequestQueue_io.empty()) 286 { 287 AbstractRequest* pRequest = RequestQueue_io.front(); 288 RequestQueue_io.remove( pRequest ); 289 return pRequest; 290 } 291 return 0; 292 } 293 294 295 /** 296 * Create specified number of threads. Initial status of threads will be waiting. 297 */ 298 bool ThreadPool::AddThreads() 299 { 300 try 301 { 302 // Allocate memory for all threads. 303 m_phThreadList = new HANDLE[m_usThreadCount]; 304 if( NULL == m_phThreadList ) 305 { 306 LogError( L"Memory allocation for thread handle failed" ); 307 return false; 308 } 309 // Create worker threads. 310 DWORD dwThreadID = 0; 311 for( unsigned short usIdx = 0u; usIdx < m_usThreadCount; usIdx++ ) 312 { 313 // Create worker thread 314 m_phThreadList[usIdx] = CreateThread( 0, 0, 315 reinterpret_cast<LPTHREAD_START_ROUTINE>( ThreadPool::ThreadProc ), 316 this, 0, &dwThreadID ); 317 if( NULL == m_phThreadList[usIdx] ) 318 { 319 LogError( L"CreateThread failed" ); 320 return false; 321 } 322 } 323 return true; 324 } 325 catch( ... ) 326 { 327 LogError( L"Exception occurred in AddThreads()" ); 328 return false; 329 } 330 } 331 332 333 /** 334 * Add request to queue and release semaphore by one. 335 */ 336 bool ThreadPool::NotifyThread() 337 { 338 try 339 { 340 AutoLock LockThread( m_LockWorkerThread ); 341 // Release semaphore by one to process this request. 342 if( NULL == ReleaseSemaphore( m_hSemaphore, 1, NULL )) 343 { 344 LogError( L"ReleaseSemaphore failed" ); 345 return false; 346 } 347 m_usSemaphoreCount++; 348 return true; 349 } 350 catch( ... ) 351 { 352 LogError( L"Exception occurred in NotifyThread()" ); 353 m_RequestQueue.pop_back(); 354 return false; 355 } 356 } 357 358 359 /** 360 * Process request in queue. 361 */ 362 bool ThreadPool::ProcessRequests() 363 { 364 bool bContinue( true ); 365 do 366 { 367 try 368 { 369 LogInfo( L"Thread WAITING" ); 370 // Wait for request. 371 if( !WaitForRequest()) 372 { 373 LogError( L"WaitForRequest() failed" ); 374 continue; 375 } 376 // Thread counter. 377 AutoCounter Counter( m_lActiveThread, m_LockWorkerThread ); 378 LogInfo( L"Thread ACTIVE" ); 379 // Check thread pool destroy request. 380 if( IsDestroyed()) 381 { 382 LogInfo( L"Thread EXITING" ); 383 break; 384 } 385 // Get request from request queue. 386 AbstractRequest* pRequest = PopRequest( m_RequestQueue ); 387 if( NULL == pRequest ) 388 { 389 LogError( L"PopRequest failed" ); 390 continue; 391 } 392 // Execute the request. 393 long lReturn = pRequest->Execute(); 394 if( NULL != lReturn ) 395 { 396 LogError( L"Request execution failed" ); 397 continue; 398 } 399 // Check thread pool destroy request. 400 if( IsDestroyed()) 401 { 402 LogInfo( L"Thread EXITING" ); 403 break; 404 } 405 AutoLock LockThread( m_LockWorkerThread ); 406 // Inform thread if any pending request. 407 if( m_usPendingReqCount > 0 ) 408 { 409 if( m_usSemaphoreCount < m_usThreadCount ) 410 { 411 // Thread available to process, so notify thread. 412 if( !NotifyThread()) 413 { 414 LogError( L"NotifyThread failed" ); 415 continue; 416 } 417 m_usPendingReqCount--; 418 } 419 } 420 } 421 catch( ... ) 422 { 423 LogError( L"Exception occurred in ProcessRequests()" ); 424 continue; 425 } 426 } 427 while( bContinue ); 428 return true; 429 } 430 431 432 /** 433 * Wait for request queuing to thread pool. 434 */ 435 bool ThreadPool::WaitForRequest() 436 { 437 try 438 { 439 // Wait released when requested queued. 440 DWORD dwReturn = WaitForSingleObject( m_hSemaphore, INFINITE ); 441 if( WAIT_OBJECT_0 != dwReturn ) 442 { 443 LogError( L"WaitForSingleObject failed" ); 444 return false; 445 } 446 AutoLock LockThread( m_LockWorkerThread ); 447 m_usSemaphoreCount--; 448 // Clear previous error. 449 ::SetLastError( 0 ); 450 return true; 451 } 452 catch( ... ) 453 { 454 LogError( L"Exception occurred in WaitForRequest()" ); 455 return false; 456 } 457 } 458 459 460 /** 461 * Destroy and clean up thread pool. 462 */ 463 bool ThreadPool::DestroyPool() 464 { 465 try 466 { 467 // Wait for the exist of threads. 468 DWORD dwReturn = WaitForMultipleObjects( m_usThreadCount, m_phThreadList, TRUE, INFINITE ); 469 if( WAIT_OBJECT_0 != dwReturn ) 470 { 471 LogError( L"WaitForMultipleObjects failed" ); 472 return false; 473 } 474 // Close all threads. 475 for( USHORT uIdx = 0u; uIdx < m_usThreadCount; uIdx++ ) 476 { 477 if( TRUE != CloseHandle( m_phThreadList[uIdx] )) 478 { 479 LogError( L"CloseHandle failed for threads" ); 480 return false; 481 } 482 } 483 // Clear memory allocated for threads. 484 delete[] m_phThreadList; 485 m_phThreadList = 0; 486 // Close the semaphore 487 if( TRUE != CloseHandle( m_hSemaphore )) 488 { 489 LogError( L"CloseHandle failed for semaphore" ); 490 return false; 491 } 492 // Clear request queue. 493 m_RequestQueue.clear(); 494 return true; 495 } 496 catch( ... ) 497 { 498 LogError( L"Exception occurred in DestroyPool()" ); 499 return false; 500 } 501 } 502 503 504 /** 505 * Check for destroy request. 506 */ 507 inline bool ThreadPool::IsDestroyed() 508 { 509 // Avoid synchronization issues if destroy requested after validation. 510 AutoLock LockThread( m_LockWorkerThread ); 511 // During thread pool destruction all semaphores are released 512 // to exit all threads. 513 return m_bDestroyed; 514 } 515 516 517 /** 518 * Set destroy flag 519 */ 520 inline void ThreadPool::SetDestroyFlag( const bool bFlag_i ) 521 { 522 AutoLock LockThread( m_LockWorkerThread ); 523 m_bDestroyed = bFlag_i; 524 } 525 526 527 /** 528 * Cancel all processing request in pool. 529 */ 530 void ThreadPool::CancelRequests() 531 { 532 try 533 { 534 // Avoid synchronization issues if destroy requested after validation. 535 AutoLock LockThread( m_LockWorkerThread ); 536 LogInfo( L"Thread pool destroy requested" ); 537 // Clear main queue. 538 m_RequestQueue.clear(); 539 } 540 catch( ... ) 541 { 542 LogError( L"Exception occurred in CancelRequests()" ); 543 } 544 } 545 546 547 /** 548 * Log error in thread pool. 549 * 550 * @param wstrError_i - Error description. 551 */ 552 void ThreadPool::LogError( const std::wstring& wstrError_i ) 553 { 554 if( NULL != m_pLogger ) 555 { 556 m_pLogger->LogError( m_lActiveThread, wstrError_i ); 557 } 558 } 559 560 561 /** 562 * Log information in thread pool. 563 * 564 * @param wstrInfo_i - Information description. 565 */ 566 void ThreadPool::LogInfo( const std::wstring& wstrInfo_i ) 567 { 568 if( NULL != m_pLogger ) 569 { 570 m_pLogger->LogInfo( m_lActiveThread, wstrInfo_i ); 571 } 572 } 573 574 575 /** 576 * worker thread procedure. 577 * 578 * @param pParam_i - ThreadPool instance. 579 * @return UINT - Return 0 on success. 580 */ 581 UINT ThreadPool::ThreadProc( LPVOID pParam_i ) 582 { 583 ThreadPool* pThreadPool = NULL; 584 try 585 { 586 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>( pParam_i ); 587 if( NULL == pThreadPool ) 588 { 589 return 1; 590 } 591 if( !pThreadPool->ProcessRequests()) 592 { 593 pThreadPool->LogError( L"ProcessRequests() failed" ); 594 return 1; 595 } 596 return 0; 597 } 598 catch( ... ) 599 { 600 if( NULL != pThreadPool ) 601 { 602 pThreadPool->LogError( L"Exception occurred in ThreadProc()" ); 603 } 604 return 1; 605 } 606 } 607 } // namespace TP
This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)
英文原文:http://www.codeproject.com/Articles/637708/Thread-Pool
源代碼 : http://files.cnblogs.com/wb-DarkHorse/ThreadPool.zip