C# 高性能 TCP 服務的多種實現方式Cowboy.Sockets

本篇文章的主旨是使用 .NET/C# 實現 TCP 高性能服務的不一樣方式,包括但不限於以下內容:html

在 .NET/C# 中對於 Socket 的支持均是基於 Windows I/O Completion Ports 完成端口技術的封裝,經過不一樣的 Non-Blocking 封裝結構來知足不一樣的編程需求。以上方式均已在 Cowboy.Sockets 中有完整實現,而且 APM 和 TAP 方式已經在實際項目中應用。Cowboy.Sockets 還在不斷的進化和完善中,若有任何問題請及時指正。ios

雖然有這麼多種實現方式,但抽象的看,它們是同樣同樣的,用兩個 Loop 便可描述:Accept Loop和 Read Loop,以下圖所示。(這裏說起的 "Loop" 指的是一種循環方式,而非特指 while/for 等關鍵字。)git

  • 在任何 TCP Server 的實現中,必定存在一個 Accept Socket Loop,用於接收 Client 端的 Connect 請求以創建 TCP Connection。
  • 在任何 TCP Server 的實現中,必定存在一個 Read Socket Loop,用於接收 Client 端 Write 過來的數據。

若是 Accept 循環阻塞,則會致使沒法快速的創建鏈接,服務端 Pending Backlog 滿,進而致使 Client 端收到 Connect Timeout 的異常。若是 Read 循環阻塞,則顯然會致使沒法及時收到 Client 端發過來的數據,進而致使 Client 端 Send Buffer 滿,沒法再發送數據。github

從實現細節的角度看,可以致使服務阻塞的位置可能在:編程

  1. Accept 到新的 Socket,構建新的 Connection 須要分配各類資源,分配資源慢;
  2. Accept 到新的 Socket,沒有及時觸發下一次 Accept;
  3. Read 到新的 Buffer,斷定 Payload 消息長度,斷定過程長;
  4. Read 到新的 Buffer,發現 Payload 尚未收全,繼續 Read,則 "可能" 會致使一次 Buffer Copy;
  5. Payload 接收完畢,進行 De-Serialization 轉成可識別的 Protocol Message,反序列化慢;
  6. 由 Business Module 來處理相應的 Protocol Message,處理過程慢;

1-2 涉及到 Accept 過程和 Connection 的創建過程,3-4 涉及到 ReceiveBuffer 的處理過程,5-6 涉及到應用邏輯側的實現。windows

Java 中著名的 Netty 網絡庫從 4.0 版本開始對於 Buffer 部分作了全新的嘗試,採用了名叫 ByteBuf的設計,實現 Buffer Zero Copy 以減小高併發條件下 Buffer 拷貝帶來的性能損失和 GC 壓力。DotNettyOrleans ,Helios 等項目正在嘗試在 C# 中進行相似的 ByteBuf 的實現。網絡

APM 方式:TcpSocketServer

TcpSocketServer 的實現是基於 .NET Framework 自帶的 TcpListener 和 TcpClient 的更進一步的封裝,採用基於 APM 的 BeginXXX 和 EndXXX 接口實現。session

TcpSocketServer 中的 Accept Loop 指的就是,併發

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

每個創建成功的 Connection 由 TcpSocketSession 來處理,因此 TcpSocketSession 中會包含 Read Loop,app

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer 經過暴露 Event 來實現 Connection 的創建與斷開和數據接收的通知。

  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

使用也是簡單直接,直接訂閱事件通知。

複製代碼
  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }
  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }
  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }
  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {
      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }
複製代碼

TAP 方式:AsyncTcpSocketServer

AsyncTcpSocketServer 的實現是基於 .NET Framework 自帶的 TcpListener 和 TcpClient 的更進一步的封裝,採用基於 TAP 的 async/await 的 XXXAsync 接口實現。

然而,實際上 XXXAsync 並無建立什麼神奇的效果,其內部實現只是將 APM 的方法轉換成了 TAP 的調用方式。

複製代碼
  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]
  public Task<Socket> AcceptSocketAsync()
  {
      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]
  public Task<TcpClient> AcceptTcpClientAsync()
  {
      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }
複製代碼

