一次簡單的Socket探索之旅,分別對Socket服務端的兩種方式進行了測試和解析。c#
實現一個簡單的Socket服務,基本功能就是接收消息而後加上結束消息時間返回給客戶端。api
/// <summary> /// 簡單服務,收發消息 /// </summary> class FirstSimpleServer { public static void Run(string m_ip, int m_port) { var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var ip = IPAddress.Parse(m_ip); var endpoint = new IPEndPoint(ip, m_port); socket.Bind(endpoint); socket.Listen(0); socket.ReceiveTimeout = -1; Task.Run(() => { while (true) { var acceptSocket = socket.Accept(); if (acceptSocket != null && acceptSocket.Connected) { Task.Run(() => { byte[] receiveBuffer = new byte[256]; int result = 0; do { if (acceptSocket.Connected) { result = acceptSocket.Receive(receiveBuffer, 0, receiveBuffer.Length, SocketFlags.None, out SocketError error); if (error == SocketError.Success && result > 0) { var recestr = Encoding.UTF8.GetString(receiveBuffer, 0, result); var Replaystr = $"Server收到消息:{recestr};Server收到消息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}"; Console.WriteLine(Replaystr); var strbytes = Encoding.UTF8.GetBytes(Replaystr); acceptSocket.Send(strbytes, 0, strbytes.Length, SocketFlags.None); if (recestr.Contains("stop")) { break; } } } else { break; } } while (result > 0); }).ContinueWith((t) => { System.Threading.Thread.Sleep(1000); acceptSocket.Disconnect(false); acceptSocket.Dispose(); }); } } }).Wait(); }
測試:一個客戶端,發送10次數據,每次間隔50ms,緩存
結果:客戶端的顯示以下,客戶端發送消息,再接收到,十次中最長的耗時10ms。服務器
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:0;Client發送時間:2020-04-11 13:21:22:974};Server收到消息的時間:2020-04-11 13:21:22:981;ClientReceiceServer時間:2020-04-11 13:21:22:984} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 13:21:23:032};Server收到消息的時間:2020-04-11 13:21:23:032;ClientReceiceServer時間:2020-04-11 13:21:23:032} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 13:21:23:082};Server收到消息的時間:2020-04-11 13:21:23:082;ClientReceiceServer時間:2020-04-11 13:21:23:083} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 13:21:23:133};Server收到消息的時間:2020-04-11 13:21:23:133;ClientReceiceServer時間:2020-04-11 13:21:23:133} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 13:21:23:184};Server收到消息的時間:2020-04-11 13:21:23:184;ClientReceiceServer時間:2020-04-11 13:21:23:190} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 13:21:23:235};Server收到消息的時間:2020-04-11 13:21:23:235;ClientReceiceServer時間:2020-04-11 13:21:23:235} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 13:21:23:286};Server收到消息的時間:2020-04-11 13:21:23:286;ClientReceiceServer時間:2020-04-11 13:21:23:286} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 13:21:23:336};Server收到消息的時間:2020-04-11 13:21:23:336;ClientReceiceServer時間:2020-04-11 13:21:23:336} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 13:21:23:387};Server收到消息的時間:2020-04-11 13:21:23:387;ClientReceiceServer時間:2020-04-11 13:21:23:388} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 13:21:23:438};Server收到消息的時間:2020-04-11 13:21:23:438;ClientReceiceServer時間:2020-04-11 13:21:23:438}
假如客戶端發送消息速度加快,對服務端會有什麼影響?測試將客戶端發送消息的間隔修改成1ms網絡
System.Threading.Thread.Sleep(1);併發
結果以下,並無發現問題。socket
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:0;Client發送時間:2020-04-11 13:48:57:193};Server收到消息的時間:2020-04-11 13:48:57:196;ClientReceiceServer時間:2020-04-11 13:48:57:197} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 13:48:57:198};Server收到消息的時間:2020-04-11 13:48:57:198;ClientReceiceServer時間:2020-04-11 13:48:57:201} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 13:48:57:200};Server收到消息的時間:2020-04-11 13:48:57:201;ClientReceiceServer時間:2020-04-11 13:48:57:202} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 13:48:57:202};Server收到消息的時間:2020-04-11 13:48:57:202;ClientReceiceServer時間:2020-04-11 13:48:57:203} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 13:48:57:204};Server收到消息的時間:2020-04-11 13:48:57:204;ClientReceiceServer時間:2020-04-11 13:48:57:204} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 13:48:57:206};Server收到消息的時間:2020-04-11 13:48:57:206;ClientReceiceServer時間:2020-04-11 13:48:57:207} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 13:48:57:208};Server收到消息的時間:2020-04-11 13:48:57:208;ClientReceiceServer時間:2020-04-11 13:48:57:208} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 13:48:57:209};Server收到消息的時間:2020-04-11 13:48:57:209;ClientReceiceServer時間:2020-04-11 13:48:57:211} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 13:48:57:211};Server收到消息的時間:2020-04-11 13:48:57:211;ClientReceiceServer時間:2020-04-11 13:48:57:212} ClientReceiceServer:{Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 13:48:57:213};Server收到消息的時間:2020-04-11 13:48:57:213;ClientReceiceServer時間:2020-04-11 13:48:57:213}
再極致一點,將客戶端的發送間隔取消,循環發送。看到下面服務端接收消息的結果,可看到消息包很混亂。仔細分析一下,發現其實服務器其實就收到3次消息,前兩次接受256個字節,最後一次接收138字節。這是因爲設置服務端接收緩存的大小爲256個字節。說明發送比較快或並行發送的時候,服務端會很快將接收的緩存塊填滿,一旦填滿,Receive方法就會返回,否則就處於阻塞狀態。async
Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 13:51:18:723}{Client:1:MessageID:2;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:3;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:4;Client發送時間:2020-04-11 13:51:18:;Server收到消息的時間:2020-04-11 13:51:18:724 Server收到消息:724}{Client:1:MessageID:5;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:6;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:7;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:8;Client發送時間:2020-04-11 13:51;Server收到消息的時間:2020-04-11 13:51:18:729 Server收到消息::18:724}{Client:1:MessageID:9;Client發送時間:2020-04-11 13:51:18:724};Server收到消息的時間:2020-04-11 13:51:18:732
經過一個簡單的方法解決這個問題,每次客戶端發送固定長度的消息,服務端接收固定長度的消息。如今客戶端發送的消息是65個字節,設置服務端接收數據的緩存塊爲65字節。測試
{Client:1:MessageID:1;Client發送時間:2020-04-11 13:51:18:723} byte[] receiveBuffer = new byte[65];
再連續發送10條消息,下面爲服務端測試的結果,結果顯示正常:.net
Server收到消息:{Client:1:MessageID:0;Client發送時間:2020-04-11 14:19:44:774};Server收到消息的時間:2020-04-11 14:19:44:778 Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:781 Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:781 Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:782 Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:782 Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:782 Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:783 Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:783 Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:784 Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:784
若是並行發送消息,同時有兩個消息到服務端,消息內容會混亂嗎?客戶端進行並行消息發送測試。下面爲測試結果,發現並無問題,說明一個消息可能沒有被拆分,或則即便被拆分了在網絡通信底層也會恢復原來的消息結構。
Parallel.For(1, 10, (i) => { var Replaystr = $"{{Client:1:MessageID:{i};Client發送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}}}"; var strbytes = Encoding.UTF8.GetBytes(Replaystr); socket.Send(strbytes, 0, strbytes.Length, SocketFlags.None); });
Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 17:11:44:568};Server收到消息的時間:2020-04-11 17:11:44:572 Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 17:11:44:568};Server收到消息的時間:2020-04-11 17:11:44:575 Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576 Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576 Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576 Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576 Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:577 Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:577 Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 17:11:44:571};Server收到消息的時間:2020-04-11 17:11:44:577
再進一步測試,假設有多個客戶端同時鏈接,並行發送消息。
Parallel.For(0, 9, (Clienti) => { var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var ip = IPAddress.Parse(m_ip); var endpoint = new IPEndPoint(ip, m_port); socket.ReceiveTimeout = -1; Task.Run(() => { socket.Connect(endpoint); ... ... } });
結果:這個測試是放在虛擬機中,使用的是NAT網絡模式,同一個子網內客戶端從發出消息接收服務端返回消息最長耗時有6秒,仍是比較誇張的。
服務端結果:
客戶端結果:
這個Socket服務在少許客戶端鏈接的時候好像沒什麼問題,它能抗住大量客戶端的鏈接併發測試嗎?我想答案確定是否認的,爲何呢?由於每一個客戶端鏈接都須要消耗1個線程,線程是很昂貴的資源,每一個線程自生就要消耗1M內存,100客戶端鏈接什麼都不作就消耗了100M,更不用說線程之間的上下文切換須要消耗更寶貴的CPU資源,因此這個服務端根本應對不了大量客戶端的鏈接。
那麼最理想的Socket服務端是什麼樣子的呢?在我看來就是隻有與CPU核數相同的線程量在運行,若是4核那麼就4個線程在運行,而後每一個線程處理超級多的客戶端,最好沒有阻塞,不休息。怎樣才能實現這個目標呢?微軟給了一個簡單的例子,已經極大程度的實現了這個想法,一塊兒來看看吧
我仿照微軟提供的這個實例擼了個簡單的Socket服務端
public class SocketArgsServer { private static int m_numConnections; private static int m_receiveBufferSize; private static int m_sendBufferSize; private static byte[] m_receivebuffer; private static Stack<int> m_freeReceiveIndexPool; private static int m_currentReceiveIndex; private static byte[] m_sendbuffer; private static Stack<int> m_freeSendIndexPool; private static int m_currentSendIndex; private static Stack<SocketAsyncEventArgs> m_ReadPool; private static Stack<SocketAsyncEventArgs> m_WritePool; private static Semaphore m_maxNumberAcceptedClients; private static int m_numConnectedSockets; private static int m_totalBytesRead; private static Socket listenSocket; public static void Run(string m_ip, int m_port, int numConnections, int m_receiveBuffer, int m_sentBuffer) { m_numConnections = numConnections; m_receiveBufferSize = m_receiveBuffer; m_sendBufferSize = m_sentBuffer; m_receivebuffer = new byte[m_receiveBufferSize * m_numConnections]; m_freeReceiveIndexPool = new Stack<int>(); m_currentReceiveIndex = 0; m_sendbuffer = new byte[m_sendBufferSize * m_numConnections]; m_freeSendIndexPool = new Stack<int>(); m_currentSendIndex = 0; m_ReadPool = new Stack<SocketAsyncEventArgs>(m_numConnections); m_WritePool = new Stack<SocketAsyncEventArgs>(m_numConnections); m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections); m_numConnectedSockets = 0; m_totalBytesRead = 0; for (int i = 0; i < m_numConnections; i++) { var readEventArg = new SocketAsyncEventArgs(); readEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted); readEventArg.UserToken = new AsyncUserToken(); if (m_freeReceiveIndexPool.Count > 0) { readEventArg.SetBuffer(m_receivebuffer, m_freeReceiveIndexPool.Pop(), m_receiveBufferSize); } else { if ((m_receiveBufferSize * m_numConnections - m_receiveBufferSize) < m_currentReceiveIndex) { new ArgumentException("接收緩存設置異常"); } readEventArg.SetBuffer(m_receivebuffer, m_currentReceiveIndex, m_receiveBufferSize); m_currentReceiveIndex += m_receiveBufferSize; } m_ReadPool.Push(readEventArg); var writeEventArg = new SocketAsyncEventArgs(); writeEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted); writeEventArg.UserToken = new AsyncUserToken(); if (m_freeSendIndexPool.Count > 0) { writeEventArg.SetBuffer(m_sendbuffer, m_freeSendIndexPool.Pop(), m_sendBufferSize); } else { if ((m_sendBufferSize * m_numConnections - m_sendBufferSize) < m_currentSendIndex) { new ArgumentException("發送緩存設置異常"); } writeEventArg.SetBuffer(m_sendbuffer, m_currentSendIndex, m_sendBufferSize); m_currentSendIndex += m_sendBufferSize; } m_WritePool.Push(writeEventArg); } listenSocket = new Socket(new IPEndPoint(IPAddress.Parse(m_ip), m_port).AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.Bind(new IPEndPoint(IPAddress.Parse(m_ip), m_port)); listenSocket.Listen(100); StartAccept(null); Console.WriteLine("Press any key to terminate the server process...."); Console.ReadKey(); } public static void ReadWriteIOComleted(object sender, SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } public static void ProcessSend(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { AsyncUserToken token = (AsyncUserToken)e.UserToken; bool willRaiseEvent = token.Socket.ReceiveAsync(token.readEventArgs); if (!willRaiseEvent) { ProcessReceive(token.readEventArgs); } } else { CloseClientSocket(e); } } public static void CloseClientSocket(SocketAsyncEventArgs e) { AsyncUserToken token = e.UserToken as AsyncUserToken; try { token.Socket.Shutdown(SocketShutdown.Send); } catch (Exception exception) { Console.WriteLine(exception); } token.Socket.Close(); Interlocked.Decrement(ref m_numConnectedSockets); m_ReadPool.Push(token.readEventArgs); m_WritePool.Push(token.writeEventArgs); token.Socket = null; token.readEventArgs = null; token.writeEventArgs = null; m_maxNumberAcceptedClients.Release(); } public static void ProcessReceive(SocketAsyncEventArgs e) { AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred); byte[] data = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred); var recestr = Encoding.UTF8.GetString(data); var Replaystr = $"Server收到消息:{recestr};Server收到消息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}"; Console.WriteLine(Replaystr); var strbytes = Encoding.UTF8.GetBytes(Replaystr); Array.Copy(strbytes, 0, token.writeEventArgs.Buffer, token.writeEventArgs.Offset, strbytes.Length); bool willRaiseEvent = token.Socket.SendAsync(token.writeEventArgs); if (!willRaiseEvent) { ProcessSend(token.writeEventArgs); } } else { CloseClientSocket(e); } } public static void ProcessAccept(SocketAsyncEventArgs e) { Interlocked.Increment(ref m_numConnectedSockets); SocketAsyncEventArgs readEventArgs = m_ReadPool.Pop(); SocketAsyncEventArgs writeEventArgs = m_WritePool.Pop(); ((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket; ((AsyncUserToken)readEventArgs.UserToken).readEventArgs = readEventArgs; ((AsyncUserToken)readEventArgs.UserToken).writeEventArgs = writeEventArgs; ((AsyncUserToken)writeEventArgs.UserToken).Socket = e.AcceptSocket; ((AsyncUserToken)writeEventArgs.UserToken).readEventArgs = readEventArgs; ((AsyncUserToken)writeEventArgs.UserToken).writeEventArgs = writeEventArgs; bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs); if (!willRaiseEvent) { ProcessReceive(readEventArgs); } StartAccept(e); } public static void StartAccept(SocketAsyncEventArgs listenEventArg) { if (listenEventArg == null) { listenEventArg = new SocketAsyncEventArgs(); listenEventArg.Completed += new EventHandler<SocketAsyncEventArgs>((sender, e) => ProcessAccept(e)); } else { listenEventArg.AcceptSocket = null; } m_maxNumberAcceptedClients.WaitOne(); bool willRaiseEvent = listenSocket.AcceptAsync(listenEventArg); if (!willRaiseEvent) { ProcessAccept(listenEventArg); } } } class AsyncUserToken { public Socket Socket { get; set; } public SocketAsyncEventArgs readEventArgs { set; get; } public SocketAsyncEventArgs writeEventArgs { set; get; } }
先直接上測試結果,該測試環境仍是在虛擬機中,忽略一下服務端收到消息時間,由於虛擬機時間和主機時間不是同步的。能夠看到服務端發送消息到接收到消息最長耗時2s。
這個Socket服務端直接丟棄了線程的概念,經過SocketAsyncEventArgs來實現了以前線程實現的全部功能。一個SocketAsyncEventArgs來監測鏈接,客戶端鏈接的時候從SocketAsyncEventArgsPool中分配兩個SocketAsyncEventArgs分別負責讀寫消息。讀寫消息的緩存塊也進行了統一管理,共同組成一個大的緩存塊進行重複使用。當客戶端失去鏈接的時候將分配的讀寫SocketAsyncEventArgs返還給SocketAsyncEventArgsPool進行重複使用。
在本文中探索了兩種socket服務端的實現,並對這兩種socket服務端進行了簡單的剖析,我看了SuperSocket的底層實現思想採用的是第二種方式。目前這種方式的弊端我還沒想到,歡迎你們一塊兒探討。