一套封裝的IOCP通訊模型

爲何採用IOCP(I/O Completion Port)完成端口模型?服務器

    在網絡發展飛速的今天,客戶端的數量日益倍增,如何高效地處理多個客戶端的併發,成爲服務器須要處理的關鍵問題之一。在處理併發的問題上,處理器的作法是採用「一客戶一線程」的方式,一個客戶爲其建立一個線程,若是客戶端數量不少時,線程數也隨之增長,這就須要服務器來協調各線程之間的工做,一旦處理不當,就會引發併發中的一系列問題,甚至可能致使系統癱瘓。IOCP機制能夠較好的處理各個客戶端之間的協調和併發問題,可使系統資源能夠被合理地利用,有效地解決了因多線程競爭所帶來的問題。網絡

 

 

IOCP優缺點:數據結構

優勢:
      ① 幫助維持重複使用的內存池。(與重疊I/O技術有關)
      ② 去除刪除線程建立/終結負擔。
  ③ 利於管理,分配線程,控制併發,最小化的線程上下文切換。
  ④ 優化線程調度,提升CPU和內存緩衝的命中率。
缺點:
     理解以及編碼的複雜度較高。對使用者有必定要求。
                                                        ——摘抄自百度百科
 
 
相關技術:
阻塞與非阻塞:

阻塞調用是指調用結果返回以前,當前線程會被掛起(線程進入非可執行狀態,在這個狀態下,cpu不會給線程分配時間片,即線程暫停運行)。函數只有在獲得結果以後纔會返回。多線程

非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前,該函數不會阻塞當前線程,而會馬上返回。併發

 

同步異步:

若是使用同步的方式來通訊的話,這裏說的同步的方式就是說全部的操做都在一個線程內順序執行完成,這麼作缺點是很明顯的:由於同步的通訊操做會阻塞住來自同一個線程的任何其餘操做,只有這個操做完成了以後,後續的操做才能夠完成;一個最明顯的例子就是我們在MFC的界面代碼中,直接使用阻塞Socket調用的代碼,整個界面都會所以而阻塞住沒有響應!因此咱們不得不爲每個通訊的Socket都要創建一個線程,而異步則沒有這些缺點,它能夠同時完成兩件或兩件以上的事務異步

 

多線程:

單線程在程序執行時,所走的程序路徑按照連續順序排下來,前面的必須處理好,後面的纔會執行。多線程是爲了同步完成多項任務,不是爲了提升運行效率,而是爲了提升資源使用效率來提升系統的效率。線程是在同一時間須要完成多項任務的時候實現的。socket

 

重疊I/O技術:ide

針對每一個I/O操做綁定一個內核事件對象,並將等待事件等待函數等待該事件的受信,當I/O操做完成後系統使得與該操做綁定的事件受信,從而判斷那個操做完成。該技術解決了使一個設備內核對象變爲有信號技術中一個設備只能對應一個操做的不足。函數

 

 

IOCP模型實現流程:優化

(1) 調 CreateIoCompletionPort() 函數建立一個完成端口句柄,並且在通常狀況下,咱們須要且只須要創建這一個完成端口,該句柄直接調用API

(2) 創建工做者(Worker)線程,專門用來和客戶端進行通訊的

(3) 接收連入的Socket鏈接啓動一個獨立的線程,專門用來accept客戶端的鏈接請求

(4) 每當有客戶端連入的時候,咱們就仍是得調用CreateIoCompletionPort()函數,這裏卻不是新創建完成端口了,而是把新連入的Socket(也就是前面所謂的設備句柄),與目前的完成端口綁定在一塊兒。

(5) 前面創建的Worker線程中提交網絡請求,例如WSARecv(),讓系統執行接收數據的操做。同時線程都須要分別調用GetQueuedCompletionStatus() 函數在掃描完成端口的隊列裏是否有網絡通訊的請求存在(例如讀取數據,發送數據等),一旦有的話,就將這個請求從完成端口的隊列中取回來,繼續執行本線程中後面的處理代碼,處理完畢以後,咱們再繼續投遞下一個網絡通訊的請求

 

 

 代碼:

相關數據結構:

 1 typedef struct _PER_HANDLE_DATA
 2 {
 3     SOCKET SockClient;
 4     SOCKADDR_IN addr;
 5 }PER_HANDLE_DATA,*PPER_HANDLE_DATA;
 6 
 7 typedef struct _PER_IO_DATA 
 8 {
 9     OVERLAPPED ol;           // 重疊I/O結構
10     char buf[BUFFER_SIZE];  // 數據緩衝區
11     int nOperationType;     //I/O操做類型
12 #define  OP_READ   1
13 #define  OP_WRITE  2
14 #define  OP_ACCEPT 3
15 }PER_IO_DATA,*PPER_IO_DATA;
View Code

 

(1)初始化Socket

#include <winsock2.h>
#pragma comment(lib, "WS2_32")    // 連接到WS2_32.lib

class CInitSock        
{
public:
    CInitSock(BYTE minorVer = 2, BYTE majorVer = 2)
    {
        // 初始化WS2_32.dll
        WSADATA wsaData;
        WORD sockVersion = MAKEWORD(minorVer, majorVer);
        if(::WSAStartup(sockVersion, &wsaData) != 0)
        {
            exit(0);
        }
    }
    ~CInitSock()
    {    
        ::WSACleanup();    
        TRACE0("WSACleanup()");
    }
};
View Code

 

(2)用Listen類封裝監聽線程:CreateIoCompletionPort()建立完成端口句柄,監聽端口,投遞WSARecv(接收數據)

BOOL CListen::Initial()
{
    m_sListen=socket(AF_INET,SOCK_STREAM,0);
    SOCKADDR_IN addrSrv;
    addrSrv.sin_family=AF_INET;
    addrSrv.sin_port=::ntohs(6666);      ///////////////
    addrSrv.sin_addr.S_un.S_addr=INADDR_ANY;
    if (::bind(m_sListen,(SOCKADDR*)&addrSrv,sizeof(addrSrv))==SOCKET_ERROR)
    {
        TRACE("綁定套接字失敗!");
        int err;
        if ((err=WSAGetLastError())!=WSA_IO_PENDING)
        {
            TRACE1("err is %d",err);
            //TRACE("投遞失敗");
        } 
        closesocket(m_sListen);
        return FALSE;
    }
    
    if (listen(m_sListen,20)==SOCKET_ERROR)
    {
        closesocket(m_sListen);
        return FALSE;
    }

    AfxBeginThread(ListenFun,this,THREAD_PRIORITY_NORMAL,0,0,NULL);


    return TRUE;
}


int CListen::Run()
{
    PPER_HANDLE_DATA pHandleData;
    PPER_IO_DATA pPerIO;
    while (TRUE)
    {
        SOCKADDR_IN addrRemote;
        int iRemoteLen=sizeof(addrRemote);
        SOCKET sNew=accept(m_sListen,(SOCKADDR*)&addrRemote,&iRemoteLen);
        //TRACE1("sNew的值爲:%d",sNew);
        // 更新界面數據
        PostMessage(m_hWnd,USER_UPDATEWINDOW,(long)sNew,(long)&addrRemote);

        //pHandleData=new PER_HANDLE_DATA;
        pHandleData=(PPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));
        pHandleData->SockClient=sNew;
        memcpy(&pHandleData->addr,&addrRemote,iRemoteLen);

        // 將此套接字關聯到端口上
        CreateIoCompletionPort((HANDLE)pHandleData->SockClient,m_hCompletion,(DWORD)pHandleData,0);
 
        // 投遞一個請求
//        pPerIO=new PER_IO_DATA;
        pPerIO=(PPER_IO_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_DATA));

//        memset(&pPerIO->ol,0,sizeof(pPerIO->ol));   /////////////////

        pPerIO->nOperationType=OP_READ;
        WSABUF WsaBuf;
        WsaBuf.buf=pPerIO->buf;
        WsaBuf.len=BUFFER_SIZE;
        DWORD dwRecv;
        DWORD dwFlag=0;        
    
        int ret;
        if((ret=::WSARecv(pHandleData->SockClient,&WsaBuf,1,&dwRecv,&dwFlag,&pPerIO->ol,NULL))!=NO_ERROR)
        {
            TRACE1("ret is %d",ret);
            int err;
            if ((err=WSAGetLastError())!=WSA_IO_PENDING)
            {
                TRACE1("err is %d",err);
                TRACE("投遞失敗");
            }
        }
    }
    return 1;
}
View Code

 

