趁着今天有時間,多發一點。這一章主要介紹Protobuf編解碼器,首先先說protobuf是個什麼東西。它實際上是Google開發的一個數據交換的格式,它獨立於語言,獨立於平臺。用過它的都會以爲它太好用了,記得當初在iesLab開發電力系統高級應用軟件的服務端用到的通訊協議就是它,除了代碼風格不是很好以外,其餘的真是太方便了,膜拜Google的技術啊。java
在詳細介紹以前,我先來問你們一個問題,若是讓你開發一款通訊協議,你但願如何設計協議?我相信,你們既然都已經習慣了面向對象編程,都是選擇將對象轉換爲協議發送,固然接受的時候是將協議轉換爲對象。先拋開半包問題不說,先談數據幀的傳輸,咱們是否是但願:1. 通訊的時候最好能用最少的字節來傳遞更多的消息,這樣就能夠加快消息的傳輸,產生很小的延遲。2. 咱們的消息能夠跨平臺傳輸,無論對方用的什麼語言?C仍是C++仍是java,對一樣的通訊協議能夠通用,不會由於不一樣的平臺由於字節大小而痛苦。那麼java給我帶來的序列化能夠用麼?固然能夠可是有個問題,首先java的序列化的對象生成的字節數目太過龐大,幾乎是正常二進制編碼的5倍左右,固然是protobuf的更度倍。第二,你用java平臺序列化的對象,在C++的平臺是無法解析的。因此,基於這個protobuf給咱們解決了,他的編碼字節足夠小,不一樣平臺直接能夠通用。編程
在Netty中天生給咱們集成了protobuf的編解碼器,不須要咱們本身去實現,主要有解碼器:ProtobufVarint32FrameDecoder(負責半包解碼),ProtobufDecoder(負責protobuf的解碼)bootstrap
編碼器:ProtobufVarint32LengthFieldPrepender(負責半包編碼),ProtobufEncoder(負責協議編碼)服務器
這個其實很好理解,一個是爲了解決半包問題,另外一個解決協議解析問題。socket
咱們看一下服務端的代碼:ide
SubscribeProto.Subscribe.getDefaultInstance()這個是protobuf協議工具給咱們生成的,具體的百度一下就知道了,它是什麼意思呢?你想啊,Netty的解碼器又不是萬能的,他怎麼知道你的幀格式是什麼樣子的?因此這個就是告訴他你給我照着這個樣式解碼,是否是很方便啊!函數
package com.dlb.note.server; import com.dlb.note.doj.SubscribeProto; import com.google.protobuf.ProtocolStringList; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * 功能:protobuf時間服務器 * 版本:1.0 * 日期:2016/12/9 18:33 * 做者:馟蘇 */ public class ProtobufTimeServer { /** * main函數 * @param args */ public static void main(String []args) { // 構造nio線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer() { protected void initChannel(Channel channel) throws Exception { /** * 如下是支持protobuf的解碼器 */ channel.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) // 半包解碼 .addLast(new ProtobufDecoder(SubscribeProto.Subscribe.getDefaultInstance())) // protobuf解碼 .addLast(new ProtobufVarint32LengthFieldPrepender()) // 半包編碼 .addLast(new ProtobufEncoder()) // protobuf編碼 .addLast(new MyProtoBufHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture future = bootstrap.bind(8888).sync(); System.out.println("----服務端在8888端口監聽----"); // 等待服務端監聽端口關閉 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 優雅的退出,釋放線程池資源 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } class MyProtoBufHandler extends ChannelHandlerAdapter { // 客戶端連接異常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception,ip=" + ctx.channel().remoteAddress()); ctx.close(); super.exceptionCaught(ctx, cause); } // 客戶端連接到來 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client come,ip=" + ctx.channel().remoteAddress()); super.channelActive(ctx); } // 客戶端連接關閉 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip=" + ctx.channel().remoteAddress()); ctx.close(); super.channelInactive(ctx); } // 可讀 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 讀數據 SubscribeProto.Subscribe subscribe = (SubscribeProto.Subscribe) msg; ProtocolStringList addressList = subscribe.getAddressList(); System.out.println(subscribe.getName()); for (String str : addressList) { System.out.println(str); } System.out.println("---------------------------------------------------"); super.channelRead(ctx, msg); } }