unity探索者之socket傳輸protobuf字節流(三)

版權聲明:本文爲原創文章,轉載請聲明http://www.cnblogs.com/unityExplorer/p/6986474.html 
html

上一篇講到了數據的處理,這一篇主要講使用多線程收發消息緩存

  1 //建立消息數據模型
  2 //正式項目中,消息的結構通常是消息長度+消息id+消息主體內容
  3 public class Message
  4 {
  5     public IExtensible protobuf;
  6     public int messageId;
  7 }
  8 
  9 public class SocketClientTemp : MonoBehaviour
 10 {
 11     const int packageMaxLength = 1024;
 12 
 13     Socket mSocket;
 14     Thread threadSend;
 15     Thread threadRecive;
 16     Queue<Message> allMessages = new Queue<Message>();
 17     Queue<byte[]> sendQueue = new Queue<byte[]>();
 18 
 19     public bool Init()
 20     {
 21         //建立一個socket對象
 22         mSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 23         return SocketConnection("此處是ip", 1111);
 24     }
 25 
 26     void Update()
 27     {
 28         AnalysisMessage();
 29     }
 30 
 31     /// <summary>
 32     /// 創建服務器鏈接
 33     /// </summary>
 34     /// <param name="ip">服務器的ip地址</param>
 35     /// <param name="port">端口</param>
 36     bool SocketConnection(string ip, int port)
 37     {
 38         try
 39         {
 40             IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port);
 41             //同步鏈接服務器,實際使用時推薦使用異步鏈接,處理方式會在下一篇講斷線重連時講到
 42             mSocket.Connect(ipep);
 43             //鏈接成功後,建立兩個線程,分別用於發送和接收消息
 44             threadSend = new Thread(new ThreadStart(SendMessage));
 45             threadSend.Start();
 46             threadRecive = new Thread(new ThreadStart(ReceiveMessage));
 47             threadRecive.Start();
 48             return true;
 49         }
 50         catch (Exception e)
 51         {
 52             Debug.Log(e.ToString());
 53             Close();
 54             return false;
 55         }
 56     }
 57 
 58     #region ...發送消息
 59     /// <summary>
 60     /// 添加數據到發送隊列
 61     /// </summary>
 62     /// <param name="protobufModel"></param>
 63     /// <param name="messageId"></param>
 64     public void AddSendMessageQueue(IExtensible protobufModel, int messageId)
 65     {
 66         sendQueue.Enqueue(BuildPackage(protobufModel, messageId));
 67     }
 68 
 69     void SendMessage()
 70     {
 71         //循環獲取發送隊列中第一個數據,而後發送到服務器
 72         while (true)
 73         {
 74             if (sendQueue.Count == 0)
 75             {
 76                 Thread.Sleep(100);
 77                 continue;
 78             }
 79             if (!mSocket.Connected)
 80             {
 81                 Close();
 82                 break;
 83             }
 84             else
 85                 Send(sendQueue.Peek());//發送隊列中第一條數據
 86         }
 87     }
 88 
 89     void Send(byte[] bytes)
 90     {
 91         try
 92         {
 93             mSocket.Send(bytes, SocketFlags.None);
 94             //發送成功後,從發送隊列中移除已發送的消息
 95             sendQueue.Dequeue();
 96         }
 97         catch (SocketException e)
 98         {
 99             //若是錯誤碼爲10035,說明服務器緩存區滿了,因此等100毫秒再次發送
100             if (e.NativeErrorCode == 10035)
101             {
102                 Thread.Sleep(100);
103                 Send(bytes);
104             }
105             else
106                 Debug.Log(e.ToString());
107         }
108     }
109     #endregion
110 
111     #region ...接收消息
112     /// <summary>
113     /// 解析收到的消息
114     /// </summary>
115     void AnalysisMessage()
116     {
117         while (allMessages.Count > 0)
118         {
119             int id = allMessages.Dequeue().messageId;
120             switch (id)
121             {
122                 //根據消息id作不一樣的處理
123             }
124         }
125     }
126 
127     /// <summary>
128     /// 接收數據
129     /// </summary>
130     void ReceiveMessage()
131     {
132         while (true)
133         {
134             if (!mSocket.Connected)
135                 break;
136             byte[] recvBytesHead = GetBytesReceive(4);
137             int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(recvBytesHead, 0));
138             byte[] recvBytesBody = GetBytesReceive(bodyLength);
139 
140             byte[] messageId = new byte[4];
141             Array.Copy(recvBytesBody, 0, messageId, 0, 4);
142             byte[] messageBody = new byte[bodyLength - 4];
143             Array.Copy(recvBytesBody, 4, messageBody, 0, bodyLength - 4);
144 
145             if (BitConverter.IsLittleEndian)
146                 Array.Reverse(messageId);
147             FillAllPackages(BitConverter.ToInt32(messageId, 0), messageBody);
148         }
149     }
150 
151     /// <summary>
152     /// 填充接收消息隊列
153     /// </summary>
154     /// <param name="messageId"></param>
155     /// <param name="messageBody"></param>
156     void FillAllPackages(int messageId, byte[] messageBody)
157     {
158         switch (messageId)
159         {
160             //根據消息id處理消息,並添加到接收消息隊列
161             case 1:
162                 allMessages.Enqueue(new Message() 
163                 {
164                     protobuf = ProtobufSerilizer.DeSerialize<TestTemp>(messageBody), 
165                     messageId = messageId 
166                 });
167                 break;
168         }
169     }
170 
171     /// <summary>
172     /// 接收數據並處理
173     /// </summary>
174     /// <param name="length"></param>
175     /// <returns></returns>
176     byte[] GetBytesReceive(int length)
177     {
178         byte[] recvBytes = new byte[length];
179         while (length > 0)
180         {
181             byte[] receiveBytes = new byte[length < packageMaxLength ? length : packageMaxLength];
182             int iBytesBody = 0;
183             if (length >= receiveBytes.Length)
184                 iBytesBody = mSocket.Receive(receiveBytes, receiveBytes.Length, 0);
185             else
186                 iBytesBody = mSocket.Receive(receiveBytes, length, 0);
187             receiveBytes.CopyTo(recvBytes, recvBytes.Length - length);
188             length -= iBytesBody;
189         }
190         return recvBytes;
191     }
192     #endregion
193 
194     /// <summary>
195     /// 構建消息數據包
196     /// </summary>
197     /// <param name="protobufModel"></param>
198     /// <param name="messageId"></param>
199     byte[] BuildPackage(IExtensible protobufModel, int messageId)
200     {
201         byte[] b;
202         if (protobufModel != null)
203             b = ProtobufSerilizer.Serialize(protobufModel);
204         else
205             b = new byte[0];
206         //消息長度(int數據,長度4) + 消息id(int數據,長度4) + 消息主體內容
207         ByteBuffer buf = ByteBuffer.Allocate(b.Length + 4 + 4);
208         //消息長度 = 消息主體內容長度 + 消息id長度
209         buf.WriteInt(b.Length + 4);
210         buf.WriteInt(messageId);
211 
212         if (protobufModel != null)
213             buf.WriteBytes(b);
214         return buf.GetBytes();
215     }
216 
217     void OnDestroy()
218     {
219         //中止運行後,若是不關閉socket多線程,再次運行時,unity會卡死
220         Close();
221     }
222 
223     /// <summary>
224     /// 關閉socket,終止線程
225     /// </summary>
226     public void Close()
227     {
228         if (mSocket != null)
229         {
230             //微軟官方推薦在關閉socket前先shutdown,可是通過測試,發現網絡斷開後,shutdown會沒法執行
231             if (mSocket.Connected)
232                 mSocket.Shutdown(SocketShutdown.Both);
233             mSocket.Close();
234             mSocket = null;
235         }
236         //關閉線程
237         if (threadSend != null)
238             threadSend.Abort();
239         if (threadRecive != null)
240             threadRecive.Abort();
241         threadSend = null;
242         threadRecive = null;
243     }
244 }

到這裏,使用socket處理消息的收發就基本結束了,可是,某些項目爲了加強體驗,可能還會增長斷線重連的功能,這個功能會在下一篇講到服務器

相關文章
相關標籤/搜索