前提:編程
IOCP的總體編程模型跟上面的純重疊io 很是相似. 純重疊io使用OVERLAPPED + APC函數完成.併發
這種模型的缺點是必須讓調用apc函數進入alterable狀態. 而IOCP解決了這個問題.IOCP讓咱們本身建立一些線程,app
而後調用GetQueuedCompletionStatus 來告訴咱們某個io操做完成, 就像是在另外一個線程中執行了APC函數同樣;socket
使用IOCP 的時候,通常狀況下須要本身建立額外的線程,用於等待結果完成(GetQueuedCompletionStatus)函數
使用到的函數:測試
CreateIoCompletionPort : 建立/ 關聯一個完成端口 . 操作系統
第3個參數是一個自定義數據, 第4個是最多N個線程可被調用;線程
注意與其關聯的HANDLE 必需要有OVERLAPPED屬性的指針
//建立一個完成端口 HANDLE hComp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0) //關聯到完成端口. 第3個參數是一個自定義數據 //在GetQueuedCompletionStatus將攜帶這些數據返回. 這個自定義數據將一直與此套接字綁定在了一塊兒 CreateIoCompletionPort((HANDLE)client_socket, hComp, (DWORD)pSockData, 0);
GetQueuedCompletionStatus :一旦相似WSARecv / WSASend 完成後 . 用此函數獲取結果,就想APC函數同樣,一旦完成io操做就調用. 此函數通常狀況都在某一個線程中使用.注意一旦在某個線程中調用了此函數,這意味着,code
該線程就像被指派給了IOCP同樣,供IOCP使用. 總之這個行爲就想APC函數在另外一個線程被調用了;
關於解除關聯: 一旦一個套接字關閉了 , closehandle /closesocket. 就將從IOCP的設備句柄列表中解除關聯了
關於線程:
CreateIoCompletionPort 最後一個參數用於指定IOCP最多執行N個線程(若是是0 則使用默認CPU的核數). 但通常狀況下,我會預留一些額外的線程.好比
個人CPU是4核即IOCP最多可以使用 4個線程 , 不過通常狀況下會建立 8 個線程,給IOCP預留 額外4個線程 . 緣由是若是IOCP
有5個任務已經完成, 最多隻有4個線程被喚醒. 若是其中某個線程調用了WaitForSingleObject 之類的函數 ,此時IOCP將喚醒額外的線程來處理第5個任務;
先補充一下. 對於WSARecv / WSASend 的OVERLAPPED操做,簡稱爲投遞操做.意思是讓操做系統去幹活,至於何時幹完.
GetQueuedCompletionStatus 會通知你(即返回) . 所以所以, 須要注意, 這些參數像WSABUF 和 OVERLAPPED 必定要 new / malloc在堆中;
代碼中都有註釋: 另代碼中有不少返回都沒判斷.這個例子僅僅解釋如何編寫IOCP
#include "stdafx.h" #include <process.h> #include "../utils.h" //包含了一些宏和一些打印錯誤信息的函數. #define BUFFSIZE 8192 #define Read 0 #define Write 1 //自定義數據 . 注意 結構的地址 與 第一個成員的地址相同 struct IOData { WSAOVERLAPPED overlapped; //每一個io操做都須要獨立的一個overlapped WSABUF wsabuf; //讀寫各一份 int rw_mode; //判斷讀寫操做 char * buf; //真正存放數據的地方, 須要初始化 }; //自定義數據.保存客戶套接字和地址 struct SocketData { SOCKET hClientSocket; //客戶端套接字 SOCKADDR_IN clientAddr; IOData * pRead; //2個指針,只是爲了在線程中方便使用添加的 IOData * pWrite; }; //用於交換2個buf int swapBuf(WSABUF * a, WSABUF * b) { BOOL ret = FALSE; if (a && b){ char * buf = a->buf; a->buf = b->buf; b->buf = buf; ret = TRUE; } return ret; } //釋放內存,解除關聯 void freeMem(SocketData * pSockData) { closesocket(pSockData->hClientSocket); free(pSockData->pRead->buf); free(pSockData->pWrite->buf); free(pSockData->pWrite); free(pSockData->pRead); free(pSockData); } unsigned int WINAPI completeRoutine(void * param) { //完成端口 HANDLE hCom = (HANDLE)param; SocketData * pSockData = NULL; IOData * pIOData = NULL; DWORD flags = 0, bytes = 0; BOOL ret = 0; SOCKET hClientSocket = NULL; printf("tid:%ld start!\n", GetCurrentThreadId()); while (1) { flags = 0; //直到有任務完成即返回 ret = GetQueuedCompletionStatus(hCom, &bytes, (PULONG_PTR)&pSockData, (LPOVERLAPPED * )&pIOData, INFINITE); printf("GetQueuedCompletionStatus : %d , diy key : %p , pIOData:%p,mode:%d\n", ret, pSockData, pIOData,pIOData->rw_mode); //若是成功了 if (ret) { hClientSocket = pSockData->hClientSocket; //若是是WSARecv的 if (Read == pIOData->rw_mode) { printf("READ - > bytesRecved:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh); //對端關閉 if (0 == bytes) { printf("peer closed\n"); freeMem(pSockData); //釋放內存 continue; } //測試數據 pSockData->pRead->buf[bytes] = 0; printf("Read buf:%s\n", pSockData->pRead->buf); //交換指針, 把recv的buf 給 write的buf; //把write的buf交換給recv . 若是併發量不大的時候能夠這麼作 swapBuf(&pIOData->wsabuf, &pSockData->pWrite->wsabuf); //回傳操做.清空write OVERLAPPED memset(&pSockData->pWrite->overlapped, 0, sizeof(WSAOVERLAPPED)); pSockData->pWrite->wsabuf.len = bytes; WSASend(hClientSocket, &pSockData->pWrite->wsabuf, 1, NULL, 0, &pSockData->pWrite->overlapped, NULL); //再次投遞一個recv操做,等待下次客戶端發送 memset(&pSockData->pRead->overlapped, 0, sizeof(WSAOVERLAPPED)); pSockData->pRead->wsabuf.len = BUFFSIZE; WSARecv(hClientSocket, &pSockData->pRead->wsabuf, 1, NULL, &flags, &pSockData->pRead->overlapped, NULL); } else { // send 完成. printf("Send finsished - > bytes:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh); memset(&pIOData->overlapped, 0, sizeof(WSAOVERLAPPED)); } } else{ //一旦出錯, 解除綁定即刪除內存 print_error(GetLastError()); freeMem(pSockData); } } return 0; } int _tmain(int argc, _TCHAR* argv[]) { WSADATA wsadata; if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0){ print_error(WSAGetLastError()); return 0; } SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); //指定線程數量. 通常 processors * 2 const DWORD nThreads = sysinfo.dwNumberOfProcessors * 2; //建立一個完成端口 , 前3個參數保證了建立一個獨立的完成端口, 最後一個參數指定了完成 //端口可以使用的線程數. 0 使用當前cpu核數 HANDLE hCom = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); //準備一些線程供完成端口調用, 把完成端口同時傳入 HANDLE * arr_threads = new HANDLE[nThreads]; for (int i = 0; i < sysinfo.dwNumberOfProcessors; ++i) arr_threads[i] = (HANDLE)_beginthreadex(NULL, 0, completeRoutine, (void*)hCom, 0, NULL); //建立一個支持OVERLAPPED的socket.這樣的屬性將被 accept 返回的socket所繼承 SOCKET hListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); SOCKADDR_IN serv_addr, client_addr; memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(PORT); serv_addr.sin_addr.s_addr = INADDR_ANY; bind(hListenSocket, (SOCKADDR*)&serv_addr, sizeof(serv_addr)); listen(hListenSocket, BACKLOG); SOCKET client_socket; int client_addr_size = 0; DWORD flags = 0; while (1){ client_addr_size = sizeof(client_addr); flags = 0; client_socket = accept(hListenSocket, (SOCKADDR*)&client_addr, &client_addr_size); puts("accepted"); //準備一份數據, 用於保存clientsocket, addr, 以及讀寫指針; SocketData * pSockData = (SocketData *)malloc(sizeof(SocketData)); pSockData->pRead = NULL; pSockData->pWrite = NULL; pSockData->hClientSocket = client_socket; memcpy(&pSockData->clientAddr, &client_addr, client_addr_size); //準備數據 IOData * pRead = (IOData *)malloc(sizeof(IOData)); //對於OVERLAPPED,須要額外注意,清0 memset(&pRead->overlapped, 0, sizeof(WSAOVERLAPPED)); pRead->buf = (char *)malloc(BUFFSIZE); pRead->rw_mode = Read; pRead->wsabuf.buf = pRead->buf; pRead->wsabuf.len = BUFFSIZE; pSockData->pRead = pRead; IOData *pWrite = (IOData *)malloc(sizeof(IOData)); pWrite->buf = (char *)malloc(BUFFSIZE); memset(&pWrite->overlapped, 0, sizeof(WSAOVERLAPPED)); pWrite->rw_mode = Write; pWrite->wsabuf.buf = pWrite->buf; pWrite->wsabuf.len = BUFFSIZE; pSockData->pWrite = pWrite; //與iocp關聯在一塊兒. 注意第3個參數, 把自定義數據一塊兒傳遞過去 CreateIoCompletionPort((HANDLE)client_socket, hCom, (DWORD)pSockData, 0); WSARecv(client_socket, &pRead->wsabuf, 1, NULL, &flags, &pRead->overlapped, NULL); } return 0; }