Netty實戰之使用Netty解析交通部JT808協議

1.寫此文的目的

使用Netty也有一段時間了,對Netty也有個大概的瞭解。回想起剛剛使用Netty的時候踩了不少坑,不少Netty的組件也不會使用,或者說用得不夠好,不能稱之爲"最佳實踐"。此文的目的即是帶領你們使用Netty構建出一個完整的項目,將本身在實際開發經驗中整理出的一些最佳實踐分享出來,固然這些最佳實踐不必定就是真正的最佳實踐,只是本身在開發中整理的,或者參考其餘優秀的代碼一塊兒整理出的,你們若是有什麼不一樣意見或者更好的實踐,歡迎你們在評論區分享,你們一塊兒學習一塊兒進步!php

2. 項目準備

先奉上完整版代碼 zpsw/jt808-nettygit

開發環境:IDEA+JDK1.8+Mavengithub

使用框架: Netty + Spring Boot + Spring Data JPA數據庫

其餘工具: lombok(沒用過的同窗建議瞭解一下,很方便)bash

3. 開發過程

3.1.認識JT808協議
3.2.構建編/解碼器
3.3.構建業務Handler
3.4.Channel的高效管理方式
3.5.一些改進
複製代碼

3.1 認識JT808協議

下面簡單介紹一下JT808協議的格式說明,徹底版在JT808協議技術規範.pdf網絡

其中消息體屬性中咱們先只關注消息體長度,不關注其餘,分包狀況先不考慮。數據結構

根據消息頭和消息體咱們能夠抽象出一個最基本的數據結構併發

@Data
public class DataPacket {

    protected Header header = new Header(); //消息頭
    protected ByteBuf byteBuf; //消息流

    @Data
    public static class Header {
        private short msgId;// 消息ID 2字節
        private short msgBodyProps;//消息體屬性 2字節
        private String terminalPhone; // 終端手機號 6字節
        private short flowId;// 流水號 2字節

        //獲取包體長度
        public short getMsgBodyLength() {
            return (short) (msgBodyProps & 0x3ff);
        }

        //獲取加密類型 3bits
        public byte getEncryptionType() {
            return (byte) ((msgBodyProps & 0x1c00) >> 10);
        }

        //是否分包
        public boolean hasSubPackage() {
            return ((msgBodyProps & 0x2000) >> 13) == 1;
        }
    }
}
複製代碼

咱們能夠先將Header解析出來,而後由子類本身解析包體框架

public void parse() {
        try{
            this.parseHead();
            //驗證包體長度
            if (this.header.getMsgBodyLength() != this.byteBuf.readableBytes()) {
                throw new RuntimeException("包體長度有誤");
            }
            this.parseBody();//由子類重寫
        }finally {
            ReferenceCountUtil.safeRelease(this.byteBuf);//注意釋放
        }
    }

    protected void parseHead() {
        header.setMsgId(byteBuf.readShort());
        header.setMsgBodyProps(byteBuf.readShort());
        header.setTerminalPhone(BCD.BCDtoString(readBytes(6)));
        header.setFlowId(byteBuf.readShort());
    }
    protected void parseBody() {

    }
複製代碼

其中readByte(int length)方法是對ByteBuf.readBytes(byte[] dst)的一個簡單封裝ide

public byte[] readBytes(int length) {
        byte[] bytes = new byte[length];
        this.byteBuf.readBytes(bytes);
        return bytes;
}
複製代碼

由於沒有在Netty官方的Api中找到相似的方法,因此本身定義了一個

另外定義一個方法用於響應重寫。

響應重寫:

