基於.net的Socket異步編程總結

     最近在爲公司的分佈式服務框架作支持異步調用的開發,這種新特性的上線須要進行各類嚴格的測試。在併發性能測試時,性能一直很是差,並且很是的不穩定。通過不斷的分析調優,發現Socket通訊和多線程異步回調存在較爲嚴重的性能問題。通過多方優化,性能終於達標。下面是原版本、支持異步最第一版本和優化後版本的性能比較。差別仍是很是巨大的。另外說明一下,總耗時是指10000次請求累計執行時間。html

Image

     從上圖能夠看到,支持異步的版本,在單線程模式下,性能的表現與老版本差別並不明顯,可是10線程下差別就很是巨大,而100線程的測試結果反而有所好轉。經過分析,兩個版本的性能差別如此巨大,主要是由於:web

  1. 同步模式會阻塞客戶端請求,說白了,在線程內就是串行請求的。可是在異步模式中,線程內的請求再也不阻塞,網絡流量、後臺計算壓力瞬間暴漲,峯值是同步模式的100倍。網絡傳輸變成瓶頸點。
  2. 在壓力暴漲的狀況下,CPU資源佔用也會突變, 而且ThreadPool、Task、異步調用的執行都將變慢。

     在網絡通訊方面,把原先半異步的模式調整爲了SocketAsyncEventArgs 模式。下面是Socket通訊的幾種模型的介紹和示例,總結一下,與你們分享。下次再與你們分享,併發下異步調用的性能優化方案。編程

APM方式: Asynchronous Programming Model緩存

    異步編程模型是一種模式,該模式容許用更少的線程去作更多的操做,.NET Framework不少類也實現了該模式,同時咱們也能夠自定義類來實現該模式。NET Framework中的APM也稱爲Begin/End模式。此種模式下,調用BeginXXX方法來啓動異步操做,而後返回一個IAsyncResult 對象。當操做執行完成後,系統會觸發IAsyncResult 對象的執行。 具體可參考: https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm性能優化

     .net中的Socket異步模式也支持APM,與同步模式或Blocking模式相比,能夠更好的利用網絡帶寬和系統資源編寫出具備更高性能的程序。參考具體代碼以下:服務器

服務端監聽:
    
Socket serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
//本機預使用的IP和端口
IPEndPoint serverIP = new IPEndPoint(IPAddress.Any, 9050);
//綁定服務端設置的IP
serverSocket.Bind(serverIP);
//設置監聽個數
serverSocket.Listen(1);
//異步接收鏈接請求
serverSocket.BeginAccept(ar =>
{
    base.communicateSocket = serverSocket.EndAccept(ar);
   AccessAciton();
 }, null);
客戶端鏈接:
var communicateSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
   communicateSocket.Bind(new IPEndPoint(IPAddress.Any, 9051));
             
        //服務器的IP和端口
        IPEndPoint serverIP;
        try
        {
            serverIP = new IPEndPoint(IPAddress.Parse(IP), 9050);
        }
        catch
        {
            throw new Exception(String.Format("{0}不是一個有效的IP地址!", IP));
        }
             
        //客戶端只用來向指定的服務器發送信息,不須要綁定本機的IP和端口,不須要監聽
        try
        {
           communicateSocket.BeginConnect(serverIP, ar =>
            {
                AccessAciton();
            }, null);
        }
        catch
        {
            throw new Exception(string.Format("嘗試鏈接{0}不成功!", IP));
        }
客戶端請求:
    
if (communicateSocket.Connected == false)
        {
            throw new Exception("尚未創建鏈接, 不能發送消息");
        }
        Byte[] msg = Encoding.UTF8.GetBytes(message);
        communicateSocket.BeginSend(msg,0, msg.Length, SocketFlags.None,
            ar => {
                 
            }, null);

 

服務端響應:
Byte[] msg = new byte[1024];
        //異步的接受消息
        communicateSocket.BeginReceive(msg, 0, msg.Length, SocketFlags.None,
            ar => {
                //對方斷開鏈接時, 這裏拋出Socket Exception              
                    communicateSocket.EndReceive(ar);
                ReceiveAction(Encoding.UTF8.GetString(msg).Trim('\0',' '));
                Receive(ReceiveAction);
            }, null);

 

      注意:異步模式雖好,可是若是進行大量異步套接字操做,是要付出很高代價的。針對每次操做,都必須建立一個IAsyncResult對象,並且該對象不能被重複使用。因爲大量使用對象分配和垃圾收集,這會影響系統性能。如須要更好的理解APM模式,最瞭解EAP模式:Event-based Asynchronous Pattern:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-eap網絡

 

TAP 方式: Task-based Asynchronous Pattern 多線程

      基於任務的異步模式,該模式主要使用System.Threading.Tasks.Task和Task<T>類來完成異步編程,相對於APM 模式來說,TAP使異步編程模式更加簡單(由於這裏咱們只須要關注Task這個類的使用),同時TAP也是微軟推薦使用的異步編程模式。APM與TAP的本質區別,請參考個人一篇歷史博客:http://www.cnblogs.com/vveiliang/p/7943003.html併發

     TAP模式與APM模式是兩種異步模式的實現,從性能上看沒有本質的差異。TAP的資料可參考:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap 。參考具體代碼以下:app

服務端:

publicclassStateContext
{
   // Client socket.   
   publicSocketWorkSocket =null;
   // Size of receive buffer.   
   publicconstintBufferSize = 1024;
   // Receive buffer.   
   publicbyte[] buffer =newbyte[BufferSize];
   // Received data string.   
   publicStringBuildersb =newStringBuilder(100);
}
publicclassAsynchronousSocketListener
{
   // Thread signal.   
   publicstaticManualResetEventreSetEvent =newManualResetEvent(false);
   publicAsynchronousSocketListener()
    {
    }
   publicstaticvoidStartListening()
    {
       // Data buffer for incoming data.   
       byte[] bytes =newByte[1024];
       // Establish the local endpoint for the socket.   
       IPAddressipAddress =IPAddress.Parse("127.0.0.1");
       IPEndPointlocalEndPoint =newIPEndPoint(ipAddress, 11000);
       // Create a TCP/IP socket.   
       Socketlistener =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
       // Bind the socket to the local   
       try
        {
            listener.Bind(localEndPoint);
            listener.Listen(100);
           while(true)
            {
               // Set the event to nonsignaled state.   
                reSetEvent.Reset();
               // Start an asynchronous socket to listen for connections.   
               Console.WriteLine("Waiting for a connection...");
                listener.BeginAccept(newAsyncCallback(AcceptCallback), listener);
               // Wait until a connection is made before continuing.   
                reSetEvent.WaitOne();
            }
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
       Console.WriteLine("\nPress ENTER to continue...");
       Console.Read();
    }
   publicstaticvoidAcceptCallback(IAsyncResultar)
    {
       // Signal the main thread to continue.   
        reSetEvent.Set();
       // Get the socket that handles the client request.   
       Socketlistener = (Socket)ar.AsyncState;
       Sockethandler = listener.EndAccept(ar);
       // Create the state object.   
       StateContextstate =newStateContext();
        state.WorkSocket = handler;
        handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);
    }
   publicstaticvoidReadCallback(IAsyncResultar)
    {
       Stringcontent =String.Empty;
       StateContextstate = (StateContext)ar.AsyncState;
       Sockethandler = state.WorkSocket;
       // Read data from the client socket.   
       intbytesRead = handler.EndReceive(ar);
       if(bytesRead > 0)
        {
           // There might be more data, so store the data received so far.   
            state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
           // Check for end-of-file tag. If it is not there, read   
           // more data.   
            content = state.sb.ToString();
           if(content.IndexOf("<EOF>") > -1)
            {
               Console.WriteLine("讀取 {0} bytes. \n 數據: {1}", content.Length, content);
                Send(handler, content);
            }
           else
            {
                handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);
            }
        }
    }
   privatestaticvoidSend(Sockethandler,Stringdata)
    {
       byte[] byteData =Encoding.ASCII.GetBytes(data);
        handler.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), handler);
    }
   privatestaticvoidSendCallback(IAsyncResultar)
    {
       try
        {
           Sockethandler = (Socket)ar.AsyncState;
           intbytesSent = handler.EndSend(ar);
           Console.WriteLine("發送 {0} bytes.", bytesSent);
            handler.Shutdown(SocketShutdown.Both);
            handler.Close();
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
    }
   publicstaticintMain(String[] args)
    {
        StartListening();
       return0;
    }

客戶端:

publicclassAsynchronousClient
{
   // The port number for the remote device.   
   privateconstintport = 11000;
   // ManualResetEvent instances signal completion.   
   privatestaticManualResetEventconnectResetEvent =newManualResetEvent(false);
   privatestaticManualResetEventsendResetEvent =newManualResetEvent(false);
   privatestaticManualResetEventreceiveResetEvent =newManualResetEvent(false);
   privatestaticStringresponse =String.Empty;
   privatestaticvoidStartClient()
    {
       try
        {
         
           IPAddressipAddress =IPAddress.Parse("127.0.0.1");
           IPEndPointremoteEP =newIPEndPoint(ipAddress, port);
           // Create a TCP/IP socket.   
           Socketclient =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
           // Connect to the remote endpoint.   
            client.BeginConnect(remoteEP,newAsyncCallback(ConnectCallback), client);
            connectResetEvent.WaitOne();
            Send(client,"This is a test<EOF>");
            sendResetEvent.WaitOne();
            Receive(client);
            receiveResetEvent.WaitOne();
           Console.WriteLine("Response received : {0}", response);
           // Release the socket.   
            client.Shutdown(SocketShutdown.Both);
            client.Close();
           Console.ReadLine();
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
    }
   privatestaticvoidConnectCallback(IAsyncResultar)
    {
       try
        {
           Socketclient = (Socket)ar.AsyncState;
            client.EndConnect(ar);
           Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());
            connectResetEvent.Set();
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
    }
   privatestaticvoidReceive(Socketclient)
    {
       try
        {
           StateContextstate =newStateContext();
            state.WorkSocket = client;
            client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
    }
   privatestaticvoidReceiveCallback(IAsyncResultar)
    {
       try
        {
           StateContextstate = (StateContext)ar.AsyncState;
           Socketclient = state.WorkSocket;
           intbytesRead = client.EndReceive(ar);
           if(bytesRead > 0)
            {
                state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
                client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);
            }
           else
            {
               if(state.sb.Length > 1)
                {
                    response = state.sb.ToString();
                }
                receiveResetEvent.Set();
            }
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
    }
   privatestaticvoidSend(Socketclient,Stringdata)
    {
       byte[] byteData =Encoding.ASCII.GetBytes(data);
        client.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), client);
    }
   privatestaticvoidSendCallback(IAsyncResultar)
    {
       try
        {
           Socketclient = (Socket)ar.AsyncState;
           intbytesSent = client.EndSend(ar);
           Console.WriteLine("Sent {0} bytes to server.", bytesSent);
            sendResetEvent.Set();
        }
       catch(Exceptione)
        {
           Console.WriteLine(e.ToString());
        }
    }
   publicstaticintMain(String[] args)
    {
        StartClient();
       return0;
    }
}

SAEA方式: SocketAsyncEventArgs

      APM模式、TAP模式雖然解決了Socket的併發問題,可是在大併發下仍是有較大性能問題的。這主要是由於上述兩種模式都會生產 IAsyncResult 等對象 ,而大量垃圾對象的回收會很是影響系統的性能。爲此,微軟推出了 SocketAsyncEventArgs 。SocketAsyncEventArgs 是 .NET Framework 3.5 開始支持的一種支持高性能 Socket 通訊的實現。SocketAsyncEventArgs 相比於 APM 方式的主要優勢能夠描述以下,無需每次調用都生成 IAsyncResult 等對象,向原生 Socket 更靠近一些。這是官方的解釋:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

      SocketAsyncEventArgs主要爲高性能網絡服務器應用程序而設計,避免了在異步套接字 I/O 量很是大時,大量垃圾對象建立與回收。使用此類執行異步套接字操做的模式包含如下步驟,具體說明可參考:https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx

  1. 分配一個新的 SocketAsyncEventArgs 上下文對象,或者從應用程序池中獲取一個空閒的此類對象。
  2. 將該上下文對象的屬性設置爲要執行的操做(例如,完成回調方法、數據緩衝區、緩衝區偏移量以及要傳輸的最大數據量)。
  3. 調用適當的套接字方法 (xxxAsync) 以啓動異步操做。
  4. 若是異步套接字方法 (xxxAsync) 返回 true,則在回調中查詢上下文屬性來獲取完成狀態。
  5. 若是異步套接字方法 (xxxAsync) 返回 false,則說明操做是同步完成的。 能夠查詢上下文屬性來獲取操做結果。
  6. 將該上下文重用於另外一個操做,將它放回到應用程序池中,或者將它丟棄。

    下面是封裝的一個組件代碼:

classBufferManager
    {
       intm_numBytes;                // the total number of bytes controlled by the buffer pool
       byte[] m_buffer;               // the underlying byte array maintained by the Buffer Manager
       Stack<int> m_freeIndexPool;    //
       intm_currentIndex;
       intm_bufferSize;
       publicBufferManager(inttotalBytes,intbufferSize)
        {
            m_numBytes = totalBytes;
            m_currentIndex = 0;
            m_bufferSize = bufferSize;
            m_freeIndexPool =newStack<int>();
        }
       // Allocates buffer space used by the buffer pool
       publicvoidInitBuffer()
        {
           // create one big large buffer and divide that
           // out to each SocketAsyncEventArg object
            m_buffer =newbyte[m_numBytes];
        }
       // Assigns a buffer from the buffer pool to the
       // specified SocketAsyncEventArgs object
       //
       // <returns>true if the buffer was successfully set, else false</returns>
       publicboolSetBuffer(SocketAsyncEventArgsargs)
        {
           if(m_freeIndexPool.Count > 0)
            {
                args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
            }
           else
            {
               if((m_numBytes - m_bufferSize) < m_currentIndex)
                {
                   returnfalse;
                }
                args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
                m_currentIndex += m_bufferSize;
            }
           returntrue;
        }
       // Removes the buffer from a SocketAsyncEventArg object.
       // This frees the buffer back to the buffer pool
       publicvoidFreeBuffer(SocketAsyncEventArgsargs)
        {
            m_freeIndexPool.Push(args.Offset);
            args.SetBuffer(null, 0, 0);
        }
    }
   ///<summary>
   ///This class is used to communicate with a remote application over TCP/IP protocol.
   ///</summary>
   classTcpCommunicationChannel
    {
      
       #regionPrivate fields
       ///<summary>
       ///Size of the buffer that is used to receive bytes from TCP socket.
       ///</summary>
       privateconstintReceiveBufferSize = 8 * 1024;//4KB
       ///<summary>
       ///This buffer is used to receive bytes
       ///</summary>
       privatereadonlybyte[] _buffer;
       ///<summary>
       ///Socket object to send/reveice messages.
       ///</summary>
       privatereadonlySocket_clientSocket;
       ///<summary>
       ///A flag to control thread's running
       ///</summary>
       privatevolatilebool_running;
       ///<summary>
       ///This object is just used for thread synchronizing (locking).
       ///</summary>
       privatereadonlyobject_syncLock;
       privateBufferManagerreceiveBufferManager;
       privateSocketAsyncEventArgsreceiveBuff =null;
       #endregion
       #regionConstructor
       ///<summary>
       ///Creates a new TcpCommunicationChannel object.
       ///</summary>
       ///<param name="clientSocket">A connected Socket object that is
       ///used to communicate over network</param>
       publicTcpCommunicationChannel(SocketclientSocket)
        {
            _clientSocket = clientSocket;
            _clientSocket.Blocking =false;
            _buffer =newbyte[ReceiveBufferSize];
            _syncLock =newobject();
            Init();
        }
       privatevoidInit()
        {
           //初始化接收Socket緩存數據
            receiveBufferManager =newBufferManager(ReceiveBufferSize*2, ReceiveBufferSize);
            receiveBufferManager.InitBuffer();
            receiveBuff =newSocketAsyncEventArgs();
            receiveBuff.Completed += ReceiveIO_Completed;
            receiveBufferManager.SetBuffer(receiveBuff);
           //初始化發送Socket緩存數據
        }
       #endregion
       #regionPublic methods
       ///<summary>
       ///Disconnects from remote application and closes channel.
       ///</summary>
       publicvoidDisconnect()
        {
            _running =false;
            receiveBuff.Completed -= ReceiveIO_Completed;
            receiveBuff.Dispose();
           if(_clientSocket.Connected)
            {
                _clientSocket.Close();
            }
            _clientSocket.Dispose();
        }
       #endregion
     
       publicvoidStartReceive()
        {
            _running =true;
           boolresult = _clientSocket.ReceiveAsync(receiveBuff);
        }
       privatevoidReceiveIO_Completed(objectsender,SocketAsyncEventArgse)
        {
           if(e.BytesTransferred > 0 && e.SocketError ==SocketError.Success && _clientSocket.Connected ==true&& e.LastOperation ==SocketAsyncOperation.Receive)
            {
               if(!_running)
                {
                   return;
                }
               //Get received bytes count
               DateTimereceiveTime =DateTime.Now;
               //Copy received bytes to a new byte array
               varreceivedBytes =newbyte[e.BytesTransferred];
               Array.Copy(e.Buffer, 0, receivedBytes, 0, e.BytesTransferred);
               //處理消息....
               if(_running)
                {
                    StartReceive();
                }
            }
        }
       ///<summary>
       ///Sends a message to the remote application.
       ///</summary>
       ///<param name="message">Message to be sent</param>
       publicvoidSendMessage(byte[] messageBytes)
        {
           //Send message
           if(_clientSocket.Connected)
            {
               SocketAsyncEventArgsdata =newSocketAsyncEventArgs();
                data.SocketFlags =SocketFlags.None;
                data.Completed += (s, e) =>
                {
                    e.Dispose();
                };
                data.SetBuffer(messageBytes, 0, messageBytes.Length);
               //Console.WriteLine("發送:" + messageBytes.LongLength);
                _clientSocket.SendAsync(data);
            }
        }
    }
相關文章
相關標籤/搜索