Netty解決TCP的粘包和分包(二)java
使用LengthFieldBasedFrameDecoder解碼器分包git
先看一下這個類的的屬性,github
private final ByteOrder byteOrder; // private final int maxFrameLength; //定義最大幀的長度 private final int lengthFieldOffset; //長度屬性的起始指針(偏移量) private final int lengthFieldLength; //長度屬性的長度,即存放數據包長度的變量的的字節所佔的長度 private final int lengthFieldEndOffset; //根據lengthFieldOffset和lengthFieldLength計算出來的,即就是起始偏移量+長度=結束偏移量 private final int lengthAdjustment; private final int initialBytesToStrip; //解碼後的數據包須要跳過的頭部信息的字節數 private final boolean failFast;//這個和DelimiterBasedFrameDecoder是一致的,就是若是設置成true,當發現解析的數據超過maxFrameLenght就立馬報錯,不然當整個幀的數據解析完後才報錯 private boolean discardingTooLongFrame;//當前編碼器的狀態,是否是處於丟棄超長幀的狀態 private long tooLongFrameLength;//當出現超長幀的時候,這個超長幀的長度 private long bytesToDiscard;//當出現超長幀的時候,丟棄的數據的字節數
實現細節http://asialee.iteye.com/blog/1784844 服務器
http://bylijinnan.iteye.com/blog/1985706session
這裏是根據netty裏一個聊天程序改的,以下代碼,異步
服務器端代碼:socket
public final class SecureChatServer { static final int PORT = Integer .parseInt(System.getProperty("port", "8992")); public static void main(String[] args) throws Exception { SelfSignedCertificate ssc = new SelfSignedCertificate(); SslContext sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new SecureChatServerInitializer(sslCtx)); // sync 等待異步操做的完成(done) // closeFuture().sync()等待socket關閉操做的完成(done) b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class SecureChatServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public SecureChatServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(sslCtx.newHandler(ch.alloc())); // On top of the SSL handler, add the text line codec. // 4字節表示消息體的長度,服務器端解碼器 pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); // and then business logic. pipeline.addLast(new SecureChatServerHandler()); } }
public class SecureChatServerHandler extends SimpleChannelInboundHandler<String> { static final ChannelGroup channels = new DefaultChannelGroup( GlobalEventExecutor.INSTANCE); @Override public void channelActive(final ChannelHandlerContext ctx) { // Once session is secured, send a greeting and register the channel to the global channel // list so the channel received the messages from others. ctx.pipeline().get(SslHandler.class).handshakeFuture() .addListener(new GenericFutureListener<Future<Channel>>() { @Override public void operationComplete(Future<Channel> future) throws Exception { String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!"; ctx.writeAndFlush(ctx.alloc().buffer() .writeInt(welcome.length()) .writeBytes(welcome.getBytes())); String message = "Your session is protected by " + ctx.pipeline().get(SslHandler.class).engine() .getSession().getCipherSuite() + " cipher suite."; ctx.writeAndFlush(ctx.alloc().buffer() .writeInt(message.length()) .writeBytes(message.getBytes())); channels.add(ctx.channel()); } }); } @Override public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("[" + msg + "]"); String returnMsg; // Send the received message to all channels but the current one. for (Channel c : channels) { if (c != ctx.channel()) { returnMsg = "[" + ctx.channel().remoteAddress() + "] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } else { returnMsg = "[you] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } } // Close the connection if the client has sent 'bye'. if ("bye".equals(msg.toLowerCase())) { ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
客戶端代碼:ide
public final class SecureChatClient { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer .parseInt(System.getProperty("port", "8992")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx = SslContext .newClientContext(InsecureTrustManagerFactory.INSTANCE); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new SecureChatClientInitializer(sslCtx)); // Start the connection attempt. // sync 等待鏈接創建成功。由於鏈接在這裏表現爲異步操做,因此要等待鏈接的Future完成(done)。 Channel ch = b.connect(HOST, PORT).sync().channel(); // Read commands from the stdin. ChannelFuture lastWriteFuture = null; BufferedReader in = new BufferedReader(new InputStreamReader( System.in)); for (;;) { String line = in.readLine(); if (line == null) { break; } //獲取用戶的輸入,而後構造消息,發送消息 ByteBuf content = ch.alloc().buffer().writeInt(line.length()) .writeBytes(line.getBytes()); // Sends the received line to the server. lastWriteFuture = ch.writeAndFlush(content); // If user typed the 'bye' command, wait until the server closes // the connection. if ("bye".equals(line.toLowerCase())) { ch.closeFuture().sync(); break; } } // Wait until all messages are flushed before closing the channel. if (lastWriteFuture != null) { lastWriteFuture.sync(); } } finally { // The connection is closed automatically on shutdown. group.shutdownGracefully(); } } }
public class SecureChatClientInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public SecureChatClientInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(sslCtx.newHandler(ch.alloc(), SecureChatClient.HOST, SecureChatClient.PORT)); // On top of the SSL handler, add the text line codec. //使用該解碼器解碼服務器返回消息 pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); //字符串編碼器 // and then business logic. pipeline.addLast(new SecureChatClientHandler()); } }
public class SecureChatClientHandler extends SimpleChannelInboundHandler<String> { @Override public void messageReceived(ChannelHandlerContext ctx, String msg) { System.err.println(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
netty原來的example在這裏,https://github.com/netty/netty/tree/master/example/src/main/java/io/netty/example/securechat oop
如何在這裏使用LengthFieldBasedFrameDecoder解碼器,以下是在服務器端解碼器的配置,ui
// lengthFieldLength =4 字節表示實際消息體的長度 // initialBytesToStrip =4 字節表示解碼消息體的時候跳過長度 pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
當服務器端發送消息的時候,是這樣發的,
for (Channel c : channels) { if (c != ctx.channel()) { returnMsg = "[" + ctx.channel().remoteAddress() + "] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } else { returnMsg = "[you] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } }
必定要把實際消息體的長度寫入到buff中。相應的客戶端的配置見上面的代碼。
==================END==================