版權聲明:本文爲原創文章,轉載請聲明http://www.cnblogs.com/unityExplorer/p/6986474.html
html
上一篇講到了數據的處理,這一篇主要講使用多線程收發消息緩存
1 //建立消息數據模型 2 //正式項目中,消息的結構通常是消息長度+消息id+消息主體內容 3 public class Message 4 { 5 public IExtensible protobuf; 6 public int messageId; 7 } 8 9 public class SocketClientTemp : MonoBehaviour 10 { 11 const int packageMaxLength = 1024; 12 13 Socket mSocket; 14 Thread threadSend; 15 Thread threadRecive; 16 Queue<Message> allMessages = new Queue<Message>(); 17 Queue<byte[]> sendQueue = new Queue<byte[]>(); 18 19 public bool Init() 20 { 21 //建立一個socket對象 22 mSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 23 return SocketConnection("此處是ip", 1111); 24 } 25 26 void Update() 27 { 28 AnalysisMessage(); 29 } 30 31 /// <summary> 32 /// 創建服務器鏈接 33 /// </summary> 34 /// <param name="ip">服務器的ip地址</param> 35 /// <param name="port">端口</param> 36 bool SocketConnection(string ip, int port) 37 { 38 try 39 { 40 IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port); 41 //同步鏈接服務器,實際使用時推薦使用異步鏈接,處理方式會在下一篇講斷線重連時講到 42 mSocket.Connect(ipep); 43 //鏈接成功後,建立兩個線程,分別用於發送和接收消息 44 threadSend = new Thread(new ThreadStart(SendMessage)); 45 threadSend.Start(); 46 threadRecive = new Thread(new ThreadStart(ReceiveMessage)); 47 threadRecive.Start(); 48 return true; 49 } 50 catch (Exception e) 51 { 52 Debug.Log(e.ToString()); 53 Close(); 54 return false; 55 } 56 } 57 58 #region ...發送消息 59 /// <summary> 60 /// 添加數據到發送隊列 61 /// </summary> 62 /// <param name="protobufModel"></param> 63 /// <param name="messageId"></param> 64 public void AddSendMessageQueue(IExtensible protobufModel, int messageId) 65 { 66 sendQueue.Enqueue(BuildPackage(protobufModel, messageId)); 67 } 68 69 void SendMessage() 70 { 71 //循環獲取發送隊列中第一個數據,而後發送到服務器 72 while (true) 73 { 74 if (sendQueue.Count == 0) 75 { 76 Thread.Sleep(100); 77 continue; 78 } 79 if (!mSocket.Connected) 80 { 81 Close(); 82 break; 83 } 84 else 85 Send(sendQueue.Peek());//發送隊列中第一條數據 86 } 87 } 88 89 void Send(byte[] bytes) 90 { 91 try 92 { 93 mSocket.Send(bytes, SocketFlags.None); 94 //發送成功後,從發送隊列中移除已發送的消息 95 sendQueue.Dequeue(); 96 } 97 catch (SocketException e) 98 { 99 //若是錯誤碼爲10035,說明服務器緩存區滿了,因此等100毫秒再次發送 100 if (e.NativeErrorCode == 10035) 101 { 102 Thread.Sleep(100); 103 Send(bytes); 104 } 105 else 106 Debug.Log(e.ToString()); 107 } 108 } 109 #endregion 110 111 #region ...接收消息 112 /// <summary> 113 /// 解析收到的消息 114 /// </summary> 115 void AnalysisMessage() 116 { 117 while (allMessages.Count > 0) 118 { 119 int id = allMessages.Dequeue().messageId; 120 switch (id) 121 { 122 //根據消息id作不一樣的處理 123 } 124 } 125 } 126 127 /// <summary> 128 /// 接收數據 129 /// </summary> 130 void ReceiveMessage() 131 { 132 while (true) 133 { 134 if (!mSocket.Connected) 135 break; 136 byte[] recvBytesHead = GetBytesReceive(4); 137 int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(recvBytesHead, 0)); 138 byte[] recvBytesBody = GetBytesReceive(bodyLength); 139 140 byte[] messageId = new byte[4]; 141 Array.Copy(recvBytesBody, 0, messageId, 0, 4); 142 byte[] messageBody = new byte[bodyLength - 4]; 143 Array.Copy(recvBytesBody, 4, messageBody, 0, bodyLength - 4); 144 145 if (BitConverter.IsLittleEndian) 146 Array.Reverse(messageId); 147 FillAllPackages(BitConverter.ToInt32(messageId, 0), messageBody); 148 } 149 } 150 151 /// <summary> 152 /// 填充接收消息隊列 153 /// </summary> 154 /// <param name="messageId"></param> 155 /// <param name="messageBody"></param> 156 void FillAllPackages(int messageId, byte[] messageBody) 157 { 158 switch (messageId) 159 { 160 //根據消息id處理消息,並添加到接收消息隊列 161 case 1: 162 allMessages.Enqueue(new Message() 163 { 164 protobuf = ProtobufSerilizer.DeSerialize<TestTemp>(messageBody), 165 messageId = messageId 166 }); 167 break; 168 } 169 } 170 171 /// <summary> 172 /// 接收數據並處理 173 /// </summary> 174 /// <param name="length"></param> 175 /// <returns></returns> 176 byte[] GetBytesReceive(int length) 177 { 178 byte[] recvBytes = new byte[length]; 179 while (length > 0) 180 { 181 byte[] receiveBytes = new byte[length < packageMaxLength ? length : packageMaxLength]; 182 int iBytesBody = 0; 183 if (length >= receiveBytes.Length) 184 iBytesBody = mSocket.Receive(receiveBytes, receiveBytes.Length, 0); 185 else 186 iBytesBody = mSocket.Receive(receiveBytes, length, 0); 187 receiveBytes.CopyTo(recvBytes, recvBytes.Length - length); 188 length -= iBytesBody; 189 } 190 return recvBytes; 191 } 192 #endregion 193 194 /// <summary> 195 /// 構建消息數據包 196 /// </summary> 197 /// <param name="protobufModel"></param> 198 /// <param name="messageId"></param> 199 byte[] BuildPackage(IExtensible protobufModel, int messageId) 200 { 201 byte[] b; 202 if (protobufModel != null) 203 b = ProtobufSerilizer.Serialize(protobufModel); 204 else 205 b = new byte[0]; 206 //消息長度(int數據,長度4) + 消息id(int數據,長度4) + 消息主體內容 207 ByteBuffer buf = ByteBuffer.Allocate(b.Length + 4 + 4); 208 //消息長度 = 消息主體內容長度 + 消息id長度 209 buf.WriteInt(b.Length + 4); 210 buf.WriteInt(messageId); 211 212 if (protobufModel != null) 213 buf.WriteBytes(b); 214 return buf.GetBytes(); 215 } 216 217 void OnDestroy() 218 { 219 //中止運行後,若是不關閉socket多線程,再次運行時,unity會卡死 220 Close(); 221 } 222 223 /// <summary> 224 /// 關閉socket,終止線程 225 /// </summary> 226 public void Close() 227 { 228 if (mSocket != null) 229 { 230 //微軟官方推薦在關閉socket前先shutdown,可是通過測試,發現網絡斷開後,shutdown會沒法執行 231 if (mSocket.Connected) 232 mSocket.Shutdown(SocketShutdown.Both); 233 mSocket.Close(); 234 mSocket = null; 235 } 236 //關閉線程 237 if (threadSend != null) 238 threadSend.Abort(); 239 if (threadRecive != null) 240 threadRecive.Abort(); 241 threadSend = null; 242 threadRecive = null; 243 } 244 }
到這裏,使用socket處理消息的收發就基本結束了,可是,某些項目爲了加強體驗,可能還會增長斷線重連的功能,這個功能會在下一篇講到服務器