線程池(譯)

這是我翻譯的codeproject上面的一篇文章,題目是:線程池windows

一 簡介安全

   個人應用中,須要將數據備份到不一樣的地方(CD,USB,網絡共享)。若是我只用單線程來作這件事情,須要花2個小時以上。而如何改善性能對我來講是一個挑戰,最終我用多線程來實現了這個問題,結果很是可觀,只用了30分鐘就完成了個人工做。網絡

  其實在剛開始開發時,備份耗費了一成天,後來我被迫增長頻率到1個小時。如今我發現了一個問題:實際中因爲頻率太低致使線程頻繁的重複建立和銷燬。怎麼解決這個問題?爲何線程不能被重複利用呢?因此這就是線程池的由來。多線程

  • 你是否須要線程池?
  • 你有多個請求須要被重複和並行處理嗎?
  • 每一個請求能獨立工做嗎?
  • 你須要等待IO/File操做嗎?

若是你的回答是確定的,那麼線程池很是適合你。你的程序會所以具備低耦合性而且易於實現。app

 

二 背景異步

  做爲一名碼農,你應該至少有過一次使用單線程的經歷,我也無例外。如今我須要建立一個線程池並用C++實現,因而我Google並下載了不少例子,可是它們都不湊效,雖然例子不少,可是沒有適合我須要的,因此我寫下了這邊文章。ide

  我爲何須要一個線程次?通常大多數的IO操做(好比文件,磁盤等)比較耗時,因此用單線程的話,不少系統資源(好比內存,處理器等)會處於等待狀態,這顯然是一種浪費。這時用多線程的話就能有效利用空閒時間,提升內存和處理器利用率。函數

  對於這些狀況下的一些應用使用線程池會比較有效:

  • 它要求避免線程建立和銷燬等操做帶來的時間消耗
  • 它是並行的,並且會異步分發大量的小任務
  • 它會建立和銷燬大量的線程(這些線程運行時間比較短)。用線程池的話會減小線程管理的複雜度,以及線程建立銷燬負荷
  • 它會在後臺並行處理一些獨立的任務

  線程次的建立post

    線程池會建立指定數量的線程並等待調用請求。一旦收到一個線程使用請求,它會激活一個線程並開始執行。等執行完成之後,這個線程會變成等待狀態,而後等待下一次的請求。若是要求請求清除,全部的線程將會退出線程池。 性能

關係圖以下所示:

下面是其中的類的介紹:

  • ThreadPool

  這個類用於建立,管理和銷燬線程池。在使用時,你能夠建立一個ThreadPool的對象,並能夠指定須要建立的線程數量。這裏的線程池最大支持64個線程,它也能夠減小至最小線程數以免過分線程切換和系統資源消耗。

  • AbstractRequest

  這個類表明了對線程池的請求操做。客戶端應用應該繼承這個類,並重載其Execute()函數。而且你須要利用其提供的臨界區對象來保證線程安全。取消操做能夠經過IsAborted()函數來判斷。把繼承的抽象請求發送到線程池讓其處理,也能夠經過Abort()取消。

  • Logger

  線程池有錯誤信息和log記錄功能。默認的log輸出是在調試窗口。你也能夠建立一個繼承Logger的自定義的類並重載這個輸出,並實現其中的LogError()和LogInfo()函數。而後將Logger類實例在線程池建立的時候傳遞給它就能夠了。

下面這張圖說明了如何在你的應用中使用線程池?

 

   應用程序中有一個ThreadPool的實例,當接收到建立請求時,這個類會建立指定數量的線程,每一個線程都將等待被調用。一旦接收到請求,其中的一個線程將結束等待狀態並開始處理請求。等處理完成之後,線程恢復到等待狀態。用戶在任什麼時候候都能調用AbstractRequest的Abort()來停止請求過程。線程池在處理完請求後不會刪除它。

 

三 示例

  • Create()
// Create thread pool with specified number of threads.

bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL );

這個函數將建立指定數量的線程並進入等待狀態。若是指定了Logger,它將會被用來處理錯誤。這個函數失敗時返回false,最大數量的線程數是64

  • Destroy()
