IOCP全稱I/O Completion Port,中文譯爲I/O完成端口。IOCP是一個異步I/O的Windows API,它能夠高效地將I/O事件通知給應用程序,相似於Linux中的Epoll,關於epoll能夠參考 linux之epoll
IOCP模型屬於一種通信模型,適用於Windows平臺下高負載服務器的一個技術。在處理大量用戶併發請求時,若是採用一個用戶一個線程的方式那將形成CPU在這成千上萬的線程間進行切換,後果是不可想象的。而IOCP完成端口模型則徹底不會如此處理,它的理論是並行的線程數量必須有一個上限-也就是說同時發出500個客戶請求,不該該容許出現500個可運行的線程。目前來講,IOCP完成端口是Windows下性能最好的I/O模型,同時它也是最複雜的內核對象。它避免了大量用戶併發時原有模型採用的方式,極大的提升了程序的並行處理能力。html
一共包括三部分:完成端口(存放重疊的I/O請求),客戶端請求的處理,等待者線程隊列(必定數量的工做者線程,通常採用CPU*2個)linux
完成端口中所謂的[端口]並非咱們在TCP/IP中所提到的端口,能夠說是徹底沒有關係。它其實就是一個通知隊列,由操做系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操做一旦完成,某個能夠對該操做結果進行處理的工做者線程就會收到一則通知。ios
一般狀況下,咱們會在建立必定數量的工做者線程來處理這些通知,也就是線程池的方法。線程數量取決於應用程序的特定須要。理想的狀況是,線程數量等於處理器的數量,不過這也要求任何線程都不該該執行諸如同步讀寫、等待事件通知等阻塞型的操做,以避免線程阻塞。每一個線程都將分到必定的CPU時間,在此期間該線程能夠運行,而後另外一個線程將分到一個時間片並開始執行。若是某個線程執行了阻塞型的操做,操做系統將剝奪其未使用的剩餘時間片並讓其它線程開始執行。也就是說,前一個線程沒有充分使用其時間片,當發生這樣的狀況時,應用程序應該準備其它線程來充分利用這些時間片。windows
基於IOCP的開發是異步IO的,決定了IOCP所實現的服務器的高吞吐量。服務器
經過引入IOCP,會大大減小Thread切換帶來的額外開銷,最小化的線程上下文切換,減小線程切換帶來的巨大開銷,讓CPU把大量的事件用於線程的運行。當與該完成端口相關聯的可運行線程的總數目達到了該併發量,系統就會阻塞,網絡
//功能:建立完成端口和關聯完成端口 HANDLE WINAPI CreateIoCompletionPort( * __in HANDLE FileHandle, // 已經打開的文件句柄或者空句柄,通常是客戶端的句柄 * __in HANDLE ExistingCompletionPort, // 已經存在的IOCP句柄 * __in ULONG_PTR CompletionKey, // 完成鍵,包含了指定I/O完成包的指定文件 * __in DWORD NumberOfConcurrentThreads // 真正併發同時執行最大線程數,通常推介是CPU核心數*2 * );
//建立完成端口句柄 HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
typedef struct{ SOCKET socket;//客戶端socket SOCKADDR_STORAGE ClientAddr;//客戶端地址 }PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
//與socket進行關聯 CreateIoCompletionPort((HANDLE)(PerHandleData -> socket),
completionPort, (DWORD)PerHandleData, 0);
//功能:獲取隊列完成狀態 /* 返回值: 調用成功,則返回非零數值,相關數據存於lpNumberOfBytes、lpCompletionKey、
lpoverlapped變量中。失敗則返回零值。 */ BOOL GetQueuedCompletionStatus( HANDLE CompletionPort, //完成端口句柄 LPDWORD lpNumberOfBytes, //一次I/O操做所傳送的字節數 PULONG_PTR lpCompletionKey, //當文件I/O操做完成後,用於存放與之關聯的CK LPOVERLAPPED *lpOverlapped, //IOCP特定的結構體 DWORD dwMilliseconds); //調用者的等待時間 /*
//用於IOCP的特定函數 typedef struct _OVERLAPPEDPLUS{ OVERLAPPED ol; //一個固定的用於處理網絡消息事件返回值的結構體變量 SOCKET s, sclient; int OpCode; //用來區分本次消息的操做類型(在完成端口的操做裏面,
是以消息通知系統,讀數據/寫數據,都是要發這樣的
消息結構體過去的) WSABUF wbuf; //讀寫緩衝區結構體變量 DWORD dwBytes, dwFlags; //一些在讀寫時用到的標誌性變量 }OVERLAPPEDPLUS;
//功能:投遞一個隊列完成狀態 BOOL PostQueuedCompletionStatus( HANDLE CompletlonPort, //指定想向其發送一個完成數據包的完成端口對象 DW0RD dwNumberOfBytesTrlansferred, //指定—個值,直接傳遞給GetQueuedCompletionStatus
函數中對應的參數 DWORD dwCompletlonKey, //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數 LPOVERLAPPED lpoverlapped, ); //指定—個值,直接傳遞給GetQueuedCompletionStatus
函數中對應的參數
#include <winsock2.h> #include <windows.h> #include <string> #include <iostream> using namespace std; #pragma comment(lib,"ws2_32.lib") #pragma comment(lib,"kernel32.lib") HANDLE g_hIOCP; enum IO_OPERATION{IO_READ,IO_WRITE}; struct IO_DATA{ OVERLAPPED Overlapped; WSABUF wsabuf; int nBytes; IO_OPERATION opCode; SOCKET client; }; char buffer[1024]; DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) { IO_DATA *lpIOContext = NULL; DWORD nBytes = 0; DWORD dwFlags = 0; int nRet = 0; DWORD dwIoSize = 0; void * lpCompletionKey = NULL; LPOVERLAPPED lpOverlapped = NULL; while(1){ GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE); lpIOContext = (IO_DATA *)lpOverlapped; if(dwIoSize == 0) { cout << "Client disconnect" << endl; closesocket(lpIOContext->client); delete lpIOContext; continue; } if(lpIOContext->opCode == IO_READ) // a read operation complete { ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped)); lpIOContext->wsabuf.buf = buffer; lpIOContext->wsabuf.len = strlen(buffer)+1; lpIOContext->opCode = IO_WRITE; lpIOContext->nBytes = strlen(buffer)+1; dwFlags = 0; nBytes = strlen(buffer)+1; nRet = WSASend( lpIOContext->client, &lpIOContext->wsabuf, 1, &nBytes, dwFlags, &(lpIOContext->Overlapped), NULL); if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl; closesocket(lpIOContext->client); delete lpIOContext; continue; } memset(buffer, NULL, sizeof(buffer)); } else if(lpIOContext->opCode == IO_WRITE) //a write operation complete { // Write operation completed, so post Read operation. lpIOContext->opCode = IO_READ; nBytes = 1024; dwFlags = 0; lpIOContext->wsabuf.buf = buffer; lpIOContext->wsabuf.len = nBytes; lpIOContext->nBytes = nBytes; ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped)); nRet = WSARecv( lpIOContext->client, &lpIOContext->wsabuf, 1, &nBytes, &dwFlags, &lpIOContext->Overlapped, NULL); if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl; closesocket(lpIOContext->client); delete lpIOContext; continue; } cout<<lpIOContext->wsabuf.buf<<endl; } } return 0; } void main () { WSADATA wsaData; WSAStartup(MAKEWORD(2,2), &wsaData); SOCKET m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED); sockaddr_in server; server.sin_family = AF_INET; server.sin_port = htons(6000); server.sin_addr.S_un.S_addr = htonl(INADDR_ANY); bind(m_socket ,(sockaddr*)&server,sizeof(server)); listen(m_socket, 8); SYSTEM_INFO sysInfo; GetSystemInfo(&sysInfo); int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2; g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount); //CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0); for( int i=0;i < g_ThreadCount; ++i){ HANDLE hThread; DWORD dwThreadId; hThread = CreateThread(NULL, 0, WorkerThread, 0, 0, &dwThreadId); CloseHandle(hThread); } while(1) { SOCKET client = accept( m_socket, NULL, NULL ); cout << "Client connected." << endl; if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){ cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl; closesocket(client); } else { //post a recv request IO_DATA * data = new IO_DATA; memset(buffer, NULL ,1024); memset(&data->Overlapped, 0 , sizeof(data->Overlapped)); data->opCode = IO_READ; data->nBytes = 0; data->wsabuf.buf = buffer; data->wsabuf.len = sizeof(buffer); data->client = client; DWORD nBytes= 1024 ,dwFlags=0; int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes, &dwFlags, &data->Overlapped, NULL); if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){ cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl; closesocket(client); delete data; } cout<<data->wsabuf.buf<<endl; } } closesocket(m_socket); WSACleanup(); }
#include <iostream> #include <WinSock2.h> using namespace std; #pragma comment(lib,"ws2_32.lib") void main() { WSADATA wsaData; WSAStartup(MAKEWORD(2,2), &wsaData); sockaddr_in server; server.sin_family = AF_INET; server.sin_port = htons(6000); server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); SOCKET client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); int flag; flag = connect(client, (sockaddr*)&server, sizeof(server)); if(flag < 0){ cout<<"error!"<<endl; return; } while(1){ cout<<"sent hello!!!!"<<endl; char buffer[1024]; strcpy(buffer,"hello"); send(client, buffer, 1024, 0); memset(buffer, NULL, sizeof(buffer)); cout<<"recv: "<<endl; int rev = recv(client, buffer, 1024, 0); if(rev == 0) cout<<"recv nothing!"<<endl; cout<<buffer<<endl; Sleep(10000); } closesocket(client); WSACleanup(); }