詳談高性能UDP服務器的開發


上一篇文章我詳細介紹瞭如何開發一款高性能的TCP服務器的網絡傳輸層.本章我將談談如何開發一個高性能的UDP服務器的網絡層.UDP服務器的網絡層開發相對與TCP服務器來講要容易和簡單的多,UDP服務器的大體流程爲建立一個socket而後將其綁定到完成端口上並投遞必定數量的recv操做.當有數據到來時從完成隊列中取出數據發送到接收隊列中便可。
  測試結果以下:
    WindowsXP Professional,Intel Core Duo E4600 雙核2.4G , 2G內存。同時30K個用戶和該UDP服務器進行交互其CPU使用率爲10%左右,內存佔用7M左右。
  下面詳細介紹該服務器的架構及流程:  
1. 首先介紹服務器的接收和發送緩存 UDP_CONTEXT。
 1      class  UDP_CONTEXT :  protected  NET_CONTEXT
 2      {
 3        friend class UdpSer;
 4    protected:
 5        IP_ADDR m_RemoteAddr;            //對端地址
 6
 7        enum
 8        {
 9            HEAP_SIZE = 1024 * 1024 * 5,
10            MAX_IDL_DATA = 10000,
11        }
;
12
13    public:
14        UDP_CONTEXT() {}
15        virtual ~UDP_CONTEXT() {}
16
17        void* operator new(size_t nSize);
18        void operator delete(void* p);
19
20    private:
21        static vector<UDP_CONTEXT* > s_IDLQue;
22        static CRITICAL_SECTION s_IDLQueLock;
23        static HANDLE s_hHeap;    
24    }
;
UDP_CONTEXT的實現流程和TCP_CONTEXT的實現流程大體相同,此處就不進行詳細介紹。

2. UDP_RCV_DATA,當服務器收到客戶端發來的數據時會將數據以UDP_RCV_DATA的形式放入到數據接收隊列中,其聲明以下:
 1      class  DLLENTRY UDP_RCV_DATA
 2      {
 3        friend class UdpSer;
 4    public:
 5        CHAR* m_pData;                //數據緩衝區
 6        INT m_nLen;                    //數據的長度
 7        IP_ADDR m_PeerAddr;            //發送報文的地址
 8
 9        UDP_RCV_DATA(const CHAR* szBuf, int nLen, const IP_ADDR& PeerAddr);
10        ~UDP_RCV_DATA();
11
12        void* operator new(size_t nSize);
13        void operator delete(void* p);
14
15        enum
16        {
17            RCV_HEAP_SIZE = 1024 * 1024 *50,        //s_Heap堆的大小
18            DATA_HEAP_SIZE = 100 * 1024* 1024,    //s_DataHeap堆的大小
19            MAX_IDL_DATA = 250000,
20        }
;
21
22    private:
23        static vector<UDP_RCV_DATA* > s_IDLQue;
24        static CRITICAL_SECTION s_IDLQueLock;
25        static HANDLE s_DataHeap;        //數據緩衝區的堆
26        static HANDLE s_Heap;            //RCV_DATA的堆
27    }
;
UDP_RCV_DATA的實現和TCP_RCV_DATA大體相同, 此處不在詳細介紹.

下面將主要介紹UdpSer類, 該類主要用來管理UDP服務.其定義以下:
 1      class  DLLENTRY UdpSer
 2      {
 3    public:
 4        UdpSer();
 5        ~UdpSer();
 6
 7        /************************************************************************
 8        * Desc : 初始化靜態資源,在申請UDP實例對象以前應先調用該函數, 不然程序沒法正常運行
 9        ************************************************************************/

10        static void InitReource();
11
12        /************************************************************************
13        * Desc : 在釋放UDP實例之後, 掉用該函數釋放相關靜態資源
14        ************************************************************************/

15        static void ReleaseReource();
16
17        //用指定本地地址和端口進行初始化
18        BOOL StartServer(const CHAR* szIp = "0.0.0.0", INT nPort = 0);
19
20        //從數據隊列的頭部獲取一個接收數據, pCount不爲null時返回隊列的長度
21        UDP_RCV_DATA* GetRcvData(DWORD* pCount);
22
23        //向對端發送數據
24        BOOL SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen);
25
26        /****************************************************
27        * Name : CloseServer()
28        * Desc : 關閉服務器
29        ****************************************************/

