[轉]使用VC/MFC建立一個線程池

許多應用程序建立的線程花費了大量時間在睡眠狀態來等待事件的發生。還有一些線程進入睡眠狀態後按期被喚醒以輪詢工做方式來改變或者更新狀態信息。線程池可讓你更有效地使用線程,它爲你的應用程序提供一個由系統管理的工做者線程池。至少會有一個線程來監聽放到線程池的全部等待操做,當等待操做完成後,線程池中將會有一個工做者線程來執行相應的回調函數。
你也能夠把沒有等待操做的工做項目放到線程池中,用QueueUserWorkItem函數來完成這個工做,把要執行的工做項目函數經過一個參數傳遞給線程池。工做項目被放到線程池中後,就不能再取消了。
Timer-queue timers和Registered wait operations也使用線程池來實現。他們的回調函數也放在線程池中。你也能夠用BindIOCompletionCallback函數來投遞一個異步IO操做,在IO完成端口上,回調函數也是由線程池線程來執行。
當第一次調用QueueUserWorkItem函數或者BindIOCompletionCallback函數的時候,線程池被自動建立,或者Timer-queue timers或者Registered wait operations放入回調函數的時候,線程池也能夠被建立。線程池能夠建立的線程數量不限,僅受限於可用的內存,每個線程使用默認的初始堆棧大小,運行在默認的優先級上。
線程池中有兩種類型的線程:IO線程和非IO線程。IO線程等待在可告警狀態,工做項目做爲APC放到IO線程中。若是你的工做項目須要線程執行在可警告狀態,你應該將它放到IO線程。
非IO工做者線程等待在IO完成端口上,使用非IO線程比IO線程效率更高,也就是說,只要有可能的話,儘可能使用非IO線程。IO線程和非IO線程在異步IO操做沒有完成以前都不會退出。然而,不要在非IO線程中發出須要很長時間才能完成的異步IO請求。
正確使用線程池的方法是,工做項目函數以及它將會調用到的全部函數都必須是線程池安全的。安全的函數不該該假設線程是一次性線程的或者是永久線程。通常來講,應該避免使用線程本地存儲和發出須要永久線程的異步IO調用,好比說RegNotifyChangeKeyValue函數。若是須要在永久線程中執行這樣的函數的話,能夠給QueueUserWorkItem傳遞一個選項WT_EXECUTEINPERSISTENTTHREAD。
注意,線程池不能兼容COM的單線程套間(STA)模型。
爲了更深刻地講解操做系統實現的線程池的優越性,咱們首先嚐試着本身實現一個簡單的線程池模型。
代碼以下:


  1 /**//************************************************************************/
  2    /**//* Test Our own thread pool.                                             */
  3    /**//************************************************************************/
  4   
  5    typedef struct _THREAD_POOL
  6    {
  7        HANDLE QuitEvent;
  8        HANDLE WorkItemSemaphore;
  9   
 10        LONG WorkItemCount;
 11        LIST_ENTRY WorkItemHeader;
 12        CRITICAL_SECTION WorkItemLock;
 13   
 14        LONG ThreadNum;
 15        HANDLE *ThreadsArray;
 16   
 17    }THREAD_POOL, *PTHREAD_POOL;
 18   
 19    typedef VOID (*WORK_ITEM_PROC)(PVOID Param);
 20   
 21    typedef struct _WORK_ITEM
 22    {
 23        LIST_ENTRY List;
 24   
 25        WORK_ITEM_PROC UserProc;
 26        PVOID UserParam;
 27       
 28    }WORK_ITEM, *PWORK_ITEM;
 29   
 30   
 31    DWORD WINAPI WorkerThread(PVOID pParam)
 32    {
 33        PTHREAD_POOL pThreadPool = (PTHREAD_POOL)pParam;
 34        HANDLE Events[2];
 35       
 36        Events[0] = pThreadPool->QuitEvent;
 37        Events[1] = pThreadPool->WorkItemSemaphore;
 38   
 39        for(;;)
 40        {
 41            DWORD dwRet = WaitForMultipleObjects(2, Events, FALSE, INFINITE);
 42   
 43            if(dwRet == WAIT_OBJECT_0)
 44                break;
 45   
 46            //
 47            // execute user's proc.
 48            //
 49   
 50            else if(dwRet == WAIT_OBJECT_0 +1)
 51            {
 52                PWORK_ITEM pWorkItem;
 53                PLIST_ENTRY pList;
 54   
 55                EnterCriticalSection(&pThreadPool->WorkItemLock);
 56                _ASSERT(!IsListEmpty(&pThreadPool->WorkItemHeader));
 57                pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
 58                LeaveCriticalSection(&pThreadPool->WorkItemLock);
 59   
 60                pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
 61                pWorkItem->UserProc(pWorkItem->UserParam);
 62   
 63                InterlockedDecrement(&pThreadPool->WorkItemCount);
 64                free(pWorkItem);
 65            }
 66   
 67            else
 68            {
 69                _ASSERT(0);
 70                break;
 71            }
 72        }
 73   
 74        return 0;
 75    }
 76   
 77    BOOL InitializeThreadPool(PTHREAD_POOL pThreadPool, LONG ThreadNum)
 78    {
 79        pThreadPool->QuitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
 80        pThreadPool->WorkItemSemaphore = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
 81        pThreadPool->WorkItemCount = 0;
 82        InitializeListHead(&pThreadPool->WorkItemHeader);
 83        InitializeCriticalSection(&pThreadPool->WorkItemLock);
 84        pThreadPool->ThreadNum = ThreadNum;
 85        pThreadPool->ThreadsArray = (HANDLE*)malloc(sizeof(HANDLE) * ThreadNum);
 86   
 87        for(int i=0; i<ThreadNum; i++)
 88        {
 89            pThreadPool->ThreadsArray[i] = CreateThread(NULL, 0, WorkerThread, pThreadPool, 0, NULL);
 90        }
 91   
 92        return TRUE;
 93    }
 94   
 95    VOID DestroyThreadPool(PTHREAD_POOL pThreadPool)
 96    {
 97        SetEvent(pThreadPool->QuitEvent);
 98   
 99        for(int i=0; i<pThreadPool->ThreadNum; i++)
100        {
101            WaitForSingleObject(pThreadPool->ThreadsArray[i], INFINITE);
102            CloseHandle(pThreadPool->ThreadsArray[i]);
103        }
104   
105        free(pThreadPool->ThreadsArray);
106   
107        CloseHandle(pThreadPool->QuitEvent);
108        CloseHandle(pThreadPool->WorkItemSemaphore);
109        DeleteCriticalSection(&pThreadPool->WorkItemLock);
110   
111        while(!IsListEmpty(&pThreadPool->WorkItemHeader))
112        {
113            PWORK_ITEM pWorkItem;
114            PLIST_ENTRY pList;
115           
116            pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
117            pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
118           
119            free(pWorkItem);
120        }
121    }
122   
123    BOOL PostWorkItem(PTHREAD_POOL pThreadPool, WORK_ITEM_PROC UserProc, PVOID UserParam)
124    {
125        PWORK_ITEM pWorkItem = (PWORK_ITEM)malloc(sizeof(WORK_ITEM));
126        if(pWorkItem == NULL)
127            return FALSE;
128   
129        pWorkItem->UserProc = UserProc;
130        pWorkItem->UserParam = UserParam;
131   
132        EnterCriticalSection(&pThreadPool->WorkItemLock);
133        InsertTailList(&pThreadPool->WorkItemHeader, &pWorkItem->List);
134        LeaveCriticalSection(&pThreadPool->WorkItemLock);
135   
136        InterlockedIncrement(&pThreadPool->WorkItemCount);
137         ReleaseSemaphore(pThreadPool->WorkItemSemaphore, 1, NULL);
138   
139        return TRUE;
140    }
141   
142    VOID UserProc1(PVOID dwParam)
143    {
144        WorkItem(dwParam);
145    }
146   
147    void TestSimpleThreadPool(BOOL bWaitMode, LONG ThreadNum)
148    {
149        THREAD_POOL ThreadPool;    
150        InitializeThreadPool(&ThreadPool, ThreadNum);
151       
152        CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
153        BeginTime = GetTickCount();
154        ItemCount = 20;
155   
156        for(int i=0; i<20; i++)
157        {
158            PostWorkItem(&ThreadPool, UserProc1, (PVOID)bWaitMode);
159        }
160       
161        WaitForSingleObject(CompleteEvent, INFINITE);
162        CloseHandle(CompleteEvent);
163   
164        DestroyThreadPool(&ThreadPool);
165    }
166 安全

 

