netty網絡通訊中的tcp拆包問題

    工做中的一個項目,咱們的一個應用與銀行系統進行tcp通訊的時候,銀行下送的報文有時會分屢次返回。在tcp中這種數據包分屢次小數據包發送的狀況成爲拆包問題。java

其中一個,也是最多見的思路就是在報文的報文頭部分規定某一段表明本次發送的完整報文的長度,這樣接收方就會心中有數,在沒有接收到這個長度的報文以前,認爲本次通訊未完成,數據包還不完整,從而繼續等待下去。以前曾經遇到過這樣的問題,那時候是用的java socket逐個字節對報文進行接收,直到看到結尾符爲止。程序員

    只是此次項目原來的程序員用的netty框架,一開始沒有注意到如何在netty正確處理拆包問題。致使後續投產後,銀行返回的報文出現沒有完整接收的狀況,截斷在中文漢字處產生亂碼,致使異常。bootstrap

    下面介紹如何在netty中處理拆包問題。框架

    

server端代碼:socket

public class NettyServer {
    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

        // Set up the default event pipeline.
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new StringDecoder(), new StringEncoder(), new ServerHandler());
            }
        });

        // Bind and start to accept incoming connections.
        Channel bind = bootstrap.bind(new InetSocketAddress(8000));
        System.out.println("Server已經啓動,監聽端口: " + bind.getLocalAddress() + ", 等待客戶端註冊。。。");
    }

    private static class ServerHandler extends SimpleChannelHandler {
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (e.getMessage() instanceof String) {
                String message = (String) e.getMessage();
                System.out.println("Client發來:" + message);

              //  e.getChannel().write("Server已收到剛發送的:" + message+"\n");
                e.getChannel().write("000287<?xml version=\"1.0\" encoding=\"GB18030\"?><root><head><TransCode>1002</TransCode><TransDate>20161025</TransDate><TransTime>092745</TransTime>"+
   "<SeqNo>2016110542160157</SeqNo><ZoneCode>HZCQ</ZoneCode><TransRltCode>-25330</TransRltCode><TransRltMsg>000</TransRltMsg></head><body></body></root>");
               

                System.out.println("\n等待客戶端輸入。。。");
            }

            super.messageReceived(ctx, e);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            super.exceptionCaught(ctx, e);
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("有一個客戶端註冊上來了。。。");
            System.out.println("Client:" + e.getChannel().getRemoteAddress());
            System.out.println("Server:" + e.getChannel().getLocalAddress());
            System.out.println("\n等待客戶端輸入。。。");
            super.channelConnected(ctx, e);
        }
    }
}

 

client端代碼:tcp

public class NettyClient {

    public static void main(String[] args) {
        // Configure the client.
        ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

        // Set up the default event pipeline.
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new DeliDecoder(),
                                        new StringEncoder(), 
                                        new ClientHandler());
            }
        });

        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8000));

        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();

        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
    }

    private static class ClientHandler extends SimpleChannelHandler {
        private BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (e.getMessage() instanceof String) {
                String message = (String) e.getMessage();
                System.out.println(message);
                e.getChannel().write(sin.readLine());
            }

            super.messageReceived(ctx, e);
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("已經與Server創建鏈接。。。。");
            System.out.println("\n請輸入要發送的信息:");
            super.channelConnected(ctx, e);

            e.getChannel().write("ddddd");
        }
    }
}

拆包問題解決的關鍵:ide

public class DeliDecoder extends FrameDecoder{
    
    private static final Logger LOG = Logger.getLogger(DeliDecoder.class);
    private final int headLen = 6; //銀行回傳的報文前6位爲報文長度,前6位不計算在長度內

    @Override
    protected Object decode(ChannelHandlerContext chc, Channel channel,
            ChannelBuffer buffer) throws Exception {
        LOG.info("進入DeliDecoder.decode()");
        
         if (buffer.readableBytes() < headLen) {
            return null;    //return null表示繼續讀取,下同
        }
         LOG.info("buffer copy...");
         ChannelBuffer buffer2 = buffer.copy();    //直接用buffer.array()可能會報UnsupportedOperationException,故使用其copy
         LOG.info("buffer copy done");
         byte[] arr = buffer2.array();
         LOG.info("buffer array init");
         String temStr = new String(arr, "GB18030");
         LOG.info(temStr);
         int dataLength = Integer.parseInt(temStr.substring(0, 6));
        LOG.info("dataLength : " + dataLength);
        
        if (buffer.readableBytes() < dataLength + headLen) {
            return null;
        }

        buffer.skipBytes(headLen);        //從第7位開始讀取報文正文
        byte[] decoded = new byte[dataLength];
        buffer.readBytes(decoded);
        String msg = new String(decoded, "GB18030");
        return msg;
    }
}
相關文章
相關標籤/搜索