Netty入門教程:Netty拆包粘包技術講解

Netty編解碼技術是什麼意思呢?所謂的編解碼技術,說白了就是java序列化技術。序列化有兩個目的:java

一、進行網絡傳輸
二、對象持久化bootstrap

雖然咱們能夠使用java進行序列化,Netty去傳輸。可是java序列化的硬傷太多,好比java的序列化沒法跨平臺、序列化後碼流太大、序列化性能很是低等等...c#

碼流太大是什麼意思呢?好比說原先的我一篇文檔,好比說大小是1M,序列化完了以後可能0.5M,序列化減小二分之一的碼,比較大。而後0.5M去網絡傳輸這個不太好。你好比說用其它的一些主流序列化的話可能就0.01M,很是小。性能很是好。數組

性能過低就是說,我用java序列化的過程可能須要10s,而用其它的高性能序列化可能0.1s。差距就是這麼的大。網絡

序列化的目的無非就是網絡傳輸。而目前主流的序列化框架有如下幾種:框架

一、JBoss的Marshalling
二、Google的Protobuf
三、基於Protobuf的Kyro
四、MessagePack框架socket

其實咱們主要是講Marshalling和Google的Protobuf。這兩個是業界很是好用的框架。其中JBoss的Marshalling速度還要比Google的Protobuf要快,緣由是由於Marshalling不是跨語言,兩端都是java與java之間相互傳輸的。所以,在這種狀況下咱們就用它就好了。但若是你想實現跨語言,好比這邊是c#,另外一邊是java。這種跨語言進行通訊傳輸的話,那你就須要用到Google的Protobuf來進行跨語言的傳輸。性能也很是高。並且它本身有一些大端小端的優化機制。ide

下面開始Marshalling編碼實現。工具

首先新建一個java工程,導入netty和jboss-marshalling的jar包,導入幾張圖片到sources文件夾以便測試。oop

 

新建一個Req類,並編寫相關代碼

 1 package com.it448.serial;
 2 
 3 import java.io.Serializable;
 4 
 5 public class Req implements Serializable{
 6     private static final long serialVersionUID = 1L;
 7     
 8     private String id ;
 9     private String name ;
10     private String requestMessage ;
11     private byte[] attachment;
12     
13     public String getId() {
14         return id;
15     }
16     public void setId(String id) {
17         this.id = id;
18     }
19     public String getName() {
20         return name;
21     }
22     public void setName(String name) {
23         this.name = name;
24     }
25     public String getRequestMessage() {
26         return requestMessage;
27     }
28     public void setRequestMessage(String requestMessage) {
29         this.requestMessage = requestMessage;
30     }
31     public byte[] getAttachment() {
32         return attachment;
33     }
34     public void setAttachment(byte[] attachment) {
35         this.attachment = attachment;
36     }
37 }

 

新建一個Resp類,並編寫相關代碼

 1 package com.it448.serial;
 2 
 3 import java.io.Serializable;
 4 
 5 public class Resp implements Serializable{
 6     
 7     private static final long serialVersionUID = 1L;
 8     
 9     private String id;
10     private String name;
11     private String responseMessage;
12     
13     public String getId() {
14         return id;
15     }
16     public void setId(String id) {
17         this.id = id;
18     }
19     public String getName() {
20         return name;
21     }
22     public void setName(String name) {
23         this.name = name;
24     }
25     public String getResponseMessage() {
26         return responseMessage;
27     }
28     public void setResponseMessage(String responseMessage) {
29         this.responseMessage = responseMessage;
30     }
31 }

 

新建一個工具類GzipUtils,方便調用

 1 package com.it448.utils;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.File;
 6 import java.io.FileInputStream;
 7 import java.io.FileOutputStream;
 8 import java.util.zip.GZIPInputStream;
 9 import java.util.zip.GZIPOutputStream;
