異步tcp通訊——APM.Core 服務端概述

爲何使用異步html

  異步線程是由線程池負責管理,而多線程,咱們能夠本身控制,固然在多線程中咱們也能夠使用線程池。就拿網絡扒蟲而言,若是使用異步模式去實現,它使用線程池進行管理。異步操做執行時,會將操做丟給線程池中的某個工做線程來完成。當開始I/O操做的時候,異步會將工做線程還給線程池,這意味着獲取網頁的工做不會再佔用任何CPU資源了。直到異步完成,即獲取網頁完畢,異步纔會經過回調的方式通知線程池。可見,異步模式藉助於線程池,極大地節約了CPU的資源。
  注:DMA(Direct Memory Access)直接內存存取,顧名思義DMA功能就是讓設備能夠繞過處理器,直接由內存來讀取資料。經過直接內存訪問的數據交換幾乎能夠不損耗CPU的資源。在硬件中,硬盤、網卡、聲卡、顯卡等都有直接內存訪問功能。異步編程模型就是讓咱們充分利用硬件的直接內存訪問功能來釋放CPU的壓力。
  二者的應用場景:
    計算密集型工做,採用多線程。
    IO密集型工做,採用異步機制。git

C#中實現異步tcp通訊github

  socket中僅僅須要將Blocking=false便可輕鬆實現異步,部分示例以下:編程

 1 /// <summary>
 2         /// 啓動tcp監聽
 3         /// </summary>
 4         public void Start()
 5         {
 6             if (!_isStarted)
 7             {
 8                 _isStarted = true;
 9                 _server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
10 
11                 #region socket配置            
12                 LingerOption lingerOption = new LingerOption(true, 30);
13                 _server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, lingerOption);
14                 #endregion
15 
16                 _server.Blocking = false;
17                 _server.ExclusiveAddressUse = false;
18                 _server.Bind(new IPEndPoint(IPAddress.Any, this._port));
19                 _server.Listen(1000000);
20                 Parallel.For(0, 1000000, i =>
21                 {
22                     _server.BeginAccept(new AsyncCallback(ProcessAccept), _server);
23                 });
24             }
25         }
View Code

  tcp異步中處理接io操做最關鍵的參數:IAsyncResult,使用通常用begin開始,end結束。網絡

  接收數據處理以下:多線程

 1 /// <summary>
 2         /// 處理傳入的鏈接請求
 3         /// </summary>
 4         private void ProcessAccept(IAsyncResult ar)
 5         {
 6             var s = (Socket)ar.AsyncState;
 7             var remote = s.EndAccept(ar);
 8             var user = new UserToken(this._maxBufferSize) { ID = remote.RemoteEndPoint.ToString(), Client = remote };
 9             remote.BeginReceive(user.ReceiveBuffer, 0, user.ReceiveBuffer.Length, SocketFlags.None, new AsyncCallback(ProcessReceive),
10                 user);
11             s.BeginAccept(new AsyncCallback(ProcessAccept), s);
12         }
View Code
 1 private void ProcessReceive(IAsyncResult ar)
 2         {
 3             var user = (UserToken)ar.AsyncState;
 4             var remote = user.Client;
 5             try
 6             {
 7                 if (remote.Connected)
 8                 {
 9                     var ns = remote.EndReceive(ar);
10 
11                     if (ns > 0)
12                     {
13                         var buffer = new byte[ns];
14 
15                         Buffer.BlockCopy(user.ReceiveBuffer, 0, buffer, 0, buffer.Length);
16 
17                         user.UnPackage(buffer, (p) =>
18                         {
19                             Interlocked.Increment(ref this._receiveCount);
20                             this.RaiseOnOnReceived(user, p);
21                         });
22 
23                         user.ClearReceiveBuffer();
24 
25                         buffer = null;
26 
27                         remote.BeginReceive(user.ReceiveBuffer, 0, user.ReceiveBuffer.Length, SocketFlags.None, new AsyncCallback(ProcessReceive), user);
28                     }
29                 }
30                 else
31                 {
32                     this.RaiseOnDisConnected(user, new Exception("客戶端已斷開鏈接"));
33                     this.CloseClient(user);
34                 }
35             }
36             catch (SocketException sex)
37             {
38                 this.RaiseOnDisConnected(user, sex);
39                 this.CloseClient(user);
40             }
41             catch (Exception ex)
42             {
43                 this.RaiseOnError(user, ex);
44                 this.CloseClient(user);
45             }
46         }
View Code

  發送數據處理以下:異步

 1 /// <summary>
 2         /// 發送信息
 3         /// </summary>
 4         /// <param name="remote"></param>
 5         /// <param name="data"></param>
 6         /// <param name="type"></param>
 7         /// <param name="auth"></param>
 8         private void SendAsync(UserToken remote, byte[] data, TransportType type = TransportType.Heart)
 9         {
10             try
11             {
12                 using (var pakage = new TcpPackage(data, type, remote.Auth))
13                 {
14                     remote.Client.BeginSend(pakage.Data, 0, pakage.Data.Length, SocketFlags.None, new AsyncCallback(EndSend), remote);
15                 }
16 
17             }
18             catch (SocketException sex)
19             {
20                 this.RaiseOnDisConnected(remote, sex);
21             }
22             catch (Exception ex)
23             {
24                 this.RaiseOnError(remote, ex);
25             }
26         }
View Code
1 private void EndSend(IAsyncResult ar)
2         {
3             var remote = (UserToken)ar.AsyncState;
4             remote.Client.EndSend(ar);
5             Interlocked.Increment(ref this._sendCount);
6         }
View Code

  心跳、消息、文件等邏輯均可以基於發送邏輯來完成socket

