一個Socket數據處理模型

Socket編程中,如何高效地接收和處理數據,這裏介紹一個簡單的編程模型。html

 

Socket索引 - SocketId


在給出編程模型以前,先提這樣一個問題,程序中如何描述Socket鏈接?編程

爲何這麼問呢,你們能夠翻看我以前在項目總結(一)中給出的一個簡單的基本架構,其中的網絡層用來管理Socket的鏈接,並負責接收發送Socket數據,這個模塊中能夠直接使用創建的Socket鏈接對象。但若是上層須要給某個Socket發送數據怎麼辦,若是直接把Socket對象傳送給上層,就破壞了面向對象中封裝原則,上層甚至能夠直接繞過網絡層操做Socket數據收發,顯然不是咱們但願看到的。數組

既然這樣不能直接傳遞Socket對象,那麼就要給上層傳遞一個可以標識這個對象的一個標識,這就是我要說的這個SocketId。緩存

SocketId實際上就是一個無符號整形數據,在網絡層維護一個SocketId與Socket對象的映射表,上層經過SocketId通知網絡層向對應的Socket發送數據。服務器

 

Socket索引 - 如何創建SocketId


SocketId並非說簡單的從1開始,而後來一個Socket鏈接就直接加1做爲對應的SocketId,我但願可以標識更多的東西。網絡

 

如圖所示,我在網絡層創建了一個SocketMark數組,長度是通過配置的容許Socket鏈接的最大個數。其中每一個SocketMark包含兩個主要的成員,鏈接的Socket對象,和對應的索引SocketId,如上所示。對於SocketId我不只但願可以標識出在SocketMark數組中的的位置index(最終找到Socket發出數據),還但願標識出這個SocketMark被使用了多少次(在項目中有特殊用處,在這不作過多說明)。多線程

那麼,怎麼用index和usetimes表示SocketId呢?具體來講,SocketId是一個無符號整形數據,也就是有4個字節,我使用2個高字節來表示index,兩個低字節來表示usetimes。那麼,SocketId就是 index * 65536 + usetimes % 65536,相應的index = socketId / 65536, usetimes = socketId % 65536。架構

SocketMark代碼以下所示:異步

public sealed class SocketMark
{
//用於線程同步
public readonly object m_SyncLock = new object(); public uint SocketId = 0; public Socket WorkSocket = null;
//用於接收Socket數據
public byte[] Buffer;
//當前WorkSocket是否鏈接
public bool Connected = false; public SocketMark(int index, int bufferSize) {
//默認狀況下usetimes爲0 SocketId
= Convert.ToUInt32(index * 65536); Buffer = new byte[bufferSize]; } public void IncReuseTimes() { int reuseTimes = GetReuseTimes(SocketId) + 1; SocketId = Convert.ToUInt32(GetIndex(SocketId) * 65536 + reuseTimes % 65536); } public static int GetIndex(uint socketId) { return Convert.ToInt32(socketId / 65536); } public static int GetReuseTimes(uint socketId) { return Convert.ToInt32(socketId % 65536); } }

當有Socket鏈接創建時,經過查詢SocketMark數組中Connected字段值爲false的元素(能夠直接遍歷查找,也能夠採起其餘方式,我使用的是創建一個對應SOcketMark的棧,保存index,有新鏈接index就出棧,而後設置SocketMark[index]的WorkSocket爲這個鏈接;Socket斷開後index再入棧),設置相應的WorkSocket後,同時要調用一次IncReuseTimes()函數,使用次數加1,並更新SocketId。socket

在這裏,網絡層就可使用Socket鏈接對象接收數據存儲在Buffer中,並把數據連同SocketId傳送給數據協議層。

 

數據模型


接下來是我要說的重點,在數據協議層這裏,我須要定義一個新的結構,用來接收Socket數據,並儘量地使處理高效。

public sealed class ConnCache
{
    public uint SocketId;                     // 鏈接標識

    public byte[] RecvBuffer;                 // 接收數據緩存,傳輸層抵達的數據,首先進入此緩存       
    public int RecvLen;                       // 接收數據緩存中的有效數據長度
    public readonly object RecvLock;          // 數據接收鎖

    public byte[] WaitBuffer;                 // 待處理數據緩存,首先要將數據從RecvBuffer轉移到該緩存,數據處理線程才能進行處理
    public int WaitLen;                       // 待處理數據緩存中的有效數據長度
    public readonly object AnalyzeLock;       // 數據分析鎖

    public ConnCache(int recvBuffSize, int waitBuffSize)
    {
        SocketId = 65535;
        RecvBuffer = new byte[recvBuffSize];
        RecvLen = 0;
        RecvLock = new object();
        WaitBuffer = new byte[waitBuffSize];
        WaitLen = 0;
        AnalyzeLock = new object();
    }
}

解釋一下:ConnCache用於管理從網絡層接收的數據,並維護一個SocketId用來標識數據的歸屬。

在這其中,包括上面SocketMark都定義了一個公共的只讀對象,用來提供多線程時數據同步,但你應該注意到這幾個鎖的對象全都是public類型的,實際上這樣並很差。由於這樣,對象就沒法控制程序對鎖的使用,一旦鎖的使用不符合預期,就頗有可能形成程序出現死鎖,因此建議你們在使用的時候仍是考慮使用private修飾符,儘可能由對象來完成資源的同步。

可是,很不幸,我在項目中發現這樣作有點不現實,使用private可能破壞了整個系統的結構。。而使用public只要能徹底掌控代碼,對不產生死鎖有信心,仍是很是方便的,基於這個理由,最終放棄了使用private的想法。

繼續回來講這個結構,在這裏,我把從網絡層接收的數據存儲在RecvBuffer中,但個人解析線程並不直接訪問這個數組,而是另外創建一個新的數據WaitBuffer,這個WaitBuffer的用處就是從RecvBuffer Copy必定的數據,而後提供給解析線程處理。這樣作有兩個好處,第一,避免了接收線程和處理線程直接爭搶Buffer資源,可以提升處理性能。第二,額。。我看着挺清晰的,一個用來接收,一個用來處理,不是麼

注:當初在設計這個模型的時候,還不知道專門有個ReaderWriterLockSlim,我想若是可以代替上面的接收鎖和分析鎖,效果應該更好一點。

 

模型使用


介紹了上面兩個主要的結構後,咱們來看下如何寫代碼簡單使用上述模型

 

首先,實現客戶端,參考項目總結 - 異步中的客戶端代碼,將代碼修改成定時發送數據到服務器,並刪除一些無關的代碼

class Program
{
    static Socket socket;
    static void Main(string[] args)
    {
        socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        socket.Connect(IPAddress.Parse("127.0.0.1"), 1234);

        //啓動一個線程定時向服務器發送數據
        ThreadPool.QueueUserWorkItem(state => {
            int index = 0;

            while (true)
            {
                byte[] senddata = Encoding.Default.GetBytes("咱們都有" + index++ + "個家,名字叫中國");
                socket.BeginSend(senddata, 0, senddata.Length, SocketFlags.None, new AsyncCallback(Send), null);

                Thread.Sleep(1000);
            }
        });

        Console.ReadKey();
    }

    static void Send(IAsyncResult ar)
    {
        socket.EndSend(ar);
    }
}

 

對於服務器,新建一個類NetLayer,用來模擬網絡層,網絡層創建Socket鏈接後,啓動異步接收,並把接收到的數據經過委託傳給處理層,傳送時發送SocketId,代碼以下:

public delegate void ArrivedData(uint socketId, byte[] buffer);

class NetLayer
{
    Socket socket;
    SocketMark[] socketMarks;

    public event ArrivedData arrivedData;

    public NetLayer(int maxConnNum)
    {
        //初始化SocketMark
        //最大容許鏈接數
        socketMarks = new SocketMark[maxConnNum];
        for (int i = 0; i < socketMarks.Length; i++)
            socketMarks[i] = new SocketMark(i, 1024);
    }

    public void Start()
    {
        //新建Socket,並開始監聽鏈接
        socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        socket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234));
        socket.Listen(100);
        socket.BeginAccept(new AsyncCallback(Accept), "new socket connect");
    }

    public void Accept(IAsyncResult ar)
    {
        Console.WriteLine(ar.AsyncState.ToString());

        //結束監聽
        Socket _socket = socket.EndAccept(ar);

        //這裏爲了方便直接經過循環的方式查找可用的SocketMark
        SocketMark socketMark = null;
        for (int i = 0; i < socketMarks.Length; i++)
        {
            if (!socketMarks[i].Connected)
            {
                socketMark = socketMarks[i];
                break;
            }
        }
        //若是沒有找到可用的SocketMark,說明達到最大鏈接數,關閉該鏈接
        if (socketMark == null)
        {
            _socket.Close();
            return;
        }

        socketMark.WorkSocket = _socket;
        socketMark.Connected = true;
        socketMark.IncReuseTimes();

        _socket.BeginReceive(socketMark.Buffer, 0, socketMark.Buffer.Length, SocketFlags.None, new AsyncCallback(Receive), socketMark);
    }

    public void Receive(IAsyncResult ar)
    {
        SocketMark mark = (SocketMark)ar.AsyncState;

        int length = mark.WorkSocket.EndReceive(ar);
        if (length > 0)
        {
            //多線程下資源同步
            lock (mark.m_SyncLock)
            {
                byte[] data = new byte[length];
                Buffer.BlockCopy(mark.Buffer, 0, data, 0, length);

                if (arrivedData != null)
                    arrivedData(mark.SocketId, data);

                //再次投遞接收申請
                mark.WorkSocket.BeginReceive(mark.Buffer, 0, mark.Buffer.Length, SocketFlags.None, new AsyncCallback(Receive), mark);
            }
        }
    }
}

數據處理層收到數據後,先把數據存到對應ConnCache中的RecvBuffer中,並向隊列Queue<ConnCache>寫入一個標記,告訴處理線程應該處理哪一個ConnCache的數據,在這裏你們會看到,我在以前的文章中討論的lock和Monitor是如何使用的。

