完成端口服務器模型

前提:編程

IOCP的總體編程模型跟上面的純重疊io 很是相似.  純重疊io使用OVERLAPPED  + APC函數完成.併發

這種模型的缺點是必須讓調用apc函數進入alterable狀態. 而IOCP解決了這個問題.IOCP讓咱們本身建立一些線程,app

而後調用GetQueuedCompletionStatus 來告訴咱們某個io操做完成, 就像是在另外一個線程中執行了APC函數同樣;socket

 

使用IOCP 的時候,通常狀況下須要本身建立額外的線程,用於等待結果完成(GetQueuedCompletionStatus)函數

使用到的函數:測試

CreateIoCompletionPort : 建立/ 關聯一個完成端口 . 操作系統

                                          第3個參數是一個自定義數據, 第4個是最多N個線程可被調用;線程

                                          注意與其關聯的HANDLE 必需要有OVERLAPPED屬性的指針

//建立一個完成端口
HANDLE hComp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)
 
 
//關聯到完成端口. 第3個參數是一個自定義數據
//在GetQueuedCompletionStatus將攜帶這些數據返回. 這個自定義數據將一直與此套接字綁定在了一塊兒
CreateIoCompletionPort((HANDLE)client_socket, hComp, (DWORD)pSockData, 0);

GetQueuedCompletionStatus :一旦相似WSARecv / WSASend 完成後 . 用此函數獲取結果,就想APC函數同樣,一旦完成io操做就調用. 此函數通常狀況都在某一個線程中使用.注意一旦在某個線程中調用了此函數,這意味着,code

該線程就像被指派給了IOCP同樣,供IOCP使用. 總之這個行爲就想APC函數在另外一個線程被調用了;

 

關於解除關聯: 一旦一個套接字關閉了 , closehandle /closesocket. 就將從IOCP的設備句柄列表中解除關聯了

 

關於線程:

CreateIoCompletionPort  最後一個參數用於指定IOCP最多執行N個線程(若是是0 則使用默認CPU的核數). 但通常狀況下,我會預留一些額外的線程.好比

個人CPU是4核即IOCP最多可以使用 4個線程 , 不過通常狀況下會建立 8 個線程,給IOCP預留 額外4個線程 . 緣由是若是IOCP

有5個任務已經完成, 最多隻有4個線程被喚醒. 若是其中某個線程調用了WaitForSingleObject 之類的函數 ,此時IOCP將喚醒額外的線程來處理第5個任務;

 

 

 

先補充一下. 對於WSARecv / WSASend 的OVERLAPPED操做,簡稱爲投遞操做.意思是讓操做系統去幹活,至於何時幹完.

GetQueuedCompletionStatus 會通知你(即返回) . 所以所以, 須要注意, 這些參數像WSABUF 和 OVERLAPPED 必定要 new / malloc在堆中;

代碼中都有註釋: 另代碼中有不少返回都沒判斷.這個例子僅僅解釋如何編寫IOCP

#include "stdafx.h"
#include <process.h>
#include "../utils.h" //包含了一些宏和一些打印錯誤信息的函數. 
 
#define BUFFSIZE 8192
#define Read 0
#define Write 1
 
//自定義數據 .  注意 結構的地址 與 第一個成員的地址相同
struct IOData
{
    WSAOVERLAPPED overlapped;  //每一個io操做都須要獨立的一個overlapped
    WSABUF wsabuf;   //讀寫各一份
    int rw_mode;         //判斷讀寫操做
    char * buf;              //真正存放數據的地方, 須要初始化
};
 
//自定義數據.保存客戶套接字和地址
struct SocketData
{
    SOCKET hClientSocket;        //客戶端套接字
    SOCKADDR_IN clientAddr;
    IOData * pRead;                    //2個指針,只是爲了在線程中方便使用添加的
    IOData * pWrite;
};
 
 
//用於交換2個buf
int swapBuf(WSABUF * a, WSABUF * b)
{
    BOOL ret = FALSE;
    if (a && b){
        char * buf = a->buf;
        a->buf = b->buf;
        b->buf = buf;
        ret = TRUE;
    }
    return ret;
}
 
