using Google.Protobuf;
//using Google.Protobuf.Examples.AddPerson;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Net.Sockets;
using UnityEngine;
using ARProto;
using pb = global::Google.Protobuf;
public class NewBehaviourScript : MonoBehaviour
{服務器
void OnGUI()
{
if (GUI.Button(new Rect(100, 10, 120, 100), new GUIContent("Button", "Go")))
{
UIGo();
}網絡
}異步
public void UIGo()
{
Person person =new Person();
person.Id=01;
AddPerson addPerson =new AddPerson();
SendMsg(addPerson);tcp
}
// Use this for initialization
void Start()
{函數
StartConnect();
}測試
TcpClient tcpClient; //
byte[] receive_buff; // 專門用來接收Socket裏面的數據的
byte[] data_buff; // 用來存當前未處理的數據ui
CodedOutputStream outputStream; // 用來綁定SocketStream,方便把proto對象轉換成字節流Stream輸送給服務器this
void StartConnect()
{
TcpClient client = new TcpClient();
tcpClient = client;編碼
//這裏寫上你本身服務器的ip和端口
client.Connect("106.2.124.243", 9090);線程
receive_buff = new byte[client.ReceiveBufferSize];
outputStream = new CodedOutputStream(client.GetStream());
// 監聽一波服務器消息
client.GetStream().BeginRead(receive_buff, 0, client.ReceiveBufferSize, ReceiveMessage, null);
}
int nFalg = 0; // 這個變量主要是爲了防止和服務端無休無止互發消息,測試代碼
void Update()
{
// 由於ReceiveMessage接收數據是異步的方式,不是在主線程,有些方法不能用,好比ToString,因此消息處理放在這裏處理
// 但主要是由於後面要加上消息廣播,能夠添加在這裏
if (data_buff != null && ++nFalg < 5)
{
// 把數據傳給CodedInputStream計算本次包的長度
CodedInputStream inputStream = new CodedInputStream(data_buff);
int length = inputStream.ReadLength();
// 計算"包長度"佔用的字節數,後面取數據的時候扣掉這個字節數,就是真實數據長度
int lengthLength = CodedOutputStream.ComputeLengthSize(length);
// 當前數據足夠解析一個包了
if (length + lengthLength <= data_buff.Length)
{
byte[] real_data = new byte[length];
// 拷貝真實數據
Array.Copy(data_buff, lengthLength, real_data, 0, length);
// 假設服務器給你發了個AddressBook
AddPerson ab = AddPerson.Parser.ParseFrom(real_data);
// 把這個數據直接還給服務器,驗證客戶端發給服務器的狀況
SendMsg(ab);
// 數據剛恰好,沒有多餘的
if (length + lengthLength == data_buff.Length)
{
data_buff = null;
}
else
{
// 數據有剩餘,保存剩餘數據,等下一個Update解析
byte[] t = new byte[data_buff.Length - length - lengthLength];
Array.Copy(data_buff, lengthLength + length, t, 0, t.Length);
data_buff = t;
}
}
}
}
// 發送數據
public void SendMsg( pb::IMessage<AddPerson> message)
{
if (outputStream != null)
{
// WriteMessage 裏面會先write一個長度,而後再write真實數據
//// outputStream.WriteMessage(message);
outputStream.Flush(); // 把buffer數據寫入到tcpClient的流裏面
}
}
public void ReceiveMessage(IAsyncResult ar)
{
Debug.Log("消息:" + receive_buff);
try
{
// 本次接收到的數據長度
int bytesRead = tcpClient.GetStream().EndRead(ar);
if (bytesRead < 1)
{
Debug.LogError("bytesRead < 1");
return;
}
else
{
if (data_buff == null)
{
// buff裏面沒有數據
data_buff = new byte[bytesRead];
Array.Copy(receive_buff, data_buff, bytesRead);
}
else
{
// buff裏面有數據,要和新數據整合起來
byte[] new_data = new byte[bytesRead + data_buff.Length];
Array.Copy(data_buff, new_data, data_buff.Length);
Array.Copy(receive_buff, 0, new_data, data_buff.Length, bytesRead);
data_buff = new_data;
}
}
// 繼續監聽下一波數據
tcpClient.GetStream().BeginRead(receive_buff, 0, tcpClient.ReceiveBufferSize, ReceiveMessage, null);
}
catch (Exception ex)
{
// 爲了防止報ex沒被使用的警告
Debug.Log(ex);
}
}
}
服務器的粘包拆包是Netty自己支持的解碼編碼器,以下圖
服務器粘包、拆包處理方式
總共四行,其中第一行做用在拆包的時候,第三行做用在粘包的時候(我猜的)。
它這個拆包粘包不是普通的那種固定4個字節標示長度的,而是有時候1個字節,有時候是二、三、四、5個字節,根據當前發送的真實數據的長度定的。
在普通的方案粘包方案,數據是這樣的:4個字節+真實數據
有的是用換行回車做爲標識符拆包、粘包
那在Netty的方案裏,包長度到底是幾個字節呢?
其實它也是用到了Protobuff裏面的數據讀取、保存方式,感興趣的能夠打開protobuf3-for-unity-3.0.0\src\Google.Protobuf.sln
工程看一下,在Google.Protobuf
項目中,打開CodedInputStream.cs
:
SlowReadRawVarint32
包頭佔用幾個字節是由下面這個函數計算的:
這個是計算一個uint數據的真實長度的方法
這也是protobuff對象編碼後數據會比較小的主要緣由。好比一個對象編碼後獲得的是440個字節數據,那麼調用ComputeRawVarint32Size(440)
的返回值是2,也就是服務器和客戶端發送的數據最終長度是440+2=442個字節。明白了這些,拆包和粘包就都不是問題了。
上面的代碼裏,粘包是這一段:
public void SendMsg(IMessage message) { if (outputStream != null) { // WriteMessage 裏面會先write一個長度,而後再write真實數據 outputStream.WriteMessage(message); outputStream.Flush(); // 把buffer數據寫入到tcpClient的流裏面 } }
乍一看,好像沒有在真實數據前面加長度啊?其實,在outputStream的WriteMessage裏面已經有WriteLength了,幫咱們作好了。
image.png
再看拆包:
// 把數據傳給CodedInputStream計算本次包的長度 CodedInputStream inputStream = new CodedInputStream(data_buff); int length = inputStream.ReadLength(); // 計算"包長度"佔用的字節數,後面取數據的時候扣掉這個字節數,就是真實數據長度 int lengthLength = CodedOutputStream.ComputeLengthSize(length); // 當前數據足夠解析一個包了 if (length + lengthLength <= data_buff.Length) { byte[] real_data = new byte[length]; // 拷貝真實數據 Array.Copy(data_buff, lengthLength, real_data, 0, length); // 假設服務器給你發了個AddressBook AddressBook ab = AddressBook.Parser.ParseFrom(real_data); ... }
先用CodedInputStream 看看這個「包大小」值是多少,再用CodedOutputStream.ComputeLengthSize計算這個「包大小」佔幾個字節,而後就明白真實數據從哪裏開始,佔多少字節了。
本身寫的消息註冊和分發:
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Net.Sockets;
using UnityEngine;
using ARProto;
using pb = global::Google.Protobuf;
using System.Threading;
using System.Text;
using NIO;
public class ClientManager : MonoBehaviour
{
private static object lockObj = new object();
public static ClientManager instance;
public bool isEditIP;
public string IP = "106.2.124.243";
public int PORT = 9090;
public delegate void ResponMsgHandler(ServerMessage msg);
public delegate void BroadcastHandler(ServerMessage msg);
Queue<ServerMessage> QueueServerMessage = new Queue<ServerMessage>();
public event BroadcastHandler broadcastHandler;
private Dictionary<int, ResponMsgHandler> mDicMsgs = new Dictionary<int, ResponMsgHandler>();
private List<BroadcastHandler> BroadMsg = new List<BroadcastHandler>();
/// <summary>
/// 發送請求消息
/// </summary>
/// <param name="clientMessage">消息</param>
/// <param name="handler">被監聽回調方法</param>
public void SendRequestMsg(ClientMessage clientMessage, ResponMsgHandler handler)
{
AddListener(clientMessage.Seq, handler);
SendMsg(clientMessage);
}
/// <summary>
/// 訂閱廣播消息 只須要註冊一遍
/// </summary>
/// <param name="handler">被監聽方法</param>
public void AddBroadcastListener(BroadcastHandler handler)
{
if (BroadMsg == null)
BroadMsg = new List<BroadcastHandler>();
if (BroadMsg.Contains(handler))
{
return;
}
else
{
BroadMsg.Add(handler);
broadcastHandler += handler;
}
}
/// <summary>
/// 取消對參數handler的監聽
/// </summary>
/// <param name="msgType">消息類型</param>
/// <param name="handler">被監聽方法</param>
public void RemoveListener(int msgType, ResponMsgHandler handler)
{
if (mDicMsgs != null && mDicMsgs.ContainsKey(msgType))
{
mDicMsgs[msgType] -= handler;
mDicMsgs.Remove(msgType);
}
}
private void AddListener(int msgType, ResponMsgHandler handler)
{
if (mDicMsgs == null)
mDicMsgs = new Dictionary<int, ResponMsgHandler>();
if (mDicMsgs.ContainsKey(msgType))
{
return;
}
else
{
mDicMsgs.Add(msgType, null);
mDicMsgs[msgType] += handler;
}
}
private void ClearAllListeners()
{
if (mDicMsgs != null)
mDicMsgs.Clear();
}
private void SendResponMsg(int msgType, ServerMessage msg)
{
ResponMsgHandler handler;
if (mDicMsgs != null && mDicMsgs.TryGetValue(msgType, out handler))
{
if (handler != null)
handler(msg);
}
}
private void SendBroadcastMsg(ServerMessage msg)
{
if (broadcastHandler != null)
broadcastHandler(msg);
}
void Awake()
{
instance = this;
}
Thread ThreadDoMessage;
bool isThreadDoMessageAbort;
void Start()
{
if (!isEditIP)
{
string str = Callback.UnityCheckEnvironment();
Debug.Log("UnityCheckEnvironment str:" + str);
if (str.Equals("dev") || str.Equals("test"))
{
IP = "106.2.124.243";
PORT = 29090;
}
if (str.Equals("online"))
{
IP = "106.2.124.243";
PORT = 9090;
}
}
StartConnect(IP, PORT);
ThreadDoMessage = new Thread(new ThreadStart(DoMessage));
//ThreadDoMessage.IsBackground = true;
//ThreadDoMessage.Priority = System.Threading.ThreadPriority.Normal;
ThreadDoMessage.Start();
isThreadDoMessageAbort = true;
InvokeRepeating("SendRepeat", 0, 2);
}
TcpClient tcpClient;
byte[] receive_buff;
byte[] data_buff;
byte[] data_buff2;
CodedOutputStream outputStream;
bool StartConnect(string IP, int PORT)
{
TcpClient client = new TcpClient();
tcpClient = client;
try
{
client.Connect(IP, PORT);
receive_buff = new byte[client.ReceiveBufferSize];
outputStream = new CodedOutputStream(client.GetStream());
client.GetStream().BeginRead(receive_buff, 0, client.ReceiveBufferSize, new AsyncCallback(ReceiveMessage), null);
}
catch (Exception e)
{
Debug.LogError("Connect Error:" + e);
}
isSuccess = client.Connected;
QueueServerMessage.Clear();
Debug.Log("StartConnect isSuccess:" + client.Connected);
heartTime = 0;
return isSuccess;
}
int heartTime = 0;
void SendRepeat()
{
SendMsg(PackMessage.PackClientHearMessage());
heartTime++;
// Debug.Log("SendMsg PackMessage.PackClientHearMessage:" + heartTime);
}
private bool SocketConnected()
{
try
{
return !tcpClient.Client.Poll(1, SelectMode.SelectRead) && (tcpClient.Client.Available == 0);
}
catch (SocketException)
{
return false;
}
catch (ObjectDisposedException)
{
return false;
}
}
void FixedUpdate()
{
if (!isSuccess)
{
Debug.Log("ReconnectUI");
ReconnectUI();
// return;
}
while (QueueServerMessage.Count > 0)
{
ServerMessageDo(QueueServerMessage.Dequeue());
}
}
void DoMessage()
{
Debug.Log("DoMessage 1");
while (true)
{
try
{
if (!isThreadDoMessageAbort)
{
Debug.Log("Exit DoMessage");
return;
}
// Debug.Log("DoMessage while (true)");
// if (((tcpClient.Client.Poll(1000, SelectMode.SelectRead) && (tcpClient.Client.Available == 0)) || !tcpClient.Client.Connected))
// {
// Debug.Log("tcpClient false");
// isSuccess = false;
// }
// isSuccess=SocketConnected();
lock (lockObj)
{
if (null != tcpClient && null != tcpClient.Client)
{
if (!tcpClient.Client.Connected || heartTime >= 2)
{
// Debug.Log("Update heartTime:" + heartTime);
isSuccess = false;
}
}
}
// if (!isSuccess)
// {
// Debug.Log("!isSuccess");
// // ReconnectUI();
// return;
// }
while (true)
{
if (data_buff != null)
{
lock (lockObj)
{
if (data_buff2 == null)
{
data_buff2 = new byte[data_buff.Length];
Array.Copy(data_buff, data_buff2, data_buff.Length);
}
else
{
byte[] temp = new byte[data_buff2.Length + data_buff.Length];
Array.Copy(data_buff2, temp, data_buff2.Length);
Array.Copy(data_buff, 0, temp, data_buff2.Length, data_buff.Length);
data_buff2 = temp;
}
data_buff = null;
}
}
else
{
break;
}
}
while (true)
{
if (data_buff2 != null)
{
CodedInputStream inputStream = new CodedInputStream(data_buff2);
int length = inputStream.ReadLength();
int lengthLength = CodedOutputStream.ComputeLengthSize(length);
if (length + lengthLength <= data_buff2.Length)
{
byte[] real_data = new byte[length];
Array.Copy(data_buff2, lengthLength, real_data, 0, length);
if (real_data.Length == 0)
{
Debug.LogError("real_data.Length==0");
}
ServerMessage sMsg = ServerMessage.Parser.ParseFrom(real_data);
// ServerMessageDo(sMsg);
QueueServerMessage.Enqueue(sMsg);
// Debug.Log("--ServerMessage:" + sMsg);
// Write.Log("--ServerMessage:" + sMsg);
if (length + lengthLength == data_buff2.Length)
{
data_buff2 = null;
}
else
{
byte[] temp = new byte[data_buff2.Length - length - lengthLength];
Array.Copy(data_buff2, lengthLength + length, temp, 0, temp.Length);
data_buff2 = temp;
}
}
else
{
break;
}
}
else
{
break;
}
}
}
catch (Exception exception)
{
Debug.Log("Socket exception: " + exception);
}
}
}
private void SendMsg(pb::IMessage message)
{
// Debug.Log("++SendMsg:" + message);
// Write.Log("++SendMsg:" + message);
if (!isSuccess)
{
Debug.Log("!isSuccess");
return;
}
try
{
if (outputStream != null)
{
outputStream.WriteMessage(message);
outputStream.Flush();
}
}
catch (Exception e)
{
Debug.LogError("SendMsg Exception:" + e);
isSuccess = false;
}
}
private void ServerMessageDo(ServerMessage sMsg)
{
if (sMsg == null)
{
Debug.LogError("ServerMessageDo sMsg==null");
return;
}
if (sMsg.MessageCategory == ServerMessageCategory.UndefinedServerMessageCategory)
{
Debug.LogError("sMsg.MessageCategory == ServerMessageCategory.UndefinedServerMessageCategory:" + sMsg);
return;
}
// Debug.Log("--sMsg:" + sMsg);
if (sMsg.MessageCategory == ServerMessageCategory.Broadcast)
{
// Debug.Log("Broadcast:" + sMsg);
// Write.Log("Broadcast:" + sMsg);
Broadcast broadcast = sMsg.Broadcast; // 廣播
SendBroadcastMsg(sMsg);
}
else if (sMsg.MessageCategory == ServerMessageCategory.Response)
{
if (null == sMsg.Response)
{
Debug.LogError("ServerMessageDo null==sMsg.Response:" + sMsg);
return;
}
// 對客戶端請求的響應
Response response = sMsg.Response;
// Debug.Log("response.Ack:" + response.Ack);
SendResponMsg(response.Ack, sMsg);
}
else if (sMsg.MessageCategory == ServerMessageCategory.ServerHeartbeat)
{
ServerHeartbeat serverHeartbeat = sMsg.ServerHeartbeat; // 服務端心跳
heartTime--;
// Debug.Log("PackClientHearMessage serverHeartbeat 心跳:" + heartTime);
}
}
private void ReceiveMessage(IAsyncResult ar)
{
// Debug.Log("receive_buff:" + receive_buff.Length);
try
{
// Debug.Log("ReceiveMessage id =" + Thread.CurrentThread.ManagedThreadId);
int bytesRead;
bytesRead = tcpClient.GetStream().EndRead(ar);
if (bytesRead < 1)
{
Debug.LogError("網絡已斷開 bytesRead < 1");
//TODO 重連
isSuccess = false;
return;
}
else
{
lock (lockObj)
{
if (data_buff == null)
{
data_buff = new byte[bytesRead];
Array.Copy(receive_buff, data_buff, bytesRead);
}
else
{
byte[] new_data = new byte[bytesRead + data_buff.Length];
Array.Copy(data_buff, new_data, data_buff.Length);
Array.Copy(receive_buff, 0, new_data, data_buff.Length, bytesRead);
data_buff = new_data;
}
// Debug.Log("ThreadId:" + Thread.CurrentThread.ManagedThreadId + "data_buff:" + data_buff.Length);
}
}
tcpClient.GetStream().BeginRead(receive_buff, 0, tcpClient.ReceiveBufferSize, new AsyncCallback(ReceiveMessage), null);
//string strbuff = Encoding.UTF8.GetString(receive_buff, 0, bytesRead);
// Debug.Log("bytesRead:" + bytesRead + "/" + receive_buff.Length + " receive_buff:" + strbuff);
// Write.Log("bytesRead:" + bytesRead + "/" + receive_buff.Length + " receive_buff:" + strbuff);
}
catch (Exception ex)
{
Debug.Log("ReceiveMessageException:" + ex);
}
}
// int reconnetTime = 0;
bool isSuccess = false;
public void ReconnectUI()
{
ARCarUIManager.Instance.ARNetPanel.SetActive(true);
}
public bool Reconnect()
{
Debug.Log("正在嘗試從新鏈接網絡...");
bool isok = StartConnect(IP, PORT);
ARCarUIManager.Instance.ARNetPanel.SetActive(false);
return isok;
}
// void Reconnect()
// {
// Debug.Log("Reconnect");
// if (!isSuccess)
// {
// tcpClient.Close();
// InvokeRepeating("InvokeReconnect", 0, 8);
// }
// }
// void InvokeReconnect()
// {
// reconnetTime++;
// Debug.LogError("正在嘗試從新鏈接網絡..." + reconnetTime);
// StartConnect(IP, PORT);
// if (isSuccess || reconnetTime >= 5)
// {
// CancelInvoke("InvokeReconnect");
// }
// }
public void OnDestroy()
{
try
{
tcpClient.GetStream().Close();
tcpClient.Close();
isSuccess = false;
ThreadDoMessage.Abort();
isThreadDoMessageAbort = false;
}
catch (Exception e)
{
Debug.Log(" tcpClient.Close e:" + e);
}
broadcastHandler = null;
mDicMsgs = null;
BroadMsg = null;
ClientManager.instance = null;
}
}