class Program
{
    static ConnCache[] connCaches;
    
    //處理線程經過這個隊列知道有數據須要處理
    static Queue<ConnCache> tokenQueue;
    //接收到數據後,同時通知處理線程處理數據
    static AutoResetEvent tokenEvent;

    static void Main(string[] args)
    {
        //最大容許鏈接數
        int maxConnNum = 10;

        //要和底層SocketMark數組的個數相同
        connCaches = new ConnCache[maxConnNum];
        for (int i = 0; i < maxConnNum; i++)
            connCaches[i] = new ConnCache(1024, 2048);
        
        tokenQueue = new Queue<ConnCache>();
        tokenEvent = new AutoResetEvent(false);

        NetLayer netLayer = new NetLayer(maxConnNum);
        netLayer.arrivedData += new ArrivedData(netLayer_arrivedData);
        netLayer.Start();

        //處理線程
        ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyzeThrd), null);

        Console.ReadKey();
    }

    static void netLayer_arrivedData(uint socketId, byte[] buffer)
    {
        int index = (int)(socketId / 65536);
        int reusetimes = (int)(socketId % 65536);

        Console.WriteLine("recv data from - index = {0}, reusetimes = {1}", index, reusetimes);

        int dataLen = buffer.Length;
        //僅使用了RecvLock,不影響WaitBuffer中的數據處理
        lock (connCaches[index].RecvLock)
        {
            //說明已是一個新的Socket鏈接了,須要清理以前的數據
            if (connCaches[index].SocketId != socketId)
            {
                connCaches[index].SocketId = socketId;
                connCaches[index].RecvLen = 0;
                connCaches[index].WaitLen = 0;
            }

            //若是收到的數據超過了能夠接收的長度,截斷
            if (dataLen > connCaches[index].RecvBuffer.Length - connCaches[index].RecvLen)
                dataLen = connCaches[index].RecvBuffer.Length - connCaches[index].RecvLen;
            if (dataLen > 0)
            {
                //接收數據到RecvBuffer中,並更新已接收的長度值
                Buffer.BlockCopy(buffer, 0, connCaches[index].RecvBuffer, connCaches[index].RecvLen, dataLen);
                connCaches[index].RecvLen += dataLen;
            }
        }

        lock (((ICollection)tokenQueue).SyncRoot)
        {
            tokenQueue.Enqueue(connCaches[index]);
        }
        tokenEvent.Set();
    }

    static void AnalyzeThrd(object state)
    {
        ConnCache connCache;

        while (true)
        {
            Monitor.Enter(((ICollection)tokenQueue).SyncRoot);
            if (tokenQueue.Count > 0)
            {
                connCache = tokenQueue.Dequeue();
                Monitor.Exit(((ICollection)tokenQueue).SyncRoot);
            }
            else
            {
                Monitor.Exit(((ICollection)tokenQueue).SyncRoot);
                //若是沒有須要處理的數據,等待15秒後再運行
                tokenEvent.WaitOne(15000, false);
                continue;
            }

            //這裏就須要使用兩個鎖,只要保證使用這兩個鎖的順序不變,就不會出現死鎖問題
            lock (connCache.AnalyzeLock)
            {
                while (connCache.RecvLen > 0)
                {
                    lock (connCache.RecvBuffer)
                    {
                        //這裏把接收到的數據COPY到待處理數組
                        int copyLen = connCache.WaitBuffer.Length - connCache.WaitLen;
                        if (copyLen > connCache.RecvLen)
                            copyLen = connCache.RecvLen;
                        Buffer.BlockCopy(connCache.RecvBuffer, 0, connCache.WaitBuffer, connCache.WaitLen, copyLen);
                        connCache.WaitLen += copyLen;
                        connCache.RecvLen -= copyLen;
                        //若是RecvBuffer中還有數據沒有COPY完,把它們提到數組開始位置
                        if (connCache.RecvLen > 0)
                            Buffer.BlockCopy(connCache.RecvBuffer, copyLen, connCache.RecvBuffer, 0, connCache.RecvLen);
                    }
                }

                //這裏就是解析數據的地方,在這我直接把收到的數據打印出來(注意:若是客戶端數據發送很快,有可能打印出亂碼)
                //還在AnalyzeLock鎖中
                {
                    string data = Encoding.Default.GetString(connCache.WaitBuffer, 0, connCache.WaitLen);
                    Console.WriteLine("analyzed: " + data);

                    //WaitLen置0,至關於清理了WaitBuffer中的數據
                    connCache.WaitLen = 0;
                }
            }

        }
    }
}

至此,整個模型的使用就完成了。代碼圖省事就直接放上去了,見諒!

結果以下:

你們能夠試着修改下代碼使發送更快,一次發送數據更多,再來多個客戶端試一下效果。 

 

注:本文中的代碼是用來進行演示的簡化後的代碼,並不保證沒有缺陷,僅爲了闡述這一模型。

相關文章
相關標籤/搜索