客戶IO處理,是在工做線程,_WorkerThreadProc中完成的app
函數,在完成端口上調用GetQueuedCompletionStatus函數等待IO完成,並調用自定義函數HandleIO來處理IO,具體代碼以下:socket
DOWRD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam) { #ifdef _DEBUG ::OutputDebugString("Worker Thread startup...\n"); #endif //_DEBUG CIOCPServer *pThis = (CIOCPServer*)lpParam; CIOCPBuffer *pBuffer; DWORD dwKey; DWORD dwTrans; LPOVERLAPPED lpol; while(TRUE) { //在關聯到此完成端口的全部套接字上等待IO完成 BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,&dwTrans,(LPDWORD)&dwKey,(LPOVERLAPPED)&lpol,WSA_INFINITE); if(dwTrans == -1) { #ifdef _DEBUG ::OutputDebugString("Worker Thread startup...\n"); #endif //_DEBUG ::ExitThread(0); } pBuffer=CONTAINING_RECORD(lpol,CIOCPBuffer,ol); int nError = NO_ERROR; if(!bOK) { SOCKET s; if(pBuffer->nOperation == OP_ACCEPT) { s=pThis->m_sListen; } else { if(dwKey == 0) break; s=((CIOCPContext*)dwKey)->s; } DWORD dwFlags = 0; if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags)) { nError = ::WSAGetLastError(); } } pThis->HandleIO(dwKey,pBuffer,dwTrans,nError); } #ifdef _DEBUG ::OutputDebugString("Worker Thread out...\n"); #endif //_DEBUG return 0; }
SendText成員函數用於在鏈接上發送數據,執行時先申請一個緩衝區對象,把用戶將要發送的數據複製到裏面,而後調用postSend成員函數投遞這個緩衝區對象函數
BOOL CIOCPServer::SendText(CIOCPContext *pContext,char *pszText,int nLen) { CIOCPBuffer *pBuffer = AllocateBuffer(nLen); if(pBuffer != NULL) { memcpy(pBuffer->buff,pszText,nLen); return PostSend(pContext,pBuffer); } return FALSE; }
下面的HandleIO函數是關鍵,post
處理完成的IO,投遞新的IO請求,釋放完成的緩衝區對象,關閉客戶上下文對象spa
下面是主要的實現代碼:線程
void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer,DOWRD dwTrans,int nError) { CIOCPContext *pContext = (CIOCPContext*)dwKey; #ifdef _DEBUG ::OutputDebugString("HandleIO startup..\n"); #endif //_DEBUG //減小套接字未決IO計數 if(pContext!=NULL) { ::EnterCriticalSection(&pContext->Lock); if(pBuffer->nOperation == OP_READ) pContext->nOutstandingRecv--; else if(pBuffer->nOperation == OP_WRITE) pContext->nOutstandingSend--; ::LeaveCriticalSection(&pContext->Lock); //檢查套接字是否已經打開 if(pContext->bClosing) { #ifdef _DEBUG ::OutputDebugString("HandleIO startup..\n"); #endif //_DEBUG if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0) { ReleaseContext(pContext); } //釋放已關閉套接字的未決IO ReleaseBuffer(pBuffer); return; } } else { RemovePendingAccept(pBuffer); } //檢查套接字上發生的錯誤,而後直接關閉套接字 if(nError!=NO_ERROR) { if(pBuffer->nOperation != OP_ACCEPT) { OnConnectionError(pContext,pBuffer,nError); CloseAConnection(pContext); if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0) { ReleaseContext(pContext); } #ifdef _DEBUG ::OutputDebugString("HandleIO startup..\n"); #endif //_DEBUG } else//在監聽套接字上發生錯誤 { if(pBuffer->sClient != INVALID_SOCKET) { ::closesocket(pBuffer->sClient); pBuffer->sClient = INVALID_SOCKET; } #ifdef _DEBUG ::OutputDebugString("HandleIO startup..\n"); #endif //_DEBUG } ReleaseBuffer(pBuffer); return; } //開始處理 if(pBuffer->nOperation == OP_ACCEPT) { if(dwTrans == 0) { #ifdef _DEBUG ::OutputDebugString("HandleIO startup..\n"); #endif //_DEBUG if(pBuffer->sClient != INVALID_SOCKET) { ::closesocket(pBuffer->sClient); pBuffer->sClient = INVALID_SOCKET; } } else { //爲接收新鏈接的申請客戶上下文對象 CIOCPContext *pClient = AllocateContext(pBuffer->sClient); if(pClient != NULL) { if(AddAConnection(pCliebt)) { //取得用戶地址 int nLocalLen,nRmoteLen; m_lpfnGetAcceptExSockaddrs( pBuffer->buff, pBuffer->nLen-((sizeof(sockaddr_in)+16)*2), sizeof(sockaddr_in)+16, sizeof(sockaddr_in)+16, (SOCKADDR **)&pLocalAddr, &nLocalLen, (SOCKADDR **)&pRemoteAddr, &nRmoteLen); memcpy(&pClient->addrLocal,pLocalAddr,nLocalLen); memcpy(&pClient->addrRemote,pRemoteAddr,nRmoteLen); //關聯新鏈接到完成端口對象 ::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion,(DWORD)pClient,0); //通知用戶 pBuffer->nLen = dwTrans; OnConnectionEstablished(pClient,pBuffer); //向新鏈接投遞Read請求 for(int i=0;i<5;i++) { CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE); if(p != NULL) { if(!PostRecv(pClient,p)) { CloseAConnection(pClient); break; } } } } else { CloseAConnection(pClient); ReleaseContext(pClient); } } else { //資源不足,關閉與客戶的鏈接便可 ::closesocket(pBuffer->sClient); pBuffer->sClient = INVALID_SOCKET; } } //Accept請求完成,釋放IO緩衝區 ReleaseBuffer(pBuffer); //通知監聽線程繼續再投遞一個Accept請求 ::InterlockedDecrement(&m_nRepostCount); ::SetEvent(m_hRepostEvent); } else if(pBuffer->nOperation == OP_READ) { if(dwTrans == 0) { //先通知用戶 pBuffer->nLen = 0; OnConnectionClosing(pContext,pBuffer); //再關閉鏈接 CloseAConnection(pContext); //釋放客戶上下文和緩衝區對象 if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0) { ReleaseContext(pContext); } ReleaseBuffer(pBuffer); } else { pBuffer->nLen = dwTrans; //按照IO投遞的順序讀取接收到的數據 CIOCPBuffer *p = GetNextReadBuffer(pContext,pBuffer); while(p!=NULL) { OnReadCompleted(pContext,p); //增長要讀的序列號的值 ::InterlockedDecrement((LONG*)pContext->nCurrentReadSequence); //釋放IO ReleaseBuffer(p); p = GetNextReadBuffer(pContext,NULL); } //繼續投遞一個新的請求 pBuffer = AllocateBuffer(BUFFER_SIZE); if(pBuffer==NULL || !PostRecv(pContext,pBuffer)) { CloseAConnection(pContext); } } } else if(pBuffer->nOperation == OP_WRITE) { if(dwTrans == 0) { //先通知用戶 pBuffer->nLen = 0; OnConnectionClosing(pContext,pBuffer); //再關閉鏈接 CloseAConnection(pContext); //釋放客戶上下文和緩衝區對象 if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0) { ReleaseContext(pContext); } ReleaseBuffer(pBuffer); } else { //寫操做完成,通知用戶 pBuffer->nLen = dwTrans; OnWriteCompleted(pContext,pBuffer); //釋放SendText函數申請緩衝區 ReleaseBuffer(pBuffer); } } }