線程池淺析及C++代碼實現


  (1)什麼是線程池 服務器

  線程池是一種多線程處理技術。線程池先建立好若干線程,並管理這些線程。當有新的任務到來時,將任務添加到一個已建立的空閒線程中執行。線程池所建立的線程優先級都是同樣的,因此須要使用特定線程優先級的任務不宜使用線程池。 多線程

  (2)線程池的優勢和應用 併發

  線程池統一管理線程的方式減小了頻繁建立和銷燬線程的系統調度開銷,很大程度上提升了服務器處理併發任務的性能。 app

  線程池適用於頻繁的任務調度,如處理HTTP請求,任務多,而且任務週期小 ide

  (3)C++代碼實現 性能

  #include "stdafx.h" this

  #include "stdafx.h" spa

  #include 線程

  #include server

  #include

  #include

  using namespace std;

  class ITask

  {

  public:

  virtual void ProcessTask(void* pUser)=0;

  };

  //線程池

  class CThreadPool

  {

  public:

  class ThreadInfo

  {

  public:

  ThreadInfo() { m_hThread=0;m_bBusyWorking=false;}

  ThreadInfo(HANDLEhandle, boolbBusy) { m_hThread=handle; m_bBusyWorking=bBusy; }

  ThreadInfo(const ThreadInfo& info){ m_hThread=info.m_hThread; m_bBusyWorking=info.m_bBusyWorking;}

  HANDLE m_hThread;

  bool m_bBusyWorking;

  };

  typedef mapThreadInfoMap;

  typedef ThreadInfoMap::iterator Iterator_ThreadInfoMap;

  enum ThreadPoolStatus{ STATUS_BUSY, STATUS_IDLE,STATUS_NORMAL };

  public:

  CThreadPool()

  {

  InitializeCriticalSection(&m_CS);

  }

  virtual ~CThreadPool()

  {

  DeleteCriticalSection(&m_CS);

  }

  bool Start(unsigned short nStatic, unsigned short nMax)

  {

  if(nMax

  {

  assert(0);

  return false;

  }

  HANDLE hThread;

  DWORD nThreadId;

  m_nNumberOfStaticThreads=nStatic;

  m_nNumberOfTotalThreads=nMax;

  //lock the resource

  EnterCriticalSection(&m_CS);

  //create an IO port

  m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE,NULL, 0, 0);

  hThread = CreateThread(

  NULL, // SD

  0, // initial stack size

  (LPTHREAD_START_ROUTINE)ManagerProc, // threadfunction

  (LPVOID)this, // thread argument

  0, // creationoption

  &nThreadId ); // thread identifier

  m_hMgrThread = hThread;

  //now we start these worker threads

  m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE,NULL, 0, 0);

  for(long n = 0; n < nStatic; n++)

  {

  hThread = CreateThread(

  NULL, // SD

  0, // initial stack size

  (LPTHREAD_START_ROUTINE)WorkerProc, // threadfunction

  (LPVOID)this, //thread argument

  0, //creation option

  &nThreadId );

  m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));

  }

  LeaveCriticalSection(&m_CS);

  return true;

  }

  void Stop(bool bHash = false)

  {

  EnterCriticalSection(&m_CS);

  ::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);

  WaitForSingleObject(m_hMgrThread,INFINITE);

  CloseHandle(m_hMgrThread);

  CloseHandle(m_hMgrIoPort);

  //shut down all the worker threads

  UINT nCount=m_threadMap.size();

  HANDLE* pThread= new HANDLE[nCount];

  long n=0;

  ThreadInfo info;

  Iterator_ThreadInfoMap i=m_threadMap.begin();

  while(i!=m_threadMap.end())

  {

  ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);

  info=i->second;

  pThread[n++]=info.m_hThread;

  i++;

  }

  DWORD rc=WaitForMultipleObjects(nCount,pThread, TRUE,30000);//wait for 0.5 minutes, then start to killthreads

  CloseHandle(m_hWorkerIoPort);

  if(rc>=WAIT_OBJECT_0 && rc

  {

  for(unsigned int n=0;n

  {

  CloseHandle(pThread[n]);

  }

  }

  else if(rc==WAIT_TIMEOUT&&bHash)

  {

  //some threadsnot terminated, we have to stop them.

  DWORD exitCode;

  for(unsigned int i=0; i

  {

  if (::GetExitCodeThread(pThread[i],&exitCode)==STILL_ACTIVE)

  {

  TerminateThread(pThread[i], 99);

  }

  CloseHandle(pThread[i]);

  }

  }

  delete[] pThread;

  LeaveCriticalSection(&m_CS);

  }

  void AddTask(void* pUser, ITask* pWorker)const

  {

  ::PostQueuedCompletionStatus(m_hWorkerIoPort, \

  reinterpret_cast(pWorker), \

  reinterpret_cast(pUser),\

  NULL);

  }

  protected:

  HANDLE GetMgrIoPort()const { return m_hMgrIoPort; }

  UINT GetMgrWaitTime()const { return1000; }

  HANDLE GetWorkerIoPort()const { return m_hWorkerIoPort; }

  private:

  static DWORD WINAPI WorkerProc(void* p)

  {

  //convert the parameter to the server pointer.

  CThreadPool* pServer=(CThreadPool*)p;

  HANDLE IoPort = pServer->GetWorkerIoPort();

  unsigned long pN1,pN2;

  OVERLAPPED* pOverLapped;

  DWORD threadId=::GetCurrentThreadId();

  while(::GetQueuedCompletionStatus(IoPort, &pN1,&pN2,

  &pOverLapped, INFINITE))

  {

  if(pOverLapped ==(OVERLAPPED*)0xFFFFFFFE)

  {

  pServer->RemoveThread(threadId);

  break;

  }

相關文章
相關標籤/搜索