生產者消費者問題——C++ windows版 多生產者多消費者的隊列實現

  最進要寫一個多線程加載資源的資源管理器(ResourceManager)和多線程音頻解碼器(MultiThread Decoder)。由於距最近一次用到多線程放下很久了,因此今天把生產者消費者問題練一下手。windows

  爲何選擇生產者消費者問題,由於他比較接近資源管理器和多線程音頻解碼器的原型。多線程

  好比,對於音頻解碼器,音頻線程去流式的解碼一段MP3格式的內存,就相似生產者生產產品的過程;而音頻播放API(如OpenAL,OpenSL)一般須要的是PCM數據,也就是生產者生產的產品,因此播放邏輯充當消費者的角色,典型的生產者消費者問題。spa

  再對於資源管理器,加載Mesh和Texture相似生產單個Resource的過程,而相應的渲染邏輯去使用資源就至關於消費資源的過程,但不一樣的是最後當再也不使用這個資源的時候,這個資源纔會被釋放,而非使用一次。線程

  今天抽時間,寫了一個C++ windows版 多生產者多消費者的隊列實現,pthread版估計等須要作跨平臺的時候再作,若是想方便直接用OpenMP或者TBB也是能夠的,可是對於輕量級引擎,本身實現資源加載器徹底足夠了。code

  

#include <Windows.h>
#include <map>
#include <queue>

CRITICAL_SECTION g_cs;    // mutex

HANDLE    g_hEmptyBufferSemaphore;
HANDLE  g_hFullBufferSemaphore;

#define INVALID  -1

#define PRODUCER 5
#define CUSTOMER 5

#define NUM_COUNT 8
#define BUFF_SIZE 4

static std::queue<int> bufferQueue;
static std::map< DWORD,int > countMap;

bool ProducerFinish = false;

//設置控制檯輸出顏色
BOOL SetConsoleColor(WORD wAttributes)
{
    HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
    if (hConsole == INVALID_HANDLE_VALUE)
        return FALSE;    
    return SetConsoleTextAttribute(hConsole, wAttributes);
}

DWORD WINAPI ProducerFunc(LPVOID param)
{
    
    while(true)
    {
        WaitForSingleObject( g_hEmptyBufferSemaphore, INFINITE);

        EnterCriticalSection(&g_cs);

        DWORD threadId = GetCurrentThreadId();
        if(countMap.find(threadId) == countMap.end())
            countMap[threadId] = 0;

        int productID = ++countMap[threadId];

        bufferQueue.push( productID);
        printf("生產者%d , 生產%d\n", threadId, productID);

        if( productID == NUM_COUNT )
        {
            SetConsoleColor(FOREGROUND_RED);
            printf("生產者%d生產完畢\n",GetCurrentThreadId());
            SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);

            LeaveCriticalSection(&g_cs);
            ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL);

            break;
        }

        LeaveCriticalSection(&g_cs);

        ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL);
    }

    return NULL;
}

DWORD WINAPI CustomerFunc(LPVOID param)
{
    while (true)
    {
        WaitForSingleObject( g_hFullBufferSemaphore, INFINITE);

        EnterCriticalSection(&g_cs);

        int buffer = -1;
        
        if(!bufferQueue.empty())
        {
            buffer = bufferQueue.front();
            bufferQueue.pop();
        }


        if(buffer != INVALID )
            printf("消費者%d ,消費%d\n", GetCurrentThreadId() , buffer);

        if( bufferQueue.empty() && ProducerFinish)
        {
            printf("消費者%d 結束\n",GetCurrentThreadId());
            LeaveCriticalSection(&g_cs);

            // 通知其餘消費者能夠結束了
            ReleaseSemaphore( g_hFullBufferSemaphore, 1, NULL);
            break;
        }

        LeaveCriticalSection(&g_cs);

        ReleaseSemaphore( g_hEmptyBufferSemaphore, 1, NULL);
    }


    return NULL;
}

int _tmain(int argc, _TCHAR* argv[])
{
    InitializeCriticalSection(&g_cs);  

    g_hEmptyBufferSemaphore = CreateSemaphore( NULL, 4, 4, NULL);
    g_hFullBufferSemaphore = CreateSemaphore( NULL, 0, 4, NULL);
    
    HANDLE producerThreads[PRODUCER];
    HANDLE customerThreads[CUSTOMER];

    // producer
    for (int i = 0; i < PRODUCER; ++i)
    {
        producerThreads[i] = CreateThread( NULL, 0, ProducerFunc, NULL, NULL, NULL);
    }
    
    // customers
    for (int i = 0; i < CUSTOMER; ++i)
        customerThreads[i] = CreateThread( NULL, 0, CustomerFunc, NULL, NULL, NULL);


    WaitForMultipleObjects( PRODUCER, producerThreads, TRUE, INFINITE);
    ProducerFinish = true;

    WaitForMultipleObjects( CUSTOMER, customerThreads, TRUE, INFINITE);


    for (int i = 0; i < PRODUCER; ++i)
        CloseHandle(producerThreads[i]);

    for (int i = 0; i < CUSTOMER; ++i)
        CloseHandle(customerThreads[i]);


    CloseHandle(g_hEmptyBufferSemaphore);
    CloseHandle(g_hFullBufferSemaphore);

    DeleteCriticalSection(&g_cs);

    countMap.clear();

    return 0;
}
相關文章
相關標籤/搜索