30        void CloseServer();
31
32    protected:
33        SOCKET m_hSock;
34        vector<UDP_RCV_DATA* > m_RcvDataQue;                //接收數據隊列
35        CRITICAL_SECTION m_RcvDataLock;                        //訪問m_RcvDataQue的互斥鎖
36        long volatile m_bThreadRun;                                //是否容許後臺線程繼續運行
37        BOOL m_bSerRun;                                            //服務器是否正在運行
38
39        HANDLE *m_pThreads;                //線程數組
40        HANDLE m_hCompletion;                    //完成端口句柄
41
42        void ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
43
44        /****************************************************
45        * Name : WorkThread()
46        * Desc : I/O 後臺管理線程
47        ****************************************************/

48        static UINT WINAPI WorkThread(LPVOID lpParam);
49    }
;

1. InitReource() 主要對相關的靜態資源進行初始化.其實大體和TcpServer::InitReource()大體相同.在UdpSer實例使用以前必須調用該函數進行靜態資源的初始化, 不然服務器沒法正常使用.

2.ReleaseReource() 主要對相關靜態資源進行釋放.只有在應用程序結束時才能調用該函數進行靜態資源的釋放.

3. StartServer() 
該函數的主要功能啓動一個UDP服務.其大體流程爲先建立服務器UDP socket, 將其綁定到完成端口上而後投遞必定數量的recv操做以接收客戶端的數據.其實現以下:
 1     BOOL UdpSer::StartServer( const  CHAR *  szIp  /* =  */ , INT nPort  /* = 0 */ )
 2      {
 3        BOOL bRet = TRUE;
 4        const int RECV_COUNT = 500;
 5        WSABUF RcvBuf = { NULL, 0 };
 6        DWORD dwBytes = 0;
 7        DWORD dwFlag = 0;
 8        INT nAddrLen = sizeof(IP_ADDR);
 9        INT iErrCode = 0;
10
11        try
12        {
13            if (m_bSerRun)
14            {
15                THROW_LINE;
16            }

17
18            m_bSerRun = TRUE;
19            m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
20            if (INVALID_SOCKET == m_hSock)
21            {
22                THROW_LINE;
23            }

24            ULONG ul = 1;
25            ioctlsocket(m_hSock, FIONBIO, &ul);
26
27            //設置爲地址重用,優勢在於服務器關閉後能夠當即啓用
28            int nOpt = 1;
29            setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));
30
31            //關閉系統緩存,使用本身的緩存以防止數據的複製操做
32            INT nZero = 0;
33            setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));
34            setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));
35
36            IP_ADDR addr(szIp, nPort);
37            if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr, sizeof(addr)))
38            {
39                closesocket(m_hSock);
40                THROW_LINE;
41            }

42
43            //將SOCKET綁定到完成端口上
44            CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 00);
45
46            //投遞讀操做
47            for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++)
48            {
49                UDP_CONTEXT* pRcvContext = new UDP_CONTEXT();
50                if (pRcvContext && pRcvContext->m_pBuf)
51                {
52                    dwFlag = 0;
53                    dwBytes = 0;
54                    nAddrLen = sizeof(IP_ADDR);
55                    RcvBuf.buf = pRcvContext->m_pBuf;
56                    RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
57
58                    pRcvContext->m_hSock = m_hSock;
59                    pRcvContext->m_nOperation = OP_READ;            
60                    iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1&dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
61                        , &nAddrLen, &(pRcvContext->m_ol), NULL);
62                    if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
63                    {
64                        delete pRcvContext;
65                        pRcvContext = NULL;
66                    }

67                }

68                else
69                {
70                    delete pRcvContext;
71                }

72            }

73        }

74        catch (const long &lErrLine)
75        {            
76            bRet = FALSE;
77            _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine);            
78        }

79
80        return bRet;
81    }
4. GetRcvData(), 從接收隊列中取出一個數據包.
 1     UDP_RCV_DATA  * UdpSer::GetRcvData(DWORD *  pCount)
 2      {
 3        UDP_RCV_DATA* pRcvData = NULL;
 4
 5        EnterCriticalSection(&m_RcvDataLock);
 6        vector<UDP_RCV_DATA* >::iterator iterRcv = m_RcvDataQue.begin();
 7        if (iterRcv != m_RcvDataQue.end())
 8        {
 9            pRcvData = *iterRcv;
10            m_RcvDataQue.erase(iterRcv);
11        }

12
13        if (pCount)
14        {
15            *pCount = (DWORD)(m_RcvDataQue.size());
16        }

17        LeaveCriticalSection(&m_RcvDataLock);
18
19        return pRcvData;
20    }

5. SendData() 發送指定長度的數據包.
 1     BOOL UdpSer::SendData( const  IP_ADDR &  PeerAddr,  const  CHAR *  szData, INT nLen)
 2      {
 3        BOOL bRet = TRUE;
 4        try
 5        {
 6            if (nLen >= 1500)
 7            {
 8                THROW_LINE;
 9            }

10
11            UDP_CONTEXT* pSendContext = new UDP_CONTEXT();
12            if (pSendContext && pSendContext->m_pBuf)
13            {
14                pSendContext->m_nOperation = OP_WRITE;
15                pSendContext->m_RemoteAddr = PeerAddr;        
16
17                memcpy(pSendContext->m_pBuf, szData, nLen);
18
19                WSABUF SendBuf = { NULL, 0 };
20                DWORD dwBytes = 0;
21                SendBuf.buf = pSendContext->m_pBuf;
22                SendBuf.len = nLen;
23
24                INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1&dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr), &(pSendContext->m_ol), NULL);
25                if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
26                {
27                    delete pSendContext;
28                    THROW_LINE;
29                }

30            }

31            else
32            {
33                delete pSendContext;
34                THROW_LINE;
35            }

36        }

37        catch (const long &lErrLine)
38        {
39            bRet = FALSE;
40            _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine);            
41        }

42
43        return bRet;
44    }

6. CloseServer() 關閉服務
1      void  UdpSer::CloseServer()
2      {
3        m_bSerRun = FALSE;
4        closesocket(m_hSock);
5    }

7. WorkThread() 在完成端口上工做的後臺線程
 1     UINT WINAPI UdpSer::WorkThread(LPVOID lpParam)
 2      {
 3        UdpSer *pThis = (UdpSer *)lpParam;
 4        DWORD dwTrans = 0, dwKey = 0;
 5        LPOVERLAPPED pOl = NULL;
 6        UDP_CONTEXT *pContext = NULL;
 7
 8        while (TRUE)
 9        {
10            BOOL bOk = GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);
11
12            pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol);
13            if (pContext)
14            {
15                switch (pContext->m_nOperation)
16                {
17                case OP_READ:
18                    pThis->ReadCompletion(bOk, dwTrans, pOl);
19                    break;
20                case OP_WRITE:
21                    delete pContext;
22                    pContext = NULL;
23                    break;
24                }

25            }

26
27            if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))
28            {
29                break;
30            }

31        }

32
33        return 0;
34    }

8.ReadCompletion(), 接收操做完成後的回調函數
 1      void  UdpSer::ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
 2      {
 3        UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, UDP_CONTEXT, m_ol);
 4        WSABUF RcvBuf = { NULL, 0 };
 5        DWORD dwBytes = 0;
 6        DWORD dwFlag = 0;
 7        INT nAddrLen = sizeof(IP_ADDR);
 8        INT iErrCode = 0;
 9
10        if (TRUE == bSuccess && dwNumberOfBytesTransfered <= UDP_CONTEXT::S_PAGE_SIZE)
11        {
12#ifdef _XML_NET_
13            EnterCriticalSection(&m_RcvDataLock);
14
15            UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr);
16            if (pRcvData && pRcvData->m_pData)
17            {
18                m_RcvDataQue.push_back(pRcvData);
19            }
    
20            else
21            {
22                delete pRcvData;
23            }

24
25            LeaveCriticalSection(&m_RcvDataLock);
26#else
27            if (dwNumberOfBytesTransfered >= sizeof(PACKET_HEAD))
28            {
29                EnterCriticalSection(&m_RcvDataLock);
30
31                UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr);
32                if (pRcvData && pRcvData->m_pData)
33                {
34                    m_RcvDataQue.push_back(pRcvData);
35                }
    
36                else
37                {
38                    delete pRcvData;
39                }

40
41                LeaveCriticalSection(&m_RcvDataLock);
42            }

43#endif
44
45            //投遞下一個接收操做
46            RcvBuf.buf = pRcvContext->m_pBuf;
47            RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
48
49            iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1&dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
50                , &nAddrLen, &(pRcvContext->m_ol), NULL);
51            if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
52            {
53                ATLTRACE("\r\n%s -- %ld dwNumberOfBytesTransfered = %ld, LAST_ERR = %ld"
54                    , __FILE__, __LINE__, dwNumberOfBytesTransfered, WSAGetLastError());
55                delete pRcvContext;
56                pRcvContext = NULL;
57            }

58        }

59        else
60        {
61            delete pRcvContext;
62        }

63    }
相關文章
相關標籤/搜索