TCP在網絡通信的時候,一般在解決TCP粘包、拆包問題的時候,通常會用如下幾種方式:java
一、 消息定長 例如每一個報文的大小固定爲200個字節,若是不夠,空位補空格;bootstrap
二、 在消息尾部添加特殊字符進行分割,如添加回車;服務器
三、 將消息分爲消息體和消息頭,在消息頭裏麪包含表示消息長度的字段,而後進行業務邏輯的處理。網絡
在Netty中咱們主要利用對象的序列化進行對象的傳輸,雖然Java自己的序列化也能完成,可是Java序列化有不少問題,如後字節碼流太大,以及序列化程度過低等。Jboss的序列化有程度較高、序列化後碼流較小。這裏利用Jboss的Marshalling測試一個簡單的對象序列化。異步
新建Maven工程,引入Netty5和Jboss的Marshalling。socket
注:這裏的Marshalling的版本,若是版本過低,可能會出現消息發送失敗的問題。我在測試的時候起先用的是1.3.9,結果就是消息發送失敗,打印異常信息發現是空指針的問題。ide
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.Beta2</version> </dependency>
一、服務端工具
package com.netty.parry.ende4; import com.netty.parry.ende3.MarshallingCodeCFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public void start(int port) throws Exception { // 配置NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { // 服務器輔助啓動類配置 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_RCVBUF, 32 * 1024) .option(ChannelOption.SO_SNDBUF, 32 * 1024) .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChildChannelHandler()); // 綁定端口 同步等待綁定成功 ChannelFuture f = b.bind(port).sync(); // 等到服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅釋放線程資源 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } /** * 網絡事件處理器 */ private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加Jboss的序列化,編解碼工具 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); // 處理網絡IO ch.pipeline().addLast(new ServerHandler()); } } public static void main(String[] args) throws Exception { new Server().start(8765); } }
二、服務端IO處理類oop
package com.netty.parry.ende4; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { // 用於獲取客戶端發送的信息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 用於獲取客戶端發來的數據信息 Message body = (Message) msg; System.out.println("Server接受的客戶端的信息 :" + body.toString()); // 寫數據給客戶端 Message response = new Message("歡迎您,與服務端鏈接成功"); // 當服務端完成寫操做後,關閉與客戶端的鏈接 ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
三、客戶端測試
package com.netty.parry.ende4; import com.netty.parry.ende3.MarshallingCodeCFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { /** * 鏈接服務器 * * @param port * @param host * @throws Exception */ public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { // 客戶端輔助啓動類 對客戶端配置 Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new MyChannelHandler()); // 異步連接服務器 同步等待連接成功 ChannelFuture f = b.connect(host, port).sync(); // 等待連接關閉 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); System.out.println("客戶端優雅的釋放了線程資源..."); } } /** * 網絡事件處理器 */ private class MyChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("MyChannelHandler"); // 添加Jboss的序列化,編解碼工具 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); // 處理網絡IO ch.pipeline().addLast(new ClientHandler()); } } public static void main(String[] args) throws Exception { new Client().connect(8765, "127.0.0.1"); } }
四、客戶端IO處理類
package com.netty.parry.ende4; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { // 客戶端與服務端,鏈接成功的售後 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 發送消息 Message request1 = new Message("666"); ctx.writeAndFlush(request1).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("成功發送到服務端消息"); } else { System.out.println("失敗服務端消息失敗:"+future.cause().getMessage()); future.cause().printStackTrace(); } } }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Message response = (Message) msg; System.out.println(response); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
五、消息體
package com.netty.parry.ende4; import java.io.Serializable; public class Message implements Serializable{ /** * */ private static final long serialVersionUID = -5296315429304117678L; private String body; public String getBody() { return body; } public void setBody(String body) { this.body = body; } public Message(String body) { super(); this.body = body; } public Message() { super(); } @Override public String toString() { return "Message [body=" + body + "]"; } }