public ByteBuf toByteBufMsg() {
        ByteBuf bb = ByteBufAllocator.DEFAULT.heapBuffer();
        bb.writeInt(0);//先佔4字節用來寫msgId和msgBodyProps
        bb.writeBytes(BCD.toBcdBytes(StringUtils.leftPad(this.header.getTerminalPhone(), 12, "0")));
        bb.writeShort(this.header.getFlowId());
        return bb;
}
**
"最佳實踐":儘可能使用內存池分配ByteBuf,效率相比非池化Unpooled.buffer()高不少,可是得注意釋放,不然會內存泄漏
在ChannelPipeLine中咱們可使用ctx.alloc()或者channel.alloc()獲取Netty默認內存分配器,
其餘地方不必定要創建獨有的內存分配器,能夠經過ByteBufAllocator.DEFAULT獲取,跟前面獲取的是同一個(不特別配置的話)。
**
複製代碼

這裏當咱們將響應轉化爲ByteBuf寫出去的時候,此時並不知道消息體的具體長度,全部此時咱們先佔住位置,回頭再來寫。

全部的消息都繼承自DataPacket,咱們挑出一個字段相對較多的-》 位置上報消息

而後咱們創建位置上報消息的數據結構,先看位置消息的格式

創建結構以下:

@Data
public class LocationMsg extends DataPacket {

    private int alarm; //告警信息 4字節
    private int statusField;//狀態 4字節
    private float latitude;//緯度 4字節
    private float longitude;//經度 4字節
    private short elevation;//海拔高度 2字節
    private short speed; //速度 2字節
    private short direction; //方向 2字節
    private String time; //時間 6字節BCD

    public LocationMsg(ByteBuf byteBuf) {
        super(byteBuf);
    }
    
    @Override
    public void parseBody() {
        ByteBuf bb = this.byteBuf;
        this.setAlarm(bb.readInt());
        this.setStatusField(bb.readInt());
        this.setLatitude(bb.readUnsignedInt() * 1.0F / 1000000);
        this.setLongitude(bb.readUnsignedInt() * 1.0F / 1000000);
        this.setElevation(bb.readShort());
        this.setSpeed(bb.readShort());
        this.setDirection(bb.readShort());
        this.setTime(BCD.toBcdTimeString(readBytes(6)));
    }
}
複製代碼

全部的消息若是沒有本身的應答的話,須要默認應答,默認應答格式以下

@Data
public class CommonResp extends DataPacket {

    private short replyFlowId; //應答流水號 2字節
    private short replyId; //應答 ID  2字節
    private byte result;    //結果 1字節

    public CommonResp() {
        this.getHeader().setMsgId(JT808Const.SERVER_RESP_COMMON);
    }

    @Override
    public ByteBuf toByteBufMsg() {
        ByteBuf bb = super.toByteBufMsg();
        bb.writeShort(replyFlowId);
        bb.writeShort(replyId);
        bb.writeByte(result);
        return bb;
    }
}

複製代碼

3.2 構建編/解碼器

解碼器

前面協議能夠看到,標識位爲0x7e,因此咱們第一個解碼器能夠用Netty自帶的DelimiterBasedFrameDecoder,其中的delimiters天然就是0x7e了。(Netty有不少自帶的編解碼器,建議先確認Netty自帶的不能知足需求,再本身自定義)

通過DelimiterBasedFrameDecoder幫咱們截斷以後,信息就到了咱們本身的解碼器中了,咱們的目的是將ByteBuf轉化爲咱們前面定義的數據結構。 定義解碼器

public class JT808Decoder extends ByteToMessageDecoder {
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    
    }
}
複製代碼

第一步:轉義還原,轉義規則以下

0x7d 0x01 -> 0x7d

0x7d 0x02 -> 0x7e

public ByteBuf revert(byte[] raw) {
        int len = raw.length;
        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(len);//DataPacket parse方法回收
        for (int i = 0; i < len; i++) {
            if (raw[i] == 0x7d && raw[i + 1] == 0x01) {
                buf.writeByte(0x7d);
                i++;
            } else if (raw[i] == 0x7d && raw[i + 1] == 0x02) {
                buf.writeByte(0x7e);
                i++;
            } else {
                buf.writeByte(raw[i]);
            }
        }
        return buf;
    }
