Mina、Netty、Twisted一塊兒學(三):TCP消息固定大小的前綴(Header)

上一篇博文中,有介紹到用換行符分割消息的方法。可是這種方法有個小問題,若是消息中自己就包含換行符,那將會將這條消息分割成兩條,結果就不對了。html

本文介紹另一種消息分割方式,即上一篇博文中講的第2條:use a fixed length header that indicates the length of the body,用一個固定字節數的Header前綴來指定Body的字節數,以此來分割消息。react

上面圖中Header固定爲4字節,Header中保存的是一個4字節(32位)的整數,例如12即爲0x0000000C,這個整數用來指定Body的長度(字節數)。當讀完這麼多字節的Body以後,又是下一條消息的Header。git

下面分別用MINA、Netty、Twisted來實現對這種消息的切合和解碼。github

MINA:服務器

MINA提供了PrefixedStringCodecFactory來對這種類型的消息進行編碼解碼,PrefixedStringCodecFactory默認Header的大小是4字節,固然也能夠指定成1或2。session

public class TcpServer {  
  
    public static void main(String[] args) throws IOException {  
        IoAcceptor acceptor = new NioSocketAcceptor();  
          
        // 4字節的Header指定Body的字節數,對這種消息的處理  
        acceptor.getFilterChain().addLast("codec",   
                new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));  
          
        acceptor.setHandler(new TcpServerHandle());  
        acceptor.bind(new InetSocketAddress(8080));  
    }  
  
}  
  
class TcpServerHandle extends IoHandlerAdapter {  
  
    @Override  
    public void exceptionCaught(IoSession session, Throwable cause)  
            throws Exception {  
        cause.printStackTrace();  
    }  
  
    // 接收到新的數據  
    @Override  
    public void messageReceived(IoSession session, Object message)  
            throws Exception {  
  
        String msg = (String) message;  
        System.out.println("messageReceived:" + msg);  
          
    }  
  
    @Override  
    public void sessionCreated(IoSession session) throws Exception {  
        System.out.println("sessionCreated");  
    }  
  
    @Override  
    public void sessionClosed(IoSession session) throws Exception {  
        System.out.println("sessionClosed");  
    }  
} 

Netty:異步

Netty使用LengthFieldBasedFrameDecoder來處理這種消息。下面代碼中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5個參數,分別是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength爲消息的最大長度,lengthFieldOffset爲Header的位置,lengthFieldLength爲Header的長度,lengthAdjustment爲長度調整(默認Header中的值表示Body的長度,並不包含Header本身),initialBytesToStrip爲去掉字節數(默認解碼後返回Header+Body的所有內容,這裏設爲4表示去掉4字節的Header,只留下Body)。socket

public class TcpServer {  
  
    public static void main(String[] args) throws InterruptedException {  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {  
            ServerBootstrap b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup)  
                    .channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        public void initChannel(SocketChannel ch)  
                                throws Exception {  
                            ChannelPipeline pipeline = ch.pipeline();  
                              
                            // LengthFieldBasedFrameDecoder按行分割消息,取出body  
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));  
                            // 再按UTF-8編碼轉成字符串  
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));  
                              
                            pipeline.addLast(new TcpServerHandler());  
                        }  
                    });  
            ChannelFuture f = b.bind(8080).sync();  
            f.channel().closeFuture().sync();  
        } finally {  
            workerGroup.shutdownGracefully();  
            bossGroup.shutdownGracefully();  
        }  
    }  
  
}  
  
class TcpServerHandler extends ChannelInboundHandlerAdapter {  
  
    // 接收到新的數據  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) {  
          
        String message = (String) msg;  
        System.out.println("channelRead:" + message);  
    }  
  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) {  
        System.out.println("channelActive");  
    }  
  
    @Override  
    public void channelInactive(ChannelHandlerContext ctx) {  
        System.out.println("channelInactive");  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        cause.printStackTrace();  
        ctx.close();  
    }  
}  

Twisted:ide

在Twisted中須要繼承Int32StringReceiver,再也不繼承Protocol。Int32StringReceiver表示固定32位(4字節)的Header,另外還有Int16StringReceiver、Int8StringReceiver等。而須要實現的接受數據事件的方法再也不是dataReceived,也不是lineReceived,而是stringReceived。函數

# -*- coding:utf-8 –*-  
  
from twisted.protocols.basic import Int32StringReceiver  
from twisted.internet.protocol import Factory  
from twisted.internet import reactor  
  
class TcpServerHandle(Int32StringReceiver):  
  
    # 新的鏈接創建  
    def connectionMade(self):  
        print 'connectionMade'  
  
    # 鏈接斷開  
    def connectionLost(self, reason):  
        print 'connectionLost'  
  
    # 接收到新的數據  
    def stringReceived(self, data):  
        print 'stringReceived:' + data  
  
factory = Factory()  
factory.protocol = TcpServerHandle  
reactor.listenTCP(8080, factory)  
reactor.run()  

下面是Java編寫的一個客戶端測試程序:

public class TcpClient {  
  
    public static void main(String[] args) throws IOException {  
  
        Socket socket = null;  
        DataOutputStream out = null;  
  
        try {  
  
            socket = new Socket("localhost", 8080);  
            out = new DataOutputStream(socket.getOutputStream());  
  
            // 請求服務器  
            String data1 = "牛頓";  
            byte[] outputBytes1 = data1.getBytes("UTF-8");  
            out.writeInt(outputBytes1.length); // write header  
            out.write(outputBytes1); // write body  
              
            String data2 = "愛因斯坦";  
            byte[] outputBytes2 = data2.getBytes("UTF-8");  
            out.writeInt(outputBytes2.length); // write header  
            out.write(outputBytes2); // write body  
              
            out.flush();  
  
        } finally {  
            // 關閉鏈接  
            out.close();  
            socket.close();  
        }  
    }  
} 

MINA服務器輸出結果:

sessionCreated
messageReceived:牛頓
messageReceived:愛因斯坦
sessionClosed

Netty服務器輸出結果:

channelActive
channelRead:牛頓
channelRead:愛因斯坦
channelInactive

Twisted服務器輸出結果:

connectionMade
stringReceived:牛頓
stringReceived:愛因斯坦
connectionLost

MINA、Netty、Twisted一塊兒學系列

MINA、Netty、Twisted一塊兒學(一):實現簡單的TCP服務器

MINA、Netty、Twisted一塊兒學(二):TCP消息邊界問題及按行分割消息

MINA、Netty、Twisted一塊兒學(三):TCP消息固定大小的前綴(Header)

MINA、Netty、Twisted一塊兒學(四):定製本身的協議

MINA、Netty、Twisted一塊兒學(五):整合protobuf

MINA、Netty、Twisted一塊兒學(六):session

MINA、Netty、Twisted一塊兒學(七):發佈/訂閱(Publish/Subscribe)

MINA、Netty、Twisted一塊兒學(八):HTTP服務器

MINA、Netty、Twisted一塊兒學(九):異步IO和回調函數

MINA、Netty、Twisted一塊兒學(十):線程模型

MINA、Netty、Twisted一塊兒學(十一):SSL/TLS

MINA、Netty、Twisted一塊兒學(十二):HTTPS

源碼

https://github.com/wucao/mina-netty-twisted

相關文章
相關標籤/搜索