(3)用Server類封裝工做線程:GetQueuedCompletionStatus() 監控完成端口,提交請求

 1 int CServer::Run()
 2 {
 3     DWORD dwTrans;
 4     while (TRUE)
 5     {
 6         TRACE("進入工做線程處理函數");
 7         BOOL bOK=GetQueuedCompletionStatus(m_hCompletion,&dwTrans,(LPDWORD)&m_spPerHandle,
 8             (LPOVERLAPPED*)&m_spPerIO,WSA_INFINITE);
 9         if (!bOK && dwTrans!=0)        // 在此套接字上發生錯誤
10         {
11             TRACE("取包出錯");
12             int err=WSAGetLastError();
13             if ((err=WSAGetLastError())!=WSA_IO_PENDING)
14             {
15                 TRACE1("err is %d",err);
16             }  
17             closesocket(m_spPerHandle->SockClient);
18             GlobalFree(m_spPerHandle);
19             GlobalFree(m_spPerIO);
20             //delete m_spPerHandle;
21             //delete m_spPerIO;
22             continue;
23         }
24         if (bOK==ERROR_SUCCESS && dwTrans==0 && (m_spPerIO->nOperationType==OP_READ || m_spPerIO->nOperationType==OP_WRITE))
25         {
26             TRACE("對方關閉鏈接");
27             SendMessage(m_hWnd,USER_UPDATEWINDOW,(long)m_spPerHandle->SockClient,(long)1);
28             closesocket(m_spPerHandle->SockClient);
29             //delete m_spPerHandle;
30             //delete m_spPerIO;
31             GlobalFree(m_spPerHandle);
32             GlobalFree(m_spPerIO);
33             continue;
34         }
35         switch (m_spPerIO->nOperationType)
36         {
37         case OP_READ:
38             {
39                 TRACE("收到讀");
40                 TRACE1("dwTrans的值爲:%d",dwTrans);
41                 m_spPerIO->buf[dwTrans]='\0';
42                 CString str(m_spPerIO->buf+6,dwTrans-6);
43                 /*
44                 TRACE1("str的值爲:%s",str);
45                 TRACE1("dwTrans的值爲:%d",dwTrans);
46                 TRACE1("dwTrans的值爲:%d",m_spPerIO->buf[dwTrans]+6);
47                 */
48 
49                 char* p=new char[dwTrans-6+1];
50                 strcpy(p,m_spPerIO->buf+6);
51                 
52                 SendMessage(m_hWnd,USER_SHOWMSG,(long)m_spPerHandle->SockClient,(long)p);
53 
54                 WSABUF wsaBuf;
55                 wsaBuf.buf=m_spPerIO->buf;
56                 wsaBuf.len=BUFFER_SIZE;
57                 m_spPerIO->nOperationType=OP_READ; 
58                 DWORD nFlag=0;
59                 //WSARecv(m_spPerHandle->SockClient,&wsaBuf,1,&dwTrans,&nFlag,&m_spPerIO->ol,NULL); 
60                 if(::WSARecv(m_spPerHandle->SockClient,&wsaBuf,1,&dwTrans,&nFlag,&m_spPerIO->ol,NULL)!=NO_ERROR)
61                 {
62                     int err;
63                     if ((err=WSAGetLastError())!=WSA_IO_PENDING)
64                     {
65                         TRACE1("33333333333333err is %d",err);
66                         TRACE("投遞失敗");
67                     }
68                 } 
69             }
70             break;
71         case OP_WRITE:
72         case OP_ACCEPT:
73             TRACE("收到接收鏈接");
74             break;
75         }
76     }
77     return 1;
78 }
View Code

 

(4)開啓服務器:建立端口句柄,建立Listen和Server類對象

 1 if (strTemp.Compare("啓動服務器")==0)
 2     {
 3         SetDlgItemText(IDC_CONNECT,"中止服務器");
 4         
 5         // 建立一個完成端口
 6         m_hCompletion=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
 7         m_pServer=new CServer(m_hCompletion,m_hWnd);
 8         m_pListen=new CListen(m_hCompletion,m_hWnd);
 9 
10         if (!m_pListen->Initial())
11         {
12             ::MessageBox(m_hWnd,"\n初始化失敗!\n","提示",MB_OK);
13             delete m_pListen;
14             delete m_pServer;
15         }
16         //MessageBox("服務器開啓成功!");
17     }
View Code

 

基於MFC實現:

服務器端IOCP模型,客戶端異步選擇模型

 

 

 

 關於IOCP原理詳細內容——http://blog.csdn.net/beyond_cn/article/details/9336043
相關文章
相關標籤/搜索