// Destroy the existing thread pool.

bool Destroy();

這個函數停止全部的請求並銷燬線程池。失敗時返回false。

  • PostRequest()
// Post request to thread pool for processing.

bool PostRequest( AbstractRequest* pRequest_io );

這個函數發送指定的請求到線程池,失敗時返回false

 

依賴關係

  ThreadPool.h 包含了windows.h, list.h (STL)string.h (STL)

 

如何在你的應用中使用線程池?

  1. 包含ThreadPool.hThreadPool.cpp
  2. 若是有須要,線程池能夠打印錯誤信息到調試窗口,並且這個行爲能夠經過重載來實現。默認的行爲能夠從Logger來繼承一個類,並重載LogError()和LogInfo()函數。
  3. 用Create()建立線程池,若是有須要的話能夠提供Logger
  4. 建立一個繼承自AbtractRequest的類,能夠實現Execute()函數做爲線程函數
  5. 建立繼承自AbtractRequest的類實例,並傳遞給線程此用來處理PostRequest()函數。用戶能夠無限制的發送請求,可是被激活的請求數量等於線程數量
  6. 一旦處理完成之後,能調用Destroy()銷燬線程池。

ThreadPoolDemo 是一個應用ThreadPool類的demo。這個線程池針對Windows操做系統,也能夠移植到Linux和IOS平臺。能夠在本文最下面提供的下載連接獲取。

