最近在爲公司的分佈式服務框架作支持異步調用的開發,這種新特性的上線須要進行各類嚴格的測試。在併發性能測試時,性能一直很是差,並且很是的不穩定。通過不斷的分析調優,發現Socket通訊和多線程異步回調存在較爲嚴重的性能問題。通過多方優化,性能終於達標。下面是原版本、支持異步最第一版本和優化後版本的性能比較。差別仍是很是巨大的。另外說明一下,總耗時是指10000次請求累計執行時間。html
從上圖能夠看到,支持異步的版本,在單線程模式下,性能的表現與老版本差別並不明顯,可是10線程下差別就很是巨大,而100線程的測試結果反而有所好轉。經過分析,兩個版本的性能差別如此巨大,主要是由於:web
在網絡通訊方面,把原先半異步的模式調整爲了SocketAsyncEventArgs 模式。下面是Socket通訊的幾種模型的介紹和示例,總結一下,與你們分享。下次再與你們分享,併發下異步調用的性能優化方案。編程
APM方式: Asynchronous Programming Model緩存
異步編程模型是一種模式,該模式容許用更少的線程去作更多的操做,.NET Framework不少類也實現了該模式,同時咱們也能夠自定義類來實現該模式。NET Framework中的APM也稱爲Begin/End模式。此種模式下,調用BeginXXX方法來啓動異步操做,而後返回一個IAsyncResult 對象。當操做執行完成後,系統會觸發IAsyncResult 對象的執行。 具體可參考: https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm性能優化
.net中的Socket異步模式也支持APM,與同步模式或Blocking模式相比,能夠更好的利用網絡帶寬和系統資源編寫出具備更高性能的程序。參考具體代碼以下:服務器
服務端監聽:Socket serverSocket =
new
Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
//本機預使用的IP和端口
IPEndPoint serverIP =
new
IPEndPoint(IPAddress.Any, 9050);
//綁定服務端設置的IP
serverSocket.Bind(serverIP);
//設置監聽個數
serverSocket.Listen(1);
//異步接收鏈接請求
serverSocket.BeginAccept(ar =>
{
base
.communicateSocket = serverSocket.EndAccept(ar);
AccessAciton();
},
null
);
客戶端鏈接:var communicateSocket =
new
Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
communicateSocket.Bind(
new
IPEndPoint(IPAddress.Any, 9051));
//服務器的IP和端口
IPEndPoint serverIP;
try
{
serverIP =
new
IPEndPoint(IPAddress.Parse(IP), 9050);
}
catch
{
throw
new
Exception(String.Format(
"{0}不是一個有效的IP地址!"
, IP));
}
//客戶端只用來向指定的服務器發送信息,不須要綁定本機的IP和端口,不須要監聽
try
{
c
ommunicateSocket.BeginConnect(serverIP, ar =>
{
AccessAciton();
},
null
);
}
catch
{
throw
new
Exception(
string
.Format(
"嘗試鏈接{0}不成功!"
, IP));
}
客戶端請求:if
(communicateSocket.Connected ==
false
)
{
throw
new
Exception(
"尚未創建鏈接, 不能發送消息"
);
}
Byte[] msg = Encoding.UTF8.GetBytes(message);
communicateSocket.BeginSend(msg,0, msg.Length, SocketFlags.None,
ar => {
},
null
);
服務端響應:Byte[] msg =
new
byte
[1024];
//異步的接受消息
communicateSocket.BeginReceive(msg, 0, msg.Length, SocketFlags.None,
ar => {
//對方斷開鏈接時, 這裏拋出Socket Exception
communicateSocket.EndReceive(ar);
ReceiveAction(Encoding.UTF8.GetString(msg).Trim(
'\0'
,
' '
));
Receive(ReceiveAction);
},
null
);
注意:異步模式雖好,可是若是進行大量異步套接字操做,是要付出很高代價的。針對每次操做,都必須建立一個IAsyncResult對象,並且該對象不能被重複使用。因爲大量使用對象分配和垃圾收集,這會影響系統性能。如須要更好的理解APM模式,最瞭解EAP模式:Event-based Asynchronous Pattern:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-eap 。網絡
TAP 方式: Task-based Asynchronous Pattern 多線程
基於任務的異步模式,該模式主要使用System.Threading.Tasks.Task和Task<T>類來完成異步編程,相對於APM 模式來說,TAP使異步編程模式更加簡單(由於這裏咱們只須要關注Task這個類的使用),同時TAP也是微軟推薦使用的異步編程模式。APM與TAP的本質區別,請參考個人一篇歷史博客:http://www.cnblogs.com/vveiliang/p/7943003.html併發
TAP模式與APM模式是兩種異步模式的實現,從性能上看沒有本質的差異。TAP的資料可參考:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap 。參考具體代碼以下:app
服務端:
publicclassStateContext{// Client socket.publicSocketWorkSocket =null;// Size of receive buffer.publicconstintBufferSize = 1024;// Receive buffer.publicbyte[] buffer =newbyte[BufferSize];// Received data string.publicStringBuildersb =newStringBuilder(100);}publicclassAsynchronousSocketListener{// Thread signal.publicstaticManualResetEventreSetEvent =newManualResetEvent(false);publicAsynchronousSocketListener(){}publicstaticvoidStartListening(){// Data buffer for incoming data.byte[] bytes =newByte[1024];// Establish the local endpoint for the socket.IPAddressipAddress =IPAddress.Parse("127.0.0.1");IPEndPointlocalEndPoint =newIPEndPoint(ipAddress, 11000);// Create a TCP/IP socket.Socketlistener =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);// Bind the socket to the localtry{listener.Bind(localEndPoint);listener.Listen(100);while(true){// Set the event to nonsignaled state.reSetEvent.Reset();// Start an asynchronous socket to listen for connections.Console.WriteLine("Waiting for a connection...");listener.BeginAccept(newAsyncCallback(AcceptCallback), listener);// Wait until a connection is made before continuing.reSetEvent.WaitOne();}}catch(Exceptione){Console.WriteLine(e.ToString());}Console.WriteLine("\nPress ENTER to continue...");Console.Read();}publicstaticvoidAcceptCallback(IAsyncResultar){// Signal the main thread to continue.reSetEvent.Set();// Get the socket that handles the client request.Socketlistener = (Socket)ar.AsyncState;Sockethandler = listener.EndAccept(ar);// Create the state object.StateContextstate =newStateContext();state.WorkSocket = handler;handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);}publicstaticvoidReadCallback(IAsyncResultar){Stringcontent =String.Empty;StateContextstate = (StateContext)ar.AsyncState;Sockethandler = state.WorkSocket;// Read data from the client socket.intbytesRead = handler.EndReceive(ar);if(bytesRead > 0){// There might be more data, so store the data received so far.state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));// Check for end-of-file tag. If it is not there, read// more data.content = state.sb.ToString();if(content.IndexOf("<EOF>") > -1){Console.WriteLine("讀取 {0} bytes. \n 數據: {1}", content.Length, content);Send(handler, content);}else{handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);}}}privatestaticvoidSend(Sockethandler,Stringdata){byte[] byteData =Encoding.ASCII.GetBytes(data);handler.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), handler);}privatestaticvoidSendCallback(IAsyncResultar){try{Sockethandler = (Socket)ar.AsyncState;intbytesSent = handler.EndSend(ar);Console.WriteLine("發送 {0} bytes.", bytesSent);handler.Shutdown(SocketShutdown.Both);handler.Close();}catch(Exceptione){Console.WriteLine(e.ToString());}}publicstaticintMain(String[] args){StartListening();return0;}
客戶端:
publicclassAsynchronousClient{// The port number for the remote device.privateconstintport = 11000;// ManualResetEvent instances signal completion.privatestaticManualResetEventconnectResetEvent =newManualResetEvent(false);privatestaticManualResetEventsendResetEvent =newManualResetEvent(false);privatestaticManualResetEventreceiveResetEvent =newManualResetEvent(false);privatestaticStringresponse =String.Empty;privatestaticvoidStartClient(){try{IPAddressipAddress =IPAddress.Parse("127.0.0.1");IPEndPointremoteEP =newIPEndPoint(ipAddress, port);// Create a TCP/IP socket.Socketclient =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);// Connect to the remote endpoint.client.BeginConnect(remoteEP,newAsyncCallback(ConnectCallback), client);connectResetEvent.WaitOne();Send(client,"This is a test<EOF>");sendResetEvent.WaitOne();Receive(client);receiveResetEvent.WaitOne();Console.WriteLine("Response received : {0}", response);// Release the socket.client.Shutdown(SocketShutdown.Both);client.Close();Console.ReadLine();}catch(Exceptione){Console.WriteLine(e.ToString());}}privatestaticvoidConnectCallback(IAsyncResultar){try{Socketclient = (Socket)ar.AsyncState;client.EndConnect(ar);Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());connectResetEvent.Set();}catch(Exceptione){Console.WriteLine(e.ToString());}}privatestaticvoidReceive(Socketclient){try{StateContextstate =newStateContext();state.WorkSocket = client;client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);}catch(Exceptione){Console.WriteLine(e.ToString());}}privatestaticvoidReceiveCallback(IAsyncResultar){try{StateContextstate = (StateContext)ar.AsyncState;Socketclient = state.WorkSocket;intbytesRead = client.EndReceive(ar);if(bytesRead > 0){state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);}else{if(state.sb.Length > 1){response = state.sb.ToString();}receiveResetEvent.Set();}}catch(Exceptione){Console.WriteLine(e.ToString());}}privatestaticvoidSend(Socketclient,Stringdata){byte[] byteData =Encoding.ASCII.GetBytes(data);client.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), client);}privatestaticvoidSendCallback(IAsyncResultar){try{Socketclient = (Socket)ar.AsyncState;intbytesSent = client.EndSend(ar);Console.WriteLine("Sent {0} bytes to server.", bytesSent);sendResetEvent.Set();}catch(Exceptione){Console.WriteLine(e.ToString());}}publicstaticintMain(String[] args){StartClient();return0;}}
SAEA方式: SocketAsyncEventArgs
APM模式、TAP模式雖然解決了Socket的併發問題,可是在大併發下仍是有較大性能問題的。這主要是由於上述兩種模式都會生產 IAsyncResult 等對象 ,而大量垃圾對象的回收會很是影響系統的性能。爲此,微軟推出了 SocketAsyncEventArgs 。SocketAsyncEventArgs 是 .NET Framework 3.5 開始支持的一種支持高性能 Socket 通訊的實現。SocketAsyncEventArgs 相比於 APM 方式的主要優勢能夠描述以下,無需每次調用都生成 IAsyncResult 等對象,向原生 Socket 更靠近一些。這是官方的解釋:
The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.
SocketAsyncEventArgs主要爲高性能網絡服務器應用程序而設計,避免了在異步套接字 I/O 量很是大時,大量垃圾對象建立與回收。使用此類執行異步套接字操做的模式包含如下步驟,具體說明可參考:https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx 。
下面是封裝的一個組件代碼:
classBufferManager{intm_numBytes; // the total number of bytes controlled by the buffer poolbyte[] m_buffer; // the underlying byte array maintained by the Buffer ManagerStack<int> m_freeIndexPool; //intm_currentIndex;intm_bufferSize;publicBufferManager(inttotalBytes,intbufferSize){m_numBytes = totalBytes;m_currentIndex = 0;m_bufferSize = bufferSize;m_freeIndexPool =newStack<int>();}// Allocates buffer space used by the buffer poolpublicvoidInitBuffer(){// create one big large buffer and divide that// out to each SocketAsyncEventArg objectm_buffer =newbyte[m_numBytes];}// Assigns a buffer from the buffer pool to the// specified SocketAsyncEventArgs object//// <returns>true if the buffer was successfully set, else false</returns>publicboolSetBuffer(SocketAsyncEventArgsargs){if(m_freeIndexPool.Count > 0){args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);}else{if((m_numBytes - m_bufferSize) < m_currentIndex){returnfalse;}args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);m_currentIndex += m_bufferSize;}returntrue;}// Removes the buffer from a SocketAsyncEventArg object.// This frees the buffer back to the buffer poolpublicvoidFreeBuffer(SocketAsyncEventArgsargs){m_freeIndexPool.Push(args.Offset);args.SetBuffer(null, 0, 0);}}///<summary>///This class is used to communicate with a remote application over TCP/IP protocol.///</summary>classTcpCommunicationChannel{#regionPrivate fields///<summary>///Size of the buffer that is used to receive bytes from TCP socket.///</summary>privateconstintReceiveBufferSize = 8 * 1024;//4KB///<summary>///This buffer is used to receive bytes///</summary>privatereadonlybyte[] _buffer;///<summary>///Socket object to send/reveice messages.///</summary>privatereadonlySocket_clientSocket;///<summary>///A flag to control thread's running///</summary>privatevolatilebool_running;///<summary>///This object is just used for thread synchronizing (locking).///</summary>privatereadonlyobject_syncLock;privateBufferManagerreceiveBufferManager;privateSocketAsyncEventArgsreceiveBuff =null;#endregion#regionConstructor///<summary>///Creates a new TcpCommunicationChannel object.///</summary>///<param name="clientSocket">A connected Socket object that is///used to communicate over network</param>publicTcpCommunicationChannel(SocketclientSocket){_clientSocket = clientSocket;_clientSocket.Blocking =false;_buffer =newbyte[ReceiveBufferSize];_syncLock =newobject();Init();}privatevoidInit(){//初始化接收Socket緩存數據receiveBufferManager =newBufferManager(ReceiveBufferSize*2, ReceiveBufferSize);receiveBufferManager.InitBuffer();receiveBuff =newSocketAsyncEventArgs();receiveBuff.Completed += ReceiveIO_Completed;receiveBufferManager.SetBuffer(receiveBuff);//初始化發送Socket緩存數據}#endregion#regionPublic methods///<summary>///Disconnects from remote application and closes channel.///</summary>publicvoidDisconnect(){_running =false;receiveBuff.Completed -= ReceiveIO_Completed;receiveBuff.Dispose();if(_clientSocket.Connected){_clientSocket.Close();}_clientSocket.Dispose();}#endregionpublicvoidStartReceive(){_running =true;boolresult = _clientSocket.ReceiveAsync(receiveBuff);}privatevoidReceiveIO_Completed(objectsender,SocketAsyncEventArgse){if(e.BytesTransferred > 0 && e.SocketError ==SocketError.Success && _clientSocket.Connected ==true&& e.LastOperation ==SocketAsyncOperation.Receive){if(!_running){return;}//Get received bytes countDateTimereceiveTime =DateTime.Now;//Copy received bytes to a new byte arrayvarreceivedBytes =newbyte[e.BytesTransferred];Array.Copy(e.Buffer, 0, receivedBytes, 0, e.BytesTransferred);//處理消息....if(_running){StartReceive();}}}///<summary>///Sends a message to the remote application.///</summary>///<param name="message">Message to be sent</param>publicvoidSendMessage(byte[] messageBytes){//Send messageif(_clientSocket.Connected){SocketAsyncEventArgsdata =newSocketAsyncEventArgs();data.SocketFlags =SocketFlags.None;data.Completed += (s, e) =>{e.Dispose();};data.SetBuffer(messageBytes, 0, messageBytes.Length);//Console.WriteLine("發送:" + messageBytes.LongLength);_clientSocket.SendAsync(data);}}}