windows 系統下C++實現的多線程

摘抄http://blog.csdn.net/huyiyang2010/article/details/5809919windows

Thread.h函數

 1 #ifndef __THREAD_H__
 2 #define __THREAD_H__
 3 
 4 #include <string>
 5 
 6 #include   <windows.h>
 7 #include   <process.h>
 8 
 9 class Runnable
10 {
11 public:
12     virtual ~Runnable() {};
13     virtual void Run() = 0;
14 };
15 
16 class CThread : public Runnable
17 {
18 private:
19     explicit CThread(const CThread & rhs);
20 
21 public:
22     CThread();
23     CThread(Runnable * pRunnable);
24     CThread(const char * ThreadName, Runnable * pRunnable = NULL);
25     CThread(std::string ThreadName, Runnable * pRunnable = NULL);
26     ~CThread(void);
27 
28     /**
29       開始運行線程
30       @arg bSuspend 開始運行時是否掛起
31     **/
32     bool Start(bool bSuspend = false);
33 
34     /**
35       運行的線程函數,可使用派生類重寫此函數
36     **/
37     virtual void Run();
38 
39     /**
40       當前執行此函數線程等待線程結束
41       @arg timeout 等待超時時間,若是爲負數,等待無限時長
42     **/
43     void Join(int timeout = -1);
44     /**
45       恢復掛起的線程
46     **/
47     void Resume();
48     /**
49       掛起線程
50     **/
51     void Suspend();
52     /**
53       終止線程的執行
54     **/
55     bool Terminate(unsigned long ExitCode);
56 
57     unsigned int GetThreadID();
58     std::string GetThreadName();
59     void SetThreadName(std::string ThreadName);
60     void SetThreadName(const char * ThreadName);
61 
62 private:
63     static unsigned int WINAPI StaticThreadFunc(void * arg);
64 
65 private:
66     HANDLE m_handle;
67     Runnable * const m_pRunnable;
68     unsigned int m_ThreadID;
69     std::string m_ThreadName;
70     volatile bool m_bRun;
71 };
72 
73 #endif

Thread.cppthis

#include "Thread.h"

CThread::CThread(void) : 
m_pRunnable(NULL),
m_bRun(false)
{
}

CThread::~CThread(void)
{
}

CThread::CThread(Runnable * pRunnable) : 
m_ThreadName(""),
m_pRunnable(pRunnable),
m_bRun(false)
{
}

CThread::CThread(const char * ThreadName, Runnable * pRunnable) : 
m_ThreadName(ThreadName),
m_pRunnable(pRunnable),
m_bRun(false)
{
}

CThread::CThread(std::string ThreadName, Runnable * pRunnable) : 
m_ThreadName(ThreadName),
m_pRunnable(pRunnable),
m_bRun(false)
{
}

bool CThread::Start(bool bSuspend)
{
    if(m_bRun)
    {
        return true;
    }
    if(bSuspend)
    {
        m_handle = (HANDLE)_beginthreadex(NULL, 0, StaticThreadFunc, this, CREATE_SUSPENDED, &m_ThreadID);
    }
    else
    {
        m_handle = (HANDLE)_beginthreadex(NULL, 0, StaticThreadFunc, this, 0, &m_ThreadID);
    }
    m_bRun = (NULL != m_handle);
    return m_bRun;
}

void CThread::Run()
{
    if(!m_bRun)
    {
        return;
    }
    if(NULL != m_pRunnable)
    {
        m_pRunnable->Run();
    }
    m_bRun = false;
}

void CThread::Join(int timeout)
{
    if(NULL == m_handle || !m_bRun)
    {
        return;
    }
    if(timeout <= 0)
    {
        timeout = INFINITE;
    }
    ::WaitForSingleObject(m_handle, timeout);
}

void CThread::Resume()
{
    if(NULL == m_handle || !m_bRun)
    {
        return;
    }
    ::ResumeThread(m_handle);
}

void CThread::Suspend()
{
    if(NULL == m_handle || !m_bRun)
    {
        return;
    }
    ::SuspendThread(m_handle);
}

bool CThread::Terminate(unsigned long ExitCode)
{
    if(NULL == m_handle || !m_bRun)
    {
        return true;
    }
    if(::TerminateThread(m_handle, ExitCode))
    {
        ::CloseHandle(m_handle);
        return true;
    }
    return false;
}

unsigned int CThread::GetThreadID()
{
    return m_ThreadID;
}

std::string CThread::GetThreadName()
{
    return m_ThreadName;
}

void CThread::SetThreadName(std::string ThreadName)
{
    m_ThreadName = ThreadName;
}

void CThread::SetThreadName(const char * ThreadName)
{
    if(NULL == ThreadName)
    {
        m_ThreadName = "";
    }
    else
    {
        m_ThreadName = ThreadName;
    }
}

unsigned int CThread::StaticThreadFunc(void * arg)
{
    CThread * pThread = (CThread *)arg;
    pThread->Run();
    return 0;
}

ThreadPoolExecutor.hspa

#ifndef __THREAD_POOL_EXECUTOR__
#define __THREAD_POOL_EXECUTOR__

#include "Thread.h"
#include <set>
#include <list>
#include <windows.h>

class CThreadPoolExecutor
{
public:
    CThreadPoolExecutor(void);
    ~CThreadPoolExecutor(void);

    /**
      初始化線程池,建立minThreads個線程
    **/
    bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);

    /**
      執行任務,若當前任務列表沒有滿,將此任務插入到任務列表,返回true
      若當前任務列表滿了,但當前線程數量小於最大線程數,將建立新線程執行此任務,返回true
      若當前任務列表滿了,但當前線程數量等於最大線程數,將丟棄此任務,返回false
    **/
    bool Execute(Runnable * pRunnable);

    /**
      終止線程池,先制止塞入任務,
      而後等待直到任務列表爲空,
      而後設置最小線程數量爲0,
      等待直到線程數量爲空,
      清空垃圾堆中的任務
    **/
    void Terminate();

    /**
      返回線程池中當前的線程數量
    **/
    unsigned int GetThreadPoolSize();

private:
    /**
      獲取任務列表中的任務,若任務列表爲空,返回NULL
    **/
    Runnable * GetTask();

    static unsigned int WINAPI StaticThreadFunc(void * arg);

private:
    class CWorker : public CThread
    {
    public:
        CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);
        ~CWorker();
        void Run();

    private:
        CThreadPoolExecutor * m_pThreadPool;
        Runnable * m_pFirstTask;
        volatile bool m_bRun;
    };

    typedef std::set<CWorker *> ThreadPool;
    typedef std::list<Runnable *> Tasks;
    typedef Tasks::iterator TasksItr;
    typedef ThreadPool::iterator ThreadPoolItr;

    ThreadPool m_ThreadPool;
    ThreadPool m_TrashThread;
    Tasks m_Tasks;

    CRITICAL_SECTION m_csTasksLock;
    CRITICAL_SECTION m_csThreadPoolLock;

    volatile bool m_bRun;
    volatile bool m_bEnableInsertTask;
    volatile unsigned int m_minThreads;
    volatile unsigned int m_maxThreads;
    volatile unsigned int m_maxPendingTasks;
};

#endif

ThreadPoolExecutor.cpp.net

#include "ThreadPoolExecutor.h"

CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) : 
m_pThreadPool(pThreadPool),
m_pFirstTask(pFirstTask),
m_bRun(true)
{
    
}

CThreadPoolExecutor::CWorker::~CWorker()
{
}

/**
  執行任務的工做線程。
  當前沒有任務時,
  若是當前線程數量大於最小線程數量,減小線程,
  不然,執行清理程序,將線程類給釋放掉
**/
void CThreadPoolExecutor::CWorker::Run()
{
    Runnable * pTask = NULL;
    while(m_bRun)
    {
        if(NULL == m_pFirstTask)
        {
            pTask = m_pThreadPool->GetTask();
        }
        else
        {
            pTask = m_pFirstTask;
            m_pFirstTask = NULL;
        }

        if(NULL == pTask)
        {
            EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
            if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)
            {
                ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
                if(itr != m_pThreadPool->m_ThreadPool.end())
                {
                    m_pThreadPool->m_ThreadPool.erase(itr);
                    m_pThreadPool->m_TrashThread.insert(this);
                }
                m_bRun = false;
            }
            else
            {
                ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
                while(itr != m_pThreadPool->m_TrashThread.end())
                {
                    (*itr)->Join();
                    delete (*itr);
                    m_pThreadPool->m_TrashThread.erase(itr);
                    itr = m_pThreadPool->m_TrashThread.begin();
                }
            }
            LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
            continue;
        }
        else
        {
            pTask->Run();
            pTask = NULL;
        }
    }
}

/////////////////////////////////////////////////////////////////////////////////////////////

CThreadPoolExecutor::CThreadPoolExecutor(void) : 
m_bRun(false),
m_bEnableInsertTask(false)
{
    InitializeCriticalSection(&m_csTasksLock);
    InitializeCriticalSection(&m_csThreadPoolLock);
}

CThreadPoolExecutor::~CThreadPoolExecutor(void)
{
    Terminate();
    DeleteCriticalSection(&m_csTasksLock);
    DeleteCriticalSection(&m_csThreadPoolLock);
}

bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)
{
    if(minThreads == 0)
    {
        return false;
    }
    if(maxThreads < minThreads)
    {
        return false;
    }
    m_minThreads = minThreads;
    m_maxThreads = maxThreads;
    m_maxPendingTasks = maxPendingTasks;
    unsigned int i = m_ThreadPool.size();
    for(; i<minThreads; i++)
    {
        //建立線程
        CWorker * pWorker = new CWorker(this);
        if(NULL == pWorker)
        {
            return false;
        }
        EnterCriticalSection(&m_csThreadPoolLock);
        m_ThreadPool.insert(pWorker);
        LeaveCriticalSection(&m_csThreadPoolLock);
        pWorker->Start();
    }
    m_bRun = true;
    m_bEnableInsertTask = true;
    return true;
}

bool CThreadPoolExecutor::Execute(Runnable * pRunnable)
{
    if(!m_bEnableInsertTask)
    {
        return false;
    }
    if(NULL == pRunnable)
    {
        return false;
    }
    if(m_Tasks.size() >= m_maxPendingTasks)
    {
        if(m_ThreadPool.size() < m_maxThreads)
        {
            CWorker * pWorker = new CWorker(this, pRunnable);
            if(NULL == pWorker)
            {
                return false;
            }
            EnterCriticalSection(&m_csThreadPoolLock);
            m_ThreadPool.insert(pWorker);
            LeaveCriticalSection(&m_csThreadPoolLock);
            pWorker->Start();
        }
        else
        {
            return false;
        }
    }
    else
    {
        EnterCriticalSection(&m_csTasksLock);
        m_Tasks.push_back(pRunnable);
        LeaveCriticalSection(&m_csTasksLock);
    }
    return true;
}

Runnable * CThreadPoolExecutor::GetTask()
{
    Runnable * Task = NULL;
    EnterCriticalSection(&m_csTasksLock);
    if(!m_Tasks.empty())
    {
        Task = m_Tasks.front();
        m_Tasks.pop_front();
    }
    LeaveCriticalSection(&m_csTasksLock);
    return Task;
}

unsigned int CThreadPoolExecutor::GetThreadPoolSize()
{
    return m_ThreadPool.size();
}

void CThreadPoolExecutor::Terminate()
{
    m_bEnableInsertTask = false;
    while(m_Tasks.size() > 0)
    {
        Sleep(1);
    }
    m_bRun = false;
    m_minThreads = 0;
    m_maxThreads = 0;
    m_maxPendingTasks = 0;
    while(m_ThreadPool.size() > 0)
    {
        Sleep(1);
    }
    EnterCriticalSection(&m_csThreadPoolLock);
    ThreadPoolItr itr = m_TrashThread.begin();
    while(itr != m_TrashThread.end())
    {
        (*itr)->Join();
        delete (*itr);
        m_TrashThread.erase(itr);
        itr = m_TrashThread.begin();
    }
    LeaveCriticalSection(&m_csThreadPoolLock);
}

main.cpp線程

#include "Thread.h"
#include "ThreadPoolExecutor.h"

class R : public Runnable
{
public:
    ~R()
    {
        printf("~R\n");
    }
    void Run()
    {
        printf("Hello World\n");
    }
};

int main()
{
    CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
    pExecutor->Init(1, 10, 50);
    R r;
    for(int i=0;i<100;i++)
    {
        while(!pExecutor->Execute(&r))
        {
        }
    }
    pExecutor->Terminate();
    delete pExecutor;
    getchar();
     return 0;
}
相關文章
相關標籤/搜索