package object.server.impl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class SubReqServer { public void start(int port) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); NioEventLoopGroup bossGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); // 配置 NioServerSocketChannel 的 tcp 參數, BACKLOG 的大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { /* * 使用 weakCachingConcurrentResolver 建立線程安全的 WeakReferenceMap * ,對類加載器進行緩存 * ,它支持多線程併發訪問,當虛擬機內存不足時,會釋放緩存中的內存,防止內存泄露,爲了房子異常碼流和解碼錯位致使的內存溢出 * ,這裏將當個對象序列化以後的字節數組長度設置爲1M */ ObjectDecoder objectDecoder = new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this .getClass().getClassLoader())); ch.pipeline().addLast(objectDecoder); ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new SubReqHandler()); } }); // 綁定端口,隨後調用它的同步阻塞方法 sync 等等綁定操做成功,完成以後 Netty 會返回一個 ChannelFuture // 它的功能相似於的 Future,主要用於異步操做的通知回調. ChannelFuture channelFuture; try { channelFuture = bootstrap.bind(port).sync(); // 等待服務端監聽端口關閉,調用 sync 方法進行阻塞,等待服務端鏈路關閉以後 main 函數才退出. channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { SubReqServer server = new SubReqServer(); server.start(9091); } }
package object.server.impl; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); SubscriptResp sub = new SubscriptResp(); ctx.writeAndFlush(sub); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
Clientjava
package object.client.impl; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class SubReqClient { public void connect(String host, int port) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { /* * 禁止堆類加載器進行緩存,他在基於 OSGI 的動態模塊化編程中常用,因爲 OSGI 能夠進行熱部署和熱升級,當某個 * bundle * 升級後,它對應的類加載器也將一塊兒升級,所以在動態模塊化的編程過程當中,不多對類加載器進行緩存,由於他隨時可能會發生變化. */ ch.pipeline().addLast( new ObjectDecoder(1024 >> 2, ClassResolvers .cacheDisabled(getClass().getClassLoader()))); ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 發起異步連接操做 ChannelFuture future; try { future = bootstrap.connect(host, port).sync(); // 等待客戶端鏈路關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } public static void main(String[] args) { new SubReqClient().connect("localhost", 9091); } }
ClientHandler編程
package object.client.impl; import object.server.impl.SubScriptReq; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { SubScriptReq req = new SubScriptReq(); req.setSubReq(999); ctx.writeAndFlush(req); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
POJObootstrap
package object.server.impl; import java.io.Serializable; public class SubscriptResp implements Serializable { /** * */ private static final long serialVersionUID = 4923081103118853877L; private Integer subScriptID; private String respCode; private String desc; public Integer getSubScriptID() { return subScriptID; } public void setSubScriptID(Integer subScriptID) { this.subScriptID = subScriptID; } public String getRespCode() { return respCode; } public void setRespCode(String respCode) { this.respCode = respCode; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "SubscriptResp [subScriptID=" + subScriptID + ", respCode=" + respCode + ", desc=" + desc + "]"; } }
req數組
package object.server.impl; import java.io.Serializable; public class SubScriptReq implements Serializable { /** * */ private static final long serialVersionUID = 4686274228090335845L; private Integer subReq; private String userName; private String productName; private String address; public Integer getSubReq() { return subReq; } public void setSubReq(Integer subReq) { this.subReq = subReq; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "SubScriptReq [subReq=" + subReq + ", userName=" + userName + ", productName=" + productName + ", address=" + address + "]"; } }