netty: marshalling傳遞對象,傳輸附件GzipUtils前端
前端與服務端傳輸文件時,須要雙方須要進行解壓縮,也就是Java序列化。可使用java進行對象序列化,netty去傳輸,但java序列化硬傷太多(沒法跨語言,碼流太大,性能過低),因此最好使用主流的編輯碼框架來配合netty使用。此處使用的是JBossMarshalling框架。
用到的包:java
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>2.0.0.CR1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.CR1</version> </dependency>
用到的壓縮包工具類:框架
gziputils.javaide
public class GzipUtils { public static byte[] gzip(byte[] data) throws Exception{ ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(bos); gzip.write(data); gzip.finish(); gzip.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } public static byte[] ungzip(byte[] data) throws Exception{ ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream gzip = new GZIPInputStream(bis); byte[] buf = new byte[1024]; int num = -1; ByteArrayOutputStream bos = new ByteArrayOutputStream(); while((num = gzip.read(buf, 0 , buf.length)) != -1 ){ bos.write(buf, 0, num); } gzip.close(); bis.close(); byte[] ret = bos.toByteArray(); bos.flush(); bos.close(); return ret; } public static void main(String[] args) throws Exception{ //讀取文件 String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "Netty+3.1中文用戶手冊.doc.jpg"; File file = new File(readPath); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); System.out.println("文件原始大小:" + data.length); //測試壓縮 byte[] ret1 = GzipUtils.gzip(data); System.out.println("壓縮以後大小:" + ret1.length); byte[] ret2 = GzipUtils.ungzip(ret1); System.out.println("還原以後大小:" + ret2.length); //寫出文件 String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "Netty+3.1中文用戶手冊.doc.jpg"; FileOutputStream fos = new FileOutputStream(writePath); fos.write(ret2); fos.close(); } }
Request.java類工具
public class Request implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String name; private String requestMessage; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } }
Response.java類oop
public class Response implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
MarshallingCodeCFactory.java性能
序列號編碼解碼類測試
public final class MarshallingCodeCFactory { /** * 解碼器 * @return */ public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建MarshallingDecoder對象,兩個參數分別爲provider和消息序列化後的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1); return decoder; } /** * 編碼器 * @return */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //構建MarshallingEncoder對象,參數爲provider; MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
開始開發client,server功能ui
server.javathis
public class Server { public static void main(String[] args) throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub //設置編碼解碼 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); boss.shutdownGracefully(); worker.shutdownGracefully(); } }
serverHandler.java
須要繼承ChannelHandlerAdapter類
public class ServerHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //super.channelRead(ctx, msg); Request request = (Request) msg; System.out.println("Server: " + request.getId() + ","+request.getName()+","+request.getRequestMessage()); //接收附件 寫入文件 byte[] attachment = GzipUtils.ungzip(request.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + request.getId() +".png"; FileOutputStream outputStream = new FileOutputStream(path); outputStream.write(attachment); outputStream.close(); //返回數據 Response response = new Response(); response.setId(request.getId()); response.setName("response: " + request.getName()); response.setResponseMessage("相應的內容: " + request.getRequestMessage()); ctx.writeAndFlush(response); } }
client.java類
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub //設置編碼解碼 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i=0; i< 5; i++) { Request request = new Request(); request.setId(i + ""); request.setName( "pro"+ i); request.setRequestMessage("數據信息Client~Server:" + i); //發送附件 String path = System.getProperty("user.dir") + File.separatorChar + "resources" + File.separatorChar + "1.png"; File file = new File(path); FileInputStream inputStream = new FileInputStream(file); byte[] data = new byte[inputStream.available()]; inputStream.read(data); inputStream.close(); request.setAttachment(GzipUtils.gzip(data)); cf.channel().writeAndFlush(request); } System.out.println("user.dir: " + System.getProperty("user.dir") + File.separatorChar + "resources" + File.separatorChar + "1.png" ); cf.channel().closeFuture().sync(); worker.shutdownGracefully(); } }
ClientHandler.java類
須要繼承ChannelHandlerAdapter類
public class ClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //super.channelRead(ctx, msg); try { Response response = (Response) msg; System.out.println("Client : " + response.getId() + ","+response.getName()+","+response.getResponseMessage()); } finally { // TODO: handle finally clause ReferenceCountUtil.release(msg); } } }
目錄以下: