ProtoBuff3 unity_TCP網絡發包解包&&消息訂閱

using Google.Protobuf;
//using Google.Protobuf.Examples.AddPerson;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Net.Sockets;
using UnityEngine;
using ARProto;
using pb = global::Google.Protobuf;
public class NewBehaviourScript : MonoBehaviour
{服務器


    void OnGUI()
    {
        if (GUI.Button(new Rect(100, 10, 120, 100), new GUIContent("Button", "Go")))
        {
            UIGo();
        }網絡

    }異步

    public void UIGo()
    {
        Person person =new    Person();
        person.Id=01;
        AddPerson addPerson =new AddPerson();
      SendMsg(addPerson);tcp

    }
    // Use this for initialization
    void Start()
    {函數

        StartConnect();
    }測試

    TcpClient tcpClient;                // 
    byte[] receive_buff;                // 專門用來接收Socket裏面的數據的
    byte[] data_buff;                   // 用來存當前未處理的數據ui

    CodedOutputStream outputStream;     // 用來綁定SocketStream,方便把proto對象轉換成字節流Stream輸送給服務器this

    void StartConnect()
    {
        TcpClient client = new TcpClient();
        tcpClient = client;編碼

        //這裏寫上你本身服務器的ip和端口
        client.Connect("106.2.124.243", 9090);線程

        receive_buff = new byte[client.ReceiveBufferSize];

        outputStream = new CodedOutputStream(client.GetStream());

        // 監聽一波服務器消息
        client.GetStream().BeginRead(receive_buff, 0, client.ReceiveBufferSize, ReceiveMessage, null);
    }

    int nFalg = 0;        // 這個變量主要是爲了防止和服務端無休無止互發消息,測試代碼
    void Update()
    {

        // 由於ReceiveMessage接收數據是異步的方式,不是在主線程,有些方法不能用,好比ToString,因此消息處理放在這裏處理
        // 但主要是由於後面要加上消息廣播,能夠添加在這裏
        if (data_buff != null && ++nFalg < 5)
        {
            // 把數據傳給CodedInputStream計算本次包的長度
            CodedInputStream inputStream = new CodedInputStream(data_buff);
            int length = inputStream.ReadLength();
            // 計算"包長度"佔用的字節數,後面取數據的時候扣掉這個字節數,就是真實數據長度
            int lengthLength = CodedOutputStream.ComputeLengthSize(length);

            // 當前數據足夠解析一個包了
            if (length + lengthLength <= data_buff.Length)
            {
                byte[] real_data = new byte[length];
                // 拷貝真實數據
                Array.Copy(data_buff, lengthLength, real_data, 0, length);

                // 假設服務器給你發了個AddressBook
                AddPerson ab = AddPerson.Parser.ParseFrom(real_data);

                // 把這個數據直接還給服務器,驗證客戶端發給服務器的狀況
                 SendMsg(ab);

                // 數據剛恰好,沒有多餘的
                if (length + lengthLength == data_buff.Length)
                {
                    data_buff = null;
                }
                else
                {
                    // 數據有剩餘,保存剩餘數據,等下一個Update解析
                    byte[] t = new byte[data_buff.Length - length - lengthLength];
                    Array.Copy(data_buff, lengthLength + length, t, 0, t.Length);
                    data_buff = t;
                }
            }
        }
    }

    // 發送數據
    public void SendMsg( pb::IMessage<AddPerson> message)
    {
        if (outputStream != null)
        {
            // WriteMessage 裏面會先write一個長度,而後再write真實數據
            ////     outputStream.WriteMessage(message);
            outputStream.Flush();       // 把buffer數據寫入到tcpClient的流裏面
        }
    }

    public void ReceiveMessage(IAsyncResult ar)
    {
          Debug.Log("消息:" + receive_buff);
        try
        {
            // 本次接收到的數據長度
            int bytesRead = tcpClient.GetStream().EndRead(ar);
            if (bytesRead < 1)
            {
                Debug.LogError("bytesRead < 1");
                return;
            }
            else
            {
                if (data_buff == null)
                {
                    // buff裏面沒有數據
                    data_buff = new byte[bytesRead];
                    Array.Copy(receive_buff, data_buff, bytesRead);
                }
                else
                {
                    // buff裏面有數據,要和新數據整合起來
                    byte[] new_data = new byte[bytesRead + data_buff.Length];
                    Array.Copy(data_buff, new_data, data_buff.Length);

                    Array.Copy(receive_buff, 0, new_data, data_buff.Length, bytesRead);

                    data_buff = new_data;
                  
                }
            }

            // 繼續監聽下一波數據
            tcpClient.GetStream().BeginRead(receive_buff, 0, tcpClient.ReceiveBufferSize, ReceiveMessage, null);
        }
        catch (Exception ex)
        {
            // 爲了防止報ex沒被使用的警告
            Debug.Log(ex);
        }
    }
}

 

處理與Netty服務器通訊的粘包、拆包

服務器的粘包拆包是Netty自己支持的解碼編碼器,以下圖

 

服務器粘包、拆包處理方式

 

總共四行,其中第一行做用在拆包的時候,第三行做用在粘包的時候(我猜的)。
它這個拆包粘包不是普通的那種固定4個字節標示長度的,而是有時候1個字節,有時候是二、三、四、5個字節,根據當前發送的真實數據的長度定的。

在普通的方案粘包方案,數據是這樣的:4個字節+真實數據
有的是用換行回車做爲標識符拆包、粘包

那在Netty的方案裏,包長度到底是幾個字節呢?
其實它也是用到了Protobuff裏面的數據讀取、保存方式,感興趣的能夠打開protobuf3-for-unity-3.0.0\src\Google.Protobuf.sln工程看一下,在Google.Protobuf項目中,打開CodedInputStream.cs

SlowReadRawVarint32


包頭佔用幾個字節是由下面這個函數計算的:

這個是計算一個uint數據的真實長度的方法


這也是protobuff對象編碼後數據會比較小的主要緣由。好比一個對象編碼後獲得的是440個字節數據,那麼調用ComputeRawVarint32Size(440)的返回值是2,也就是服務器和客戶端發送的數據最終長度是440+2=442個字節。明白了這些,拆包和粘包就都不是問題了。

 

上面的代碼裏,粘包是這一段:

public void SendMsg(IMessage message)
    {
        if (outputStream != null)
        {
            // WriteMessage 裏面會先write一個長度,而後再write真實數據
            outputStream.WriteMessage(message);
            outputStream.Flush();       // 把buffer數據寫入到tcpClient的流裏面
        }
    }

乍一看,好像沒有在真實數據前面加長度啊?其實,在outputStream的WriteMessage裏面已經有WriteLength了,幫咱們作好了。

image.png

再看拆包:

// 把數據傳給CodedInputStream計算本次包的長度
            CodedInputStream inputStream = new CodedInputStream(data_buff);
            int length = inputStream.ReadLength();
            // 計算"包長度"佔用的字節數,後面取數據的時候扣掉這個字節數,就是真實數據長度
            int lengthLength = CodedOutputStream.ComputeLengthSize(length);

            // 當前數據足夠解析一個包了
            if (length + lengthLength <= data_buff.Length)
            {
                byte[] real_data = new byte[length];
                // 拷貝真實數據
                Array.Copy(data_buff, lengthLength, real_data, 0, length);

                // 假設服務器給你發了個AddressBook
                AddressBook ab = AddressBook.Parser.ParseFrom(real_data);
                ...
            }

先用CodedInputStream 看看這個「包大小」值是多少,再用CodedOutputStream.ComputeLengthSize計算這個「包大小」佔幾個字節,而後就明白真實數據從哪裏開始,佔多少字節了。

 

本身寫的消息註冊和分發:

using Google.Protobuf;

using Google.Protobuf.WellKnownTypes;

using System;

using System.Collections;

using System.Collections.Generic;

using System.Net.Sockets;

using UnityEngine;

using ARProto;

using pb = global::Google.Protobuf;

using System.Threading;

using System.Text;

using NIO;

 

public class ClientManager : MonoBehaviour