複製代碼

第二步:校驗

byte pkgCheckSum = escape.getByte(escape.writerIndex() - 1);
    escape.writerIndex(escape.writerIndex() - 1);//排除校驗碼
    byte calCheckSum = JT808Util.XorSumBytes(escape);
    if (pkgCheckSum != calCheckSum) {
        log.warn("校驗碼錯誤,pkgCheckSum:{},calCheckSum:{}", pkgCheckSum, calCheckSum);
        ReferenceCountUtil.safeRelease(escape);//必定不要漏了釋放
        return null;
    }
複製代碼

第三步:解碼

public DataPacket parse(ByteBuf bb) {
        DataPacket packet = null;
        short msgId = bb.getShort(bb.readerIndex());
        switch (msgId) {
            case TERNIMAL_MSG_HEARTBEAT:
                packet = new HeartBeatMsg(bb);
                break;
            case TERNIMAL_MSG_LOCATION:
                packet = new LocationMsg(bb);
                break;
            case TERNIMAL_MSG_REGISTER:
                packet = new RegisterMsg(bb);
                break;
            case TERNIMAL_MSG_AUTH:
                packet = new AuthMsg(bb);
                break;
            case TERNIMAL_MSG_LOGOUT:
                packet = new LogOutMsg(bb);
                break;
            default:
                packet = new DataPacket(bb);
                break;
        }
        packet.parse();
        return packet;
    }
複製代碼

switch裏咱們儘可能將收到頻率高的放在前面,避免過多的if判斷

而後咱們將消息out.add(msg)就可讓消息到咱們的業務Handler中了。

編碼器

編碼器須要講咱們的DataPacket轉化爲ByteBuf,而後再轉義發送出去。 定義編碼器

public class JT808Encoder extends MessageToByteEncoder<DataPacket> {
    protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out) throws Exception {
    
    }
}
複製代碼

第一步:轉換

ByteBuf bb = msg.toByteBufMsg();
複製代碼

還記得咱們DataPacket轉換header時佔用了4個字節等到後面覆蓋嗎

bb.markWriterIndex();//標記一下,先到前面去寫覆蓋的,而後回到標記寫校驗碼
        short bodyLen = (short) (bb.readableBytes() - 12);
        short bodyProps = createDefaultMsgBodyProperty(bodyLen);
        //覆蓋佔用的4字節
        bb.writerIndex(0);
        bb.writeShort(msg.getHeader().getMsgId());
        bb.writeShort(bodyProps);
        bb.resetWriterIndex();
        bb.writeByte(JT808Util.XorSumBytes(bb));
複製代碼

第二步:轉義

public ByteBuf escape(ByteBuf raw) {
        int len = raw.readableBytes();
        ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(len + 12);//假設最多有12個須要轉義
        buf.writeByte(JT808Const.PKG_DELIMITER);
        while (len > 0) {
            byte b = raw.readByte();
            if (b == 0x7e) {
                buf.writeByte(0x7d);
                buf.writeByte(0x02);
            } else if (b == 0x7d) {
                buf.writeByte(0x7d);
                buf.writeByte(0x01);
            } else {
                buf.writeByte(b);
            }
            len--;
        }
        ReferenceCountUtil.safeRelease(raw);
        buf.writeByte(JT808Const.PKG_DELIMITER);
        return buf;
    }
**
"最佳實踐":咱們這裏返回ByteBuf是寫出去的,因此採用directBuffer效率更高
**
複製代碼

轉義完成,就直接發送出去了,固然不能忘了釋放。

ByteBuf escape = escape(bb);
        out.writeBytes(escape);
        ReferenceCountUtil.safeRelease(escape);
複製代碼

3.3 構建業務Handler

解碼器中咱們返回的是DataPacket對象,因此編寫Handler此時咱們有兩種選擇:

一種是定義一個Handler接收DataPacket而後判斷具體類型,以下圖

@Component
@ChannelHandler.Sharable
public class JT808ServerHandler extends SimpleChannelInboundHandler<DataPacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataPacket msg) throws Exception {
        log.debug(msg.toString());
        if (msg instanceof AuthMsg || msg instanceof HeartBeatMsg || msg instanceof LocationMsg || msg instanceof LogOutMsg) {
            CommonResp resp = CommonResp.success(msg, getFlowId(ctx));
            ctx.writeAndFlush(resp);
        } else if (msg instanceof RegisterMsg) {
            RegisterResp resp = RegisterResp.success(msg, getFlowId(ctx));
            ctx.writeAndFlush(resp);
        }
    }

}
複製代碼

另外一種是每一個DataPacket的子類型都定義一個Handler,以下圖

public class LocationMsgHandler extends SimpleChannelInboundHandler<LocationMsg> 
public class HeartBeatMsgHandler extends SimpleChannelInboundHandler<HeartBeatMsg> 
public class RegisterMsgHandler extends SimpleChannelInboundHandler<LogOutMsg> 
複製代碼

這裏我選擇第二種,一個緣由是由於代碼風格好,另外一個緣由後面會具體說明。

這裏列舉一個LocationMsgHandler的詳細代碼,將位置保存到數據庫而後回覆設備

@Slf4j
@Component
@ChannelHandler.Sharable
public class LocationMsgHandler extends BaseHandler<LocationMsg> {

    @Autowired
    private LocationRepository locationRespository;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LocationMsg msg) throws Exception {
        log.debug(msg.toString());
        locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
        CommonResp resp = CommonResp.success(msg, getSerialNumber(ctx.channel()));
        write(ctx, resp);
    }
}
複製代碼

BaseHandler繼承SimpleChannelInboundHandler ,裏面定義了一些通用的方法,例如getSerialNumber()獲取應答的流水號

private static final AttributeKey<Short> SERIAL_NUMBER = AttributeKey.newInstance("serialNumber");

    public short getSerialNumber(Channel channel){
        Attribute<Short> flowIdAttr = channel.attr(SERIAL_NUMBER);
        Short flowId = flowIdAttr.get();
        if (flowId == null) {
            flowId = 0;
        } else {
            flowId++;
        }
        flowIdAttr.set(flowId);
        return flowId;
    }

複製代碼

咱們將流水號存入Channel內部,方便維護。

3.4.Channel的高效管理方式

假設如今出現了一個需求,咱們須要找到一個特定的鏈接發送一條消息,在咱們這個項目裏,特定指的是根據header中的手機號找到鏈接併發送消息。咱們能夠本身維護一個Map用來存放全部Channel,可是這樣就浪費了Netty自帶的DefaultChannelGroup提供的一系列方法了。因此咱們改進一下,定義一個ChannelManager,內部採用DefaultChannelGroup維護Channel,本身維護手機號->ChannelId的映射關係。

@Component
public class ChannelManager {

    private static final AttributeKey<String> TERMINAL_PHONE = AttributeKey.newInstance("terminalPhone");
    
    private ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private Map<String, ChannelId> channelIdMap = new ConcurrentHashMap<>();

    private ChannelFutureListener remover = future ->
            channelIdMap.remove(future.channel().attr(TERMINAL_PHONE).get());


    public boolean add(String terminalPhone, Channel channel) {
        boolean added = channelGroup.add(channel);
        if (added) {
            channel.attr(TERMINAL_PHONE).set(terminalPhone);
            channel.closeFuture().addListener(remover);
            channelIdMap.put(terminalPhone, channel.id());
        }
        return added;
    }

    public boolean remove(String terminalPhone) {
        return channelGroup.remove(channelIdMap.remove(terminalPhone));
    }

    public Channel get(String terminalPhone) {
        return channelGroup.find(channelIdMap.get(terminalPhone));
    }