AsyncTcpSocketServer 中的 Accept Loop 指的就是,

  while (IsListening)
  {
      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

每個創建成功的 Connection 由 AsyncTcpSocketSession 來處理,因此 AsyncTcpSocketSession 中會包含 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

爲了將 async/await 異步到底,AsyncTcpSocketServer 所暴露的接口也一樣是 Awaitable 的。

複製代碼
  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }
複製代碼

使用時僅需將一個實現了該接口的對象注入到 AsyncTcpSocketServer 的構造函數中便可。

複製代碼
  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
複製代碼

固然,對於接口的實現也不是強制了,也能夠在構造函數中直接注入方法的實現。

複製代碼
  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}
複製代碼

SAEA 方式:TcpSocketSaeaServer

SAEA 是 SocketAsyncEventArgs 的簡寫。SocketAsyncEventArgs 是 .NET Framework 3.5 開始支持的一種支持高性能 Socket 通訊的實現。SocketAsyncEventArgs 相比於 APM 方式的主要優勢能夠描述以下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

也就是說,優勢就是無需爲每次調用都生成 IAsyncResult 等對象,向原生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的推薦步驟以下:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
  6. Reuse the context for another operation, put it back in the pool, or discard it.

重點在於池化(Pooling),池化的目的就是爲了重用和減小運行時分配和垃圾回收的壓力。

TcpSocketSaeaServer 便是對 SocketAsyncEventArgs 的應用和封裝,並實現了 Pooling 技術。TcpSocketSaeaServer 中的重點是 SaeaAwaitable 類,SaeaAwaitable 中內置了一個 SocketAsyncEventArgs,並經過 GetAwaiter 返回 SaeaAwaiter 來支持 async/await 操做。同時,經過 SaeaExtensions 擴展方法對來擴展 SocketAsyncEventArgs 的 Awaitable 實現。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool 則是一個 QueuedObjectPool<SaeaAwaitable> 的衍生實現,用於池化 SaeaAwaitable 實例。同時,爲了減小 TcpSocketSaeaSession 的構建過程,也實現了 SessionPool 即 QueuedObjectPool<TcpSocketSaeaSession>。

TcpSocketSaeaServer 中的 Accept Loop 指的就是,

複製代碼
  while (IsListening)
  {
      var saea = _acceptSaeaPool.Take();
  
      var socketError = await _listener.AcceptAsync(saea);
      if (socketError == SocketError.Success)
      {
          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }
複製代碼

每個創建成功的 Connection 由 TcpSocketSaeaSession 來處理,因此 TcpSocketSaeaSession 中會包含 Read Loop,

複製代碼
  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);
  
      var socketError = await _socket.ReceiveAsync(saea);
      if (socketError != SocketError.Success)
          break;
  
      var receiveCount = saea.Saea.BytesTransferred;
      if (receiveCount == 0)
          break;
  }
複製代碼

一樣,TcpSocketSaeaServer 對外所暴露的接口也一樣是 Awaitable 的。

  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起來也是簡單直接:

複製代碼
  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
複製代碼

RIO 方式:TcpSocketRioServer

從 Windows 8.1 / Windows Server 2012 R2 開始,微軟推出了 Registered I/O Networking Extensions 來支持高性能 Socket 服務的實現,簡稱 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue
  • RIOCreateCompletionQueue
  • RIOCreateRequestQueue
  • RIODequeueCompletion
  • RIODeregisterBuffer
  • RIONotify
  • RIOReceive
  • RIOReceiveEx
  • RIORegisterBuffer
  • RIOResizeCompletionQueue
  • RIOResizeRequestQueue
  • RIOSend
  • RIOSendEx

到目前爲止,.NET Framework 尚未推出對 RIO 的支持,因此若想在 C# 中實現 RIO 則只能經過 P/Invoke 方式,RioSharp 是開源項目中的一個比較完整的實現。

Cowboy.Sockets 直接引用了 RioSharp 的源代碼,放置在 Cowboy.Sockets.Experimental 名空間下,以供實驗和測試使用。

一樣,經過 TcpSocketRioServer 來實現 Accept Loop,

複製代碼
_listener.OnAccepted = (acceptedSocket) =>
{
    Task.Run(async () =>
    {
        await Process(acceptedSocket);
    })
    .Forget();
};
複製代碼

經過 TcpSocketRioSession 來處理 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
      if (receiveCount == 0)
          break;
  }

測試代碼一如既往的相似:

複製代碼
  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketRioSession session)
      {
          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
相關文章
相關標籤/搜索