如何評價一個編解碼技術:html
Java也提供了序列化技術,在工業化工程中有如下缺點:java
下面咱們來測試如下jdk序列化的問題bootstrap
建立一個測試類UserInfo:api
1 import java.io.Serializable; 2 import java.nio.ByteBuffer; 3 4 /** 5 * @author Administrator 6 * @version 1.0 7 * @date 2014年2月23日 8 */ 9 public class UserInfo implements Serializable { 10 11 /** 12 * 默認的序列號 13 */ 14 private static final long serialVersionUID = 1L; 15 16 private String userName; 17 18 private int userID; 19 20 public UserInfo buildUserName(String userName) { 21 this.userName = userName; 22 return this; 23 } 24 25 public UserInfo buildUserID(int userID) { 26 this.userID = userID; 27 return this; 28 } 29 30 /** 31 * @return the userName 32 */ 33 public final String getUserName() { 34 return userName; 35 } 36 37 /** 38 * @param userName the userName to set 39 */ 40 public final void setUserName(String userName) { 41 this.userName = userName; 42 } 43 44 /** 45 * @return the userID 46 */ 47 public final int getUserID() { 48 return userID; 49 } 50 51 /** 52 * @param userID the userID to set 53 */ 54 public final void setUserID(int userID) { 55 this.userID = userID; 56 } 57 58 /** 59 * 將當前對象轉換一個byte[]數組 60 * @return 61 */ 62 public byte[] codeC() { 63 ByteBuffer buffer = ByteBuffer.allocate(1024); 64 //寫入userName長度和內容 65 byte[] value = this.userName.getBytes(); 66 buffer.putInt(value.length); 67 buffer.put(value); 68 //直接寫入Id 69 buffer.putInt(this.userID); 70 buffer.flip(); 71 value = null; 72 byte[] result = new byte[buffer.remaining()]; 73 buffer.get(result); 74 return result; 75 } 76 77 public byte[] codeC(ByteBuffer buffer) { 78 buffer.clear(); 79 byte[] value = this.userName.getBytes(); 80 buffer.putInt(value.length); 81 buffer.put(value); 82 buffer.putInt(this.userID); 83 buffer.flip(); 84 value = null; 85 byte[] result = new byte[buffer.remaining()]; 86 buffer.get(result); 87 return result; 88 } 89 }
其中的codeC是最樸素的編碼方法,咱們來和它比較如下數組
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; /** * @author Administrator * @version 1.0 * @date 2014年2月23日 */ public class TestUserInfo { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); System.out.println("The jdk serializable length is : " + b.length); bos.close(); System.out.println("-------------------------------------"); System.out.println("The byte array serializable length is : " + info.codeC().length); } }
結果有點不能接受,這麼一點就大了6倍緩存
"C:\Program Files (x86)\Java\jdk1.8.0_102\bin\java" -Didea.launcher.port=7537 "-Didea.launcher.bin.path=C:\dev\JetBrains\IntelliJ IDEA 2016.2.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\access-bridge-32.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\cldrdata.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\jaccess.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\jfxrt.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\nashorn.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jfr.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jfxswt.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\rt.jar;G:\projects-helloworld\netty\target\classes;G:\repo\maven\io\netty\netty-all\4.1.5.Final\netty-all-4.1.5.Final.jar;C:\dev\JetBrains\IntelliJ IDEA 2016.2.1\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain demo.codec.serializable.TestUserInfo The jdk serializable length is : 117 ------------------------------------- The byte array serializable length is : 24
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; /** * @author Administrator * @version 1.0 * @date 2014年2月23日 */ public class PerformTestUserInfo { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); int loop = 1000000; ByteArrayOutputStream bos = null; ObjectOutputStream os = null; long startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { bos = new ByteArrayOutputStream(); os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); bos.close(); } long endTime = System.currentTimeMillis(); System.out.println("The jdk serializable cost time is : " + (endTime - startTime) + " ms"); System.out.println("-------------------------------------"); ByteBuffer buffer = ByteBuffer.allocate(1024); startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { byte[] b = info.codeC(buffer); } endTime = System.currentTimeMillis(); System.out.println("The byte array serializable cost time is : " + (endTime - startTime) + " ms"); } }
運行結果,jdk的慢了10倍都不止服務器
The jdk serializable cost time is : 1928 ms ------------------------------------- The byte array serializable cost time is : 164 ms
這裏主要介紹這3種,還有其餘著名好比Hryo等等...數據結構
google內部久經考驗。它將數據結構以.proto文件進行描述,經過代碼生成工具能夠生成對應數據結構的POJO對象和Protobuf相關方法和屬性。app
特色:框架
(1) ProtoBuf使用二進制編碼,而不是XML,儘管XML的可讀性和擴展性都不錯,可是XML犧牲的空間和時間開銷太大,不適合高性能框架
(2) ProtoBuf另外一個吸引人的地方是數據描述文件和代碼生成機制
下面的圖頗有說服力,爲何這麼多人選擇Google的Protobuf
性能對比:
碼流對比:
對當時的Facebook而言,thrift用於解決各系統間大數量的傳輸通訊問題,所以能夠多種語言,C++ C# Cocoa Erlang Haskell Java Perl PHP Python Ruby和Smalltalk
Thrift主要由5部分組成:
(1) 語言系統和IDL編譯器:負責由用戶給定的IDL文件生成相應語言接口代碼;
(2) TProtocol: RPC協議層,能夠選擇多種不一樣的序列化方式,例如Binary和Json;
(3) TTransport:RPC傳輸層,一樣能夠選擇不一樣的傳輸層實現,例如socket NIO和MemoryBuffer等;
(4) Tprocessor: 做爲協議層和用戶提供的服務實現的紐帶,負責調用服務實現的接口;
(5) TServer:聚合TProtocol、TTransport和TProcessor等對象。
關注協議的話就是關於於Tprotocol層,其支持3中典型的編解碼方式:
下圖展現同等測試條件下的編解碼耗時信息:
JBoss內部使用,不能跨語言,能夠看作是jdk的進化版... 擁有優勢以下:
高效、性能、跨語言、碼流小、支持的語言由Java Python Ruby Hashkell C# OCaml Lua Go C C++等。
pom文件,guava是額外能夠不用.
<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.11</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency>
import com.google.common.collect.Lists; import org.msgpack.MessagePack; import org.msgpack.template.Templates; import java.util.List; /** * Created by carl.yu on 2016/12/15. */ public class ApiDemo { public static void main(String[] args) throws Exception { //使用了guava List<String> src = Lists.newArrayList("msgpack", "kumofs", "viver"); MessagePack msgpack = new MessagePack(); //序列化 byte[] raw = msgpack.write(src); //反序列化 List<String> dst1 = msgpack.read(raw, Templates.tList(Templates.TString)); System.out.println(dst1); } }
注意,要使用Messagepack,須要在實體類前加上註解@Message.
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.msgpack.MessagePack; import java.util.List; /** * Created by carl.yu on 2016/12/15. */ public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { //將msg中的字節寫到array中 System.out.println("開始進行解碼..."); final byte[] array; final int length = msg.readableBytes(); array = new byte[length]; msg.getBytes(msg.readerIndex(), array, 0, length); MessagePack msgpack = new MessagePack(); Object result = msgpack.read(array); out.add(result); } }
import com.google.common.base.Throwables; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.msgpack.MessagePack; /** * Created by carl.yu on 2016/12/15. */ public class MsgpackEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { //負責將POJO對象編碼爲byte數組 MessagePack msgpack = new MessagePack(); byte[] raw = null; try { raw = msgpack.write(msg); } catch (Exception e) { e.printStackTrace(); Throwables.propagateIfPossible(e); } out.writeBytes(raw); } }
分別用MessagePack進行編解碼
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.LengthFieldBasedFrameDecoder; 10 import io.netty.handler.codec.LengthFieldPrepender; 11 import io.netty.handler.logging.LogLevel; 12 import io.netty.handler.logging.LoggingHandler; 13 14 /** 15 * Created by carl.yu on 2016/12/15. 16 */ 17 public class EchoServer { 18 public void bind(int port) throws Exception { 19 // 配置服務端的NIO線程組 20 EventLoopGroup bossGroup = new NioEventLoopGroup(); 21 EventLoopGroup workerGroup = new NioEventLoopGroup(); 22 try { 23 ServerBootstrap b = new ServerBootstrap(); 24 b.group(bossGroup, workerGroup) 25 .channel(NioServerSocketChannel.class) 26 .option(ChannelOption.SO_BACKLOG, 100) 27 .handler(new LoggingHandler(LogLevel.INFO)) 28 .childHandler(new ChannelInitializer<SocketChannel>() { 29 @Override 30 public void initChannel(SocketChannel ch) 31 throws Exception { 32 //讀數據的時候用decoder解碼 33 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); 34 ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder()); 35 //寫數據的時候用encoder編碼 36 ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); 37 ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder()); 38 // 39 ch.pipeline().addLast(new EchoServerHandler()); 40 } 41 }); 42 43 // 綁定端口,同步等待成功 44 ChannelFuture f = b.bind(port).sync(); 45 46 // 等待服務端監聽端口關閉 47 f.channel().closeFuture().sync(); 48 } finally { 49 // 優雅退出,釋放線程池資源 50 bossGroup.shutdownGracefully(); 51 workerGroup.shutdownGracefully(); 52 } 53 } 54 55 public static void main(String[] args) throws Exception { 56 int port = 8080; 57 if (args != null && args.length > 0) { 58 try { 59 port = Integer.valueOf(args[0]); 60 } catch (NumberFormatException e) { 61 // 採用默認值 62 } 63 } 64 new EchoServer().bind(port); 65 } 66 }
主要在於2個編解碼器。
在MessagePack編碼器以前增長了LengthFieldPrepender,它將在ByteBuf以前增長字節的消息長度。
而後使用LengthFieldBasedFrameDecoder根據消息長度進行解碼,工做原理如圖:
這樣獲取到的永遠是整包消息,很是簡單的解決了煩人的半包問題。
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; /** * Created by carl.yu on 2016/12/15. */ public class EchoClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //讀數據的時候用decoder解碼 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder()); //寫數據的時候用encoder編碼 ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder()); ch.pipeline().addLast(new EchoClientHandler(100)); } }); // 發起異步鏈接操做 ChannelFuture f = b.connect(host, port).sync(); // 當代客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new EchoClient().connect(port, "127.0.0.1"); } }
import demo.codec.serializable.UserInfo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by carl.yu on 2016/12/15. */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final int sendNumber; public EchoClientHandler(int sendNumber) { this.sendNumber = sendNumber; } private UserInfo[] userInfo() { UserInfo[] userInfos = new UserInfo[sendNumber]; UserInfo userInfo = null; for (int i = 0; i < sendNumber; i++) { userInfo = new UserInfo(); userInfos[i] = userInfo; userInfo.setUserID(i); userInfo.setUserName("ABDCEFG-->" + i); } return userInfos; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { /* UserInfo userInfo = new UserInfo(); userInfo.setUserID(0); userInfo.setUserName("ABDCEFG-->" + 0);*/ UserInfo[] userInfos = userInfo(); for (int i = 0; i < userInfos.length; i++) { ctx.writeAndFlush(userInfos[i]); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客戶端收到信息:" + msg); // ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // ctx.flush(); // ctx.close(); } }
後面咱們會更加詳細的講解LengthFieldPrepender和LengthFieldBasedFrameDecoder,這裏只須要明白用來解決半包問題便可。
準備環境:
package netty; option java_package="demo.codec.protobuf"; option java_outer_classname="SubscribeReqProto"; message SubscribeReq{ required int32 subReqID = 1; required string userName = 2; required string productName = 3; repeated string address = 4; }
package netty; option java_package="demo.codec.protobuf"; option java_outer_classname="SubscribeRespProto"; message SubscribeResp{ required int32 subReqID = 1; required int32 respCode = 2; required string desc = 3; }
這裏不詳細介紹google protobuf的語法:https://developers.google.com/protocol-buffers/docs/proto?hl=zh-CN
protoc ./proto/*.proto --java_out=../main/java pause
google protobuf依賴maven:
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency>
運行build.bat,生成:
下面咱們運行如下代碼來了解Protobuf的用法:
import com.google.protobuf.InvalidProtocolBufferException; import java.util.ArrayList; import java.util.List; /** * @author Administrator * @version 1.0 * @date 2014年2月23日 */ public class TestSubscribeReqProto { // 編碼方法: Object->byte[] private static byte[] encode(SubscribeReqProto.SubscribeReq req) { return req.toByteArray(); } // 解碼方法: bayte[] -> Object private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException { return SubscribeReqProto.SubscribeReq.parseFrom(body); } /** * 建立實例 * * @return */ private static SubscribeReqProto.SubscribeReq createSubscribeReq() { //(1) Builder模式 SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq .newBuilder(); builder.setSubReqID(1); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } /** * @param args * @throws InvalidProtocolBufferException */ public static void main(String[] args) throws InvalidProtocolBufferException { SubscribeReqProto.SubscribeReq req = createSubscribeReq(); System.out.println("Before encode : " + req.toString()); SubscribeReqProto.SubscribeReq req2 = decode(encode(req)); System.out.println("After decode : " + req.toString()); System.out.println("Assert equal : --> " + req2.equals(req)); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class SubReqServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast( new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder( SubscribeReqProto.SubscribeReq .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new SubReqServer().bind(port); } }
咱們來注意如下編解碼器的順序:
(1) ProtobufVarint32FrameDecoder : 半包問題
(2) ProtobufDecoder:解碼
(3) ProtobufVarint32LenghtFiedldPrepender:半包問題
(4) ProtobufEncoder:編碼
因而邏輯處理部分能夠直接使用類:
1 import io.netty.channel.ChannelHandler.Sharable; 2 import io.netty.channel.ChannelHandlerAdapter; 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 6 /** 7 * @author lilinfeng 8 * @version 1.0 9 * @date 2014年2月14日 10 */ 11 @Sharable 12 public class SubReqServerHandler extends ChannelInboundHandlerAdapter { 13 14 @Override 15 public void channelRead(ChannelHandlerContext ctx, Object msg) 16 throws Exception { 17 SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg; 18 if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) { 19 System.out.println("Service accept client subscribe req : [" 20 + req.toString() + "]"); 21 ctx.writeAndFlush(resp(req.getSubReqID())); 22 } 23 } 24 25 private SubscribeRespProto.SubscribeResp resp(int subReqID) { 26 SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp 27 .newBuilder(); 28 builder.setSubReqID(subReqID); 29 builder.setRespCode(0); 30 builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); 31 return builder.build(); 32 } 33 34 @Override 35 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 36 cause.printStackTrace(); 37 ctx.close();// 發生異常,關閉鏈路 38 } 39 }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class SubReqClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder( SubscribeRespProto.SubscribeResp .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 發起異步鏈接操做 ChannelFuture f = b.connect(host, port).sync(); // 當代客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new SubReqClient().connect(port, "127.0.0.1"); } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.ArrayList; import java.util.List; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class SubReqClientHandler extends ChannelInboundHandlerAdapter { /** * Creates a client-side handler. */ public SubReqClientHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 10; i++) { ctx.write(subReq(i)); } ctx.flush(); } private SubscribeReqProto.SubscribeReq subReq(int i) { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq .newBuilder(); builder.setSubReqID(i); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book For Protobuf"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
在不怎麼了解Protobuf實現和使用細節的狀況 下,咱們就能夠輕鬆支持Google Protobuf編碼
ProtobufDecoder僅僅負責解碼,所以在ProtobufDecoder前面,必定要可以處理半包的解碼器,有如下3種方式:
(1) 使用Netty提供的ProtobufVarint32FrameDecoder,它能夠處理半包消息;
(2) 繼承Netty提供的通用半包解碼器LengthFieldBasedFrameDecoder;
(3) 繼承ByteToMessageDecoder,本身處理..
半包問題必須解決,不然服務器沒法正常工做。
暫時略。能夠參考netty初探(2)。