    public ChannelGroup getChannelGroup() {
        return channelGroup;
    }
}

複製代碼

咱們定義了一個ChannelFutureListener,當channel關閉時,會執行這個回調,幫助咱們維護本身的channelIdMap不至於太過臃腫,提高效率,DefaultChannelGroup中也是如此,因此沒必要擔憂Channel都不存在了 還佔用着內存這種狀況。另外咱們能夠將DefaultChannelGroup提供出去,以便某些時候進行廣播。

3.5.一些改進

1.咱們的LocationMsgHandler中出現了數據庫操做

locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
複製代碼

然而在Netty中,默認狀況下Handler由Reactor線程驅動,一旦阻塞就會大大下降併發能力,因此咱們定義一個專門的EventExecutorGroup(不認識的話能夠先理解爲線程池),用來驅動耗時的Handler,只要在初始化Channel時指定便可。前面所說的每一個DataPacket子類型定義一個Handler的另外一個好處就體如今這裏,咱們可讓那些耗時的Handler用專門的業務線程池去驅動,而不耗時的Handler由默認的Reactor線程驅動,增長了靈活性。

pipeline.addLast(heartBeatMsgHandler);
        pipeline.addLast(businessGroup,locationMsgHandler);//由於locationMsgHandler中涉及到數據庫操做,因此放入businessGroup
        pipeline.addLast(authMsgHandler);
        pipeline.addLast(registerMsgHandler);
        pipeline.addLast(logOutMsgHandler);
複製代碼

另外如解碼器parse()中的switch裏的case順序同樣,咱們這裏也能夠利用增長Handler的順序,節省一些if判斷。

2.接上面的,如今咱們LocationMsgHandler由businessGroup驅動了,然而寫響應的時候仍是會移交給Reactor線程,因此爲了減小一些判斷提高略微的性能,咱們能夠將write(ctx, resp);改成

workerGroup.execute(() -> write(ctx, resp));
複製代碼

其中的workerGroup正是啓動引導中的,咱們藉助Spring把它單獨定義成了bean,用的時候直接註解引入便可

serverBootstrap.group(bossGroup, workerGroup)
複製代碼

3.藉助Spring的力量咱們能夠將幾乎全部的組件定義成單例,提高了略微的性能,除了編碼器和解碼器,由於他們有一些屬性須要維護,不能定義爲單例。

結束語

一直看到這裏的朋友感謝大家的耐心,這是我第一次寫文章,有錯誤的地方還請多多包涵。

另外將徹底版代碼奉上 zpsw/jt808-netty

這也是我的開源的第一個項目,若是對你有幫助,給個Star將不勝感激。

附上一些其餘的Netty最佳實踐(轉自best practice in netty):

  • writeAndFlush不要一直調用, 是否能夠經過調用write,而且在適當的時間flush,由於每次系統flush都是一次系統調用,若是能夠的話write的調用次數也應該減小,由於它會通過整個pipeline(github.com/netty/netty…)
  • 若是你不是很關注write的結果,可使用channel.voidPromise(),能夠減小對象的建立
  • 一直寫對於處理能力較弱的接受者來講,可能會引發OutMemoryError,關注channel.isWritable()和channelhandler中的cahnnelWritabilityChanged()將會頗有幫助,channel.bytesBeforeUnwritable和channel.bytesBeforeWritable()一樣值得關注
  • 關注write_buffer_high_water_mark和write_buffer_low_water_mark的配置, 例如high:32kb(default 64kb), low:8kb(default 32kb)
  • 能夠經過channelpipeline 觸發custome events (pipeline.fireUserEventTriggered(MyCustomEvent)), 能夠在DuplexChannelHandler中處理相應的事件

還有一些英文的就不貼過來了

另外給新手安利一個網絡調試工具NetAssist網絡調試助手

再見

相關文章
相關標籤/搜索