Socket探索1-兩種Socket服務端實現

介紹

一次簡單的Socket探索之旅,分別對Socket服務端的兩種方式進行了測試和解析。c#

CommonSocket

代碼實現

實現一個簡單的Socket服務,基本功能就是接收消息而後加上結束消息時間返回給客戶端。api

/// <summary>
    /// 簡單服務,收發消息
    /// </summary>
    class FirstSimpleServer
    {
        public static void Run(string m_ip, int m_port)
        {
            var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            var ip = IPAddress.Parse(m_ip);
            var endpoint = new IPEndPoint(ip, m_port);
            socket.Bind(endpoint);
            socket.Listen(0);
            socket.ReceiveTimeout = -1;
            Task.Run(() =>
            {
                while (true)
                {
                    var acceptSocket = socket.Accept();
                    if (acceptSocket != null && acceptSocket.Connected)
                    {
                        Task.Run(() =>
                        {
                            byte[] receiveBuffer = new byte[256];

                            int result = 0;
                            do
                            {
                                if (acceptSocket.Connected)
                                {

                                    result = acceptSocket.Receive(receiveBuffer, 0, receiveBuffer.Length,
                                        SocketFlags.None,
                                        out SocketError error);
                                    if (error == SocketError.Success && result > 0)
                                    {
                                        var recestr = Encoding.UTF8.GetString(receiveBuffer, 0, result);
                                        var Replaystr =
                                            $"Server收到消息:{recestr};Server收到消息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}";
                                        Console.WriteLine(Replaystr);
                                        var strbytes = Encoding.UTF8.GetBytes(Replaystr);
                                        acceptSocket.Send(strbytes, 0, strbytes.Length, SocketFlags.None);

                                        if (recestr.Contains("stop"))
                                        {
                                            break;
                                        }
                                    }
                                }
                                else
                                {
                                    break;
                                }
                            } while (result > 0);
                        }).ContinueWith((t) =>
                        {
                            System.Threading.Thread.Sleep(1000);
                            acceptSocket.Disconnect(false);
                            acceptSocket.Dispose();
                        });
                    }
                }
            }).Wait();
        }

簡單測試

測試:一個客戶端,發送10次數據,每次間隔50ms,緩存

結果:客戶端的顯示以下,客戶端發送消息,再接收到,十次中最長的耗時10ms。服務器

ClientReceiceServer:{Server收到消息:{Client:1:MessageID:0;Client發送時間:2020-04-11 13:21:22:974};Server收到消息的時間:2020-04-11 13:21:22:981;ClientReceiceServer時間:2020-04-11 13:21:22:984}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 13:21:23:032};Server收到消息的時間:2020-04-11 13:21:23:032;ClientReceiceServer時間:2020-04-11 13:21:23:032}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 13:21:23:082};Server收到消息的時間:2020-04-11 13:21:23:082;ClientReceiceServer時間:2020-04-11 13:21:23:083}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 13:21:23:133};Server收到消息的時間:2020-04-11 13:21:23:133;ClientReceiceServer時間:2020-04-11 13:21:23:133}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 13:21:23:184};Server收到消息的時間:2020-04-11 13:21:23:184;ClientReceiceServer時間:2020-04-11 13:21:23:190}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 13:21:23:235};Server收到消息的時間:2020-04-11 13:21:23:235;ClientReceiceServer時間:2020-04-11 13:21:23:235}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 13:21:23:286};Server收到消息的時間:2020-04-11 13:21:23:286;ClientReceiceServer時間:2020-04-11 13:21:23:286}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 13:21:23:336};Server收到消息的時間:2020-04-11 13:21:23:336;ClientReceiceServer時間:2020-04-11 13:21:23:336}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 13:21:23:387};Server收到消息的時間:2020-04-11 13:21:23:387;ClientReceiceServer時間:2020-04-11 13:21:23:388}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 13:21:23:438};Server收到消息的時間:2020-04-11 13:21:23:438;ClientReceiceServer時間:2020-04-11 13:21:23:438}