10 
11 public class GzipUtils {
12     public static byte[] gzip(byte[] data) throws Exception{
13         ByteArrayOutputStream bos = new ByteArrayOutputStream();
14         GZIPOutputStream gzip = new GZIPOutputStream(bos);
15         gzip.write(data);
16         gzip.finish();
17         gzip.close();
18         byte[] ret = bos.toByteArray();
19         bos.close();
20         return ret;
21     }
22     
23     public static byte[] ungzip(byte[] data) throws Exception{
24         ByteArrayInputStream bis = new ByteArrayInputStream(data);
25         GZIPInputStream gzip = new GZIPInputStream(bis);
26         byte[] buf = new byte[1024];
27         int num = -1;
28         ByteArrayOutputStream bos = new ByteArrayOutputStream();
29         while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
30             bos.write(buf, 0, num);
31         }
32         gzip.close();
33         bis.close();
34         byte[] ret = bos.toByteArray();
35         bos.flush();
36         bos.close();
37         return ret;
38     }
39     
40     public static void main(String[] args) throws Exception{
41         
42         // 讀取文件
43         String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "006.jpg";
44         File file = new File(readPath);  
45         FileInputStream in = new FileInputStream(file);  
46         byte[] data = new byte[in.available()];  
47         in.read(data);  
48         in.close();  
49         
50         System.out.println("文件原始大小:" + data.length);
51         // 測試壓縮
52         
53         byte[] ret1 = GzipUtils.gzip(data);
54         System.out.println("壓縮以後大小:" + ret1.length);
55         
56         byte[] ret2 = GzipUtils.ungzip(ret1);
57         System.out.println("還原以後大小:" + ret2.length);
58         
59         // 寫出文件
60         String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "006.jpg";
61         FileOutputStream fos = new FileOutputStream(writePath);
62         fos.write(ret2);
63         fos.close();    
64     }   
65 }

 

新建一個Marshalling工廠類MarshallingCodeCFactory.java

 1 package com.it448.serial;
 2 
 3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
 5 import io.netty.handler.codec.marshalling.MarshallerProvider;
 6 import io.netty.handler.codec.marshalling.MarshallingDecoder;
 7 import io.netty.handler.codec.marshalling.MarshallingEncoder;
 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider;
 9 
10 import org.jboss.marshalling.MarshallerFactory;
11 import org.jboss.marshalling.Marshalling;
12 import org.jboss.marshalling.MarshallingConfiguration;
13 
14 /**
15  * Marshalling工廠
16  * @author(xyh)
17  * @since 2019-06-12
18  */
19 public final class MarshallingCodeCFactory {
20 
21     /**
22           * 建立Jboss Marshalling解碼器MarshallingDecoder
23      * @return MarshallingDecoder
24      */
25     public static MarshallingDecoder buildMarshallingDecoder() {
26         // 首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。
27         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
28         // 建立了MarshallingConfiguration對象,配置了版本號爲5 
29         final MarshallingConfiguration configuration = new MarshallingConfiguration();
30         configuration.setVersion(5);
31         // 根據marshallerFactory和configuration建立provider
32         UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
33         // 構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度
34         MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
35         return decoder;
36     }
37 
38     /**
39           * 建立Jboss Marshalling編碼器MarshallingEncoder
40      * @return MarshallingEncoder
41      */
42     public static MarshallingEncoder buildMarshallingEncoder() {
43         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
44         final MarshallingConfiguration configuration = new MarshallingConfiguration();
45         configuration.setVersion(5);
46         MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
47         // 構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組
48         MarshallingEncoder encoder = new MarshallingEncoder(provider);
49         return encoder;
50     }
51 }

 

新建一個服務端的Handler類ServerHandler.java

 1 package com.it448.serial;
 2 
 3 import java.io.File;
 4 import java.io.FileOutputStream;
 5 
 6 import com.it448.utils.GzipUtils;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelInboundHandlerAdapter;
 9 
