最近都在作windows socket相關的東西,使用IOCP其中仍是遇到了一些問題,固然遇到問題就要嘗試解決問題,這也是一個學習的過程。php
IOCP能夠說是windows 上性能最好的網絡模型了,具體IOCP,就不介紹了,Google,baidu一下,你就知道了。ios
玉哥在用個人網絡接口,發大量數據時,發現,數據對不上,即收包量會增多,內容是不會丟的,因爲我是嚴格控制服務器端的收包大小,和收包量,因此也感受很奇怪。windows
先是懷疑WSASend連續調用,會有問題,但大體看了看這篇文章(http://bbs.pediy.com/showthread.php?p=826108),這文章寫的很給力,因此又以爲不會有問題,因而我就本身開始寫了一個測試程序。緩存
固然也不是寫了就立刻能發現問題,調了一下午加一個晚上,發現了問題所在。服務器
問題在WSARecv,這個東西它竟然沒填滿我要求的緩存,就給完成端口發了通知,致使客戶端發過來的數據,被截開了,因此包的數量就增長了,可是因爲是TCP,因此內容,順序都沒問題。網絡
接下來講說如何解決問題:多線程
BOOL bIORet = GetQueuedCompletionStatus(CompPort,app
&dwIoSize,socket
(LPDWORD)&pCompletionKey,性能
&lpOverlapped,
INFINITE);
關鍵看第二個參數,dwIoSize,它告訴你,你實際接收到了多少數據,經過它,你就應該知道還要繼續接受多少數據了吧,明白人,應該就知道該如何作了,不明白的看個人代碼吧。
注意:WSARecv,最好不要連續調用,特別是在多線程裏連續調用,由於後果可能會沒法預料,這個應該仍是根據需求而定,也許有的狀況下能夠。
剩下的就是貼代碼了,時間緊任務重,加上本人比較水,因此代碼質量不高,多多包涵,同時還請各位朋友給出建議,提出批評,你們一塊兒學習,共同進步。
client:
#include<iostream> #include<fstream> #include<WinSock2.h> #include<Windows.h> #include<tchar.h> using namespace std; #define BUF_TIMES 10 struct CompletionKey{SOCKET s;}; typedef struct io_operation_data { WSAOVERLAPPED overlapped; //重疊結構 WSABUF wsaBuf; //發送接收緩衝區 }IO_OPERATION_DATA, *PIO_OPERATION_DATA; SOCKET sClient; //char (*data)[1024]; DWORD WINAPI ServiceThreadProc(PVOID pParam); int main() { HRESULT hr=CoInitializeEx(NULL,COINIT_MULTITHREADED); if(FAILED(hr)) { MessageBox(NULL,_T("Initialize COM failed!"),_T("error"),MB_OK); //cout<<"Initialize COM failed!"<<endl; //return false; } WORD wVersionRequest; WSADATA wsaData; wVersionRequest=MAKEWORD(2,2); int nErrCode=WSAStartup(wVersionRequest,&wsaData); if(nErrCode!=0) { //cout<<" start up error!"<<endl; MessageBox(NULL,_T("start up error!"),_T("error"),MB_OK); } if(LOBYTE(wsaData.wVersion)!=2||HIBYTE(wsaData.wVersion)!=2) { //cout<<" lib is not 2.2!"<<endl; MessageBox(NULL,_T("lib is not 2.2!"),_T("error"),MB_OK); WSACleanup(); } sClient=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);//socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET==sClient) { MessageBox(NULL,_T("socket error!"),_T("error"),MB_OK); } //獲取系統默認的發送數據緩衝區大小 unsigned int uiRcvBuf; int uiRcvBufLen = sizeof(uiRcvBuf); nErrCode= getsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, &uiRcvBufLen); if (SOCKET_ERROR == nErrCode) { //cout<<"獲取系統默認的發送數據緩衝區大小failed!"<<endl; MessageBox(NULL,_T("獲取系統默認的發送數據緩衝區大小failed!"),_T("error"),MB_OK); //return false; } //設置系統發送數據緩衝區爲默認值的BUF_TIMES倍 uiRcvBuf *= BUF_TIMES; nErrCode = setsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, uiRcvBufLen); if (SOCKET_ERROR == nErrCode) { //cout<<"修改系統發送數據緩衝區失敗!"<<endl; MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK); } //檢查設置系統發送數據緩衝區是否成功 unsigned int uiNewRcvBuf; nErrCode=getsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiNewRcvBuf, &uiRcvBufLen); if (SOCKET_ERROR == nErrCode || uiNewRcvBuf != uiRcvBuf) { // cout<<"修改系統發送數據緩衝區失敗!"<<endl; MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK); } HANDLE CompPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (CompPort == NULL) { MessageBox(NULL,_T("建立完成端口失敗!"),_T("error"),MB_OK); //WSACleanup(); //return ; } CompletionKey iCompletionKey; iCompletionKey.s=sClient; if (CreateIoCompletionPort((HANDLE)sClient,CompPort,(DWORD)&iCompletionKey,0) == NULL) { //出錯處理。。 MessageBox(NULL,_T("關聯完成端口失敗!"),_T("error"),MB_OK); } //addrHost SOCKADDR_IN addrHost; addrHost.sin_family=AF_INET; addrHost.sin_port=htons(8800); addrHost.sin_addr.S_un.S_addr=inet_addr("192.168.0.137"); int retVal=connect(sClient,(sockaddr*)&addrHost,sizeof(sockaddr)); if(retVal==SOCKET_ERROR) { MessageBox(NULL,_T("connect error!"),_T("error"),MB_OK); //return false; } cout<<"connect success"<<endl; HANDLE hThread=CreateThread(NULL,0,ServiceThreadProc,NULL,0,NULL); DWORD dwIoSize=-1; //傳輸字節數 LPOVERLAPPED lpOverlapped=NULL; //重疊結構指針 CompletionKey* pCompletionKey=NULL; PIO_OPERATION_DATA pIO=NULL; int count=0; ofstream fos; fos.open("c.txt"); while(1) { BOOL bIORet = GetQueuedCompletionStatus(CompPort, &dwIoSize, (LPDWORD)&pCompletionKey, &lpOverlapped, INFINITE); //失敗的操做完成 if (FALSE == bIORet && NULL != pCompletionKey) { //客戶端斷開 } //成功的操做完成 if(bIORet && lpOverlapped && pCompletionKey) { cout<<"count: "<<count+1<<" " <<"number: "<<dwIoSize<<endl; pIO = CONTAINING_RECORD(lpOverlapped,IO_OPERATION_DATA,overlapped); fos.write(pIO->wsaBuf.buf,10240*5); fos<<endl; if(count==99) break; count++; } } WaitForSingleObject(hThread,INFINITE); cout<<"main end: "<<count+1<<endl; fos.close(); Sleep(5000); CoUninitialize(); WSACleanup(); return 0; } DWORD WINAPI ServiceThreadProc(PVOID pParam) { char (*a)[10240*5]=new char[100][10240*5]; for(int i=0;i<100;i++) { if(i%3==0) memset(a[i],'a',10240*5); else if(i%3==1) memset(a[i],'b',10240*5); else if(i%3==2) memset(a[i],'c',10240*5); } DWORD flags = 0; //標誌 DWORD sendBytes =0; //發送字節數 int count=0; while(count<100) { PIO_OPERATION_DATA pIO = new IO_OPERATION_DATA; ZeroMemory(pIO,sizeof(IO_OPERATION_DATA)); pIO->wsaBuf.buf=a[count]; pIO->wsaBuf.len=10240*5; //Sleep(30); if(WSASend(sClient,&(pIO->wsaBuf),1,&sendBytes,flags,&(pIO->overlapped),NULL)== SOCKET_ERROR) { if(ERROR_IO_PENDING != WSAGetLastError())//發起重疊操做失敗 { cout<<"send failed"<<endl; } } count++; } cout<<"thread func: "<<count<<endl; Sleep(10000); return 0; }
server:
#include<iostream> #include<vector> #include<fstream> #include<WinSock2.h> #include<Windows.h> #include<tchar.h> using namespace std; #define BUF_TIMES 10 struct CompletionKey{SOCKET s;}; typedef struct io_operation_data { WSAOVERLAPPED overlapped; //重疊結構 WSABUF wsaBuf; //發送接收緩衝區 }IO_OPERATION_DATA, *PIO_OPERATION_DATA; int main() { HRESULT hr=CoInitializeEx(NULL,COINIT_MULTITHREADED); if(FAILED(hr)) { MessageBox(NULL,_T("Initialize COM failed!"),_T("error"),MB_OK); //cout<<"Initialize COM failed!"<<endl; //return false; } WORD wVersionRequest; WSADATA wsaData; wVersionRequest=MAKEWORD(2,2); int nErrCode=WSAStartup(wVersionRequest,&wsaData); if(nErrCode!=0) { //cout<<" start up error!"<<endl; MessageBox(NULL,_T("start up error!"),_T("error"),MB_OK); } if(LOBYTE(wsaData.wVersion)!=2||HIBYTE(wsaData.wVersion)!=2) { //cout<<" lib is not 2.2!"<<endl; MessageBox(NULL,_T("lib is not 2.2!"),_T("error"),MB_OK); WSACleanup(); } SOCKET sListen=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);//socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET==sListen) { MessageBox(NULL,_T("socket error!"),_T("error"),MB_OK); } //獲取系統默認的發送數據緩衝區大小 unsigned int uiRcvBuf; int uiRcvBufLen = sizeof(uiRcvBuf); nErrCode= getsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, &uiRcvBufLen); if (SOCKET_ERROR == nErrCode) { //cout<<"獲取系統默認的發送數據緩衝區大小failed!"<<endl; MessageBox(NULL,_T("獲取系統默認的發送數據緩衝區大小failed!"),_T("error"),MB_OK); //return false; } //設置系統發送數據緩衝區爲默認值的BUF_TIMES倍 uiRcvBuf *= BUF_TIMES; nErrCode = setsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, uiRcvBufLen); if (SOCKET_ERROR == nErrCode) { //cout<<"修改系統發送數據緩衝區失敗!"<<endl; MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK); } //檢查設置系統發送數據緩衝區是否成功 unsigned int uiNewRcvBuf; nErrCode=getsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiNewRcvBuf, &uiRcvBufLen); if (SOCKET_ERROR == nErrCode || uiNewRcvBuf != uiRcvBuf) { // cout<<"修改系統發送數據緩衝區失敗!"<<endl; MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK); } //cout<<uiNewRcvBuf<<endl; HANDLE CompPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (CompPort == NULL) { MessageBox(NULL,_T("建立完成端口失敗!"),_T("error"),MB_OK); //WSACleanup(); //return ; } SOCKADDR_IN addrHost; addrHost.sin_family=AF_INET; addrHost.sin_port=htons(8800); addrHost.sin_addr.S_un.S_addr=INADDR_ANY; int retVal=bind(sListen,(LPSOCKADDR)&addrHost, sizeof(SOCKADDR_IN)); if(SOCKET_ERROR==retVal) { //cout<<"bind error!"<<endl; MessageBox(NULL,_T("bind error!"),_T("error"),MB_OK); //closesocket(this->m_sListen); //return false; } retVal=listen(sListen,5); if(SOCKET_ERROR==retVal) { MessageBox(NULL,_T("listen error!"),_T("error"),MB_OK); } sockaddr_in addrClient; int addrLen=sizeof(sockaddr_in); SOCKET sClient=accept(sListen,(sockaddr*)&addrClient,&addrLen); cout<<"come in"<<endl; CompletionKey iCompletionKey; iCompletionKey.s=sClient; //關聯完成端口 if (CreateIoCompletionPort((HANDLE)sClient,CompPort,(DWORD)(&iCompletionKey),0) == NULL) { //出錯處理。。 MessageBox(NULL,_T("關聯完成端口失敗!"),_T("error"),MB_OK); } DWORD dwIoSize=-1; //傳輸字節數 LPOVERLAPPED lpOverlapped=NULL; //重疊結構指針 CompletionKey* pCompletionKey=NULL; DWORD flags = 0; //標誌 DWORD recvBytes =0; //接收字節數 PIO_OPERATION_DATA pIO=NULL; int count=0; pIO=new IO_OPERATION_DATA; ZeroMemory(pIO, sizeof(IO_OPERATION_DATA)); pIO->wsaBuf.buf=new char[10240*5]; pIO->wsaBuf.len=10240*5; if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR) { if(ERROR_IO_PENDING != WSAGetLastError()) { cout<<"recv failed"<<endl; } } vector<IO_OPERATION_DATA*> vecCache; ofstream fos; fos.open("s.txt"); while(1) { BOOL bIORet = GetQueuedCompletionStatus(CompPort, &dwIoSize, (LPDWORD)&pCompletionKey, &lpOverlapped, INFINITE); //失敗的操做完成 if (FALSE == bIORet && NULL != pCompletionKey) { //客戶端斷開 } //成功的操做完成 if(bIORet && lpOverlapped && pCompletionKey) { pIO = CONTAINING_RECORD(lpOverlapped,IO_OPERATION_DATA,overlapped); if(dwIoSize<pIO->wsaBuf.len) { vecCache.push_back(pIO); int size=pIO->wsaBuf.len; pIO->wsaBuf.len=dwIoSize; pIO=new IO_OPERATION_DATA; ZeroMemory(pIO, sizeof(IO_OPERATION_DATA)); pIO->wsaBuf.buf=new char[size-dwIoSize]; pIO->wsaBuf.len=size-dwIoSize; if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR) { if(ERROR_IO_PENDING != WSAGetLastError()) { cout<<"recv failed"<<endl; } } } else { if(vecCache.size()!=0) { char*p=new char[10240*5]; int size=0; for(vector<IO_OPERATION_DATA*>::iterator it=vecCache.begin(); it!=vecCache.end();it++) { memcpy(p+size,(*it)->wsaBuf.buf,(*it)->wsaBuf.len); size+=(*it)->wsaBuf.len; //清理資源 delete []((*it)->wsaBuf.buf); delete (*it); } memcpy(p+size,pIO->wsaBuf.buf,pIO->wsaBuf.len); count++; fos.write(p,10240*5); fos<<endl; vecCache.clear(); cout<<"count: "<<count<<endl; //清理資源 delete [](pIO->wsaBuf.buf); delete pIO; delete []p; } else { count++; fos.write(pIO->wsaBuf.buf,10240*5); fos<<endl; cout<<"count: "<<count <<" number: "<<dwIoSize<<endl; //清理資源 delete [](pIO->wsaBuf.buf); delete pIO; } pIO=new IO_OPERATION_DATA; ZeroMemory(pIO, sizeof(IO_OPERATION_DATA)); pIO->wsaBuf.buf=new char[10240*5]; pIO->wsaBuf.len=10240*5; if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR) { if(ERROR_IO_PENDING != WSAGetLastError()) { cout<<"recv failed"<<endl; } } } if(count==100) break; } } cout<<"main end: "<<count<<endl; fos.close(); Sleep(10000); CoUninitialize(); WSACleanup(); return 0; }