JBoss Marshalling 是一個Java 對象序列化包,對 JDK 默認的序列化框架進行了優化,但又保持跟 Java.io.Serializable 接口的兼容,同時增長了一些可調的參數和附件的特性, 這些參數和附加的特性, 這些參數和特性可經過工廠類進行配置.java
1. 下載 org.jboss.marshallingbootstrap
<dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.Beta2</version> </dependency>
2. 定義 POJO 對象,進行編解碼.框架
SubScriptReq
package object.server.impl; import java.io.Serializable; public class SubScriptReq implements Serializable { /** * */ private static final long serialVersionUID = 4686274228090335845L; private Integer subReq; private String userName; private String productName; private String address; public Integer getSubReq() { return subReq; } public void setSubReq(Integer subReq) { this.subReq = subReq; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "SubScriptReq [subReq=" + subReq + ", userName=" + userName + ", productName=" + productName + ", address=" + address + "]"; } }
SubscriptResp
package object.server.impl; import java.io.Serializable; public class SubscriptResp implements Serializable { /** * */ private static final long serialVersionUID = 4923081103118853877L; private Integer subScriptID; private String respCode; private String desc; public Integer getSubScriptID() { return subScriptID; } public void setSubScriptID(Integer subScriptID) { this.subScriptID = subScriptID; } public String getRespCode() { return respCode; } public void setRespCode(String respCode) { this.respCode = respCode; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "SubscriptResp [subScriptID=" + subScriptID + ", respCode=" + respCode + ", desc=" + desc + "]"; } }
3. Marshalling 構造工具異步
package object.server.impl; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; public class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder() { /* * 經過 Marshalling 工具類的 getProvidedMarshallerFactory * 靜態方法獲取MarshallerFactory 實例, , 參數 serial 表示建立的是 Java 序列化工廠對象.它是由 * jboss-marshalling-serial 包提供 */ final MarshallerFactory marshallerFactory = Marshalling .getProvidedMarshallerFactory("serial"); /* * 建立 */ final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider( marshallerFactory, configuration); /* * provider : 提供商 maxSize : 單個對象最大尺寸 */ int maxSize = 1024 << 2; MarshallingDecoder decoder = new MarshallingDecoder(provider, maxSize); return decoder; } public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling .getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider( marshallerFactory, configuration); MarshallingEncoder decoder = new MarshallingEncoder(provider); return decoder; } }
4. Netty 服務端代碼:socket
package object.server.impl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.marshalling.MarshallingDecoder; public class SubReqServer { public void start(int port) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); NioEventLoopGroup bossGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); // 配置 NioServerSocketChannel 的 tcp 參數, BACKLOG 的大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast( MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new SubReqHandler()); } }); // 綁定端口,隨後調用它的同步阻塞方法 sync 等等綁定操做成功,完成以後 Netty 會返回一個 ChannelFuture // 它的功能相似於的 Future,主要用於異步操做的通知回調. ChannelFuture channelFuture; try { channelFuture = bootstrap.bind(port).sync(); // 等待服務端監聽端口關閉,調用 sync 方法進行阻塞,等待服務端鏈路關閉以後 main 函數才退出. channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { SubReqServer server = new SubReqServer(); server.start(9091); } }
serverHandlertcp
package object.server.impl; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); SubscriptResp sub = new SubscriptResp(); sub.setDesc("desc"); sub.setSubScriptID(999); sub.setRespCode("0"); ctx.writeAndFlush(sub); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
5. client ide
package object.client.impl; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import object.server.impl.MarshallingCodeCFactory; public class SubReqClient { public void connect(String host, int port) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast( MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 發起異步連接操做 ChannelFuture future; try { future = bootstrap.connect(host, port).sync(); // 等待客戶端鏈路關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } public static void main(String[] args) { new SubReqClient().connect("localhost", 9091); } }
clientHandler 函數
package object.client.impl; import object.server.impl.SubScriptReq; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { SubScriptReq req = new SubScriptReq(); for (int i = 0; i < 100; i++) { req.setSubReq(999); req.setProductName("productName"); req.setUserName("userName"); req.setAddress("address"); ctx.writeAndFlush(req); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
注 : MarshallingDecoder 是自帶半包處理的.工具