[TOC]java
本篇文章將介紹JT808協議的解析思路。
另請大神繞路,不喜勿噴!
先寫個大體的思路,有疑問能夠聯繫本人,聯繫方式:git
emial: hylexus@163.comgithub
數據類型 | 描述及要求 |
---|---|
BYTE | 無符號單字節整形(字節, 8 位) |
WORD | 無符號雙字節整形(字, 16 位) |
DWORD | 無符號四字節整形(雙字, 32 位) |
BYTE[n] | n 字節 |
BCD[n] | 8421 碼, n 字節 |
STRING | GBK 編碼,若無數據,置空 |
標識位 | 消息頭 | 消息體 | 校驗碼 | 標識位 |
---|---|---|---|---|
1byte(0x7e) | 16byte | 1byte | 1byte(0x7e) |
消息ID(0-1) 消息體屬性(2-3) 終端手機號(4-9) 消息流水號(10-11) 消息包封裝項(12-15) byte[0-1] 消息ID word(16) byte[2-3] 消息體屬性 word(16) bit[0-9] 消息體長度 bit[10-12] 數據加密方式 此三位都爲 0,表示消息體不加密 第 10 位爲 1,表示消息體通過 RSA 算法加密 其它保留 bit[13] 分包 1:消息體衛長消息,進行分包發送處理,具體分包信息由消息包封裝項決定 0:則消息頭中無消息包封裝項字段 bit[14-15] 保留 byte[4-9] 終端手機號或設備ID bcd[6] 根據安裝後終端自身的手機號轉換 手機號不足12 位,則在前面補 0 byte[10-11] 消息流水號 word(16) 按發送順序從 0 開始循環累加 byte[12-15] 消息包封裝項 byte[0-1] 消息包總數(word(16)) 該消息分包後得總包數 byte[2-3] 包序號(word(16)) 從 1 開始 若是消息體屬性中相關標識位肯定消息分包處理,則該項有內容 不然無該項
整個消息體結構中最複雜的就是消息頭了。算法
如下是對整個消息體抽象出來的一個java實體類。bootstrap
import java.nio.channels.Channel; public class PackageData { /** * 16byte 消息頭 */ protected MsgHeader msgHeader; // 消息體字節數組 protected byte[] msgBodyBytes; /** * 校驗碼 1byte */ protected int checkSum; //記錄每一個客戶端的channel,以便下發信息給客戶端 protected Channel channel; public MsgHeader getMsgHeader() { return msgHeader; } //TODO set 和 get 方法在此處省略 //消息頭 public static class MsgHeader { // 消息ID protected int msgId; /////// ========消息體屬性 // byte[2-3] protected int msgBodyPropsField; // 消息體長度 protected int msgBodyLength; // 數據加密方式 protected int encryptionType; // 是否分包,true==>有消息包封裝項 protected boolean hasSubPackage; // 保留位[14-15] protected String reservedBit; /////// ========消息體屬性 // 終端手機號 protected String terminalPhone; // 流水號 protected int flowId; //////// =====消息包封裝項 // byte[12-15] protected int packageInfoField; // 消息包總數(word(16)) protected long totalSubPackage; // 包序號(word(16))此次發送的這個消息包是分包中的第幾個消息包, 從 1 開始 protected long subPackageSeq; //////// =====消息包封裝項 //TODO set 和 get 方法在此處省略 } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hylexus.jt808.util.BCD8421Operater; import cn.hylexus.jt808.util.BitOperator; import cn.hylexus.jt808.vo.PackageData; import cn.hylexus.jt808.vo.PackageData.MsgHeader; public class MsgDecoder { private static final Logger log = LoggerFactory.getLogger(MsgDecoder.class); private BitOperator bitOperator; private BCD8421Operater bcd8421Operater; public MsgDecoder() { this.bitOperator = new BitOperator(); this.bcd8421Operater = new BCD8421Operater(); } //字節數組到消息體實體類 public PackageData queueElement2PackageData(byte[] data) { PackageData ret = new PackageData(); // 1. 16byte 或 12byte 消息頭 MsgHeader msgHeader = this.parseMsgHeaderFromBytes(data); ret.setMsgHeader(msgHeader); int msgBodyByteStartIndex = 12; // 2. 消息體 // 有子包信息,消息體起始字節後移四個字節:消息包總數(word(16))+包序號(word(16)) if (msgHeader.isHasSubPackage()) { msgBodyByteStartIndex = 16; } byte[] tmp = new byte[msgHeader.getMsgBodyLength()]; System.arraycopy(data, msgBodyByteStartIndex, tmp, 0, tmp.length); ret.setMsgBodyBytes(tmp); // 3. 去掉分隔符以後,最後一位就是校驗碼 // int checkSumInPkg = // this.bitOperator.oneByteToInteger(data[data.length - 1]); int checkSumInPkg = data[data.length - 1]; int calculatedCheckSum = this.bitOperator.getCheckSum4JT808(data, 0, data.length - 1); ret.setCheckSum(checkSumInPkg); if (checkSumInPkg != calculatedCheckSum) { log.warn("檢驗碼不一致,msgid:{},pkg:{},calculated:{}", msgHeader.getMsgId(), checkSumInPkg, calculatedCheckSum); } return ret; } private MsgHeader parseMsgHeaderFromBytes(byte[] data) { MsgHeader msgHeader = new MsgHeader(); // 1. 消息ID word(16) // byte[] tmp = new byte[2]; // System.arraycopy(data, 0, tmp, 0, 2); // msgHeader.setMsgId(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setMsgId(this.parseIntFromBytes(data, 0, 2)); // 2. 消息體屬性 word(16)=================> // System.arraycopy(data, 2, tmp, 0, 2); // int msgBodyProps = this.bitOperator.twoBytesToInteger(tmp); int msgBodyProps = this.parseIntFromBytes(data, 2, 2); msgHeader.setMsgBodyPropsField(msgBodyProps); // [ 0-9 ] 0000,0011,1111,1111(3FF)(消息體長度) msgHeader.setMsgBodyLength(msgBodyProps & 0x1ff); // [10-12] 0001,1100,0000,0000(1C00)(加密類型) msgHeader.setEncryptionType((msgBodyProps & 0xe00) >> 10); // [ 13_ ] 0010,0000,0000,0000(2000)(是否有子包) msgHeader.setHasSubPackage(((msgBodyProps & 0x2000) >> 13) == 1); // [14-15] 1100,0000,0000,0000(C000)(保留位) msgHeader.setReservedBit(((msgBodyProps & 0xc000) >> 14) + ""); // 消息體屬性 word(16)<================= // 3. 終端手機號 bcd[6] // tmp = new byte[6]; // System.arraycopy(data, 4, tmp, 0, 6); // msgHeader.setTerminalPhone(this.bcd8421Operater.bcd2String(tmp)); msgHeader.setTerminalPhone(this.parseBcdStringFromBytes(data, 4, 6)); // 4. 消息流水號 word(16) 按發送順序從 0 開始循環累加 // tmp = new byte[2]; // System.arraycopy(data, 10, tmp, 0, 2); // msgHeader.setFlowId(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setFlowId(this.parseIntFromBytes(data, 10, 2)); // 5. 消息包封裝項 // 有子包信息 if (msgHeader.isHasSubPackage()) { // 消息包封裝項字段 msgHeader.setPackageInfoField(this.parseIntFromBytes(data, 12, 4)); // byte[0-1] 消息包總數(word(16)) // tmp = new byte[2]; // System.arraycopy(data, 12, tmp, 0, 2); // msgHeader.setTotalSubPackage(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setTotalSubPackage(this.parseIntFromBytes(data, 12, 2)); // byte[2-3] 包序號(word(16)) 從 1 開始 // tmp = new byte[2]; // System.arraycopy(data, 14, tmp, 0, 2); // msgHeader.setSubPackageSeq(this.bitOperator.twoBytesToInteger(tmp)); msgHeader.setSubPackageSeq(this.parseIntFromBytes(data, 12, 2)); } return msgHeader; } protected String parseStringFromBytes(byte[] data, int startIndex, int lenth) { return this.parseStringFromBytes(data, startIndex, lenth, null); } private String parseStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) { try { byte[] tmp = new byte[lenth]; System.arraycopy(data, startIndex, tmp, 0, lenth); return new String(tmp, "UTF-8"); } catch (Exception e) { log.error("解析字符串出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth) { return this.parseBcdStringFromBytes(data, startIndex, lenth, null); } private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) { try { byte[] tmp = new byte[lenth]; System.arraycopy(data, startIndex, tmp, 0, lenth); return this.bcd8421Operater.bcd2String(tmp); } catch (Exception e) { log.error("解析BCD(8421碼)出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } private int parseIntFromBytes(byte[] data, int startIndex, int length) { return this.parseIntFromBytes(data, startIndex, length, 0); } private int parseIntFromBytes(byte[] data, int startIndex, int length, int defaultVal) { try { // 字節數大於4,從起始索引開始向後處理4個字節,其他超出部分丟棄 final int len = length > 4 ? 4 : length; byte[] tmp = new byte[len]; System.arraycopy(data, startIndex, tmp, 0, len); return bitOperator.byteToInteger(tmp); } catch (Exception e) { log.error("解析整數出錯:{}", e.getMessage()); e.printStackTrace(); return defaultVal; } } }
package cn.hylexus.jt808.util; public class BCD8421Operater { /** * BCD字節數組===>String * * @param bytes * @return 十進制字符串 */ public String bcd2String(byte[] bytes) { StringBuilder temp = new StringBuilder(bytes.length * 2); for (int i = 0; i < bytes.length; i++) { // 高四位 temp.append((bytes[i] & 0xf0) >>> 4); // 低四位 temp.append(bytes[i] & 0x0f); } return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp.toString().substring(1) : temp.toString(); } /** * 字符串==>BCD字節數組 * * @param str * @return BCD字節數組 */ public byte[] string2Bcd(String str) { // 奇數,前補零 if ((str.length() & 0x1) == 1) { str = "0" + str; } byte ret[] = new byte[str.length() / 2]; byte bs[] = str.getBytes(); for (int i = 0; i < ret.length; i++) { byte high = ascII2Bcd(bs[2 * i]); byte low = ascII2Bcd(bs[2 * i + 1]); // TODO 只遮罩BCD低四位? ret[i] = (byte) ((high << 4) | low); } return ret; } private byte ascII2Bcd(byte asc) { if ((asc >= '0') && (asc <= '9')) return (byte) (asc - '0'); else if ((asc >= 'A') && (asc <= 'F')) return (byte) (asc - 'A' + 10); else if ((asc >= 'a') && (asc <= 'f')) return (byte) (asc - 'a' + 10); else return (byte) (asc - 48); } }
package cn.hylexus.jt808.util; import java.util.Arrays; import java.util.List; public class BitOperator { /** * 把一個整形該爲byte * * @param value * @return * @throws Exception */ public byte integerTo1Byte(int value) { return (byte) (value & 0xFF); } /** * 把一個整形該爲1位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo1Bytes(int value) { byte[] result = new byte[1]; result[0] = (byte) (value & 0xFF); return result; } /** * 把一個整形改成2位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo2Bytes(int value) { byte[] result = new byte[2]; result[0] = (byte) ((value >>> 8) & 0xFF); result[1] = (byte) (value & 0xFF); return result; } /** * 把一個整形改成3位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo3Bytes(int value) { byte[] result = new byte[3]; result[0] = (byte) ((value >>> 16) & 0xFF); result[1] = (byte) ((value >>> 8) & 0xFF); result[2] = (byte) (value & 0xFF); return result; } /** * 把一個整形改成4位的byte數組 * * @param value * @return * @throws Exception */ public byte[] integerTo4Bytes(int value){ byte[] result = new byte[4]; result[0] = (byte) ((value >>> 24) & 0xFF); result[1] = (byte) ((value >>> 16) & 0xFF); result[2] = (byte) ((value >>> 8) & 0xFF); result[3] = (byte) (value & 0xFF); return result; } /** * 把byte[]轉化位整形,一般爲指令用 * * @param value * @return * @throws Exception */ public int byteToInteger(byte[] value) { int result; if (value.length == 1) { result = oneByteToInteger(value[0]); } else if (value.length == 2) { result = twoBytesToInteger(value); } else if (value.length == 3) { result = threeBytesToInteger(value); } else if (value.length == 4) { result = fourBytesToInteger(value); } else { result = fourBytesToInteger(value); } return result; } /** * 把一個byte轉化位整形,一般爲指令用 * * @param value * @return * @throws Exception */ public int oneByteToInteger(byte value) { return (int) value & 0xFF; } /** * 把一個2位的數組轉化位整形 * * @param value * @return * @throws Exception */ public int twoBytesToInteger(byte[] value) { // if (value.length < 2) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; return ((temp0 << 8) + temp1); } /** * 把一個3位的數組轉化位整形 * * @param value * @return * @throws Exception */ public int threeBytesToInteger(byte[] value) { int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; return ((temp0 << 16) + (temp1 << 8) + temp2); } /** * 把一個4位的數組轉化位整形,一般爲指令用 * * @param value * @return * @throws Exception */ public int fourBytesToInteger(byte[] value) { // if (value.length < 4) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; int temp3 = value[3] & 0xFF; return ((temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3); } /** * 把一個4位的數組轉化位整形 * * @param value * @return * @throws Exception */ public long fourBytesToLong(byte[] value) throws Exception { // if (value.length < 4) { // throw new Exception("Byte array too short!"); // } int temp0 = value[0] & 0xFF; int temp1 = value[1] & 0xFF; int temp2 = value[2] & 0xFF; int temp3 = value[3] & 0xFF; return (((long) temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3); } /** * 把一個數組轉化長整形 * * @param value * @return * @throws Exception */ public long bytes2Long(byte[] value) { long result = 0; int len = value.length; int temp; for (int i = 0; i < len; i++) { temp = (len - 1 - i) * 8; if (temp == 0) { result += (value[i] & 0x0ff); } else { result += (value[i] & 0x0ff) << temp; } } return result; } /** * 把一個長整形改成byte數組 * * @param value * @return * @throws Exception */ public byte[] longToBytes(long value){ return longToBytes(value, 8); } /** * 把一個長整形改成byte數組 * * @param value * @return * @throws Exception */ public byte[] longToBytes(long value, int len) { byte[] result = new byte[len]; int temp; for (int i = 0; i < len; i++) { temp = (len - 1 - i) * 8; if (temp == 0) { result[i] += (value & 0x0ff); } else { result[i] += (value >>> temp) & 0x0ff; } } return result; } /** * 獲得一個消息ID * * @return * @throws Exception */ public byte[] generateTransactionID() throws Exception { byte[] id = new byte[16]; System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 0, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 2, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 4, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 6, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 8, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 10, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 12, 2); System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 14, 2); return id; } /** * 把IP拆分位int數組 * * @param ip * @return * @throws Exception */ public int[] getIntIPValue(String ip) throws Exception { String[] sip = ip.split("[.]"); // if (sip.length != 4) { // throw new Exception("error IPAddress"); // } int[] intIP = { Integer.parseInt(sip[0]), Integer.parseInt(sip[1]), Integer.parseInt(sip[2]), Integer.parseInt(sip[3]) }; return intIP; } /** * 把byte類型IP地址轉化位字符串 * * @param address * @return * @throws Exception */ public String getStringIPValue(byte[] address) throws Exception { int first = this.oneByteToInteger(address[0]); int second = this.oneByteToInteger(address[1]); int third = this.oneByteToInteger(address[2]); int fourth = this.oneByteToInteger(address[3]); return first + "." + second + "." + third + "." + fourth; } /** * 合併字節數組 * * @param first * @param rest * @return */ public byte[] concatAll(byte[] first, byte[]... rest) { int totalLength = first.length; for (byte[] array : rest) { if (array != null) { totalLength += array.length; } } byte[] result = Arrays.copyOf(first, totalLength); int offset = first.length; for (byte[] array : rest) { if (array != null) { System.arraycopy(array, 0, result, offset, array.length); offset += array.length; } } return result; } /** * 合併字節數組 * * @param rest * @return */ public byte[] concatAll(List<byte[]> rest) { int totalLength = 0; for (byte[] array : rest) { if (array != null) { totalLength += array.length; } } byte[] result = new byte[totalLength]; int offset = 0; for (byte[] array : rest) { if (array != null) { System.arraycopy(array, 0, result, offset, array.length); offset += array.length; } } return result; } public float byte2Float(byte[] bs) { return Float.intBitsToFloat( (((bs[3] & 0xFF) << 24) + ((bs[2] & 0xFF) << 16) + ((bs[1] & 0xFF) << 8) + (bs[0] & 0xFF))); } public float byteBE2Float(byte[] bytes) { int l; l = bytes[0]; l &= 0xff; l |= ((long) bytes[1] << 8); l &= 0xffff; l |= ((long) bytes[2] << 16); l &= 0xffffff; l |= ((long) bytes[3] << 24); return Float.intBitsToFloat(l); } public int getCheckSum4JT808(byte[] bs, int start, int end) { if (start < 0 || end > bs.length) throw new ArrayIndexOutOfBoundsException("getCheckSum4JT808 error : index out of bounds(start=" + start + ",end=" + end + ",bytes length=" + bs.length + ")"); int cs = 0; for (int i = start; i < end; i++) { cs ^= bs[i]; } return cs; } public int getBitRange(int number, int start, int end) { if (start < 0) throw new IndexOutOfBoundsException("min index is 0,but start = " + start); if (end >= Integer.SIZE) throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but end = " + end); return (number << Integer.SIZE - (end + 1)) >>> Integer.SIZE - (end - start + 1); } public int getBitAt(int number, int index) { if (index < 0) throw new IndexOutOfBoundsException("min index is 0,but " + index); if (index >= Integer.SIZE) throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but " + index); return ((1 << index) & number) >> index; } public int getBitAtS(int number, int index) { String s = Integer.toBinaryString(number); return Integer.parseInt(s.charAt(index) + ""); } @Deprecated public int getBitRangeS(int number, int start, int end) { String s = Integer.toBinaryString(number); StringBuilder sb = new StringBuilder(s); while (sb.length() < Integer.SIZE) { sb.insert(0, "0"); } String tmp = sb.reverse().substring(start, end + 1); sb = new StringBuilder(tmp); return Integer.parseInt(sb.reverse().toString(), 2); } }
import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.kkbc.tpms.tcp.service.TCPServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; public class TCPServer2 { private Logger log = LoggerFactory.getLogger(getClass()); private volatile boolean isRunning = false; private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private int port; public TCPServer2() { } public TCPServer2(int port) { this(); this.port = port; } private void bind() throws Exception { this.bossGroup = new NioEventLoopGroup(); this.workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup)// .channel(NioServerSocketChannel.class) // .childHandler(new ChannelInitializer<SocketChannel>() { // @Override public void initChannel(SocketChannel ch) throws Exception { //超過15分鐘未收到客戶端消息則自動斷開客戶端鏈接 ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //ch.pipeline().addLast(new Decoder4LoggingOnly()); // 1024表示單條消息的最大長度,解碼器在查找分隔符的時候,達到該長度還沒找到的話會拋異常 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }), Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e }))); ch.pipeline().addLast(new TCPServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) // .childOption(ChannelOption.SO_KEEPALIVE, true); this.log.info("TCP服務啓動完畢,port={}", this.port); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } public synchronized void startServer() { if (this.isRunning) { throw new IllegalStateException(this.getName() + " is already started ."); } this.isRunning = true; new Thread(() -> { try { this.bind(); } catch (Exception e) { this.log.info("TCP服務啓動出錯:{}", e.getMessage()); e.printStackTrace(); } }, this.getName()).start(); } public synchronized void stopServer() { if (!this.isRunning) { throw new IllegalStateException(this.getName() + " is not yet started ."); } this.isRunning = false; try { Future<?> future = this.workerGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("workerGroup 沒法正常中止:{}", future.cause()); } future = this.bossGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("bossGroup 沒法正常中止:{}", future.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } this.log.info("TCP服務已經中止..."); } private String getName() { return "TCP-Server"; } public static void main(String[] args) throws Exception { TCPServer2 server = new TCPServer2(20048); server.startServer(); // Thread.sleep(3000); // server.stopServer(); } }
package cn.hylexus.jt808.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hylexus.jt808.server.SessionManager; import cn.hylexus.jt808.service.codec.MsgDecoder; import cn.hylexus.jt808.vo.PackageData; import cn.hylexus.jt808.vo.Session; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1) private final Logger logger = LoggerFactory.getLogger(getClass()); // 一個維護客戶端鏈接的類 private final SessionManager sessionManager; private MsgDecoder decoder = new MsgDecoder(); public TCPServerHandler() { this.sessionManager = SessionManager.getInstance(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2) try { ByteBuf buf = (ByteBuf) msg; if (buf.readableBytes() <= 0) { // ReferenceCountUtil.safeRelease(msg); return; } byte[] bs = new byte[buf.readableBytes()]; buf.readBytes(bs); PackageData jt808Msg = this.decoder.queueElement2PackageData(bs); // 處理客戶端消息 this.processClientMsg(jt808Msg); } finally { release(msg); } } private void processClientMsg(PackageData jt808Msg) { // TODO 更加消息ID的不一樣,分別實現本身的業務邏輯 if (jt808Msg.getMsgHeader().getMsgId() == 0x900) { // TODO ... } else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) { // TODO ... } // else if(){} // else if(){} // else if(){} // else if(){} // ... else { logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) logger.error("發生異常:{}", cause.getMessage()); cause.printStackTrace(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Session session = Session.buildSession(ctx.channel()); sessionManager.put(session.getId(), session); logger.debug("終端鏈接:{}", session); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String sessionId = ctx.channel().id().asLongText(); Session session = sessionManager.findBySessionId(sessionId); this.sessionManager.removeBySessionId(sessionId); logger.debug("終端斷開鏈接:{}", session); ctx.channel().close(); // ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel())); logger.error("服務器主動斷開鏈接:{}", session); ctx.close(); } } } private void release(Object msg) { try { ReferenceCountUtil.release(msg); } catch (Exception e) { e.printStackTrace(); } } }
package cn.hylexus.jt808.server; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.stream.Collectors; import cn.hylexus.jt808.vo.Session; public class SessionManager { private static volatile SessionManager instance = null; // netty生成的sessionID和Session的對應關係 private Map<String, Session> sessionIdMap; // 終端手機號和netty生成的sessionID的對應關係 private Map<String, String> phoneMap; public static SessionManager getInstance() { if (instance == null) { synchronized (SessionManager.class) { if (instance == null) { instance = new SessionManager(); } } } return instance; } public SessionManager() { this.sessionIdMap = new ConcurrentHashMap<>(); this.phoneMap = new ConcurrentHashMap<>(); } public boolean containsKey(String sessionId) { return sessionIdMap.containsKey(sessionId); } public boolean containsSession(Session session) { return sessionIdMap.containsValue(session); } public Session findBySessionId(String id) { return sessionIdMap.get(id); } public Session findByTerminalPhone(String phone) { String sessionId = this.phoneMap.get(phone); if (sessionId == null) return null; return this.findBySessionId(sessionId); } public synchronized Session put(String key, Session value) { if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) { this.phoneMap.put(value.getTerminalPhone(), value.getId()); } return sessionIdMap.put(key, value); } public synchronized Session removeBySessionId(String sessionId) { if (sessionId == null) return null; Session session = sessionIdMap.remove(sessionId); if (session == null) return null; if (session.getTerminalPhone() != null) this.phoneMap.remove(session.getTerminalPhone()); return session; } public Set<String> keySet() { return sessionIdMap.keySet(); } public void forEach(BiConsumer<? super String, ? super Session> action) { sessionIdMap.forEach(action); } public Set<Entry<String, Session>> entrySet() { return sessionIdMap.entrySet(); } public List<Session> toList() { return this.sessionIdMap.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList()); } }
請移步: https://github.com/hylexus/jt...數組
另請不要吝嗇,在GitHub給個star讓小可裝裝逼…………^_^服務器
急急忙忙寫的博客,先寫個大體的思路,有疑問能夠聯繫本人,聯繫方式:session
emial: hylexus@163.comapp