//釋放內存,解除關聯
void freeMem(SocketData * pSockData)
{
    closesocket(pSockData->hClientSocket);
    free(pSockData->pRead->buf);
    free(pSockData->pWrite->buf);
    free(pSockData->pWrite);
    free(pSockData->pRead);
    free(pSockData);
}
 
 
unsigned int WINAPI completeRoutine(void * param)
{
    //完成端口
    HANDLE hCom = (HANDLE)param;
 
    SocketData * pSockData = NULL;
    IOData * pIOData = NULL;
    DWORD flags = 0, bytes = 0; 
    BOOL ret = 0;
    SOCKET hClientSocket = NULL;
    printf("tid:%ld start!\n", GetCurrentThreadId());
 
    while (1)
    {
        flags = 0;
        
        //直到有任務完成即返回
        ret = GetQueuedCompletionStatus(hCom, &bytes,
            (PULONG_PTR)&pSockData,
            (LPOVERLAPPED * )&pIOData,
            INFINITE);
        printf("GetQueuedCompletionStatus : %d , diy key : %p , pIOData:%p,mode:%d\n", ret, pSockData,
            pIOData,pIOData->rw_mode);
 
        //若是成功了
        if (ret)
        {
            hClientSocket = pSockData->hClientSocket;
 
            //若是是WSARecv的
            if (Read == pIOData->rw_mode)
            {
                printf("READ - > bytesRecved:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh);
 
                //對端關閉
                if (0 == bytes)
                {
                    printf("peer closed\n");
                    freeMem(pSockData);  //釋放內存
                    continue;
                } 
 
                //測試數據 
                pSockData->pRead->buf[bytes] = 0;
                printf("Read buf:%s\n", pSockData->pRead->buf);
 
                //交換指針, 把recv的buf 給 write的buf;
                //把write的buf交換給recv . 若是併發量不大的時候能夠這麼作
                swapBuf(&pIOData->wsabuf, &pSockData->pWrite->wsabuf);
 
                //回傳操做.清空write OVERLAPPED
                memset(&pSockData->pWrite->overlapped, 0, sizeof(WSAOVERLAPPED));
                pSockData->pWrite->wsabuf.len = bytes;
                WSASend(hClientSocket, &pSockData->pWrite->wsabuf,
                    1, NULL, 0, &pSockData->pWrite->overlapped, NULL);
 
                //再次投遞一個recv操做,等待下次客戶端發送
                memset(&pSockData->pRead->overlapped, 0, sizeof(WSAOVERLAPPED));
                pSockData->pRead->wsabuf.len = BUFFSIZE;
                WSARecv(hClientSocket, &pSockData->pRead->wsabuf, 1, NULL, &flags,
                    &pSockData->pRead->overlapped, NULL);
            }
            else {
 
                 // send 完成.
                printf("Send finsished - > bytes:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh);
                memset(&pIOData->overlapped, 0, sizeof(WSAOVERLAPPED));
            }
        }
        else{
            //一旦出錯, 解除綁定即刪除內存
            print_error(GetLastError());
            freeMem(pSockData);
        }
    }
 
    return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
    WSADATA wsadata;
    if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0){
        print_error(WSAGetLastError());
        return 0;
    }
    SYSTEM_INFO sysinfo;
    GetSystemInfo(&sysinfo);
 
    //指定線程數量. 通常 processors * 2
    const DWORD nThreads = sysinfo.dwNumberOfProcessors * 2;
 
    //建立一個完成端口 ,  前3個參數保證了建立一個獨立的完成端口, 最後一個參數指定了完成
    //端口可以使用的線程數. 0 使用當前cpu核數
    HANDLE hCom = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
 
    //準備一些線程供完成端口調用, 把完成端口同時傳入
    HANDLE  * arr_threads = new HANDLE[nThreads];
    for (int i = 0; i < sysinfo.dwNumberOfProcessors; ++i)
        arr_threads[i] = (HANDLE)_beginthreadex(NULL, 0, completeRoutine, (void*)hCom, 0, NULL);
 
 
    //建立一個支持OVERLAPPED的socket.這樣的屬性將被 accept 返回的socket所繼承
    SOCKET hListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    SOCKADDR_IN serv_addr, client_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(PORT);
    serv_addr.sin_addr.s_addr = INADDR_ANY;
 
    bind(hListenSocket, (SOCKADDR*)&serv_addr, sizeof(serv_addr));
    listen(hListenSocket, BACKLOG);
 
    SOCKET client_socket;
    int client_addr_size = 0;
    DWORD flags = 0;
    while (1){
        client_addr_size = sizeof(client_addr);
        flags = 0;
        client_socket = accept(hListenSocket, (SOCKADDR*)&client_addr, &client_addr_size);
        puts("accepted");
 
        //準備一份數據, 用於保存clientsocket, addr, 以及讀寫指針;
        SocketData * pSockData = (SocketData *)malloc(sizeof(SocketData));
        pSockData->pRead = NULL;
        pSockData->pWrite = NULL;
        pSockData->hClientSocket = client_socket;
        memcpy(&pSockData->clientAddr, &client_addr, client_addr_size);
 
        
        //準備數據
        IOData *  pRead = (IOData *)malloc(sizeof(IOData));
        //對於OVERLAPPED,須要額外注意,清0
        memset(&pRead->overlapped, 0, sizeof(WSAOVERLAPPED));
        pRead->buf = (char *)malloc(BUFFSIZE);
        pRead->rw_mode = Read;
        pRead->wsabuf.buf = pRead->buf;
        pRead->wsabuf.len = BUFFSIZE;
        pSockData->pRead = pRead;
 
        IOData *pWrite = (IOData *)malloc(sizeof(IOData));
        pWrite->buf = (char *)malloc(BUFFSIZE);
        memset(&pWrite->overlapped, 0, sizeof(WSAOVERLAPPED));
        pWrite->rw_mode = Write;
        pWrite->wsabuf.buf = pWrite->buf;
        pWrite->wsabuf.len = BUFFSIZE;
        pSockData->pWrite = pWrite;
 
 
        //與iocp關聯在一塊兒. 注意第3個參數, 把自定義數據一塊兒傳遞過去
        CreateIoCompletionPort((HANDLE)client_socket, hCom, (DWORD)pSockData, 0);
        WSARecv(client_socket, &pRead->wsabuf, 1, NULL, &flags, &pRead->overlapped, NULL);
    }
 
    return 0;
}
相關文章
相關標籤/搜索