假如客戶端發送消息速度加快,對服務端會有什麼影響?測試將客戶端發送消息的間隔修改成1ms網絡

System.Threading.Thread.Sleep(1);併發

結果以下,並無發現問題。socket

ClientReceiceServer:{Server收到消息:{Client:1:MessageID:0;Client發送時間:2020-04-11 13:48:57:193};Server收到消息的時間:2020-04-11 13:48:57:196;ClientReceiceServer時間:2020-04-11 13:48:57:197}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 13:48:57:198};Server收到消息的時間:2020-04-11 13:48:57:198;ClientReceiceServer時間:2020-04-11 13:48:57:201}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 13:48:57:200};Server收到消息的時間:2020-04-11 13:48:57:201;ClientReceiceServer時間:2020-04-11 13:48:57:202}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 13:48:57:202};Server收到消息的時間:2020-04-11 13:48:57:202;ClientReceiceServer時間:2020-04-11 13:48:57:203}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 13:48:57:204};Server收到消息的時間:2020-04-11 13:48:57:204;ClientReceiceServer時間:2020-04-11 13:48:57:204}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 13:48:57:206};Server收到消息的時間:2020-04-11 13:48:57:206;ClientReceiceServer時間:2020-04-11 13:48:57:207}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 13:48:57:208};Server收到消息的時間:2020-04-11 13:48:57:208;ClientReceiceServer時間:2020-04-11 13:48:57:208}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 13:48:57:209};Server收到消息的時間:2020-04-11 13:48:57:209;ClientReceiceServer時間:2020-04-11 13:48:57:211}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 13:48:57:211};Server收到消息的時間:2020-04-11 13:48:57:211;ClientReceiceServer時間:2020-04-11 13:48:57:212}
ClientReceiceServer:{Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 13:48:57:213};Server收到消息的時間:2020-04-11 13:48:57:213;ClientReceiceServer時間:2020-04-11 13:48:57:213}

再極致一點,將客戶端的發送間隔取消,循環發送。看到下面服務端接收消息的結果,可看到消息包很混亂。仔細分析一下,發現其實服務器其實就收到3次消息,前兩次接受256個字節,最後一次接收138字節。這是因爲設置服務端接收緩存的大小爲256個字節。說明發送比較快或並行發送的時候,服務端會很快將接收的緩存塊填滿,一旦填滿,Receive方法就會返回,否則就處於阻塞狀態。async

Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 13:51:18:723}{Client:1:MessageID:2;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:3;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:4;Client發送時間:2020-04-11 13:51:18:;Server收到消息的時間:2020-04-11 13:51:18:724
Server收到消息:724}{Client:1:MessageID:5;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:6;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:7;Client發送時間:2020-04-11 13:51:18:724}{Client:1:MessageID:8;Client發送時間:2020-04-11 13:51;Server收到消息的時間:2020-04-11 13:51:18:729
Server收到消息::18:724}{Client:1:MessageID:9;Client發送時間:2020-04-11 13:51:18:724};Server收到消息的時間:2020-04-11 13:51:18:732

經過一個簡單的方法解決這個問題,每次客戶端發送固定長度的消息,服務端接收固定長度的消息。如今客戶端發送的消息是65個字節,設置服務端接收數據的緩存塊爲65字節。測試

{Client:1:MessageID:1;Client發送時間:2020-04-11 13:51:18:723}
byte[] receiveBuffer = new byte[65];

再連續發送10條消息,下面爲服務端測試的結果,結果顯示正常:.net

Server收到消息:{Client:1:MessageID:0;Client發送時間:2020-04-11 14:19:44:774};Server收到消息的時間:2020-04-11 14:19:44:778
Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:781
Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:781
Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:782
Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:782
Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:782
Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:783
Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:783
Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:784
Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 14:19:44:776};Server收到消息的時間:2020-04-11 14:19:44:784

併發消息測試

