1.首先交代maven依賴Marshalling API 和Marshalling Serial Protocoljava
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>2.0.2.Final</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.2.Final</version> <scope>test</scope> </dependency>
2.bootstrap
package com.test.frame.zookeeper.marshalling; import io.netty.handler.codec.marshalling.*; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * MarshallingCodeFactory class * * @author guanhuifang * @date 2017/10/31 下午2:45 **/ public final class MarshallingCodeFactory { /** * 建立Marshalling解碼器MarshallingDecoder * @return */ public static MarshallingDecoder buildMarshallingDecoder(){ /** * 利用Marshalling工具類的靜態方法getProvidedMarshallerFactory獲取MarshallerFactory實例 */ final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); /** * 建立MarshallingConfiguration對象,設置其版本爲5 */ final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); /** * 建立UnmarshallerProvider實例 */ UnmarshallerProvider provider=new DefaultUnmarshallerProvider(marshallerFactory,configuration); /** * 經過構造函數建立Netty的MarshallingDecoder對象,參數爲UnmarshallerProvider和單個消息序列化後的最大長度 */ MarshallingDecoder decoder = new MarshallingDecoder(provider,1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder(){ final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); /** * 建立MarshallerProvider對象 */ MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory,configuration); /** * MarshallingEncoder用於將實現序列化接口的pojo對象序列化爲二進制數組 */ MarshallingEncoder encoder =new MarshallingEncoder(provider); return encoder; } }
3.服務端開發數組
package com.test.frame.zookeeper.marshalling; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * SubReqServer class * * @author guanhuifang * @date 2017/10/31 下午2:32 **/ public class SubReqServer { public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new SubReqServer().bind(8080); } }
package com.test.frame.zookeeper.marshalling; import com.test.frame.zookeeper.pojo.SubscribeReq; import com.test.frame.zookeeper.pojo.SubscribeResp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * SubReqServerHandler class * * @author guanhuifang * @date 2017/10/31 下午2:35 **/ public class SubReqServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ SubscribeReq req = (SubscribeReq) msg; System.out.println("服務端收到消息體:"+req.toString()); if("gholly".equalsIgnoreCase(req.getUserName())){ ctx.writeAndFlush(subResp(req.getSubReqID())); } } private SubscribeResp subResp(int i){ SubscribeResp resp =new SubscribeResp(); resp.setSubReqID(i); resp.setRespCode(0); resp.setDesc("I love China"); return resp; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ ctx.close(); } }
4.客戶端開發socket
package com.test.frame.zookeeper.marshalling; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * SubReqClient class * * @author guanhuifang * @date 2017/10/31 下午2:38 **/ public class SubReqClient { public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new SubReqClient().connect("127.0.0.1",8080); } }
package com.test.frame.zookeeper.marshalling; import com.test.frame.zookeeper.pojo.SubscribeReq; import com.test.frame.zookeeper.pojo.SubscribeResp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * SubReqClientHandler class * * @author guanhuifang * @date 2017/10/31 下午2:41 **/ public class SubReqClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ System.out.println(msg); SubscribeResp resp = (SubscribeResp) msg; System.out.println("客戶端收到消息體爲:"+resp.toString()); } private SubscribeReq subReq(int i){ SubscribeReq req= new SubscribeReq(); req.setAddress("Shenzhen"); req.setPhoneNumber("110"); req.setProductName("netty"); req.setSubReqID(i); req.setUserName("gholly"); return req; } @Override public void channelActive(ChannelHandlerContext ctx){ for(int i=0;i<10;i++){ ctx.write(subReq(i)); } ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ ctx.close(); } }
運行結果:maven
netty的Marshalling編解碼器支持半包粘包的處理ide