1、前言html
時間過得真是快,轉眼就2018年了。首先祝各位博友,軟件開發者新年新氣象,事業有成,身體健康,闔家幸福!最近看到園子裏好多關於本身的2017年度總結以及對本身新一年的願景,以爲咱園子的氛圍是真的好。這三天假期我也沒閒着,一邊看OB海鮮團吃雞一邊寫Socket SocketAsyncEventArgs 代碼。我上一篇博客已經用APM的方式實現了客戶端與服務器端的Socket通訊,並具備了必定的併發能力。因此這三天我就決定對服務器代碼進行改造,使用MS在4.0時發佈的SocketAsyncEventArgs(SAEA)寫法。爲了方便的進行服務器端兩種寫法的對比,我客戶端的代碼沒有進行變化,依然使用APM方式。代碼已經上傳至Github,連接會在文末貼出。git
2、個人業務功能github
個人業務功能依然是實現從服務器多線程下載更新文件。下載以前的那些操做我基本就不講了,上一篇博文裏的都有,本文仍是回到Socket下載文件上。具體流程以下:數組
在我寫SAEA代碼以前,我仔細搜了一下網上的資源:MSDN、CNBLOG、CSDN、CodeProject。這四種來源的代碼示例的主要流程是這樣的:緩存
對比個人流程,您會發現少了一半的通訊過程。客戶端的代碼好寫,可是服務器端如何發送完數據以後再接收數據?這中間的銜接過程仍是有點門道的。特別是SAEA的代碼採用了Buffer池化以及SAEA池化以後,裏面有些小的細節就要想清楚了。下面就是具體的代碼,我會以我本身的視角去論述APM與SAEA到底有什麼區別。服務器
3、對比多線程
其實對於服務器端的APM,我以爲最重要的並非代碼中的BeginXXX或者是EndXXX,由於這就是APM寫法的特徵,BeginXXX或者EndXXX而後裏面有一個回調函數,在回調函數裏去作一些業務上的事情。最重要的是要有一個線程等待的概念,也就是代碼中的ManualResetEvent這個東西,它就像地鐵閘機同樣,處理好一個再放一個進去。APM寫法的好處是顯而易見的,就是代碼看起來十分的簡單。缺點依照MS的說法就是若是有過多的客-服交流,可能會產生較多的IAsyncResult對象,這樣會增長服務器的開銷。 架構
服務器端的APM寫法:併發
1 using System; 2 using System.IO; 3 using System.Linq; 4 using System.Net; 5 using System.Net.Sockets; 6 using System.Threading; 7 using UpdaterShare.GlobalSetting; 8 using UpdaterShare.Model; 9 using UpdaterShare.Utility; 10 11 namespace UpdaterServerAPM 12 { 13 public static class ServerSocket 14 { 15 private static int _downloadChannelsCount; 16 private static string _serverPath; 17 private static readonly ManualResetEvent AllDone = new ManualResetEvent(false); 18 19 public static void StartServer(int port, int backlog) 20 { 21 _downloadChannelsCount = DownloadSetting.DownloadChannelsCount; 22 try 23 { 24 IPAddress ipAddress = IPAddress.Any; 25 IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port); 26 Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 27 listener.Bind(localEndPoint); 28 listener.Listen(backlog); 29 30 while (true) 31 { 32 AllDone.Reset(); 33 listener.BeginAccept(AcceptCallback, listener); 34 AllDone.WaitOne(); 35 } 36 } 37 catch (Exception ex) 38 { 39 var path = $"{AppDomain.CurrentDomain.BaseDirectory}\\RunLog.txt"; 40 File.AppendAllText(path, ex.Message); 41 } 42 } 43 44 45 private static void AcceptCallback(IAsyncResult ar) 46 { 47 AllDone.Set(); 48 Socket listener = (Socket)ar.AsyncState; 49 Socket handler = listener.EndAccept(ar); 50 ComObject state = new ComObject { WorkSocket = handler }; 51 handler.BeginReceive(state.Buffer, 0, ComObject.BufferSize, 0, FindUpdateFileCallback, state); 52 } 53 54 55 private static void FindUpdateFileCallback(IAsyncResult ar) 56 { 57 ComObject state = (ComObject)ar.AsyncState; 58 Socket handler = state.WorkSocket; 59 int bytesRead = handler.EndReceive(ar); 60 if (bytesRead > 0) 61 { 62 var receiveData = state.Buffer.Take(bytesRead).ToArray(); 63 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientFindFileInfoTag()); 64 if (dataList != null && dataList.Any()) 65 { 66 var request = PacketUtils.GetData(PacketUtils.ClientFindFileInfoTag(), dataList.FirstOrDefault()); 67 string str = System.Text.Encoding.UTF8.GetString(request); 68 var infos = str.Split('_'); 69 var productName = infos[0]; 70 var revitVersion = infos[1]; 71 var currentVersion = infos[2]; 72 73 var mainFolder = AppDomain.CurrentDomain.BaseDirectory.Replace("bin", "TestFile"); 74 var serverFileFolder = Path.Combine(mainFolder, "Server"); 75 var serverFileFiles = new DirectoryInfo(serverFileFolder).GetFiles(); 76 77 var updatefile = serverFileFiles.FirstOrDefault(x=>x.Name.Contains(productName) && x.Name.Contains(revitVersion) && x.Name.Contains(currentVersion)); 78 if (updatefile != null) 79 { 80 if (string.IsNullOrEmpty(updatefile.FullName) || !File.Exists(updatefile.FullName)) return; 81 _serverPath = updatefile.FullName; 82 FoundUpdateFileResponse(handler); 83 } 84 } 85 } 86 } 87 88 89 private static void FoundUpdateFileResponse(Socket handler) 90 { 91 byte[] foundUpdateFileData = PacketUtils.PacketData(PacketUtils.ServerFoundFileInfoTag(),null); 92 ComObject state = new ComObject { WorkSocket = handler }; 93 handler.BeginSend(foundUpdateFileData, 0, foundUpdateFileData.Length, 0, HasFoundUpdateFileCallback, state); 94 } 95 96 97 private static void HasFoundUpdateFileCallback(IAsyncResult ar) 98 { 99 ComObject state = (ComObject)ar.AsyncState; 100 Socket handler = state.WorkSocket; 101 handler.EndSend(ar); 102 handler.BeginReceive(state.Buffer, 0, ComObject.BufferSize, 0, ReadFilePositionRequestCallback, state); 103 } 104 105 106 private static void ReadFilePositionRequestCallback(IAsyncResult ar) 107 { 108 ComObject state = (ComObject)ar.AsyncState; 109 Socket handler = state.WorkSocket; 110 int bytesRead = handler.EndReceive(ar); 111 if (bytesRead > 0) 112 { 113 var receiveData = state.Buffer.Take(bytesRead).ToArray(); 114 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientRequestFileTag()); 115 if (dataList != null) 116 { 117 foreach (var request in dataList) 118 { 119 if (PacketUtils.IsPacketComplete(request)) 120 { 121 int startPosition = PacketUtils.GetRequestFileStartPosition(request); 122 SendFileResponse(handler, startPosition); 123 } 124 } 125 } 126 } 127 } 128 129 private static void SendFileResponse(Socket handler, int startPosition) 130 { 131 var packetSize = PacketUtils.GetPacketSize(_serverPath, _downloadChannelsCount); 132 if (packetSize != 0) 133 { 134 byte[] filedata = FileUtils.GetFile(_serverPath, startPosition, packetSize); 135 byte[] packetNumber = BitConverter.GetBytes(startPosition/packetSize); 136 if (filedata != null) 137 { 138 byte[] segmentedFileResponseData = PacketUtils.PacketData(PacketUtils.ServerResponseFileTag(), filedata, packetNumber); 139 ComObject state = new ComObject {WorkSocket = handler}; 140 handler.BeginSend(segmentedFileResponseData, 0, segmentedFileResponseData.Length, 0, SendFileResponseCallback, state); 141 } 142 } 143 else 144 { 145 handler.Shutdown(SocketShutdown.Both); 146 handler.Close(); 147 } 148 } 149 150 151 private static void SendFileResponseCallback(IAsyncResult ar) 152 { 153 try 154 { 155 ComObject state = (ComObject)ar.AsyncState; 156 Socket handler = state.WorkSocket; 157 handler.EndSend(ar); 158 handler.Shutdown(SocketShutdown.Both); 159 handler.Close(); 160 } 161 catch (Exception e) 162 { 163 164 } 165 } 166 } 167 }
說到SAEA,我以爲初入的小夥伴必定要先看MSDN上的實例,特別是它的BufferManager以及SocketAsyncEventArgsPool是怎麼寫的,究竟是幹什麼用的。這裏我能夠簡單的說下:SocketAsyncEventArgsPool是用來存放SAEA對象的,其個數依賴於你服務器所能承擔的隊列長度,好比說我服務器能承擔100個客戶的等待,我就在服務器端生成100個SAEA對象放在池子裏,當有客戶來鏈接時,我從池子裏取出一個來和他對接。客戶走了,我再扔到池子裏去。BufferManager則是對池子裏的SAEA對象進行Buffer分配的,也至關於一個池子,這個池子的大小是隊列長度*通訊緩存長度*2,乘以2是由於讀與寫是分開的。通訊緩存長度很好理解,客戶端要傳個2G的信息給服務器端不可能一會兒接收2G,確定是一口一口吃,那麼這一口的大小就是通訊緩存長度。那麼分配給每一個SAEA的緩存是多大呢?固然就是通訊緩存長度的大小咯。注意!!注意!!注意!!既然是池化了,全部關於Buffer的操做都要圍繞分配給SAEA的Buffer去操做!見148-149行。當服務器拿着分配到的Buffer去接收信息後,若是再要發送信息,所要作的第一件事就是先清空分配的Buffer再使用,BufferManager給你分配哪段你就用哪段,別使用錯了。有幾個參數須要注意:e.Offset(偏移),e.Count(大小),e.Buffer(緩存字節數組), e.BytesTransferred(通訊傳輸的字節長度)。若是服務器端要發送數據,必定要用Array.Copy將信息寫入對應分配的Buffer中。異步
說完池化,接着就是寫法上的小區別,我以爲區別並不大,無非就是委託換了個寫法。固然還要判斷下是否爲異步操做,若是是不然須要進行同步操做,見82-85行代碼。
服務器的SAEA寫法:
1 using System; 2 using System.IO; 3 using System.Linq; 4 using System.Net; 5 using System.Net.Sockets; 6 using System.Threading; 7 using UpdaterShare.GlobalSetting; 8 using UpdaterShare.Model; 9 using UpdaterShare.Utility; 10 11 namespace UpdaterServerSAEA 12 { 13 public class ServerSocket 14 { 15 private readonly int _port; 16 private readonly int _backlog; 17 private Socket _listenSocket; 18 private const int _opsToPreAlloc = 2; 19 private readonly BufferManager _bufferManager; 20 private readonly SocketAsyncEventArgsPool _readWritePool; 21 private readonly Semaphore _maxNumberAcceptedClients; 22 23 private string _serverPath; 24 private static readonly int _downloadChannelsCount = DownloadSetting.DownloadChannelsCount; 25 26 public ServerSocket(int port, int backlog) 27 { 28 _port = port; 29 _backlog = backlog; 30 31 _bufferManager = new BufferManager(ComObject.BufferSize * backlog * _opsToPreAlloc, ComObject.BufferSize); 32 _readWritePool = new SocketAsyncEventArgsPool(backlog); 33 _maxNumberAcceptedClients = new Semaphore(backlog, backlog); 34 } 35 36 37 private void Init() 38 { 39 _bufferManager.InitBuffer(); 40 41 for (var i = 0; i < _backlog; i++) 42 { 43 var readWriteEventArg = new SocketAsyncEventArgs(); 44 _bufferManager.SetBuffer(readWriteEventArg); 45 _readWritePool.Push(readWriteEventArg); 46 } 47 } 48 49 50 public void StartServer() 51 { 52 try 53 { 54 Init(); 55 IPAddress ipAddress = IPAddress.Any; 56 IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port); 57 _listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 58 _listenSocket.Bind(localEndPoint); 59 _listenSocket.Listen(_backlog); 60 StartAccept(null); 61 } 62 catch (Exception ex) 63 { 64 Console.WriteLine(ex.Message); 65 } 66 } 67 68 private void StartAccept(SocketAsyncEventArgs acceptEventArg) 69 { 70 if (acceptEventArg == null) 71 { 72 acceptEventArg = new SocketAsyncEventArgs(); 73 acceptEventArg.Completed += StartAccept_Completed; 74 } 75 else 76 { 77 acceptEventArg.AcceptSocket = null; 78 } 79 80 _maxNumberAcceptedClients.WaitOne(); 81 82 if (!_listenSocket.AcceptAsync(acceptEventArg)) 83 { 84 ProcessAccept(acceptEventArg); 85 } 86 } 87 88 private void StartAccept_Completed(object sender, SocketAsyncEventArgs e) 89 { 90 ProcessAccept(e); 91 } 92 93 94 private void ProcessAccept(SocketAsyncEventArgs e) 95 { 96 if (e.SocketError == SocketError.Success) 97 { 98 var socket = e.AcceptSocket; 99 if (socket.Connected) 100 { 101 SocketAsyncEventArgs readEventArgs = _readWritePool.Pop(); 102 readEventArgs.AcceptSocket = socket; 103 readEventArgs.Completed += ProcessAccept_Completed; 104 if (!socket.ReceiveAsync(readEventArgs)) 105 { 106 ProcessReceiveFindFileRequest(readEventArgs); 107 } 108 StartAccept(e); 109 } 110 } 111 } 112 113 private void ProcessAccept_Completed(object sender, SocketAsyncEventArgs e) 114 { 115 ProcessReceiveFindFileRequest(e); 116 } 117 118 119 private void ProcessReceiveFindFileRequest(SocketAsyncEventArgs e) 120 { 121 var bytesRead = e.BytesTransferred; 122 if (bytesRead > 0 && e.SocketError == SocketError.Success) 123 { 124 var receiveData = e.Buffer.Skip(e.Offset).Take(bytesRead).ToArray(); 125 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientFindFileInfoTag()); 126 if (dataList != null && dataList.Any()) 127 { 128 var request = PacketUtils.GetData(PacketUtils.ClientFindFileInfoTag(), dataList.FirstOrDefault()); 129 string str = System.Text.Encoding.UTF8.GetString(request); 130 var infos = str.Split('_'); 131 var productName = infos[0]; 132 var revitVersion = infos[1]; 133 var currentVersion = infos[2]; 134 135 var mainFolder = AppDomain.CurrentDomain.BaseDirectory.Replace("bin", "TestFile"); 136 var serverFileFolder = Path.Combine(mainFolder, "Server"); 137 var serverFileFiles = new DirectoryInfo(serverFileFolder).GetFiles(); 138 139 var updatefile = serverFileFiles.FirstOrDefault(x => x.Name.Contains(productName) && x.Name.Contains(revitVersion) && x.Name.Contains(currentVersion)); 140 if (updatefile != null) 141 { 142 if (string.IsNullOrEmpty(updatefile.FullName) || !File.Exists(updatefile.FullName)) return; 143 _serverPath = updatefile.FullName; 144 145 //ready to send back to Client 146 byte[] foundUpdateFileData = PacketUtils.PacketData(PacketUtils.ServerFoundFileInfoTag(), null); 147 148 Array.Clear(e.Buffer, e.Offset, e.Count); 149 Array.Copy(foundUpdateFileData, 0, e.Buffer, e.Offset, foundUpdateFileData.Length); 150 151 e.Completed -= ProcessAccept_Completed; 152 e.Completed += ProcessReceiveFindFileRequest_Completed; 153 154 if (!e.AcceptSocket.SendAsync(e)) 155 { 156 ProcessFilePosition(e); 157 } 158 } 159 } 160 } 161 } 162 163 164 private void ProcessReceiveFindFileRequest_Completed(object sender, SocketAsyncEventArgs e) 165 { 166 ProcessFilePosition(e); 167 } 168 169 170 private void ProcessFilePosition(SocketAsyncEventArgs e) 171 { 172 if (e.SocketError == SocketError.Success) 173 { 174 var socket = e.AcceptSocket; 175 if (socket.Connected) 176 { 177 //clear buffer 178 Array.Clear(e.Buffer, e.Offset, e.Count); 179 180 e.Completed -= ProcessReceiveFindFileRequest_Completed; 181 e.Completed += ProcessFilePosition_Completed; 182 183 if (!socket.ReceiveAsync(e)) 184 { 185 ProcessSendFile(e); 186 } 187 } 188 } 189 } 190 191 private void ProcessFilePosition_Completed(object sender, SocketAsyncEventArgs e) 192 { 193 ProcessSendFile(e); 194 } 195 196 private void ProcessSendFile(SocketAsyncEventArgs e) 197 { 198 var bytesRead = e.BytesTransferred; 199 if (bytesRead > 0 && e.SocketError == SocketError.Success) 200 { 201 var receiveData = e.Buffer.Skip(e.Offset).Take(bytesRead).ToArray(); 202 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientRequestFileTag()); 203 if (dataList != null) 204 { 205 foreach (var request in dataList) 206 { 207 if (PacketUtils.IsPacketComplete(request)) 208 { 209 int startPosition = PacketUtils.GetRequestFileStartPosition(request); 210 211 var packetSize = PacketUtils.GetPacketSize(_serverPath, _downloadChannelsCount); 212 if (packetSize != 0) 213 { 214 byte[] filedata = FileUtils.GetFile(_serverPath, startPosition, packetSize); 215 byte[] packetNumber = BitConverter.GetBytes(startPosition / packetSize); 216 217 Console.WriteLine("Receive File Request PacketNumber: "+startPosition / packetSize); 218 219 if (filedata != null) 220 { 221 //ready to send back to Client 222 byte[] segmentedFileResponseData = PacketUtils.PacketData(PacketUtils.ServerResponseFileTag(), filedata, packetNumber); 223 224 Array.Clear(e.Buffer, e.Offset, e.Count); 225 Array.Copy(segmentedFileResponseData, 0, e.Buffer, e.Offset, segmentedFileResponseData.Length); 226 227 e.Completed -= ProcessFilePosition_Completed; 228 e.Completed += ProcessSendFile_Completed; 229 230 if (!e.AcceptSocket.SendAsync(e)) 231 { 232 CloseClientSocket(e); 233 } 234 } 235 } 236 } 237 } 238 } 239 } 240 else 241 { 242 CloseClientSocket(e); 243 } 244 } 245 246 247 private void ProcessSendFile_Completed(object sender, SocketAsyncEventArgs e) 248 { 249 CloseClientSocket(e); 250 } 251 252 253 private void CloseClientSocket(SocketAsyncEventArgs e) 254 { 255 try 256 { 257 e.AcceptSocket.Shutdown(SocketShutdown.Both); 258 e.AcceptSocket.Close(); 259 } 260 catch (Exception ex) 261 { 262 Console.WriteLine(ex.Message); 263 } 264 finally 265 { 266 _maxNumberAcceptedClients.Release(); 267 _readWritePool.Push(e); 268 } 269 } 270 } 271 }
4、總結
坑坑窪窪總算是寫完了SAEA的代碼,因爲本人知識面有限,若是說的不對,還請各位及時直接提出批評與建議,我這我的比較在意技術不在意麪子的。
附:
MSDN示例:
https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx
啓蒙博客:
http://www.cnblogs.com/gaochundong/p/csharp_tcp_service_models.html
大神改造:
http://freshflower.iteye.com/blog/2285272
架構狂魔:
http://www.cnblogs.com/jiahuafu/archive/2013/01/05/2845631.html
個人GitHub