若是並行發送消息,同時有兩個消息到服務端,消息內容會混亂嗎?客戶端進行並行消息發送測試。下面爲測試結果,發現並無問題,說明一個消息可能沒有被拆分,或則即便被拆分了在網絡通信底層也會恢復原來的消息結構。

Parallel.For(1, 10, (i) =>
             {
                 var Replaystr =
                     $"{{Client:1:MessageID:{i};Client發送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}}}";
                 var strbytes = Encoding.UTF8.GetBytes(Replaystr);
                 socket.Send(strbytes, 0, strbytes.Length, SocketFlags.None);
             });
Server收到消息:{Client:1:MessageID:2;Client發送時間:2020-04-11 17:11:44:568};Server收到消息的時間:2020-04-11 17:11:44:572
Server收到消息:{Client:1:MessageID:1;Client發送時間:2020-04-11 17:11:44:568};Server收到消息的時間:2020-04-11 17:11:44:575
Server收到消息:{Client:1:MessageID:4;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576
Server收到消息:{Client:1:MessageID:5;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576
Server收到消息:{Client:1:MessageID:6;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576
Server收到消息:{Client:1:MessageID:7;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:576
Server收到消息:{Client:1:MessageID:8;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:577
Server收到消息:{Client:1:MessageID:9;Client發送時間:2020-04-11 17:11:44:572};Server收到消息的時間:2020-04-11 17:11:44:577
Server收到消息:{Client:1:MessageID:3;Client發送時間:2020-04-11 17:11:44:571};Server收到消息的時間:2020-04-11 17:11:44:577

併發客戶端測試

再進一步測試,假設有多個客戶端同時鏈接,並行發送消息。

Parallel.For(0, 9, (Clienti) =>
            {
                var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                var ip = IPAddress.Parse(m_ip);
                var endpoint = new IPEndPoint(ip, m_port);
                socket.ReceiveTimeout = -1;
                Task.Run(() =>
                {
                    socket.Connect(endpoint);
                    ...
                    ...
                 }
             });

結果:這個測試是放在虛擬機中,使用的是NAT網絡模式,同一個子網內客戶端從發出消息接收服務端返回消息最長耗時有6秒,仍是比較誇張的。

服務端結果:

1586608311741

客戶端結果:

1586608461707

總結

這個Socket服務在少許客戶端鏈接的時候好像沒什麼問題,它能抗住大量客戶端的鏈接併發測試嗎?我想答案確定是否認的,爲何呢?由於每一個客戶端鏈接都須要消耗1個線程,線程是很昂貴的資源,每一個線程自生就要消耗1M內存,100客戶端鏈接什麼都不作就消耗了100M,更不用說線程之間的上下文切換須要消耗更寶貴的CPU資源,因此這個服務端根本應對不了大量客戶端的鏈接。

那麼最理想的Socket服務端是什麼樣子的呢?在我看來就是隻有與CPU核數相同的線程量在運行,若是4核那麼就4個線程在運行,而後每一個線程處理超級多的客戶端,最好沒有阻塞,不休息。怎樣才能實現這個目標呢?微軟給了一個簡單的例子,已經極大程度的實現了這個想法,一塊兒來看看吧

SocketAsyncEventArgs

MSDN中SocketAsyncEventArgs

代碼實現

我仿照微軟提供的這個實例擼了個簡單的Socket服務端

public class SocketArgsServer
    {
        private static int m_numConnections;
        private static int m_receiveBufferSize;
        private static int m_sendBufferSize;
        private static byte[] m_receivebuffer;
        private static Stack<int> m_freeReceiveIndexPool;
        private static int m_currentReceiveIndex;
        private static byte[] m_sendbuffer;
        private static Stack<int> m_freeSendIndexPool;
        private static int m_currentSendIndex;
        private static Stack<SocketAsyncEventArgs> m_ReadPool;
        private static Stack<SocketAsyncEventArgs> m_WritePool;
        private static Semaphore m_maxNumberAcceptedClients;
        private static int m_numConnectedSockets;
        private static int m_totalBytesRead;
        private static Socket listenSocket;
        public static void Run(string m_ip, int m_port, int numConnections, int m_receiveBuffer, int m_sentBuffer)
        {
            m_numConnections = numConnections;
            m_receiveBufferSize = m_receiveBuffer;
            m_sendBufferSize = m_sentBuffer;
            m_receivebuffer = new byte[m_receiveBufferSize * m_numConnections];
            m_freeReceiveIndexPool = new Stack<int>();
            m_currentReceiveIndex = 0;
            m_sendbuffer = new byte[m_sendBufferSize * m_numConnections];
            m_freeSendIndexPool = new Stack<int>();
            m_currentSendIndex = 0;
            m_ReadPool = new Stack<SocketAsyncEventArgs>(m_numConnections);
            m_WritePool = new Stack<SocketAsyncEventArgs>(m_numConnections);
            m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections);
            m_numConnectedSockets = 0;
            m_totalBytesRead = 0;
            for (int i = 0; i < m_numConnections; i++)
            {
                var readEventArg = new SocketAsyncEventArgs();
                readEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted);
                readEventArg.UserToken = new AsyncUserToken();
                if (m_freeReceiveIndexPool.Count > 0)
                {
                    readEventArg.SetBuffer(m_receivebuffer, m_freeReceiveIndexPool.Pop(), m_receiveBufferSize);
                }
                else
                {
                    if ((m_receiveBufferSize * m_numConnections - m_receiveBufferSize) < m_currentReceiveIndex)
                    {
                        new ArgumentException("接收緩存設置異常");
                    }
                    readEventArg.SetBuffer(m_receivebuffer, m_currentReceiveIndex, m_receiveBufferSize);
                    m_currentReceiveIndex += m_receiveBufferSize;
                }
                m_ReadPool.Push(readEventArg);
                var writeEventArg = new SocketAsyncEventArgs();
                writeEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted);
                writeEventArg.UserToken = new AsyncUserToken();
                if (m_freeSendIndexPool.Count > 0)
                {
                    writeEventArg.SetBuffer(m_sendbuffer, m_freeSendIndexPool.Pop(), m_sendBufferSize);
                }
                else
                {
                    if ((m_sendBufferSize * m_numConnections - m_sendBufferSize) < m_currentSendIndex)
                    {
                        new ArgumentException("發送緩存設置異常");
                    }
                    writeEventArg.SetBuffer(m_sendbuffer, m_currentSendIndex, m_sendBufferSize);
                    m_currentSendIndex += m_sendBufferSize;
                }
                m_WritePool.Push(writeEventArg);
            }

            listenSocket = new Socket(new IPEndPoint(IPAddress.Parse(m_ip), m_port).AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            listenSocket.Bind(new IPEndPoint(IPAddress.Parse(m_ip), m_port));
            listenSocket.Listen(100);

            StartAccept(null);
            Console.WriteLine("Press any key to terminate the server process....");
            Console.ReadKey();
        }
        public static void ReadWriteIOComleted(object sender, SocketAsyncEventArgs e)
        {
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }
        }
        public static void ProcessSend(SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.Success)
            {
                AsyncUserToken token = (AsyncUserToken)e.UserToken;
                bool willRaiseEvent = token.Socket.ReceiveAsync(token.readEventArgs);
                if (!willRaiseEvent)
                {
                    ProcessReceive(token.readEventArgs);
                }
            }
            else
            {
                CloseClientSocket(e);
            }
        }
        public static void CloseClientSocket(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = e.UserToken as AsyncUserToken;
            try
            {
                token.Socket.Shutdown(SocketShutdown.Send);
            }
            catch (Exception exception)
            {
                Console.WriteLine(exception);
            }
            token.Socket.Close();
            Interlocked.Decrement(ref m_numConnectedSockets);
            m_ReadPool.Push(token.readEventArgs);
            m_WritePool.Push(token.writeEventArgs);
            token.Socket = null;
            token.readEventArgs = null;
            token.writeEventArgs = null;
            m_maxNumberAcceptedClients.Release();
        }
        public static void ProcessReceive(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = (AsyncUserToken)e.UserToken;
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            {
                Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
                byte[] data = new byte[e.BytesTransferred];
                Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
                var recestr = Encoding.UTF8.GetString(data);
                var Replaystr =
                    $"Server收到消息:{recestr};Server收到消息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}";
                Console.WriteLine(Replaystr);
                var strbytes = Encoding.UTF8.GetBytes(Replaystr);
                Array.Copy(strbytes, 0, token.writeEventArgs.Buffer, token.writeEventArgs.Offset,
                    strbytes.Length);

                bool willRaiseEvent = token.Socket.SendAsync(token.writeEventArgs);
                if (!willRaiseEvent)
                {
                    ProcessSend(token.writeEventArgs);
                }
            }
            else
            {
                CloseClientSocket(e);
            }
        }

        public static void ProcessAccept(SocketAsyncEventArgs e)
        {
            Interlocked.Increment(ref m_numConnectedSockets);
            SocketAsyncEventArgs readEventArgs = m_ReadPool.Pop();
            SocketAsyncEventArgs writeEventArgs = m_WritePool.Pop();
            ((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
            ((AsyncUserToken)readEventArgs.UserToken).readEventArgs = readEventArgs;
            ((AsyncUserToken)readEventArgs.UserToken).writeEventArgs = writeEventArgs;

            ((AsyncUserToken)writeEventArgs.UserToken).Socket = e.AcceptSocket;
            ((AsyncUserToken)writeEventArgs.UserToken).readEventArgs = readEventArgs;
            ((AsyncUserToken)writeEventArgs.UserToken).writeEventArgs = writeEventArgs;
            bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
            if (!willRaiseEvent)
            {
                ProcessReceive(readEventArgs);
            }
            StartAccept(e);
        }

        public static void StartAccept(SocketAsyncEventArgs listenEventArg)
        {
            if (listenEventArg == null)
            {
                listenEventArg = new SocketAsyncEventArgs();
                listenEventArg.Completed += new EventHandler<SocketAsyncEventArgs>((sender, e) => ProcessAccept(e));
            }
            else
            {
                listenEventArg.AcceptSocket = null;
            }

            m_maxNumberAcceptedClients.WaitOne();
            bool willRaiseEvent = listenSocket.AcceptAsync(listenEventArg);

            if (!willRaiseEvent)
            {
                ProcessAccept(listenEventArg);
            }
        }

    }
    class AsyncUserToken
    {
 
        public Socket Socket { get; set; }

        public SocketAsyncEventArgs readEventArgs { set; get; }

        public SocketAsyncEventArgs writeEventArgs { set; get; }

    }

併發測試

先直接上測試結果,該測試環境仍是在虛擬機中,忽略一下服務端收到消息時間,由於虛擬機時間和主機時間不是同步的。能夠看到服務端發送消息到接收到消息最長耗時2s。

1586695784591

總結

這個Socket服務端直接丟棄了線程的概念,經過SocketAsyncEventArgs來實現了以前線程實現的全部功能。一個SocketAsyncEventArgs來監測鏈接,客戶端鏈接的時候從SocketAsyncEventArgsPool中分配兩個SocketAsyncEventArgs分別負責讀寫消息。讀寫消息的緩存塊也進行了統一管理,共同組成一個大的緩存塊進行重複使用。當客戶端失去鏈接的時候將分配的讀寫SocketAsyncEventArgs返還給SocketAsyncEventArgsPool進行重複使用。

最後

在本文中探索了兩種socket服務端的實現,並對這兩種socket服務端進行了簡單的剖析,我看了SuperSocket的底層實現思想採用的是第二種方式。目前這種方式的弊端我還沒想到,歡迎你們一塊兒探討。

相關文章
相關標籤/搜索