1 /// <summary>
2         /// 回覆心跳
3         /// </summary>
4         /// <param name="remote"></param>
5         /// <param name="package"></param>
6         private void ReplyHeart(UserToken remote, TcpPackage package)
7         {
8             this.SendAsync(remote, null, TransportType.Heart);
9         }
View Code
1 /// <summary>
2         /// 發送信息
3         /// </summary>
4         /// <param name="remote"></param>
5         /// <param name="msg"></param>
6         public void SendMsg(UserToken remote, byte[] msg)
7         {
8             this.SendAsync(remote, msg, TransportType.Message);
9         }
View Code
 1 /// <summary>
 2         /// 發送文件
 3         /// </summary>
 4         /// <param name="remote"></param>
 5         /// <param name="filePath"></param>
 6         public void SendFile(UserToken remote, string filePath)
 7         {
 8             using (var file = new TransferFileInfo()
 9             {
10                 ID = remote.ID,
11                 FileBytes = File.ReadAllBytes(filePath),
12                 Name = filePath.Substring(filePath.LastIndexOf("\\") + 1),
13                 CreateTime = DateTime.Now.Ticks
14             })
15             {
16                 var buffer = TransferFileInfo.Serialize(file);
17                 this.SendAsync(remote, buffer, TransportType.File);
18                 buffer = null;
19             }
20         }
View Code
 1 /// <summary>
 2         /// 發送文件
 3         /// </summary>
 4         /// <param name="remote"></param>
 5         /// <param name="fileName"></param>
 6         /// <param name="file"></param>
 7         public void SendFile(UserToken remote, string fileName, byte[] file)
 8         {
 9             using (var fileInfo = new TransferFileInfo()
10             {
11                 ID = remote.ID,
12                 FileBytes = file,
13                 Name = fileName,
14                 CreateTime = DateTime.Now.Ticks
15             })
16             {
17                 var buffer = TransferFileInfo.Serialize(fileInfo);
18                 this.SendAsync(remote, buffer, TransportType.File);
19                 buffer = null;
20             }
21         }
View Code

 

異步tcp通訊——APM.Core 服務端概述

異步tcp通訊——APM.Core 解包

異步tcp通訊——APM.Server 消息推送服務的實現

異步tcp通訊——APM.ConsoleDemo

 


轉載請標明本文來源:http://www.cnblogs.com/yswenli/
更多內容歡迎star做者的github:https://github.com/yswenli/APM
若是發現本文有什麼問題和任何建議,也隨時歡迎交流~tcp

相關文章
相關標籤/搜索