{

private static object lockObj = new object();

public static ClientManager instance;

public bool isEditIP;

public string IP = "106.2.124.243";

public int PORT = 9090;

 

public delegate void ResponMsgHandler(ServerMessage msg);

 

public delegate void BroadcastHandler(ServerMessage msg);

 

Queue<ServerMessage> QueueServerMessage = new Queue<ServerMessage>();

 

public event BroadcastHandler broadcastHandler;

 

private Dictionary<int, ResponMsgHandler> mDicMsgs = new Dictionary<int, ResponMsgHandler>();

private List<BroadcastHandler> BroadMsg = new List<BroadcastHandler>();

 

/// <summary>

/// 發送請求消息

/// </summary>

/// <param name="clientMessage">消息</param>

/// <param name="handler">被監聽回調方法</param>

public void SendRequestMsg(ClientMessage clientMessage, ResponMsgHandler handler)

{

AddListener(clientMessage.Seq, handler);

SendMsg(clientMessage);

}

 

/// <summary>

/// 訂閱廣播消息 只須要註冊一遍

/// </summary>

/// <param name="handler">被監聽方法</param>

public void AddBroadcastListener(BroadcastHandler handler)

{

if (BroadMsg == null)

BroadMsg = new List<BroadcastHandler>();

if (BroadMsg.Contains(handler))

{

return;

}

else

{

BroadMsg.Add(handler);

broadcastHandler += handler;

}

}

 

/// <summary>

/// 取消對參數handler的監聽

/// </summary>

/// <param name="msgType">消息類型</param>

/// <param name="handler">被監聽方法</param>

public void RemoveListener(int msgType, ResponMsgHandler handler)

{

if (mDicMsgs != null && mDicMsgs.ContainsKey(msgType))

{

mDicMsgs[msgType] -= handler;

mDicMsgs.Remove(msgType);

}

}


 

private void AddListener(int msgType, ResponMsgHandler handler)

{

if (mDicMsgs == null)

mDicMsgs = new Dictionary<int, ResponMsgHandler>();

if (mDicMsgs.ContainsKey(msgType))

{

return;

}

else

{

mDicMsgs.Add(msgType, null);

mDicMsgs[msgType] += handler;

}

 

}

 

private void ClearAllListeners()

{

if (mDicMsgs != null)

mDicMsgs.Clear();

}

 

private void SendResponMsg(int msgType, ServerMessage msg)

{

ResponMsgHandler handler;

if (mDicMsgs != null && mDicMsgs.TryGetValue(msgType, out handler))

{

if (handler != null)

handler(msg);

}

}

 

private void SendBroadcastMsg(ServerMessage msg)

{

if (broadcastHandler != null)

broadcastHandler(msg);

}

 

void Awake()

{

instance = this;

}

Thread ThreadDoMessage;

bool isThreadDoMessageAbort;

void Start()

{

if (!isEditIP)

{

string str = Callback.UnityCheckEnvironment();

Debug.Log("UnityCheckEnvironment str:" + str);

if (str.Equals("dev") || str.Equals("test"))

{

IP = "106.2.124.243";

PORT = 29090;

}

if (str.Equals("online"))

{

IP = "106.2.124.243";

PORT = 9090;

}

}

StartConnect(IP, PORT);

ThreadDoMessage = new Thread(new ThreadStart(DoMessage));

//ThreadDoMessage.IsBackground = true;

//ThreadDoMessage.Priority = System.Threading.ThreadPriority.Normal;

 

ThreadDoMessage.Start();

isThreadDoMessageAbort = true;

InvokeRepeating("SendRepeat", 0, 2);

 

}

 

TcpClient tcpClient;

byte[] receive_buff;

byte[] data_buff;

byte[] data_buff2;

CodedOutputStream outputStream;

 

bool StartConnect(string IP, int PORT)

{

TcpClient client = new TcpClient();

tcpClient = client;

try

{

client.Connect(IP, PORT);

receive_buff = new byte[client.ReceiveBufferSize];

outputStream = new CodedOutputStream(client.GetStream());

client.GetStream().BeginRead(receive_buff, 0, client.ReceiveBufferSize, new AsyncCallback(ReceiveMessage), null);

}

catch (Exception e)

{

Debug.LogError("Connect Error:" + e);

}

 

isSuccess = client.Connected;

QueueServerMessage.Clear();

Debug.Log("StartConnect isSuccess:" + client.Connected);

heartTime = 0;

return isSuccess;

 

}

int heartTime = 0;

void SendRepeat()

{

SendMsg(PackMessage.PackClientHearMessage());

heartTime++;

// Debug.Log("SendMsg PackMessage.PackClientHearMessage:" + heartTime);

}

private bool SocketConnected()

{

try

{

return !tcpClient.Client.Poll(1, SelectMode.SelectRead) && (tcpClient.Client.Available == 0);

}

catch (SocketException)

{

return false;

}

catch (ObjectDisposedException)

{

return false;

}

}

 

void FixedUpdate()

{

if (!isSuccess)

{

Debug.Log("ReconnectUI");

ReconnectUI();

// return;

}

while (QueueServerMessage.Count > 0)

{

ServerMessageDo(QueueServerMessage.Dequeue());

}

 

}

 

void DoMessage()

{

Debug.Log("DoMessage 1");

while (true)

{

try

{

if (!isThreadDoMessageAbort)

{

Debug.Log("Exit DoMessage");

return;

}

// Debug.Log("DoMessage while (true)");

// if (((tcpClient.Client.Poll(1000, SelectMode.SelectRead) && (tcpClient.Client.Available == 0)) || !tcpClient.Client.Connected))

// {

// Debug.Log("tcpClient false");

// isSuccess = false;

// }

// isSuccess=SocketConnected();

 

lock (lockObj)

{

if (null != tcpClient && null != tcpClient.Client)

{

if (!tcpClient.Client.Connected || heartTime >= 2)

{

// Debug.Log("Update heartTime:" + heartTime);

isSuccess = false;

}

}

}


 

// if (!isSuccess)

// {

// Debug.Log("!isSuccess");

// // ReconnectUI();

// return;

// }

 

while (true)

{

if (data_buff != null)

{

lock (lockObj)

{

if (data_buff2 == null)

{

data_buff2 = new byte[data_buff.Length];

Array.Copy(data_buff, data_buff2, data_buff.Length);

}

else

{

byte[] temp = new byte[data_buff2.Length + data_buff.Length];

Array.Copy(data_buff2, temp, data_buff2.Length);

Array.Copy(data_buff, 0, temp, data_buff2.Length, data_buff.Length);

data_buff2 = temp;

}

data_buff = null;

}

}

else

{

break;

}

}

 

while (true)

{

if (data_buff2 != null)

{

CodedInputStream inputStream = new CodedInputStream(data_buff2);

int length = inputStream.ReadLength();

int lengthLength = CodedOutputStream.ComputeLengthSize(length);

 

if (length + lengthLength <= data_buff2.Length)

{

byte[] real_data = new byte[length];

Array.Copy(data_buff2, lengthLength, real_data, 0, length);

if (real_data.Length == 0)

{

Debug.LogError("real_data.Length==0");

}

ServerMessage sMsg = ServerMessage.Parser.ParseFrom(real_data);

// ServerMessageDo(sMsg);

QueueServerMessage.Enqueue(sMsg);

// Debug.Log("--ServerMessage:" + sMsg);

// Write.Log("--ServerMessage:" + sMsg);

if (length + lengthLength == data_buff2.Length)

{

data_buff2 = null;

}

else

{

byte[] temp = new byte[data_buff2.Length - length - lengthLength];

Array.Copy(data_buff2, lengthLength + length, temp, 0, temp.Length);

data_buff2 = temp;

}

}

else

{

break;

}

}

else

{

break;

}

}

}

catch (Exception exception)

{

Debug.Log("Socket exception: " + exception);

}

}

}


 

private void SendMsg(pb::IMessage message)

{

// Debug.Log("++SendMsg:" + message);

// Write.Log("++SendMsg:" + message);

if (!isSuccess)

{

Debug.Log("!isSuccess");

return;

}

try

{

if (outputStream != null)

{

outputStream.WriteMessage(message);

outputStream.Flush();

}

}

catch (Exception e)

{

Debug.LogError("SendMsg Exception:" + e);

isSuccess = false;

}

}

 

private void ServerMessageDo(ServerMessage sMsg)

{

if (sMsg == null)

{

Debug.LogError("ServerMessageDo sMsg==null");

return;

}

if (sMsg.MessageCategory == ServerMessageCategory.UndefinedServerMessageCategory)

{

Debug.LogError("sMsg.MessageCategory == ServerMessageCategory.UndefinedServerMessageCategory:" + sMsg);

return;

}

// Debug.Log("--sMsg:" + sMsg);

if (sMsg.MessageCategory == ServerMessageCategory.Broadcast)

{

// Debug.Log("Broadcast:" + sMsg);

// Write.Log("Broadcast:" + sMsg);

Broadcast broadcast = sMsg.Broadcast; // 廣播

SendBroadcastMsg(sMsg);

 

}

else if (sMsg.MessageCategory == ServerMessageCategory.Response)

{

if (null == sMsg.Response)

{

Debug.LogError("ServerMessageDo null==sMsg.Response:" + sMsg);

return;

}

// 對客戶端請求的響應

Response response = sMsg.Response;

// Debug.Log("response.Ack:" + response.Ack);

SendResponMsg(response.Ack, sMsg);

 

}

else if (sMsg.MessageCategory == ServerMessageCategory.ServerHeartbeat)

{

ServerHeartbeat serverHeartbeat = sMsg.ServerHeartbeat; // 服務端心跳

heartTime--;

// Debug.Log("PackClientHearMessage serverHeartbeat 心跳:" + heartTime);

}

 

}

 

private void ReceiveMessage(IAsyncResult ar)

{

// Debug.Log("receive_buff:" + receive_buff.Length);

try

{

// Debug.Log("ReceiveMessage id =" + Thread.CurrentThread.ManagedThreadId);

int bytesRead;

bytesRead = tcpClient.GetStream().EndRead(ar);

 

if (bytesRead < 1)

{

Debug.LogError("網絡已斷開 bytesRead < 1");

//TODO 重連

isSuccess = false;

return;

}

else

{

lock (lockObj)

{

if (data_buff == null)

{

data_buff = new byte[bytesRead];

Array.Copy(receive_buff, data_buff, bytesRead);

}

else

{

byte[] new_data = new byte[bytesRead + data_buff.Length];

Array.Copy(data_buff, new_data, data_buff.Length);

Array.Copy(receive_buff, 0, new_data, data_buff.Length, bytesRead);

data_buff = new_data;

}

// Debug.Log("ThreadId:" + Thread.CurrentThread.ManagedThreadId + "data_buff:" + data_buff.Length);

}

}

 

tcpClient.GetStream().BeginRead(receive_buff, 0, tcpClient.ReceiveBufferSize, new AsyncCallback(ReceiveMessage), null);

 

//string strbuff = Encoding.UTF8.GetString(receive_buff, 0, bytesRead);

// Debug.Log("bytesRead:" + bytesRead + "/" + receive_buff.Length + " receive_buff:" + strbuff);

// Write.Log("bytesRead:" + bytesRead + "/" + receive_buff.Length + " receive_buff:" + strbuff);


 

}

catch (Exception ex)

{

Debug.Log("ReceiveMessageException:" + ex);

}

}

 

// int reconnetTime = 0;

bool isSuccess = false;

 

public void ReconnectUI()

{

ARCarUIManager.Instance.ARNetPanel.SetActive(true);

}

 

public bool Reconnect()

{

Debug.Log("正在嘗試從新鏈接網絡...");

bool isok = StartConnect(IP, PORT);

ARCarUIManager.Instance.ARNetPanel.SetActive(false);

return isok;

}

 

// void Reconnect()

// {

// Debug.Log("Reconnect");

// if (!isSuccess)

// {

// tcpClient.Close();

// InvokeRepeating("InvokeReconnect", 0, 8);

// }

// }

 

// void InvokeReconnect()

// {

// reconnetTime++;

// Debug.LogError("正在嘗試從新鏈接網絡..." + reconnetTime);

// StartConnect(IP, PORT);

// if (isSuccess || reconnetTime >= 5)

// {

// CancelInvoke("InvokeReconnect");

// }

// }

 

public void OnDestroy()

{

try

{

tcpClient.GetStream().Close();

tcpClient.Close();

isSuccess = false;

ThreadDoMessage.Abort();

isThreadDoMessageAbort = false;

}

catch (Exception e)

{

Debug.Log(" tcpClient.Close e:" + e);

}

broadcastHandler = null;

mDicMsgs = null;

BroadMsg = null;

ClientManager.instance = null;

}

 

}

相關文章
相關標籤/搜索