咱們把工做項目放到一個隊列中,用一個信號量通知線程池,線程池中任意一個線程取出工做項目來執行,執行完畢以後,線程返回線程池,繼續等待新的工做項目。
線程池中線程的數量是固定的,預先建立好的,永久的線程,直到銷燬線程池的時候,這些線程纔會被銷燬。
線程池中線程得到工做項目的機會是均等的,隨機的,並無特別的方式保證哪個線程具備特殊的優先得到工做項目的機會。
並且,同一時刻能夠併發運行的線程數目沒有任何限定。事實上,在咱們的執行計算任務的演示代碼中,全部的線程都併發執行。
下面,咱們再來看一下,完成一樣的任務,系統提供的線程池是如何運做的。

 

 


 1  /**//************************************************************************/
 2    /**//* QueueWorkItem Test.                                                   */
 3    /**//************************************************************************/
 4   
 5    DWORD BeginTime;
 6    LONG   ItemCount;
 7    HANDLE CompleteEvent;
 8   
 9    int compute()
10    {
11        srand(BeginTime);
12   
13        for(int i=0; i<20 *1000 * 1000; i++)
14            rand();
15   
16        return rand();
17    }
18   
19    DWORD WINAPI WorkItem(LPVOID lpParameter)
20    {
21        BOOL bWaitMode = (BOOL)lpParameter;
22   
23        if(bWaitMode)
24            Sleep(1000);
25        else
26            compute();
27   
28        if(InterlockedDecrement(&ItemCount) == 0)
29        {
30            printf("Time total %d second.\n", GetTickCount() - BeginTime);
31            SetEvent(CompleteEvent);
32        }
33   
34        return 0;
35    }
36   
37    void TestWorkItem(BOOL bWaitMode, DWORD Flag)
38    {
39        CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
40        BeginTime = GetTickCount();
41        ItemCount = 20;
42       
43        for(int i=0; i<20; i++)
44        {
45            QueueUserWorkItem(WorkItem, (PVOID)bWaitMode, Flag);
46        }    
47   
48        WaitForSingleObject(CompleteEvent, INFINITE);
49        CloseHandle(CompleteEvent);
50    }
51 併發

 

很簡單,是吧?咱們僅須要關注於咱們的回調函數便可。可是與咱們的簡單模擬來比,系統提供的線程池有着更多的優勢。       首先,線程池中線程的數目是動態調整的,其次,線程池利用IO完成端口的特性,它能夠限制併發運行的線程數目,默認狀況下,將會限制爲CPU的數目,這能夠減小線程切換。它挑選最近執行過的線程再次投入執行,從而避免了沒必要要的線程切換。 
相關文章
相關標籤/搜索