[toc]java
程序代碼來自於《Netty權威指南》第8章,已經加了註釋,不過須要注意的是,使用的proto源代碼是在Google Protobuf入門與使用中生成的,關於protobuf代碼自動生成工具的使用能夠參考這篇文章。bootstrap
例子中,經過×××ProtobufVarint32FrameDecoder
和編碼器ProtobufVarint32LengthFieldPrepender
的使用已經解決了半包問題,測試時能夠把其註釋掉,這樣就能夠演示Netty中使用Protobuf出現的TCP粘包問題。數組
同時,經過protobuf的使用,也能夠深入感覺到,其在Netty中的使用確實很是簡單,編解碼、半包問題,只須要添加相關的處理器便可,並且它能夠方便地實現跨語言的遠程服務調用。(protobuf自己提供了對不一樣語言的支持)異步
但其實在使用時會發現有一個問題,就是編解碼的對象是須要使用其生成的特定的proto對象來進行操做的,也就是說,須要編寫.proto文件,再經過protoc來生成相應語言的代碼文件,顯然這樣作仍是會有些麻煩(雖然其實也還好,不算麻煩),有沒有方便點的方法呢?後面經過protostuff的使用便可解決這個問題。socket
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeReqProto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 添加日誌處理器 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用於Protobuf的半包處理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder×××,它的參數是com.google.protobuf.MessageLite // 實際上就是要告訴ProtobufDecoder須要解碼的目標類是什麼,不然僅僅從字節數組中是 // 沒法判斷出要解碼的目標類型信息的(服務端須要解析的是客戶端請求,因此是Req) ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); /** * 來自源碼的代碼註釋,用於Protobuf的半包處理 * * An encoder that prepends the the Google Protocol Buffers * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base * 128 Varints</a> integer length field. For example: * <pre> * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+ * </pre> * */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder編碼器,這樣就不須要對SubscribeResp進行手工編碼 ch.pipeline().addLast(new ProtobufEncoder()); // 添加業務處理handler ch.pipeline().addLast(new SubReqServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // TODO: handle exception } } new SubReqServer().bind(port); } }
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeReqProto; import cn.xpleaf.protobuf.SubscribeRespProto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class SubReqServerHandler extends ChannelInboundHandlerAdapter { /** * 因爲ProtobufDecoder已經對消息進行了自動解碼,所以接收到的訂購請求消息能夠直接使用 * 對用戶名進行校驗,校驗經過後構造應答消息返回給客戶端,因爲使用了ProtobufEncoder, * 因此不須要對SubscribeRespProto.SubscribeResp進行手工編碼 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg; String username = req.getUserName(); if("xpleaf".equalsIgnoreCase(username)) { System.out.println("Service accept client subscribe req : [" + req.toString() + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } /** * 構建SubscribeRespProto.SubscribeResp對象 * @param subReqID * @return */ private SubscribeRespProto.SubscribeResp resp(int subReqID) { SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 發生異常,關閉鏈路 ctx.close(); } }
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeRespProto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqClient { public void connect(String host, int port) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) // 設置TCP鏈接超時時間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用於Protobuf的半包處理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder×××,它的參數是com.google.protobuf.MessageLite // 實際上就是要告訴ProtobufDecoder須要解碼的目標類是什麼,不然僅僅從字節數組中是 // 沒法判斷出要解碼的目標類型信息的(客戶端須要解析的是服務端請求,因此是Resp) ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); /** * 來自源碼的代碼註釋,用於Protobuf的半包處理 * * An encoder that prepends the the Google Protocol Buffers * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base * 128 Varints</a> integer length field. For example: * <pre> * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+ * </pre> * */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder編碼器,這樣就不須要對SubscribeResp進行手工編碼 ch.pipeline().addLast(new ProtobufEncoder()); // 添加業務處理handler ch.pipeline().addLast(new SubReqClientHandler()); } }); // 發起異步鏈接操做 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // 採用默認值 } } new SubReqClient().connect("localhost", port); } }
package cn.xpleaf.subscribe; import java.util.ArrayList; import java.util.List; import cn.xpleaf.protobuf.SubscribeReqProto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class SubReqClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { for(int i = 0; i < 10; i++) { ctx.write(subReq(i)); } ctx.flush(); } /** * 構建SubscribeReqProto.SubscribeReq對象 * @param i * @return */ private SubscribeReqProto.SubscribeReq subReq(int i) { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(i); builder.setUserName("xpleaf"); builder.setProductName("Netty Book For Protobuf"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChange"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Service accept server subscribe response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服務端輸出以下:ide
Service accept client subscribe req : [subReqID: 0 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 1 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 2 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 3 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 4 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 5 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 6 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 7 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 8 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 9 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ]
客戶端輸出以下:工具
Service accept server subscribe response : [subReqID: 0 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 1 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 2 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 3 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 4 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 5 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 6 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 7 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 8 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 9 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ]