IOCP (關於WSASend,WSARecv調用)

最近都在作windows socket相關的東西,使用IOCP其中仍是遇到了一些問題,固然遇到問題就要嘗試解決問題,這也是一個學習的過程。php

IOCP能夠說是windows 上性能最好的網絡模型了,具體IOCP,就不介紹了,Google,baidu一下,你就知道了。ios

玉哥在用個人網絡接口,發大量數據時,發現,數據對不上,即收包量會增多,內容是不會丟的,因爲我是嚴格控制服務器端的收包大小,和收包量,因此也感受很奇怪。windows

先是懷疑WSASend連續調用,會有問題,但大體看了看這篇文章(http://bbs.pediy.com/showthread.php?p=826108),這文章寫的很給力,因此又以爲不會有問題,因而我就本身開始寫了一個測試程序。緩存

固然也不是寫了就立刻能發現問題,調了一下午加一個晚上,發現了問題所在。服務器

問題在WSARecv,這個東西它竟然沒填滿我要求的緩存,就給完成端口發了通知,致使客戶端發過來的數據,被截開了,因此包的數量就增長了,可是因爲是TCP,因此內容,順序都沒問題。網絡

接下來講說如何解決問題:多線程

BOOL bIORet = GetQueuedCompletionStatus(CompPort,app

                                    &dwIoSize,socket

                                    (LPDWORD)&pCompletionKey,性能

                                    &lpOverlapped,

                                    INFINITE);

關鍵看第二個參數,dwIoSize,它告訴你,你實際接收到了多少數據,經過它,你就應該知道還要繼續接受多少數據了吧,明白人,應該就知道該如何作了,不明白的看個人代碼吧。

 

注意:WSARecv,最好不要連續調用,特別是在多線程裏連續調用,由於後果可能會沒法預料,這個應該仍是根據需求而定,也許有的狀況下能夠。

 

剩下的就是貼代碼了,時間緊任務重,加上本人比較水,因此代碼質量不高,多多包涵,同時還請各位朋友給出建議,提出批評,你們一塊兒學習,共同進步。

client:

#include<iostream>  
#include<fstream>  
#include<WinSock2.h>  
#include<Windows.h>  
#include<tchar.h>  
using namespace std;  
#define BUF_TIMES 10  
struct CompletionKey{SOCKET s;};  
typedef struct io_operation_data  
{  
    WSAOVERLAPPED   overlapped;             //重疊結構  
    WSABUF      wsaBuf;                  //發送接收緩衝區  
}IO_OPERATION_DATA, *PIO_OPERATION_DATA;  
SOCKET sClient;  
//char (*data)[1024];  
DWORD WINAPI ServiceThreadProc(PVOID pParam);  
int main()  
{  
    HRESULT  hr=CoInitializeEx(NULL,COINIT_MULTITHREADED);  
    if(FAILED(hr))  
    {  
        MessageBox(NULL,_T("Initialize COM failed!"),_T("error"),MB_OK);  
        //cout<<"Initialize COM failed!"<<endl;  
        //return false;  
    }  
    WORD wVersionRequest;  
    WSADATA wsaData;  
    wVersionRequest=MAKEWORD(2,2);  
    int nErrCode=WSAStartup(wVersionRequest,&wsaData);  
    if(nErrCode!=0)  
    {  
        //cout<<" start up error!"<<endl;  
        MessageBox(NULL,_T("start up error!"),_T("error"),MB_OK);  
    }  
    if(LOBYTE(wsaData.wVersion)!=2||HIBYTE(wsaData.wVersion)!=2)  
    {  
        //cout<<" lib is not 2.2!"<<endl;  
        MessageBox(NULL,_T("lib is not 2.2!"),_T("error"),MB_OK);  
        WSACleanup();  
    }  
    sClient=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);//socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
    if(INVALID_SOCKET==sClient)  
    {  
        MessageBox(NULL,_T("socket error!"),_T("error"),MB_OK);  
    }  
  
    //獲取系統默認的發送數據緩衝區大小  
    unsigned int uiRcvBuf;  
    int uiRcvBufLen = sizeof(uiRcvBuf);  
    nErrCode= getsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, &uiRcvBufLen);  
    if (SOCKET_ERROR == nErrCode)  
    {  
        //cout<<"獲取系統默認的發送數據緩衝區大小failed!"<<endl;  
        MessageBox(NULL,_T("獲取系統默認的發送數據緩衝區大小failed!"),_T("error"),MB_OK);  
        //return false;  
    }  
    //設置系統發送數據緩衝區爲默認值的BUF_TIMES倍  
    uiRcvBuf *= BUF_TIMES;  
    nErrCode = setsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, uiRcvBufLen);  
    if (SOCKET_ERROR == nErrCode)  
    {  
        //cout<<"修改系統發送數據緩衝區失敗!"<<endl;  
        MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK);  
    }  
  
    //檢查設置系統發送數據緩衝區是否成功  
    unsigned int uiNewRcvBuf;  
    nErrCode=getsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiNewRcvBuf, &uiRcvBufLen);  
    if (SOCKET_ERROR == nErrCode || uiNewRcvBuf != uiRcvBuf)  
    {  
//      cout<<"修改系統發送數據緩衝區失敗!"<<endl;  
        MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK);  
    }  
    HANDLE CompPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);  
    if (CompPort == NULL)  
    {  
        MessageBox(NULL,_T("建立完成端口失敗!"),_T("error"),MB_OK);  
        //WSACleanup();  
        //return ;  
    }  
    CompletionKey iCompletionKey;  
    iCompletionKey.s=sClient;  
    if (CreateIoCompletionPort((HANDLE)sClient,CompPort,(DWORD)&iCompletionKey,0) == NULL)  
    {  
        //出錯處理。。  
        MessageBox(NULL,_T("關聯完成端口失敗!"),_T("error"),MB_OK);  
    }  
    //addrHost    
    SOCKADDR_IN addrHost;  
    addrHost.sin_family=AF_INET;  
    addrHost.sin_port=htons(8800);  
    addrHost.sin_addr.S_un.S_addr=inet_addr("192.168.0.137");  
  
    int retVal=connect(sClient,(sockaddr*)&addrHost,sizeof(sockaddr));  
    if(retVal==SOCKET_ERROR)  
    {  
        MessageBox(NULL,_T("connect error!"),_T("error"),MB_OK);  
        //return false;  
    }  
    cout<<"connect success"<<endl;  
  
    HANDLE hThread=CreateThread(NULL,0,ServiceThreadProc,NULL,0,NULL);  
      
    DWORD           dwIoSize=-1;        //傳輸字節數  
    LPOVERLAPPED    lpOverlapped=NULL;  //重疊結構指針  
    CompletionKey*  pCompletionKey=NULL;  
    PIO_OPERATION_DATA pIO=NULL;  
    int count=0;  
    ofstream fos;  
    fos.open("c.txt");  
  
    while(1)  
    {  
        BOOL bIORet = GetQueuedCompletionStatus(CompPort,  
                                                &dwIoSize,  
                                                (LPDWORD)&pCompletionKey,  
                                                &lpOverlapped,  
                                                INFINITE);  
        //失敗的操做完成  
        if (FALSE == bIORet && NULL != pCompletionKey)  
        {     
            //客戶端斷開  
        }  
        //成功的操做完成  
        if(bIORet && lpOverlapped && pCompletionKey)  
        {  
            cout<<"count: "<<count+1<<"  "  
                <<"number: "<<dwIoSize<<endl;  
            pIO = CONTAINING_RECORD(lpOverlapped,IO_OPERATION_DATA,overlapped);  
            fos.write(pIO->wsaBuf.buf,10240*5);  
            fos<<endl;  
            if(count==99)  
                break;  
            count++;  
        }  
  
    }  
      
    WaitForSingleObject(hThread,INFINITE);  
    cout<<"main end: "<<count+1<<endl;  
    fos.close();  
    Sleep(5000);  
    CoUninitialize();  
    WSACleanup();  
    return 0;  
}  
DWORD WINAPI ServiceThreadProc(PVOID pParam)  
{  
    char (*a)[10240*5]=new char[100][10240*5];  
    for(int i=0;i<100;i++)  
    {  
        if(i%3==0)  
            memset(a[i],'a',10240*5);  
        else if(i%3==1)  
            memset(a[i],'b',10240*5);  
        else if(i%3==2)  
            memset(a[i],'c',10240*5);  
    }  
    DWORD   flags = 0;          //標誌  
    DWORD   sendBytes =0;       //發送字節數  
    int count=0;  
    while(count<100)  
    {  
        PIO_OPERATION_DATA pIO = new IO_OPERATION_DATA;   
        ZeroMemory(pIO,sizeof(IO_OPERATION_DATA));  
        pIO->wsaBuf.buf=a[count];  
        pIO->wsaBuf.len=10240*5;  
          
        //Sleep(30);  
        if(WSASend(sClient,&(pIO->wsaBuf),1,&sendBytes,flags,&(pIO->overlapped),NULL)== SOCKET_ERROR)  
        {  
            if(ERROR_IO_PENDING != WSAGetLastError())//發起重疊操做失敗  
            {  
                cout<<"send failed"<<endl;  
            }  
        }  
        count++;  
    }  
      
    cout<<"thread func:  "<<count<<endl;  
    Sleep(10000);  
      
    return 0;  
}

server:

#include<iostream>  
#include<vector>  
#include<fstream>  
#include<WinSock2.h>  
#include<Windows.h>  
#include<tchar.h>  
using namespace std;  
#define BUF_TIMES 10  
struct CompletionKey{SOCKET s;};  
typedef struct io_operation_data  
{  
    WSAOVERLAPPED   overlapped;             //重疊結構  
    WSABUF      wsaBuf;                  //發送接收緩衝區  
}IO_OPERATION_DATA, *PIO_OPERATION_DATA;  
int main()  
{  
    HRESULT  hr=CoInitializeEx(NULL,COINIT_MULTITHREADED);  
    if(FAILED(hr))  
    {  
        MessageBox(NULL,_T("Initialize COM failed!"),_T("error"),MB_OK);  
        //cout<<"Initialize COM failed!"<<endl;  
        //return false;  
    }  
    WORD wVersionRequest;  
    WSADATA wsaData;  
    wVersionRequest=MAKEWORD(2,2);  
    int nErrCode=WSAStartup(wVersionRequest,&wsaData);  
    if(nErrCode!=0)  
    {  
        //cout<<" start up error!"<<endl;  
        MessageBox(NULL,_T("start up error!"),_T("error"),MB_OK);  
    }  
    if(LOBYTE(wsaData.wVersion)!=2||HIBYTE(wsaData.wVersion)!=2)  
    {  
        //cout<<" lib is not 2.2!"<<endl;  
        MessageBox(NULL,_T("lib is not 2.2!"),_T("error"),MB_OK);  
        WSACleanup();  
    }  
    SOCKET sListen=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);//socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
    if(INVALID_SOCKET==sListen)  
    {  
        MessageBox(NULL,_T("socket error!"),_T("error"),MB_OK);  
    }  
      
    //獲取系統默認的發送數據緩衝區大小  
    unsigned int uiRcvBuf;  
    int uiRcvBufLen = sizeof(uiRcvBuf);  
    nErrCode= getsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, &uiRcvBufLen);  
    if (SOCKET_ERROR == nErrCode)  
    {  
        //cout<<"獲取系統默認的發送數據緩衝區大小failed!"<<endl;  
        MessageBox(NULL,_T("獲取系統默認的發送數據緩衝區大小failed!"),_T("error"),MB_OK);  
        //return false;  
    }  
    //設置系統發送數據緩衝區爲默認值的BUF_TIMES倍  
    uiRcvBuf *= BUF_TIMES;  
    nErrCode = setsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, uiRcvBufLen);  
    if (SOCKET_ERROR == nErrCode)  
    {  
        //cout<<"修改系統發送數據緩衝區失敗!"<<endl;  
        MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK);  
    }  
    //檢查設置系統發送數據緩衝區是否成功  
    unsigned int uiNewRcvBuf;  
    nErrCode=getsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiNewRcvBuf, &uiRcvBufLen);  
    if (SOCKET_ERROR == nErrCode || uiNewRcvBuf != uiRcvBuf)  
    {  
//      cout<<"修改系統發送數據緩衝區失敗!"<<endl;  
        MessageBox(NULL,_T("修改系統發送數據緩衝區失敗!"),_T("error"),MB_OK);  
    }  
    //cout<<uiNewRcvBuf<<endl;  
    HANDLE CompPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);  
    if (CompPort == NULL)  
    {  
        MessageBox(NULL,_T("建立完成端口失敗!"),_T("error"),MB_OK);  
        //WSACleanup();  
        //return ;  
    }  
    SOCKADDR_IN addrHost;  
    addrHost.sin_family=AF_INET;  
    addrHost.sin_port=htons(8800);  
    addrHost.sin_addr.S_un.S_addr=INADDR_ANY;  
    int retVal=bind(sListen,(LPSOCKADDR)&addrHost, sizeof(SOCKADDR_IN));  
    if(SOCKET_ERROR==retVal)  
    {  
        //cout<<"bind error!"<<endl;  
        MessageBox(NULL,_T("bind error!"),_T("error"),MB_OK);  
        //closesocket(this->m_sListen);  
        //return false;  
    }  
    retVal=listen(sListen,5);  
    if(SOCKET_ERROR==retVal)  
    {  
        MessageBox(NULL,_T("listen error!"),_T("error"),MB_OK);  
          
    }  
    sockaddr_in addrClient;  
    int addrLen=sizeof(sockaddr_in);  
    SOCKET sClient=accept(sListen,(sockaddr*)&addrClient,&addrLen);  
      
    cout<<"come in"<<endl;  
    CompletionKey iCompletionKey;  
    iCompletionKey.s=sClient;  
    //關聯完成端口  
    if (CreateIoCompletionPort((HANDLE)sClient,CompPort,(DWORD)(&iCompletionKey),0) == NULL)  
    {  
        //出錯處理。。  
        MessageBox(NULL,_T("關聯完成端口失敗!"),_T("error"),MB_OK);  
    }  
    DWORD           dwIoSize=-1;        //傳輸字節數  
    LPOVERLAPPED    lpOverlapped=NULL;  //重疊結構指針  
    CompletionKey*  pCompletionKey=NULL;  
    DWORD   flags = 0;      //標誌  
    DWORD   recvBytes =0;   //接收字節數  
    PIO_OPERATION_DATA pIO=NULL;  
    int count=0;  
    pIO=new IO_OPERATION_DATA;  
    ZeroMemory(pIO, sizeof(IO_OPERATION_DATA));  
    pIO->wsaBuf.buf=new char[10240*5];  
    pIO->wsaBuf.len=10240*5;  
    if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR)  
    {  
        if(ERROR_IO_PENDING != WSAGetLastError())  
        {  
            cout<<"recv failed"<<endl;  
        }  
    }  
    vector<IO_OPERATION_DATA*> vecCache;  
    ofstream fos;  
    fos.open("s.txt");  
    while(1)  
    {  
        BOOL bIORet = GetQueuedCompletionStatus(CompPort,  
                            &dwIoSize,  
                            (LPDWORD)&pCompletionKey,  
                            &lpOverlapped,  
                            INFINITE);  
        //失敗的操做完成  
        if (FALSE == bIORet && NULL != pCompletionKey)  
        {     
            //客戶端斷開   
        }  
        //成功的操做完成  
        if(bIORet && lpOverlapped && pCompletionKey)  
        {  
            pIO = CONTAINING_RECORD(lpOverlapped,IO_OPERATION_DATA,overlapped);  
            if(dwIoSize<pIO->wsaBuf.len)  
            {  
                vecCache.push_back(pIO);  
                int size=pIO->wsaBuf.len;  
                pIO->wsaBuf.len=dwIoSize;  
                pIO=new IO_OPERATION_DATA;  
                ZeroMemory(pIO, sizeof(IO_OPERATION_DATA));  
                pIO->wsaBuf.buf=new char[size-dwIoSize];  
                pIO->wsaBuf.len=size-dwIoSize;  
                if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR)  
                {  
                    if(ERROR_IO_PENDING != WSAGetLastError())  
                    {             
                        cout<<"recv failed"<<endl;  
                    }  
                }  
            }  
            else  
            {  
                if(vecCache.size()!=0)  
                {  
                    char*p=new char[10240*5];  
                    int size=0;  
                    for(vector<IO_OPERATION_DATA*>::iterator it=vecCache.begin();  
                        it!=vecCache.end();it++)  
                    {  
                        memcpy(p+size,(*it)->wsaBuf.buf,(*it)->wsaBuf.len);  
                        size+=(*it)->wsaBuf.len;  
                        //清理資源  
                        delete []((*it)->wsaBuf.buf);  
                        delete (*it);  
                    }  
                    memcpy(p+size,pIO->wsaBuf.buf,pIO->wsaBuf.len);  
                    count++;  
                    fos.write(p,10240*5);  
                    fos<<endl;  
                    vecCache.clear();  
                    cout<<"count: "<<count<<endl;  
                    //清理資源  
                    delete [](pIO->wsaBuf.buf);  
                    delete pIO;  
                    delete []p;  
                }  
                else  
                {  
                    count++;  
                    fos.write(pIO->wsaBuf.buf,10240*5);  
                    fos<<endl;  
                    cout<<"count: "<<count  
                        <<"  number: "<<dwIoSize<<endl;  
                    //清理資源  
                    delete [](pIO->wsaBuf.buf);  
                    delete pIO;  
                }  
                    pIO=new IO_OPERATION_DATA;  
                    ZeroMemory(pIO, sizeof(IO_OPERATION_DATA));  
                    pIO->wsaBuf.buf=new char[10240*5];  
                    pIO->wsaBuf.len=10240*5;  
                    if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR)  
                    {  
                        if(ERROR_IO_PENDING != WSAGetLastError())  
                        {             
                            cout<<"recv failed"<<endl;  
                        }  
                    }  
                  
            }  
            if(count==100)  
                break;  
        }  
    }  
      
    cout<<"main end: "<<count<<endl;  
    fos.close();  
    Sleep(10000);  
    CoUninitialize();  
    WSACleanup();  
    return 0;  
}
相關文章
相關標籤/搜索