這裏是ThreadPool的代碼:

  1 /**
  2  * @author :    Suresh
  3  */
  4 
  5 #ifndef _THREAD_POOL_MGR_H_
  6 #define _THREAD_POOL_MGR_H_
  7 
  8 #include <windows.h>
  9 #include <list>
 10 
 11 namespace TP
 12 {
 13 
 14     /**
 15      * Logger - This is base class for the error logger class and it is polymorphic.
 16      *          The users of the ThreadPool create a class which derived from this 
 17      *          and override LogError() and LogInfo() for their own error logging mechanism.
 18      *          The default error logging will be in output window.
 19      */
 20     class Logger
 21     {
 22 
 23     public:
 24 
 25         // Constructor
 26         Logger(){};
 27         // Destructor
 28         virtual ~Logger(){};
 29         // Log error description.
 30         void LogError( const long lActiveReq_i, const std::wstring& wstrError_i );
 31         // Log information.
 32         void LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i );
 33         // Override this function to log errors. Default log will be in output window.
 34         virtual void LogError( const std::wstring& wstrError_i );
 35         // Override this function to log informations. Default log will be in output window.
 36         virtual void LogInfo( const std::wstring& wstrInfo_i );
 37 
 38     private:
 39 
 40         // Log thread ID, Active thread count and last error.
 41         void PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io );
 42     };
 43 
 44     /**
 45      * SyncObject - The class is a wrapper of Critical section object to provide
 46      *              synchronization for thread pool.
 47      */
 48     class SyncObject
 49     {
 50 
 51     public:
 52         // Constructor
 53         SyncObject()
 54         {
 55             ::InitializeCriticalSection( &m_stCriticalSection );
 56         }
 57 
 58         // Destructor
 59         ~SyncObject()
 60         {
 61             ::DeleteCriticalSection( &m_stCriticalSection );
 62         }
 63 
 64         // Lock critical section.
 65         bool Lock()
 66         {
 67             ::EnterCriticalSection( &m_stCriticalSection );
 68             return true;
 69         }
 70 
 71         // Unlock critical section.
 72         bool Unlock()
 73         {
 74             ::LeaveCriticalSection( &m_stCriticalSection );
 75             return true;
 76         }
 77 
 78     private:
 79         SyncObject( const SyncObject& );
 80         SyncObject& operator = ( const SyncObject& );
 81 
 82     private:
 83 
 84         // Critical section object.
 85         CRITICAL_SECTION m_stCriticalSection;
 86     };
 87 
 88     /**
 89      * AutoLock - This class own synchronization object during construction and
 90      *            release the ownership during the destruction.
 91      */
 92     class AutoLock
 93     {
 94 
 95     public:
 96 
 97         /** 
 98          * Parameterized constructor
 99          * 
100          * @param       LockObj_i - Synchronization object.
101          * @return      Nil
102          * @exception   Nil
103          * @see         Nil
104          * @since       1.0
105          */
106         AutoLock( SyncObject& LockObj_i ) : m_pSyncObject( &LockObj_i )
107         {
108             if( NULL != m_pSyncObject )
109             {
110                 m_pSyncObject->Lock();
111             }
112         }
113 
114         /** 
115          * Destructor.
116          * 
117          * @param       Nil
118          * @return      Nil
119          * @exception   Nil
120          * @see         Nil
121          * @since       1.0
122          */
123         ~AutoLock()
124         {
125             if( NULL != m_pSyncObject )
126             {
127                 m_pSyncObject->Unlock();
128                 m_pSyncObject = NULL;
129             }
130         }
131 
132     private:
133         SyncObject* m_pSyncObject;
134     };
135 
136 
137     /**
138      * AbstractRequest - This is abstract base class for the request to be processed in thread pool.
139      *                   and it is polymorphic. The users of the ThreadPool must create a class 
140      *                   which derived from this and override Execute() function.
141      */
142     class AbstractRequest
143     {
144 
145     public:
146         // Constructor
147         AbstractRequest() : m_bAborted( false ), m_usRequestID( 0u ){}
148         // Destructor
149         virtual ~AbstractRequest(){}
150         // Thread procedure to be override in derived class. This function should return if request aborted.
151         // Abort request can check by calling IsAborted() function during time consuming operation.
152         virtual long Execute() = 0;
153         // Set request ID.
154         void SetRequestID( unsigned short uRequestID_i )
155         {
156             AutoLock LockRequest( m_LockWorkerThread );
157             m_usRequestID = uRequestID_i;
158         }
159         // Get request ID.
160         unsigned short GetRequestID()
161         {
162             AutoLock LockRequest( m_LockWorkerThread );
163             return m_usRequestID;
164         }
165         // Abort the processing of the request.
166         void Abort()
167         {
168             AutoLock LockRequest( m_LockWorkerThread );
169             m_bAborted = true;
170         }
171         // Clear abort flag for re-posting the same request.
172         void ClearAbortFlag()
173         {
174             AutoLock LockRequest( m_LockWorkerThread );
175             m_bAborted = false;
176         }
177 
178     protected:
179         // Check for the abort request
180         bool IsAborted()
181         {
182             AutoLock LockRequest( m_LockWorkerThread );
183             return m_bAborted;
184         }
185         // Prepare error or information log.
186         void PrepareLog( std::wstring& wstrLog_io );
187 
188     protected:
189         // Synchronization object for resource locking.
190         SyncObject m_LockWorkerThread;
191 
192     private:
193         // Abort flag.
194         bool m_bAborted;
195         // Request Identifier.
196         unsigned short m_usRequestID;
197 
198     };
199 
200     /**
201      * AutoCounter - Increment and decrement counter
202      */
203     class AutoCounter
204     {
205 
206     public:
207         // Constructor.
208         AutoCounter( unsigned short& usCount_io,
209                      SyncObject& Lock_io ) :
210                      m_usCount( usCount_io ), m_LockThread( Lock_io )
211         {
212             AutoLock Lock( m_LockThread );
213             m_usCount++;
214         }
215 
216         // Destructor.
217         ~AutoCounter()
218         {
219             AutoLock Lock( m_LockThread );
220             m_usCount--;
221         }
222 
223     private:
224         // Counter variable.
225         unsigned short& m_usCount;
226         // Synchronization object for resource locking.
227         SyncObject& m_LockThread;
228     };
229 
230 
231     typedef std::list<AbstractRequest*> REQUEST_QUEUE;
232 
233 
234     /**
235      * ThreadPool - This class create and destroy thread pool based on the request.
236      *              The requested to be processed can be post to pool as derived object of 
237      *              AbstractRequest. Also a class can be derive from Logger to error and
238      *              information logging.
239      */
240     class ThreadPool
241     {
242 
243     public:
244         // Constructor.
245         ThreadPool();
246         // Destructor.
247         ~ThreadPool();
248 
249         // Create thread pool with specified number of threads.
250         bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL );
251         // Destroy the existing thread pool.
252         bool Destroy();
253         // Post request to thread pool for processing.
254         bool PostRequest( AbstractRequest* pRequest_io );
255 
256     private:
257         AbstractRequest* PopRequest( REQUEST_QUEUE& RequestQueue_io );
258         bool AddThreads();
259         bool NotifyThread();
260         bool ProcessRequests();
261         bool WaitForRequest();
262         bool DestroyPool();
263         bool IsDestroyed();
264         void SetDestroyFlag( const bool bFlag_i );
265         void CancelRequests();
266         void LogError( const std::wstring& wstrError_i );
267         void LogInfo( const std::wstring& wstrInfo_i );
268         static UINT WINAPI ThreadProc( LPVOID pParam_i );
269 
270     private:
271         ThreadPool( const ThreadPool& );
272         ThreadPool& operator = ( const ThreadPool& );
273 
274     private:
275         // Used for thread pool destruction.
276         bool m_bDestroyed;
277         // Hold thread count in the pool.
278         unsigned short m_usThreadCount;
279         // Released semaphore count.
280         unsigned short m_usSemaphoreCount;
281         // Active thread count.
282         unsigned short m_lActiveThread;
283         // Active thread count.
284         unsigned short m_usPendingReqCount;
285         // Manage active thread count in pool.
286         HANDLE m_hSemaphore;
287         // Hold thread handles.
288         HANDLE* m_phThreadList;
289         // Request queue.
290         REQUEST_QUEUE m_RequestQueue;
291         // Synchronization object for resource locking.
292         SyncObject m_LockWorkerThread;
293         // User defined error and information logger class.
294         Logger* m_pLogger;
295         // Default error and information logger.
296         Logger m_Logger;
297     };
298 } // namespace TP
299 
300 #endif // #ifndef _THREAD_POOL_MGR_H_
ThreadPool.h
  1 /**
  2  * @author :    Suresh
  3  */
  4 
  5 #include "ThreadPool.h"
  6 #include <sstream>
  7 #include <iomanip>
  8 
  9 namespace TP
 10 {
 11 
 12     /** 
 13      * Log error description.
 14      * 
 15      * @param       lActiveReq_i - Count of active requests.
 16      * @param       wstrError_i  - Error message.
 17      */
 18     void Logger::LogError( const long lActiveReq_i, const std::wstring& wstrError_i )
 19     {
 20         std::wstring wstrLog( wstrError_i );
 21         PrepareLog( lActiveReq_i, wstrLog );
 22         LogError( wstrLog );
 23     }
 24 
 25 
 26     /** 
 27      * Log information.
 28      * 
 29      * @param       lActiveReq_i - Count of active requests.
 30      * @param       wstrInfo_i   - Information message.
 31      */
 32     void Logger::LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i )
 33     {
 34         std::wstring wstrLog( wstrInfo_i );
 35         PrepareLog( lActiveReq_i, wstrLog );
 36         LogInfo( wstrLog );
 37     }
 38 
 39 
 40     /** 
 41      * Override this function to log errors. Default log will be in output window.
 42      * 
 43      * @param       wstrError_i  - Error description
 44      */
 45     void Logger::LogError( const std::wstring& wstrError_i )
 46     {
 47         OutputDebugString( wstrError_i.c_str());
 48     }
 49 
 50 
 51     /** 
 52      * Override this function to log informations. Default log will be in output window.
 53      * 
 54      * @param       wstrInfo_i   - Information description.
 55      */
 56     void Logger::LogInfo( const std::wstring& wstrInfo_i )
 57     {
 58         OutputDebugString( wstrInfo_i.c_str());
 59     }
 60 
 61 
 62     /** 
 63      * Log thread ID, Active thread count and last error.
 64      * 
 65      * @param       lActiveReq_i - Active thread count.
 66      * @param       wstrLog_io   - Error or information description
 67      */
 68     void Logger::PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io )
 69     {
 70         std::wstringstream wstrmLog;
 71         wstrmLog << L"##TP## [TID=" << std::setfill( L'0' ) << std::setw(8) << ::GetCurrentThreadId()
 72                  << L"] [ACTIVE REQUEST=" << std::setw(4) << lActiveReq_i
 73                  << L"] [LAST ERROR=" << std::setw(4) << ::GetLastError()
 74                  << L"] " << wstrLog_io.c_str() << + L"]";
 75         wstrLog_io = wstrmLog.str();
 76     }
 77 
 78 
 79     /** 
 80      * Prepare error or information log.
 81      * 
 82      * @param       wstrLog_io - Log information
 83      */
 84     void AbstractRequest::PrepareLog( std::wstring& wstrLog_io )
 85     {
 86         std::wstringstream wstrmLog;
 87         wstrmLog << std::setfill( L'0' );
 88         wstrmLog << L"##RQ## [RID=" << std::setw(8) << GetRequestID()
 89                  << L"] [Desc=" << wstrLog_io.c_str() << + L"]";
 90         wstrLog_io = wstrmLog.str();
 91     }
 92 
 93 
 94     /** 
 95      * Constructor
 96      */
 97     ThreadPool::ThreadPool() : m_bDestroyed( false ),
 98                                m_usThreadCount( 0u ),
 99                                m_usSemaphoreCount( 0u ),
