本文完整代碼,能夠瀏覽:java
我在網上查閱過的 MINA 黏包處理,通常都是放在 Decoder 中作的。也就是黏包處理和消息解碼放在一塊兒作,顯得比較混亂很差打理。而如下這段代碼,我是把黏包處理放在 Filter 中了。在具體使用時能夠這樣:github
1 // 建立 IO 接收器 2 NioSocketAcceptor acceptor = new NioSocketAcceptor(); 3 4 // 獲取責任鏈 5 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 6 // 處理網絡粘包 7 chain.addLast("msgCumulative", new MsgCumulativeFilter()); 8 9 // 添加自定義編解碼器 10 chain.addLast("msgCodec", new ProtocolCodecFilter( 11 new XxxEncoder(), 12 new XxxDecoder() 13 )); 14 15 // 獲取會話配置 16 IoSessionConfig cfg = acceptor.getSessionConfig(); 17 18 // 設置緩衝區大小 19 cfg.setReadBufferSize(4096); 20 // 設置 session 空閒時間 21 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10); 22 23 // 設置 IO 句柄 24 acceptor.setHandler(new XxxHandler()); 25 acceptor.setReuseAddress(true); 26 27 try { 28 // 綁定端口 29 acceptor.bind(new InetSocketAddress("127.0.0.1", 4400)); 30 } catch (Exception ex) { 31 // 輸出錯誤日誌 32 System.error.println(ex); 33 }
目前 Netty 框架要比 MINA 流行的多,並且 Netty 對網絡黏包處理也作了很好的處理,不用開發者本身費那麼大勁。我也考慮過遷移到 Netty 框架上,不過目前尚未找到特別充分的理由。閒話很少說了,如下就是黏包處理代碼:apache
1 package com.game.gameServer.framework.mina; 2 3 import java.util.concurrent.ConcurrentHashMap; 4 5 import org.apache.mina.core.buffer.IoBuffer; 6 import org.apache.mina.core.filterchain.IoFilterAdapter; 7 import org.apache.mina.core.session.IoSession; 8 9 import com.game.gameServer.framework.FrameworkLog; 10 import com.game.gameServer.msg.SpecialMsgSerialUId; 11 import com.game.part.msg.IoBuffUtil; 12 13 /** 14 * 消息粘包處理 15 * 16 * @author hjj2017 17 * @since 2014/3/17 18 * 19 */ 20 class MsgCumulativeFilter extends IoFilterAdapter { 21 /** 22 * 從客戶端接收的消息估計長度, 23 * {@value} 字節, 24 * 對於從客戶端接收的數據來講, 都是簡單的命令! 25 * 不多超過 {@value}B 26 * 27 */ 28 private static final int DECODE_MSG_LEN = 64; 29 /** 容器 Buff 字典 */ 30 private static final ConcurrentHashMap<Long, IoBuffer> _containerBuffMap = new ConcurrentHashMap<>(); 31 32 @Override 33 public void sessionClosed(NextFilter nextFilter, IoSession sessionObj) throws Exception { 34 if (nextFilter == null || 35 sessionObj == null) { 36 // 若是參數對象爲空, 37 // 則直接退出! 38 FrameworkLog.LOG.error("null nextFilter or sessionObj"); 39 return; 40 } 41 42 // 移除容器 Buff 43 removeContainerBuff(sessionObj); 44 // 向下傳遞 45 super.sessionClosed(nextFilter, sessionObj); 46 } 47 48 @Override 49 public void messageReceived( 50 NextFilter nextFilter, IoSession sessionObj, Object msgObj) throws Exception { 51 if (nextFilter == null || 52 sessionObj == null) { 53 // 若是參數對象爲空, 54 // 則直接退出! 55 FrameworkLog.LOG.error("null nextFilter or sessionObj"); 56 return; 57 } 58 59 // 獲取會話 UId 60 long sessionUId = sessionObj.getId(); 61 62 if (!(msgObj instanceof IoBuffer)) { 63 // 若是消息對象不是 ByteBuff, 64 // 則直接向下傳遞! 65 FrameworkLog.LOG.warn("msgObj is not a IoBuff, sessionUId = " + sessionUId); 66 super.messageReceived(nextFilter, sessionObj, msgObj); 67 } 68 69 // 獲取輸入 Buff 70 IoBuffer inBuff = (IoBuffer)msgObj; 71 72 if (!inBuff.hasRemaining()) { 73 // 若是沒有剩餘內容, 74 // 則直接退出! 75 FrameworkLog.LOG.error("inBuff has not remaining, sessionUId = " + sessionUId); 76 return; 77 } else if (inBuff.remaining() <= 8) { 78 // 若是 <= 8 字節, 79 // 那仍是執行粘包處理過程吧 ... 80 // 8 字節 = 消息長度 ( Short ) + 消息類型 ( Short ) + 時間戳 ( Int ) 81 // 若是比這個長度都小, 82 // 那確定不是一條完整消息 ... 83 this.msgRecv_0(nextFilter, sessionObj, inBuff); 84 return; 85 } 86 87 // 獲取消息長度 88 final int msgSize = inBuff.getShort(); 89 inBuff.position(0); 90 91 if (msgSize == inBuff.limit() && 92 containerBuffIsEmpty(sessionObj)) { 93 // 94 // 若是消息長度和極限值恰好相同, 95 // 而且容器 Buff 中沒有任何內容 ( 即, 上一次消息沒有粘包 ), 96 // 那麼直接向下傳遞! 97 // 98 super.messageReceived( 99 nextFilter, sessionObj, inBuff 100 ); 101 } else { 102 // 103 // 若是消息長度和極限值不一樣, 104 // 則說明是網絡粘包! 105 // 這時候跳轉到粘包處理過程 ... 106 // 107 this.msgRecv_0(nextFilter, sessionObj, inBuff); 108 } 109 } 110 111 /** 112 * 接收連包消息 113 * 114 * @param nextFilter 115 * @param sessionObj 116 * @param inBuff 117 * @throws Exception 118 * 119 */ 120 private void msgRecv_0( 121 NextFilter nextFilter, IoSession sessionObj, IoBuffer inBuff) throws Exception { 122 if (nextFilter == null || 123 sessionObj == null) { 124 // 若是參數對象爲空, 125 // 則直接退出! 126 FrameworkLog.LOG.error("null nextFilter or sessionObj"); 127 return; 128 } 129 130 // 獲取會話 UId 131 long sessionUId = sessionObj.getId(); 132 // 獲取容器 Buff 133 IoBuffer containerBuff = getContainerBuff(sessionObj); 134 135 // 添加新 Buff 到容器 Buff 的末尾 136 IoBuffUtil.append(containerBuff, inBuff); 137 // 令 position = 0 138 containerBuff.position(0); 139 140 // // 記錄調試信息 141 // FrameworkLog.LOG.debug("\nin = [ " + inBuff.getHexDump() + " ]"); 142 143 for (int i = 0; ; i++) { 144 // // 記錄調試信息 145 // FrameworkLog.LOG.debug( 146 // "i = " + i 147 // + "\nco = [ " + containerBuff.getHexDump() + " ]" 148 // + "\nco.pos = " + containerBuff.position() 149 // + "\nco.lim = " + containerBuff.limit() 150 // ); 151 152 if (containerBuff.remaining() < 4) { 153 // 154 // 若是剩餘字節數 < 4, 155 // 這樣根本沒法識別出消息類型 msgSerialUId ... 156 // 直接退出! 157 // 在退出前, 158 // 準備好接收下一次消息! 159 // 160 IoBuffUtil.readyToNext(containerBuff); 161 return; 162 } 163 164 // 獲取原始位置 165 final int oldPos = containerBuff.position(); 166 // 獲取消息長度和類型 167 final int msgSize = containerBuff.getShort(); 168 final int msgSerialUId = containerBuff.getShort(); 169 170 // // 記錄調試信息 171 // FrameworkLog.LOG.debug( 172 // "i = " + i 173 // + "\nmsgSize = " + msgSize 174 // + "\nmsgSerialUId = " + msgSerialUId 175 // ); 176 177 // 還原原始位置 178 containerBuff.position(oldPos); 179 180 if (msgSerialUId == SpecialMsgSerialUId.CG_FLASH_POLICY || 181 msgSerialUId == SpecialMsgSerialUId.CG_QQ_TGW) { 182 // 183 // 若是是 Flash 安全策略消息, 184 // 或者是騰訊網關消息, 185 // 則嘗試找一下 0 字節的位置 ... 186 // 187 int pos0 = IoBuffUtil.indexOf(containerBuff, (byte)0); 188 189 if (pos0 <= -1) { 190 // 若是找不到 0 字節的位置, 191 // 則說明消息還沒接收完, 192 // 準備接受下次消息並直接退出! 193 IoBuffUtil.readyToNext(containerBuff); 194 return; 195 } 196 197 // 複製 Buff 內容 198 containerBuff.position(0); 199 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, pos0); 200 201 // 更新 Buff 位置 202 final int newPos = containerBuff.position() + pos0; 203 containerBuff.position(newPos); 204 // 壓縮容器 Buff 205 IoBuffUtil.compact(containerBuff); 206 207 // 向下傳遞 208 super.messageReceived( 209 nextFilter, sessionObj, realBuff 210 ); 211 continue; 212 } 213 214 if (msgSize <= 0) { 215 // 216 // 若是消息長度 <= 0, 217 // 則直接退出! 218 // 這種狀況多是消息已經亂套了 ... 219 // 仍是從新來過吧! 220 // 221 FrameworkLog.LOG.error("i = " + i + ", msgSize = " + msgSize + ", sessionUId = " + sessionUId); 222 // 將容器 Buff 內容清空 223 containerBuff.position(0); 224 containerBuff.flip(); 225 // 壓縮容器 Buff 226 IoBuffUtil.compact(containerBuff); 227 return; 228 } 229 230 if (containerBuff.remaining() < msgSize) { 231 // 232 // 若是消息長度不夠, 233 // 則多是出現網絡粘包狀況了 ... 234 // 直接退出就能夠了! 235 // 236 FrameworkLog.LOG.warn( 237 "i = " + i 238 + ", msgSize = " + msgSize 239 + ", containerBuff.remaining = " + containerBuff.remaining() 240 + ", sessionUId = " + sessionUId 241 ); 242 243 // 準備接受下一次消息 244 IoBuffUtil.readyToNext(containerBuff); 245 return; 246 } 247 248 // 建立新 Buff 並複製字節內容 249 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, msgSize); 250 251 if (realBuff == null) { 252 // 253 // 若是真實的 Buff 爲空, 254 // 則直接退出! 255 // 這種狀況可能也是消息亂套了 ... 256 // 記錄一下錯誤信息 257 // 258 FrameworkLog.LOG.error("i = " + i + ", null realBuff, sessionUId = " + sessionUId); 259 } else { 260 // // 記錄調試信息 261 // FrameworkLog.LOG.debug( 262 // "i = " + i 263 // + "\nreal = [ " + realBuff.getHexDump() + " ]" 264 // + "\nreal.pos = " + realBuff.position() 265 // + "\nreal.lim = " + realBuff.limit() 266 // ); 267 268 // 向下傳遞 269 super.messageReceived( 270 nextFilter, sessionObj, realBuff 271 ); 272 } 273 274 // 更新位置 275 containerBuff.position(containerBuff.position() + msgSize); 276 // 壓縮容器 Buff 277 IoBuffUtil.compact(containerBuff); 278 } 279 } 280 281 /** 282 * 獲取玩家的 Buff, 若是爲空則新建一個! 283 * 284 * @param sessionObj 285 * @return 286 * 287 */ 288 private static IoBuffer getContainerBuff(IoSession sessionObj) { 289 if (sessionObj == null) { 290 // 若是參數對象爲空, 291 // 則直接退出! 292 return null; 293 } 294 295 // 獲取會話 UId 296 long sessionUId = sessionObj.getId(); 297 // 獲取容器 Buff 298 IoBuffer containerBuff = _containerBuffMap.get(sessionUId); 299 300 if (containerBuff == null) { 301 // 建立緩存 Buff 302 containerBuff = IoBuffer.allocate(DECODE_MSG_LEN); 303 containerBuff.setAutoExpand(true); 304 containerBuff.setAutoShrink(true); 305 containerBuff.position(0); 306 containerBuff.flip(); 307 // 緩存 Buff 對象 308 Object oldVal = _containerBuffMap.putIfAbsent(sessionUId, containerBuff); 309 310 if (oldVal != null) { 311 FrameworkLog.LOG.warn("exists oldVal"); 312 } 313 } 314 315 return containerBuff; 316 } 317 318 /** 319 * 移除容器 Buff 320 * 321 * @param sessionObj 322 * 323 */ 324 private static void removeContainerBuff(IoSession sessionObj) { 325 if (sessionObj == null) { 326 // 若是參數對象爲空, 327 // 則直接退出! 328 return; 329 } 330 331 // 獲取會話 UId 332 long sessionUId = sessionObj.getId(); 333 // 獲取容器 Buff 334 IoBuffer containerBuff = _containerBuffMap.get(sessionUId); 335 336 if (containerBuff != null) { 337 // 是否所佔資源 338 containerBuff.clear(); 339 } 340 341 // 移除玩家的 Buff 對象 342 _containerBuffMap.remove(sessionUId); 343 } 344 345 /** 346 * 容器 Buff 爲空 ? 347 * 348 * @param sessionObj 349 * @return 350 * 351 */ 352 private static boolean containerBuffIsEmpty(IoSession sessionObj) { 353 if (sessionObj == null) { 354 // 若是參數對象爲空, 355 // 則直接退出! 356 return false; 357 } 358 359 // 獲取容器 Buff 360 IoBuffer containerBuff = getContainerBuff(sessionObj); 361 362 if (containerBuff == null) { 363 // 若是容器爲空, 364 // 則直接退出! 365 FrameworkLog.LOG.error("null containerBuff, sessionUId = " + sessionObj.getId()); 366 return false; 367 } else { 368 // 若是當前位置和極限值都爲 0, 369 // 則斷定爲空! 370 return (containerBuff.position() == 0 371 && containerBuff.limit() == 0); 372 } 373 } 374 }