Socket編程中,如何高效地接收和處理數據,這裏介紹一個簡單的編程模型。html
在給出編程模型以前,先提這樣一個問題,程序中如何描述Socket鏈接?編程
爲何這麼問呢,你們能夠翻看我以前在項目總結(一)中給出的一個簡單的基本架構,其中的網絡層用來管理Socket的鏈接,並負責接收發送Socket數據,這個模塊中能夠直接使用創建的Socket鏈接對象。但若是上層須要給某個Socket發送數據怎麼辦,若是直接把Socket對象傳送給上層,就破壞了面向對象中封裝原則,上層甚至能夠直接繞過網絡層操做Socket數據收發,顯然不是咱們但願看到的。數組
既然這樣不能直接傳遞Socket對象,那麼就要給上層傳遞一個可以標識這個對象的一個標識,這就是我要說的這個SocketId。緩存
SocketId實際上就是一個無符號整形數據,在網絡層維護一個SocketId與Socket對象的映射表,上層經過SocketId通知網絡層向對應的Socket發送數據。服務器
SocketId並非說簡單的從1開始,而後來一個Socket鏈接就直接加1做爲對應的SocketId,我但願可以標識更多的東西。網絡
如圖所示,我在網絡層創建了一個SocketMark數組,長度是通過配置的容許Socket鏈接的最大個數。其中每一個SocketMark包含兩個主要的成員,鏈接的Socket對象,和對應的索引SocketId,如上所示。對於SocketId我不只但願可以標識出在SocketMark數組中的的位置index(最終找到Socket發出數據),還但願標識出這個SocketMark被使用了多少次(在項目中有特殊用處,在這不作過多說明)。多線程
那麼,怎麼用index和usetimes表示SocketId呢?具體來講,SocketId是一個無符號整形數據,也就是有4個字節,我使用2個高字節來表示index,兩個低字節來表示usetimes。那麼,SocketId就是 index * 65536 + usetimes % 65536,相應的index = socketId / 65536, usetimes = socketId % 65536。架構
SocketMark代碼以下所示:異步
public sealed class SocketMark {
//用於線程同步 public readonly object m_SyncLock = new object(); public uint SocketId = 0; public Socket WorkSocket = null;
//用於接收Socket數據 public byte[] Buffer;
//當前WorkSocket是否鏈接 public bool Connected = false; public SocketMark(int index, int bufferSize) {
//默認狀況下usetimes爲0 SocketId = Convert.ToUInt32(index * 65536); Buffer = new byte[bufferSize]; } public void IncReuseTimes() { int reuseTimes = GetReuseTimes(SocketId) + 1; SocketId = Convert.ToUInt32(GetIndex(SocketId) * 65536 + reuseTimes % 65536); } public static int GetIndex(uint socketId) { return Convert.ToInt32(socketId / 65536); } public static int GetReuseTimes(uint socketId) { return Convert.ToInt32(socketId % 65536); } }
當有Socket鏈接創建時,經過查詢SocketMark數組中Connected字段值爲false的元素(能夠直接遍歷查找,也能夠採起其餘方式,我使用的是創建一個對應SOcketMark的棧,保存index,有新鏈接index就出棧,而後設置SocketMark[index]的WorkSocket爲這個鏈接;Socket斷開後index再入棧),設置相應的WorkSocket後,同時要調用一次IncReuseTimes()函數,使用次數加1,並更新SocketId。socket
在這裏,網絡層就可使用Socket鏈接對象接收數據存儲在Buffer中,並把數據連同SocketId傳送給數據協議層。
接下來是我要說的重點,在數據協議層這裏,我須要定義一個新的結構,用來接收Socket數據,並儘量地使處理高效。
public sealed class ConnCache { public uint SocketId; // 鏈接標識 public byte[] RecvBuffer; // 接收數據緩存,傳輸層抵達的數據,首先進入此緩存 public int RecvLen; // 接收數據緩存中的有效數據長度 public readonly object RecvLock; // 數據接收鎖 public byte[] WaitBuffer; // 待處理數據緩存,首先要將數據從RecvBuffer轉移到該緩存,數據處理線程才能進行處理 public int WaitLen; // 待處理數據緩存中的有效數據長度 public readonly object AnalyzeLock; // 數據分析鎖 public ConnCache(int recvBuffSize, int waitBuffSize) { SocketId = 65535; RecvBuffer = new byte[recvBuffSize]; RecvLen = 0; RecvLock = new object(); WaitBuffer = new byte[waitBuffSize]; WaitLen = 0; AnalyzeLock = new object(); } }
解釋一下:ConnCache用於管理從網絡層接收的數據,並維護一個SocketId用來標識數據的歸屬。
在這其中,包括上面SocketMark都定義了一個公共的只讀對象,用來提供多線程時數據同步,但你應該注意到這幾個鎖的對象全都是public類型的,實際上這樣並很差。由於這樣,對象就沒法控制程序對鎖的使用,一旦鎖的使用不符合預期,就頗有可能形成程序出現死鎖,因此建議你們在使用的時候仍是考慮使用private修飾符,儘可能由對象來完成資源的同步。
可是,很不幸,我在項目中發現這樣作有點不現實,使用private可能破壞了整個系統的結構。。而使用public只要能徹底掌控代碼,對不產生死鎖有信心,仍是很是方便的,基於這個理由,最終放棄了使用private的想法。
繼續回來講這個結構,在這裏,我把從網絡層接收的數據存儲在RecvBuffer中,但個人解析線程並不直接訪問這個數組,而是另外創建一個新的數據WaitBuffer,這個WaitBuffer的用處就是從RecvBuffer Copy必定的數據,而後提供給解析線程處理。這樣作有兩個好處,第一,避免了接收線程和處理線程直接爭搶Buffer資源,可以提升處理性能。第二,額。。我看着挺清晰的,一個用來接收,一個用來處理,不是麼
注:當初在設計這個模型的時候,還不知道專門有個ReaderWriterLockSlim,我想若是可以代替上面的接收鎖和分析鎖,效果應該更好一點。
介紹了上面兩個主要的結構後,咱們來看下如何寫代碼簡單使用上述模型
首先,實現客戶端,參考項目總結 - 異步中的客戶端代碼,將代碼修改成定時發送數據到服務器,並刪除一些無關的代碼
class Program { static Socket socket; static void Main(string[] args) { socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socket.Connect(IPAddress.Parse("127.0.0.1"), 1234); //啓動一個線程定時向服務器發送數據 ThreadPool.QueueUserWorkItem(state => { int index = 0; while (true) { byte[] senddata = Encoding.Default.GetBytes("咱們都有" + index++ + "個家,名字叫中國"); socket.BeginSend(senddata, 0, senddata.Length, SocketFlags.None, new AsyncCallback(Send), null); Thread.Sleep(1000); } }); Console.ReadKey(); } static void Send(IAsyncResult ar) { socket.EndSend(ar); } }
對於服務器,新建一個類NetLayer,用來模擬網絡層,網絡層創建Socket鏈接後,啓動異步接收,並把接收到的數據經過委託傳給處理層,傳送時發送SocketId,代碼以下:
public delegate void ArrivedData(uint socketId, byte[] buffer); class NetLayer { Socket socket; SocketMark[] socketMarks; public event ArrivedData arrivedData; public NetLayer(int maxConnNum) { //初始化SocketMark //最大容許鏈接數 socketMarks = new SocketMark[maxConnNum]; for (int i = 0; i < socketMarks.Length; i++) socketMarks[i] = new SocketMark(i, 1024); } public void Start() { //新建Socket,並開始監聽鏈接 socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234)); socket.Listen(100); socket.BeginAccept(new AsyncCallback(Accept), "new socket connect"); } public void Accept(IAsyncResult ar) { Console.WriteLine(ar.AsyncState.ToString()); //結束監聽 Socket _socket = socket.EndAccept(ar); //這裏爲了方便直接經過循環的方式查找可用的SocketMark SocketMark socketMark = null; for (int i = 0; i < socketMarks.Length; i++) { if (!socketMarks[i].Connected) { socketMark = socketMarks[i]; break; } } //若是沒有找到可用的SocketMark,說明達到最大鏈接數,關閉該鏈接 if (socketMark == null) { _socket.Close(); return; } socketMark.WorkSocket = _socket; socketMark.Connected = true; socketMark.IncReuseTimes(); _socket.BeginReceive(socketMark.Buffer, 0, socketMark.Buffer.Length, SocketFlags.None, new AsyncCallback(Receive), socketMark); } public void Receive(IAsyncResult ar) { SocketMark mark = (SocketMark)ar.AsyncState; int length = mark.WorkSocket.EndReceive(ar); if (length > 0) { //多線程下資源同步 lock (mark.m_SyncLock) { byte[] data = new byte[length]; Buffer.BlockCopy(mark.Buffer, 0, data, 0, length); if (arrivedData != null) arrivedData(mark.SocketId, data); //再次投遞接收申請 mark.WorkSocket.BeginReceive(mark.Buffer, 0, mark.Buffer.Length, SocketFlags.None, new AsyncCallback(Receive), mark); } } } }
數據處理層收到數據後,先把數據存到對應ConnCache中的RecvBuffer中,並向隊列Queue<ConnCache>寫入一個標記,告訴處理線程應該處理哪一個ConnCache的數據,在這裏你們會看到,我在以前的文章中討論的lock和Monitor是如何使用的。
class Program { static ConnCache[] connCaches; //處理線程經過這個隊列知道有數據須要處理 static Queue<ConnCache> tokenQueue; //接收到數據後,同時通知處理線程處理數據 static AutoResetEvent tokenEvent; static void Main(string[] args) { //最大容許鏈接數 int maxConnNum = 10; //要和底層SocketMark數組的個數相同 connCaches = new ConnCache[maxConnNum]; for (int i = 0; i < maxConnNum; i++) connCaches[i] = new ConnCache(1024, 2048); tokenQueue = new Queue<ConnCache>(); tokenEvent = new AutoResetEvent(false); NetLayer netLayer = new NetLayer(maxConnNum); netLayer.arrivedData += new ArrivedData(netLayer_arrivedData); netLayer.Start(); //處理線程 ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyzeThrd), null); Console.ReadKey(); } static void netLayer_arrivedData(uint socketId, byte[] buffer) { int index = (int)(socketId / 65536); int reusetimes = (int)(socketId % 65536); Console.WriteLine("recv data from - index = {0}, reusetimes = {1}", index, reusetimes); int dataLen = buffer.Length; //僅使用了RecvLock,不影響WaitBuffer中的數據處理 lock (connCaches[index].RecvLock) { //說明已是一個新的Socket鏈接了,須要清理以前的數據 if (connCaches[index].SocketId != socketId) { connCaches[index].SocketId = socketId; connCaches[index].RecvLen = 0; connCaches[index].WaitLen = 0; } //若是收到的數據超過了能夠接收的長度,截斷 if (dataLen > connCaches[index].RecvBuffer.Length - connCaches[index].RecvLen) dataLen = connCaches[index].RecvBuffer.Length - connCaches[index].RecvLen; if (dataLen > 0) { //接收數據到RecvBuffer中,並更新已接收的長度值 Buffer.BlockCopy(buffer, 0, connCaches[index].RecvBuffer, connCaches[index].RecvLen, dataLen); connCaches[index].RecvLen += dataLen; } } lock (((ICollection)tokenQueue).SyncRoot) { tokenQueue.Enqueue(connCaches[index]); } tokenEvent.Set(); } static void AnalyzeThrd(object state) { ConnCache connCache; while (true) { Monitor.Enter(((ICollection)tokenQueue).SyncRoot); if (tokenQueue.Count > 0) { connCache = tokenQueue.Dequeue(); Monitor.Exit(((ICollection)tokenQueue).SyncRoot); } else { Monitor.Exit(((ICollection)tokenQueue).SyncRoot); //若是沒有須要處理的數據,等待15秒後再運行 tokenEvent.WaitOne(15000, false); continue; } //這裏就須要使用兩個鎖,只要保證使用這兩個鎖的順序不變,就不會出現死鎖問題 lock (connCache.AnalyzeLock) { while (connCache.RecvLen > 0) { lock (connCache.RecvBuffer) { //這裏把接收到的數據COPY到待處理數組 int copyLen = connCache.WaitBuffer.Length - connCache.WaitLen; if (copyLen > connCache.RecvLen) copyLen = connCache.RecvLen; Buffer.BlockCopy(connCache.RecvBuffer, 0, connCache.WaitBuffer, connCache.WaitLen, copyLen); connCache.WaitLen += copyLen; connCache.RecvLen -= copyLen; //若是RecvBuffer中還有數據沒有COPY完,把它們提到數組開始位置 if (connCache.RecvLen > 0) Buffer.BlockCopy(connCache.RecvBuffer, copyLen, connCache.RecvBuffer, 0, connCache.RecvLen); } } //這裏就是解析數據的地方,在這我直接把收到的數據打印出來(注意:若是客戶端數據發送很快,有可能打印出亂碼) //還在AnalyzeLock鎖中 { string data = Encoding.Default.GetString(connCache.WaitBuffer, 0, connCache.WaitLen); Console.WriteLine("analyzed: " + data); //WaitLen置0,至關於清理了WaitBuffer中的數據 connCache.WaitLen = 0; } } } } }
至此,整個模型的使用就完成了。代碼圖省事就直接放上去了,見諒!
結果以下:
你們能夠試着修改下代碼使發送更快,一次發送數據更多,再來多個客戶端試一下效果。
注:本文中的代碼是用來進行演示的簡化後的代碼,並不保證沒有缺陷,僅爲了闡述這一模型。