須知java
1. 須要的jar包jboss-marshalling-serial-1.3.0.CR9.jar、jboss-marshalling-1.3.0.CR9.jar、netty-all-5.jarbootstrap
2. 傳輸對象須要實現Serializable接口socket
Serveride
package com.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) throws Exception { //1.第一個線程是用於接收client鏈接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2.第二個線程是用於實際的業務處理操做的 EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup). channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(1234).sync(); f.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
Client工具
package com.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture f = b.connect("127.0.0.1", 1234).sync(); for(int i=0;i<5;i++){ Req req = new Req(); req.setId(i+""); req.setName("Hello Word"+i); req.setRequestMessage("數據信息"+i); f.channel().writeAndFlush(req); } f.channel().closeFuture().sync(); group.shutdownGracefully(); } }
ClientHandleroop
package com.netty; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Resp resp = (Resp) msg; System.out.println(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); } }
ServerHandlerui
package com.netty; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Req req = (Req) msg; System.out.println(req); Resp resp = new Resp(); resp.setId(req.getId()); resp.setName(req.getName()); resp.setResponseMessage("響應消息"+req.getId()); ctx.writeAndFlush(resp); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); } }
MarshallingCodeCFactorythis
package com.netty; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; public class MarshallingCodeCFactory { /** * 建立Jboss Marshalling解碼器MarshallingDecoder * @return */ public static MarshallingDecoder buildMarshallingDecoder(){ //首先經過Marshalling工具類的方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立MarshallingConfiguration對象,配置版本號爲5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據MarshallerFactory和configuration建立provide UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建netty的marshallingDecoder,兩個參數分別爲provider和單個消息序列化後的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider,1024*1024*1); return decoder; } /** * 建立Jboss Marshalling編碼器MarshallingEncoder * @return */ public static MarshallingEncoder buildMarshallingEncoder(){ //首先經過Marshalling工具類的方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立MarshallingConfiguration對象,配置版本號爲5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據MarshallerFactory和configuration建立provide MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
Req編碼
package com.netty; import java.io.Serializable; public class Req implements Serializable { private static final long serialVersionUID = 1L; private String id; private String name; private String requestMessage; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } @Override public String toString() { return "Req [id=" + id + ", name=" + name + ", requestMessage=" + requestMessage + "]"; } }
Resp.net
package com.netty; import java.io.Serializable; public class Resp implements Serializable { private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } @Override public String toString() { return "Resp [id=" + id + ", name=" + name + ", responseMessage=" + responseMessage + "]"; } }
輸出的數據
server端
Req [id=0, name=Hello Word0, requestMessage=數據信息0]
Req [id=1, name=Hello Word1, requestMessage=數據信息1]
Req [id=2, name=Hello Word2, requestMessage=數據信息2]
Req [id=3, name=Hello Word3, requestMessage=數據信息3]
Req [id=4, name=Hello Word4, requestMessage=數據信息4]
client端
Resp [id=0, name=Hello Word0, responseMessage=響應消息0] Resp [id=1, name=Hello Word1, responseMessage=響應消息1] Resp [id=2, name=Hello Word2, responseMessage=響應消息2] Resp [id=3, name=Hello Word3, responseMessage=響應消息3] Resp [id=4, name=Hello Word4, responseMessage=響應消息4]