100                                m_lActiveThread( 0u ),
101                                m_usPendingReqCount( 0u ),
102                                m_hSemaphore( NULL ),
103                                m_phThreadList( NULL ),
104                                m_pLogger( &m_Logger )
105     {
106     }
107 
108 
109     /** 
110      * Destructor
111      */
112     ThreadPool::~ThreadPool()
113     {
114         if( NULL != m_phThreadList )
115         {
116             if( !Destroy())
117             {
118                 LogError( L"Destroy() failed" );
119             }
120         }
121     }
122 
123 
124     /** 
125      * Create thread pool with specified number of threads.
126      * 
127      * @param       usThreadCount_i - Thread count.
128      * @param       pLogger_i       - Logger instance to log errors and informations
129      */
130     bool ThreadPool::Create( const unsigned short usThreadCount_i, Logger* pLogger_i )
131     {
132         try
133         {
134             // Assign logger object. If user not provided then use existing and
135             // error will be logged in output window.
136             m_pLogger = ( NULL != pLogger_i ) ? pLogger_i : &m_Logger;
137             // Check thread pool is initialized already.
138             if( NULL != m_phThreadList )
139             {
140                 LogError( L"ThreadPool already created" );
141                 return false;
142             }
143             // Validate thread count.
144             if( 0 == usThreadCount_i )
145             {
146                 LogError( L"Minimum allowed thread count is one" );
147                 return false;
148             }
149             if( usThreadCount_i > 64 )
150             {
151                 LogError( L"Maximum allowed thread count is 64" );
152                 return false;
153             }
154             LogInfo( L"Thread pool creation requested" );
155 
156             // Initialize values.
157             m_lActiveThread = 0u;
158             m_usSemaphoreCount = 0u;
159             m_usPendingReqCount = 0u;
160             m_usThreadCount = usThreadCount_i;
161             // Create semaphore for thread count management.
162             m_hSemaphore = CreateSemaphore( NULL, 0, m_usThreadCount, NULL );
163             if( NULL == m_hSemaphore )
164             {
165                 LogError( L"Semaphore creation failed" );
166                 m_usThreadCount = 0u;
167                 return false;
168             }
169             // Create worker threads and make pool active
170             if( !AddThreads())
171             {
172                 LogError( L"Threads creation failed" );
173                 Destroy();
174                 return false;
175             }
176             SetDestroyFlag( false );
177             LogInfo( L"Thread pool created successfully" );
178             return true;
179         }
180         catch( ... )
181         {
182             LogError( L"Exception occurred in Create()" );
183             return false;
184         }
185     }
186 
187 
188     /** 
189      * Destroy thread pool.
190      */
191     bool ThreadPool::Destroy()
192     {
193         try
194         {
195             // Check whether thread pool already destroyed.
196             if( NULL == m_phThreadList )
197             {
198                 LogError( L"ThreadPool is already destroyed or not created yet" );
199                 return false;
200             }
201             // Cancel all requests.
202             CancelRequests();
203             // Set destroyed flag to true for exiting threads.
204             SetDestroyFlag( true );
205             // Release remaining semaphores to exit thread.
206             {
207                 AutoLock LockThread( m_LockWorkerThread );
208                 if( m_lActiveThread < m_usThreadCount )
209                 {
210                     if( NULL == ReleaseSemaphore( m_hSemaphore, m_usThreadCount - m_lActiveThread, NULL ))
211                     {
212                         LogError( L"Failed to release Semaphore" );
213                         return false;
214                     }
215                 }
216             }
217             // Wait for destroy completion and clean the thread pool.
218             if( !DestroyPool())
219             {
220                 LogError( L"Thread pool destruction failed" );
221                 return false;
222             }
223             LogInfo( L"Thread Pool destroyed successfully" );
224             return true;
225         }
226         catch( ... )
227         {
228             LogError( L"Exception occurred in Destroy()" );
229             return false;
230         }
231     }
232 
233 
234     /** 
235      * Post request to thread pool for processing
236      * 
237      * @param       pRequest_io - Request to be processed.
238      */
239     bool ThreadPool::PostRequest( AbstractRequest* pRequest_io )
240     {
241         try
242         {
243             AutoLock LockThread( m_LockWorkerThread );
244             if( NULL == m_phThreadList )
245             {
246                 LogError( L"ThreadPool is destroyed or not created yet" );
247                 return false;
248             }
249             m_RequestQueue.push_back( pRequest_io );
250             if( m_usSemaphoreCount < m_usThreadCount )
251             {
252                 // Thread available to process, so notify thread.
253                 if( !NotifyThread())
254                 {
255                     LogError( L"NotifyThread failed" );
256                     // Request notification failed. Try after some time.
257                     m_usPendingReqCount++;
258                     return false;
259                 }
260             }
261             else
262             {
263                 // Thread not available to process.
264                 m_usPendingReqCount++;
265             }
266             return true;
267         }
268         catch( ... )
269         {
270             LogError( L"Exception occurred in PostRequest()" );
271             return false;
272         }
273     }
274 
275 
276     /** 
277      * Pop request from queue for processing.
278      * 
279      * @param       RequestQueue_io  - Request queue.
280      * @return      AbstractRequest* - Request pointer.
281      */
282     AbstractRequest* ThreadPool::PopRequest( REQUEST_QUEUE& RequestQueue_io )
283     {
284         AutoLock LockThread( m_LockWorkerThread );
285         if( !RequestQueue_io.empty())
286         {
287             AbstractRequest* pRequest = RequestQueue_io.front();
288             RequestQueue_io.remove( pRequest );
289             return pRequest;
290         }
291         return 0;
292     }
293 
294 
295     /** 
296      * Create specified number of threads. Initial status of threads will be waiting.
297      */
298     bool ThreadPool::AddThreads()
299     {
300         try
301         {
302             // Allocate memory for all threads.
303             m_phThreadList = new HANDLE[m_usThreadCount];
304             if( NULL == m_phThreadList )
305             {
306                 LogError( L"Memory allocation for thread handle failed" );
307                 return false;
308             }
309             // Create worker threads.
310             DWORD dwThreadID = 0;
311             for( unsigned short usIdx = 0u; usIdx < m_usThreadCount; usIdx++ )
312             {
313                 // Create worker thread
314                 m_phThreadList[usIdx] = CreateThread( 0, 0,
315                                                       reinterpret_cast<LPTHREAD_START_ROUTINE>( ThreadPool::ThreadProc ),
316                                                       this, 0, &dwThreadID );
317                 if( NULL == m_phThreadList[usIdx] )
318                 {
319                     LogError( L"CreateThread failed" );
320                     return false;
321                 }
322             }
323             return true;
324         }
325         catch( ... )
326         {
327             LogError( L"Exception occurred in AddThreads()" );
328             return false;
329         }
330     }
331 
332 
333     /** 
334      * Add request to queue and release semaphore by one.
335      */
336     bool ThreadPool::NotifyThread()
337     {
338         try
339         {
340             AutoLock LockThread( m_LockWorkerThread );
341             // Release semaphore by one to process this request.
342             if( NULL == ReleaseSemaphore( m_hSemaphore, 1, NULL ))
343             {
344                 LogError( L"ReleaseSemaphore failed" );
345                 return false;
346             }
347             m_usSemaphoreCount++;
348             return true;
349         }
350         catch( ... )
351         {
352             LogError( L"Exception occurred in NotifyThread()" );
353             m_RequestQueue.pop_back();
354             return false;
355         }
356     }
357 
358 
359     /** 
360      * Process request in queue.
361      */
362     bool ThreadPool::ProcessRequests()
363     {
364         bool bContinue( true );
365         do
366         {
367             try
368             {
369                 LogInfo( L"Thread WAITING" );
370                 // Wait for request.
371                 if( !WaitForRequest())
372                 {
373                     LogError( L"WaitForRequest() failed" );
374                     continue;
375                 }
376                 // Thread counter.
377                 AutoCounter Counter( m_lActiveThread, m_LockWorkerThread );
378                 LogInfo( L"Thread ACTIVE" );
379                 // Check thread pool destroy request.
380                 if( IsDestroyed())
381                 {
382                     LogInfo( L"Thread EXITING" );
383                     break;
384                 }
385                 // Get request from request queue.
386                 AbstractRequest* pRequest = PopRequest( m_RequestQueue );
387                 if( NULL == pRequest )
388                 {
389                     LogError( L"PopRequest failed" );
390                     continue;
391                 }
392                 // Execute the request.
393                 long lReturn = pRequest->Execute();
394                 if( NULL != lReturn )
395                 {
396                     LogError( L"Request execution failed" );
397                     continue;
398                 }
399                 // Check thread pool destroy request.
400                 if( IsDestroyed())
401                 {
402                     LogInfo( L"Thread EXITING" );
403                     break;
404                 }
405                 AutoLock LockThread( m_LockWorkerThread );
406                 // Inform thread if any pending request.
407                 if( m_usPendingReqCount > 0 )
408                 {
409                     if( m_usSemaphoreCount < m_usThreadCount )
410                     {
411                         // Thread available to process, so notify thread.
412                         if( !NotifyThread())
413                         {
414                             LogError( L"NotifyThread failed" );
415                             continue;
416                         }
417                         m_usPendingReqCount--;
418                     }
419                 }
420             }
421             catch( ... )
422             {
423                 LogError( L"Exception occurred in ProcessRequests()" );
424                 continue;
425             }
426         }
427         while( bContinue );
428         return true;
429     }
430 
431 
432     /** 
433      * Wait for request queuing to thread pool.
434      */
435     bool ThreadPool::WaitForRequest()
436     {
437         try
438         {
439             // Wait released when requested queued.
440             DWORD dwReturn = WaitForSingleObject( m_hSemaphore, INFINITE );
441             if( WAIT_OBJECT_0 != dwReturn )
442             {
443                 LogError( L"WaitForSingleObject failed" );
444                 return false;
445             }
446             AutoLock LockThread( m_LockWorkerThread );
447             m_usSemaphoreCount--;
448             // Clear previous error.
449             ::SetLastError( 0 );
450             return true;
451         }
452         catch( ... )
453         {
454             LogError( L"Exception occurred in WaitForRequest()" );
455             return false;
456         }
457     }
458 
459 
460     /** 
461      * Destroy and clean up thread pool.
462      */
463     bool ThreadPool::DestroyPool()
464     {
465         try
466         {
467             // Wait for the exist of threads.
468             DWORD dwReturn = WaitForMultipleObjects( m_usThreadCount, m_phThreadList, TRUE, INFINITE );
469             if( WAIT_OBJECT_0 != dwReturn )
470             {
471                 LogError( L"WaitForMultipleObjects failed" );
472                 return false;
473             }
474             // Close all threads.
475             for( USHORT uIdx = 0u; uIdx < m_usThreadCount; uIdx++ )
476             {
477                 if( TRUE != CloseHandle( m_phThreadList[uIdx] ))
478                 {
479                     LogError( L"CloseHandle failed for threads" );
480                     return false;
481                 }
482             }
483             // Clear memory allocated for threads.
484             delete[] m_phThreadList;
485             m_phThreadList = 0;
486             // Close the semaphore
487             if( TRUE != CloseHandle( m_hSemaphore ))
488             {
489                 LogError( L"CloseHandle failed for semaphore" );
490                 return false;
491             }
492             // Clear request queue.
493             m_RequestQueue.clear();
494             return true;
495         }
496         catch( ... )
497         {
498             LogError( L"Exception occurred in DestroyPool()" );
499             return false;
500         }
501     }
502 
503 
504     /** 
505      * Check for destroy request.
506      */
507     inline bool ThreadPool::IsDestroyed()
508     {
509         // Avoid synchronization issues if destroy requested after validation.
510         AutoLock LockThread( m_LockWorkerThread );
511         // During thread pool destruction all semaphores are released
512         // to exit all threads.
513         return m_bDestroyed;
514     }
515 
516 
517     /** 
518      * Set destroy flag
519      */
520     inline void ThreadPool::SetDestroyFlag( const bool bFlag_i )
521     {
522         AutoLock LockThread( m_LockWorkerThread );
523         m_bDestroyed = bFlag_i;
524     }
525 
526 
527     /** 
528      * Cancel all processing request in pool.
529      */
530     void ThreadPool::CancelRequests()
531     {
532         try
533         {
534             // Avoid synchronization issues if destroy requested after validation.
535             AutoLock LockThread( m_LockWorkerThread );
536             LogInfo( L"Thread pool destroy requested" );
537             // Clear main queue.
538             m_RequestQueue.clear();
539         }
540         catch( ... )
541         {
542             LogError( L"Exception occurred in CancelRequests()" );
543         }
544     }
545 
546 
547     /** 
548      * Log error in thread pool.
549      * 
550      * @param       wstrError_i - Error description.
551      */
552     void ThreadPool::LogError( const std::wstring& wstrError_i )
553     {
554         if( NULL != m_pLogger )
555         {
556             m_pLogger->LogError( m_lActiveThread, wstrError_i );
557         }
558     }
559 
560 
561     /** 
562      * Log information in thread pool.
563      * 
564      * @param       wstrInfo_i - Information description.
565      */
566     void ThreadPool::LogInfo( const std::wstring& wstrInfo_i )
567     {
568         if( NULL != m_pLogger )
569         {
570             m_pLogger->LogInfo( m_lActiveThread, wstrInfo_i );
571         }
572     }
573 
574 
575     /** 
576      * worker thread procedure.
577      * 
578      * @param       pParam_i - ThreadPool instance.
579      * @return      UINT      - Return 0 on success.
580      */
581     UINT ThreadPool::ThreadProc( LPVOID pParam_i )
582     {
583         ThreadPool* pThreadPool = NULL;
584         try
585         {
586             ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>( pParam_i );
587             if( NULL == pThreadPool )
588             {
589                 return 1;
590             }
591             if( !pThreadPool->ProcessRequests())
592             {
593                 pThreadPool->LogError( L"ProcessRequests() failed" );
594                 return 1;
595             }
596             return 0;
597         }
598         catch( ... )
599         {
600             if( NULL !=  pThreadPool )
601             {
602                 pThreadPool->LogError( L"Exception occurred in ThreadProc()" );
603             }
604             return 1;
605         }
606     }
607 } // namespace TP
ThreadPool.cpp

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

 *****************************************************************************************************************************************************
 
相關文章
相關標籤/搜索