10 public class ServerHandler extends ChannelInboundHandlerAdapter{
11     @Override
12     public void channelActive(ChannelHandlerContext ctx) throws Exception {
13     }
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
17         Req req = (Req)msg;
18         System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
19         byte[] attachment = GzipUtils.ungzip(req.getAttachment());
20         
21         String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";
22         FileOutputStream fos = new FileOutputStream(path);
23         fos.write(attachment);
24         fos.close();
25         
26         Resp resp = new Resp();
27         resp.setId(req.getId());
28         resp.setName("resp" + req.getId());
29         resp.setResponseMessage("響應內容" + req.getId());
30         ctx.writeAndFlush(resp);
31     }
32 
33     @Override
34     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
35     }
36 
37     @Override
38     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
39         ctx.close();
40     }    
41 }

 

新建一個服務端類Server.java

 1 package com.it448.serial;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 
14 public class Server {
15     public static void main(String[] args) throws Exception{
16         EventLoopGroup pGroup = new NioEventLoopGroup();
17         EventLoopGroup cGroup = new NioEventLoopGroup();
18         
19         ServerBootstrap b = new ServerBootstrap();
20         b.group(pGroup, cGroup)
21          .channel(NioServerSocketChannel.class)
22          .option(ChannelOption.SO_BACKLOG, 1024)
23          // 設置日誌
24          .handler(new LoggingHandler(LogLevel.INFO))
25          .childHandler(new ChannelInitializer<SocketChannel>() {
26             protected void initChannel(SocketChannel sc) throws Exception {
27                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
28                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
29                 sc.pipeline().addLast(new ServerHandler());
30             }
31         });
32         
33         ChannelFuture cf = b.bind(8765).sync();
34         
35         cf.channel().closeFuture().sync();
36         pGroup.shutdownGracefully();
37         cGroup.shutdownGracefully();        
38     }
39 }

 

新建一個客戶端Handler類ClientHandler.java

 1 package com.it448.serial;
 2 
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 import io.netty.util.ReferenceCountUtil;
 6 
 7 public class ClientHandler extends ChannelInboundHandlerAdapter{
 8     @Override
 9     public void channelActive(ChannelHandlerContext ctx) throws Exception {
10     }
11 
12     @Override
13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14         try {
15             Resp resp = (Resp)msg;
16             System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
17         } finally {
18             ReferenceCountUtil.release(msg);
19         }
20     }
21 
22     @Override
23     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
24     }
25 
26     @Override
27     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
28         ctx.close();
29     }    
30 }

 

 

新建一個客戶端類Client.java

 1 package com.it448.serial;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioSocketChannel;
10 
11 import java.io.File;
12 import java.io.FileInputStream;
13 
14 import com.it448.utils.GzipUtils;
15 
16 public class Client {
17     public static void main(String[] args) throws Exception{
18         EventLoopGroup group = new NioEventLoopGroup();
19         Bootstrap b = new Bootstrap();
20         b.group(group)
21          .channel(NioSocketChannel.class)
22          .handler(new ChannelInitializer<SocketChannel>() {
23             @Override
24             protected void initChannel(SocketChannel sc) throws Exception {
25                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
26                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
27                 sc.pipeline().addLast(new ClientHandler());
28             }
29         });
30         
31         ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
32         
33         for(int i = 0; i < 1000; i++ ){
34             Req req = new Req();
35             req.setId("" + i);
36             req.setName("pro" + i);
37             req.setRequestMessage("數據信息" + i);    
38             String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
39             File file = new File(path);
40             FileInputStream in = new FileInputStream(file);  
41             byte[] data = new byte[in.available()];  
42             in.read(data);  
43             in.close(); 
44             req.setAttachment(GzipUtils.gzip(data));
45             cf.channel().writeAndFlush(req);
46         }
47 
48         cf.channel().closeFuture().sync();
49         group.shutdownGracefully();
50     }
51 }

 

代碼測試

首先啓動服務端,也就是運行Server類的main方法。

 

而後啓用客戶端,也就是運行Client類的main方法。

 

測試結果

 

從圖中能夠看到,receive文件夾多了一張001.jpg的圖片。說明圖片已經傳輸過來了。

好了,這部份內容就講到這裏,送上今天的福利:三套Netty系列教程【價值600】,加wxhaox就能夠領取。固然了,對應netty有任何疑問也均可以諮詢!!

end -- 1560313059

  -- 學而不思則罔,思而不學則殆

相關文章
相關標籤/搜索