根據上一篇博文 Google Protobuf 使用 Java 版 netty 集成 protobuf 的方法很是簡單.代碼以下:html
server編程
package protobuf.server.impl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import object.server.impl.SubReqServer; import object.server.impl.SubScriptReqProto; public class SubReqProtobufServer { public void start(int port) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); NioEventLoopGroup bossGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); // 配置 NioServerSocketChannel 的 tcp 參數, BACKLOG 的大小 bootstrap.option(ChannelOption.SO_BACKLOG, 100); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { /* * 首先添加 ProtobufVarint32FrameDecoder 處理器 , 它主要用於半包處理 */ ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); /* * 而後添加 ProtobufDecoder 解碼器 , * 他的參數com.google.protobuf.MessageLite 實際上就是要告訴 ProtobufDecoder * 須要解碼的目標類是什麼,不然僅僅從字節數組中是沒法判斷出須要解碼的目標類型信息 */ ProtobufDecoder protobufDecoder = new ProtobufDecoder( SubScriptReqProto.SubScriptReq.getDefaultInstance()); ch.pipeline().addLast(protobufDecoder); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqProtobufHandler()); } }); // 綁定端口,隨後調用它的同步阻塞方法 sync 等等綁定操做成功,完成以後 Netty 會返回一個 ChannelFuture // 它的功能相似於的 Future,主要用於異步操做的通知回調. ChannelFuture channelFuture; try { channelFuture = bootstrap.bind(port).sync(); // 等待服務端監聽端口關閉,調用 sync 方法進行阻塞,等待服務端鏈路關閉以後 main 函數才退出. channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { SubReqProtobufServer server = new SubReqProtobufServer(); server.start(9091); } }
serverHandlerbootstrap
package protobuf.server.impl; import object.server.impl.SubScriptReqProto; import object.server.impl.SubScriptRespProto; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqProtobufHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { SubScriptReqProto.SubScriptReq req = (SubScriptReqProto.SubScriptReq) msg; System.out.println("SubReqProtobufHandler : " + req); ctx.writeAndFlush(resp(req.getSubReqID())); } catch (Exception e) { e.printStackTrace(); throw e; } } private Object resp(int subReqID) { SubScriptRespProto.SubScriptResp.Builder builder = SubScriptRespProto.SubScriptResp .newBuilder(); builder.setSubReqID(subReqID); builder.setDesc("desc"); builder.setRespCode(2); return builder.build(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
client 數組
package protobuf.client.impl; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.codec.serialization.ObjectEncoder; import object.client.impl.SubReqClient; import object.server.impl.SubScriptReqProto; import object.server.impl.SubScriptRespProto; public class SubReqProtobufClient { public void connect(String host, int port) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { /* * 禁止堆類加載器進行緩存,他在基於 OSGI 的動態模塊化編程中常常使用,因爲 OSGI 能夠進行熱部署和熱升級,當某個 * bundle * 升級後,它對應的類加載器也將一塊兒升級,所以在動態模塊化的編程過程當中,不多對類加載器進行緩存,由於他隨時可能會發生變化. */ ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder(SubScriptRespProto.SubScriptResp .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 發起異步連接操做 ChannelFuture future; try { future = bootstrap.connect(host, port).sync(); // 等待客戶端鏈路關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } public static void main(String[] args) { new SubReqProtobufClient().connect("localhost", 9091); } }
clientHandler緩存
package protobuf.client.impl; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import object.server.impl.SubScriptReqProto; public class SubReqClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { try { SubScriptReqProto.SubScriptReq.Builder builder = SubScriptReqProto.SubScriptReq .newBuilder(); for (int i = 0; i < 100; i++) { builder.setSubReqID(999 + i); builder.setAddress("address" + i); builder.setProductName("productvalue" + i); builder.setUserName("userName" + i); ctx.writeAndFlush(builder.build()); } } catch (Exception e) { e.printStackTrace(); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SubReqClientHandler : " + msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
ProtobufDecode 只負責解碼,它不支持半包讀寫,所以,在 protoBuf 前必定要一個能處理半包讀寫的處理器.他有三種方法能夠選擇:異步
以上內容出自 : <Netty 權威指南socket
>tcp