Netty實戰十一之預置的ChannelHandler和編解碼器

Netty爲許多通用協議提供了編解碼器和處理器,幾乎能夠開箱即用,這減小了你在那些至關繁瑣的事務上原本會花費的時間與精力。咱們將探討這些工具以及它們所帶來的好處,其中包括Netty對於SSL/TLS和WebSocket的支持,以及如何簡單地經過數據壓縮來壓榨HTTP,以獲取更好的性能。java

一、經過SSL/TLS保護Netty應用程序git

SSL和TLS這樣的安全協議,它們層疊在其餘協議之上,用以實現數據安全。咱們在訪問安全網站時遇到過這些協議,可是它們也可用於其餘不是基於HTTP的應用程序,如安全SMTP(SMTPS)郵件服務器甚至是關係型數據庫系統。web

爲了支持SSL/TLS,Java提供了javax.net.ssl包,它的SSLContext和SSLEngine類使得實現解密和加密至關簡單直接。Netty經過一個名爲SslHandler的ChannelHandler實現利用了這個API,其中SslHandler在內部使用了SSLEngine來完成實際的工做。數據庫

下圖展現了使用SslHandler的數據流 輸入圖片說明如下代碼展現瞭如何使用ChannelInitializer來將SslHandler添加到ChannelPipeline中。編程

public class SSLChannelInitializer extends ChannelInitializer<Channel>{ private final SslContext context; private final boolean startTls; //若是設置爲true,第一個寫入的消息將不會被加密(客戶端應該設置爲true) public SSLChannelInitializer(SslContext context, boolean startTls) { this.context = context; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws Exception { //對於每一個SslHandler實例,都使用Channel的ByteBufAllocator從SslCOntext獲取一個新的SSLEngine SSLEngine engine = context.newEngine(ch.alloc()); //將SslHandler做爲第一個ChannelHandler添加到ChannelPipeline中 ch.pipeline().addLast("ssl",new SslHandler(engine,startTls)); } } 

在大多數狀況下,SslHandler將是ChannelPipeline中的第一個ChannelHandler。這確保了只有在全部其餘的ChannelHandler將它們的邏輯應用到數據以後,纔會進行加密。安全

例如,在握手階段,兩個節點將相互驗證而且商定一種加密方式。你能夠經過配置SslHandler來修改它的行爲,或者在SSL/TLS握手一旦完成以後提供通知,握手階段完成以後,全部的數據都將會被加密。SSL/TLS握手將會被自動執行。服務器

二、構建基於Netty的HTTP/HTTPS應用程序websocket

HTTP/HTTPS是最多見的協議套件之一,而且隨着智能手機的成功,它的應用也日益普遍,由於對於任何公司來講,擁有一個能夠被移動設備訪問的網站幾乎是必須的。這些協議也被用於其餘方面。markdown

HTTP是基於請求/響應模式的:客戶端向服務器發送一個HTTP請求,而後服務器將會返回一個HTTP響應。Netty提供了多種編碼器和解碼器以簡化對這個協議的使用。網絡

下圖分別展現了生產和消費HTTP請求和HTTP響應的方法。 輸入圖片說明輸入圖片說明正如上圖所示,一個HTTP請求/響應可能由多個數據部分組成,而且它老是以一個LastHttpContent部分做爲結束。FullHttpRequest和FullHttpResponse消息是特殊的子類型,分別表明了完整的請求和響應。全部類型的HTTP消息都實現HttpObject接口。

如下代碼中的HttpPipelineInitializer類展現了將HTTP支持添加到你的應用程序時多麼簡單——幾乎只須要將正確的ChannelHandler添加到ChannelPipeline中。

public class HttpPipelineInitializer extends ChannelInitializer<Channel>{ private final boolean client; public HttpPipelineInitializer(boolean client) { this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client){ //若是是客戶端,則添加HttpResponseDecoder以處理來自服務器的響應 pipeline.addLast("decoder",new HttpResponseDecoder()); //添加HttpResponseEncoder以向服務器發送請求 pipeline.addLast("encoder",new HttpResponseEncoder()); } else { //若是是服務器,則添加HttpRequestDecoder以接收來自客戶端的請求 pipeline.addLast("decoder",new HttpRequestDecoder()); //添加HttpRequestEncoder以向客戶端發送響應 pipeline.addLast("encoder",new HttpRequestEncoder()); } } } 

三、聚合HTTP消息

在ChannelInitializer將ChannelHandler安裝到ChannelPipeline中以後,你即可以處理不一樣類型的HttpObject消息了。可是因爲HTTP的請求和響應可能由許多部分組成,所以你須要聚合它們以造成完整的消息。爲了消除這項繁瑣的任務,Netty提供了一個聚合器,它能夠將多個消息部分合併爲FullHttpRequest或者FullHttpResponse消息。經過這樣的方式,你將老是看到完整的消息內容。

因爲消息分段須要被緩衝,直到能夠轉發一個完整的消息給下一個ChannelInboundHandler,因此這個操做有輕微的開銷。其所帶來的好處即是你沒必要關心消息碎片了。

引入這種自動聚合機制只不過是向ChannelPipeline中添加另一個ChannelHandler罷了。如如下代碼所示。

public class HttpAggregatorInitializer extends ChannelInitializer<Channel>{ private final boolean isClient; public HttpAggregatorInitializer(boolean isClient) { this.isClient = isClient; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (isClient) { //若是是客戶端,則添加HttpClineCodec pipeline.addLast("codec",new HttpClientCodec()); } else { //若是是服務端,則添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); } //將最大的消息大小爲512KB的HttpObjectAggregator添加到ChannelPipeline pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024)); } } 

四、HTTP壓縮

當使用HTTP時,建議開啓壓縮功能以儘量多地減小傳輸數據的大小。雖然壓縮會帶來一些CPU時鐘週期上的開銷,可是一般來講它都是一個好主意,特別是對於文本數據來講。

Netty爲壓縮和解壓縮提供了ChannelHandler實現,它們同時支持gzip和deflate編碼。

HTTP請求的頭部信息,客戶端能夠經過提供如下頭部信息來指示服務器它所支持的壓縮格式:

Get /encrypted-area HTTP/1.1

Host: www.example.com

Accept-Encoding:gzip,deflate

然而,須要注意的是,服務器沒有義務壓縮它所發送的數據。

如下代碼展現了一個例子。

public class HttpCompressionInitializer extends ChannelInitializer<Channel>{ private final boolean isClient; public HttpCompressionInitializer(boolean isClient) { this.isClient = isClient; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (isClient) { //若是是客戶端,則添加HttpClientCodec pipeline.addLast("codec",new HttpClientCodec()); //若是是客戶端,則添加HttpContentDecompressor以處理來自服務器的壓縮內容 pipeline.addLast("decompressor",new HttpContentDecompressor()); } else { //若是是服務器,則添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); //若是是服務器,則添加HttpContentCompressor來壓縮數據 pipeline.addLast("decompressor",new HttpContentCompressor()); } } } 

五、使用HTTPS

如下代碼顯示,啓用HTTPS只須要將SslHandler添加到ChannelPipeline的ChannelHandler組合中。

public class HttpsCodecInitializer extends ChannelInitializer<Channel>{ private final SslContext context; private final boolean isClient; public HttpsCodecInitializer(SslContext context, boolean isClient) { this.context = context; this.isClient = isClient; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); SSLEngine engine = context.newEngine(channel.alloc()); //將SslHandler添加到ChannelPipeline中以使用HTTPS pipeline.addLast("ssl",new SslHandler(engine)); if (isClient) { //若是是客戶端,則添加HttpClientCodec pipeline.addLast("codec",new HttpClientCodec()); } else { //若是是服務器,則添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); } } } 

以上例子,說明了Netty的架構方式是如何將代碼重用變爲槓桿做用的。只須要簡單地將一個ChannelHandler添加到ChannelPipeline中,即可以提供一項新的功能,甚至像加密這樣重要的功能都能提供。

六、WebSocket

WebSocket解決了一個長期存在的問題:既然底層的協議(HTTP)是一個請求/響應模式的交互序列,那麼如何實時地發佈信息?AJAX提供了必定程度上的改善,可是數據流仍然是由客戶端所發送的請求驅動。

WebSocket規範以及它的實現表明了對一種更加有效的解決方案的嘗試。簡單地說,WebSocket提供了「在一個單個的TCP鏈接上提供雙向的通訊·······結合WebSocketAPI·····它爲網頁和遠程服務器之間的雙向通訊提供了一種替代HTTP輪詢的方案。」

WebSocket在客戶端和服務器之間提供了真正的雙向數據交換。WebSocket如今能夠用於傳輸任意類型的數據,很像普通的套接字。

下圖給出了WebSocket協議的通常概念。在這個場景下,通訊將做爲普通的HTTP協議開始,隨後升級到雙向的WebSocket協議。 輸入圖片說明 要想向你的應用程序中添加對於WebSocket的支持,你須要將適當的客戶端或者服務器WebSocket ChannelHandler添加到ChannelPipeline中。這個類將處理由WebSocket定義的稱爲幀的特殊消息類型。

由於Netty主要是一種服務器端的技術,因此在這裏咱們重點建立WebSocket服務器。代碼以下所示,這個類處理協議升級握手,以及3種控制幀——Close、Ping和Pong。Text和Binary數據幀將會被傳遞給下一個ChannelHandler進行處理。

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast( new HttpServerCodec(), //爲握手提供聚合的HttpRequest new HttpObjectAggregator(65535), //若是被請求的端點是「/websocket」則處理該升級握手 new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler() ); } public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { //Handle text frame } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame) throws Exception { //Handle binary frame } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ContinuationWebSocketFrame continuationWebSocketFrame) throws Exception { //Handle continuation frame } } } 

保護WenSocket:要想爲WebSocket添加安全性,只須要將SslHandler做爲第一個ChannelHandler添加到ChannelPipeline中。

七、空閒的鏈接和超時

只要你有效地管理你的網絡資源,這些技術就可使得你的應用程序更加高效、易用和安全。

檢測空閒鏈接以及超時對於及時釋放資源來講是相當重要的。

讓咱們仔細看看在實踐中使用得最多的IdleStateHandler。如下代碼展現了當使用一般的發送心跳信息到遠程節點的方法時,若是在60秒以內沒有接收或發送任何的數據,咱們將如何獲得通知;若是沒有響應,則鏈接會被關閉。

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //IdleStateHandler將在被觸發時發送一個IdleStateEvent事件 pipeline.addLast(new IdleStateHandler(0,0,60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } //實現userEventTriggered方法以發送心跳消息 public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{ private static final ByteBuf HEARTBEAT_SEQUENCE = //發送到遠程節點的心跳消息 Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //發送心跳消息,並在發送失敗時關閉該鏈接 if (evt instanceof IdleStateEvent){ ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } } 

以上示例演示瞭如何使用IdleStateHandler來測試遠程節點是否仍然還活着,而且在它失活時經過關閉鏈接來釋放資源

若是鏈接超過60秒沒有接收或者發送任何的數據,那麼IdleStateHandler將會使用一個IdleStateEvent事件來調用fireUserEventTriggered()方法。HeartbeatHandler實現了userEventTriggered()方法,若是這個方法檢測到IdleStateEvent事件,它將會發送心跳消息,而且添加一個將在發送操做失敗時關閉該鏈接的ChannelFutureListener。

八、解碼基於分隔符的協議和基於長度的協議

基於分隔符的(delimited)消息協議使用定義的字符來標記的消息或消息段(一般被稱爲幀)的開頭或者結尾。由RFC文檔正式定義的許多協議(如SMTP、POP三、IMAP以及Telnet)都是這樣。

下圖展現了當幀由行尾序列\r\n分割時是如何被處理的。 輸入圖片說明如下代碼展現瞭如何使用LineBasedFrameDecoder來處理上圖的場景。

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //該LineBasedFrameDecoder將提取的幀轉發給下一個ChannelInboundHandler pipeline.addLast(new LineBasedFrameDecoder(64*1024)); //添加FrameHandler以接收幀 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override //傳入單個幀的內容 protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // do something with the data extracted from the frame } } } 

若是你正在使用除了行尾符以外的分隔符的幀,那麼你能夠以相似的方法使用DelimiterBasedFrameDecoder,只須要將特定的分隔符序列指定到其構造函數便可。

這些解碼器是實現你本身的基於分隔符的協議的工具。做爲示例,咱們將使用下面的協議規範:

——傳入數據流是一系列的幀,每一個幀都由換行符(\n)分割

——每一個幀都由一系列的元素組成,每一個元素都由單個空格字符分割

——一個幀的內容表明瞭一個命令、定義爲一個命令名稱後跟着數目可變的參數

咱們用於這個協議的自定義解碼器將定義如下類:

——Cmd——將幀(命令)的內容存儲在ByteBuf中,一個ByteBuf用於名稱,另外一個用於參數

——CmdDecoder——從被重寫了的decode()方法中獲取一行字符串,並從它的內容構建一個Cmd的實例

——CmdHandler——從CmdDecoder獲取解碼的Cmd對象,並對它進行一些處理;

——CmdHandlerinitializer——爲了簡便起見,咱們將會把前面的這些類定義爲專門的ChannelInitializer的嵌套類,其將會把這些ChannelInboundHandler安裝到ChannelPipeline中。

如下代碼,這個解碼器的關鍵是擴展LineBasedFrameDecoder。

public class CmdHandlerInitializer extends ChannelInitializer<Channel>{ public static final byte SPACE = (byte)' '; @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //添加CmdDecoder以提取Cmd對象,並將它轉發給下一個ChannelInboundHandler pipeline.addLast(new CmdDecoder(64*1024)); //添加CmdHandler以接收和處理Cmd對象 pipeline.addLast(new CmdHandler()); } //Cmd POJO public static final class Cmd{ private final ByteBuf name; private final ByteBuf args; public Cmd(ByteBuf name, ByteBuf args) { this.name = name; this.args = args; } public ByteBuf getName() { return name; } public ByteBuf getArgs() { return args; } } public static final class CmdDecoder extends LineBasedFrameDecoder{ public CmdDecoder(int maxLength) { super(maxLength); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { //從ByteBuf中提取由行尾符序列分隔的幀 ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); if (frame == null){ //若是輸入中沒有幀,則返回null return null; } //查找第一個空格字符的索引 int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(),SPACE); //使用包含有命令名稱和參數的切片建立新的Cmd對象 return new Cmd(frame.slice(frame.readerIndex(),index),frame.slice(index + 1,frame.writerIndex())); } } public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Cmd cmd) throws Exception { //處理傳經ChannelPipeline的Cmd對象 // do something with the command } } } 

九、基於長度的協議

基於長度的協議經過將它的長度編碼到幀的頭部來定義幀,而不是使用特殊的分隔符來標記它的結束。

下圖展現了FixedLengthFrameDecoder的功能,其在構造時已經指定了幀長度爲8字節。 輸入圖片說明你將常常會遇到被編碼到消息頭部的幀大小不是固定值的協議。爲了處理這種變長幀,你可使用LengthFieldBasedFrameDecoder,它將從頭部字段肯定幀長,而後從數據流中提取指定的字節數。

下圖展現了示例,其中長度字段在幀中的偏移量爲0,而且長度爲2字節。 輸入圖片說明LengthFieldBasedFrameDecoder提供了幾個構造函數來支持各類各樣的頭部配置狀況。如下代碼展現瞭如何使用其3個構造參數分別爲maxFrameLength、lengthFieldOffset和lengthFieldLength的構造函數。在這個場景中,幀的長度被編碼到了幀起始的前8個字節中。

public class LengthFieldBasedFrameDecoder extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new io.netty.handler.codec.LengthFieldBasedFrameDecoder(64*1024,0,8)); //添加FrameHandler以處理每一個幀 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { //處理幀的數據 // Do something with the frame } } } 

十、寫大型數據

由於網絡飽和的可能性,如何在異步框架中高效地寫大塊的數據是一個特殊的問題。因爲寫操做是非阻塞的,因此即便沒有寫出全部的數據,寫操做也會在完成時返回並通知ChannelFuture。當這種狀況發生時,若是仍然不停地寫入,就有內存耗盡的風險,因此在寫大型數據時,須要準備好處理到遠程節點的鏈接是慢速鏈接的狀況,這種狀況會致使內存釋放的延遲。

NIO的零拷貝特性,這種特性消除了將文件的內容從文件系統移動到網絡棧的複製過程。全部的一切都發生在Netty的核心中,因此應用程序全部須要作的就是使用一個FileRegion接口的實現,其在Netty的API文檔中的定義是:「經過支持零拷貝的文件傳輸的Channel來發送的文件區域。」

如下代碼展現瞭如何經過從FileInputStream建立一個DefaultFileRegion,並將其寫入Channel,從而利用零拷貝特性來傳輸一個文件的內容。

//建立一個FileInputStream FileInputStream in = new FileInputStream(file); FileRegion region = new DefaultFileRegion( in.getChannel(),0,file.length()); //發送該DefaultFileRegion,並註冊一個ChannelFutureListener channel.writeAndFlush(region).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { //處理失敗 Throwable cause = channelFuture.cause(); // Do something } } } ); 

這個示例只適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理。在須要將數據從文件系統複製到用戶內存中時,可使用ChunkedWriteHandler,它支持異步寫大型數據流,而又不會致使大量的內存消耗。

關鍵是interface ChunkedInput ,其中類型參數B是readChunk()方法返回的類型。Netty預置了該接口的4個實現。

如下代碼說明了ChunkedStream的用法,它是實踐中最經常使用的實現。所示的類使用了一個File以及一個SslContext進行實例化。當initChannel()方法被調用時,它將使用所示的ChannelHandler鏈初始化該Channel。當Channel的狀態變爲活動時,WriteStreamHandler將會逐塊地把來自文件中的數據做爲ChunkedStream寫入。數據在傳輸以前將會由SslHandler加密。

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{ private final File file; private final SslContext sslCtx; public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) { this.file = file; this.sslCtx = sslCtx; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //將SslHandler添加到ChannelPipeline中 pipeline.addLast(new SslHandler(sslCtx.newEngine(channel.alloc()))); //添加ChunkedWriteHandler以處理做爲ChunkedInput傳入的數據 pipeline.addLast(new ChunkedWriteHandler()); //一旦鏈接創建,WriteStreamHandler就開始寫文件數據 pipeline.addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{ //當鏈接創建時,channelActive方法將使用ChunkedInput寫文件數據 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush( new ChunkedStream(new FileInputStream(file))); } } } 

逐塊輸入:要使用你本身的ChunkedInput實現,請在ChannelPipeline中安裝一個ChunkedWriteHandler

十一、JDK序列化數據

JDK提供了ObjectOutputStream和ObjectInputStream,用於經過網絡對POJO的基本數據類型和圖進行序列化和反序列化。該API並不複雜,並且能夠被應用於任何實現了java.io.Serializable接口的對象。可是它的性能也不是很是高效。

若是你的應用程序必需要和使用了ObjectOutputStream和ObjectInputStream的遠程節點交互,而且兼容性也是你最關心的,那麼JDK序列化將是正確的選擇。

十二、使用了JBoss Marshalling進行序列化

若是你能夠自由地使用外部依賴,那麼JBoss Marshalling將是一個理想的選擇:他比JDK序列化最多快3倍,並且也更加緊湊。

如下代碼展現瞭如何使用MarshallingDecoder和MarshallingEncoder。一樣,幾乎只是適當地配置ChannelPipeline罷了。

public class MarshallingInitializer extends ChannelInitializer<Channel>{ private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new MarshallingDecoder(unmarshallerProvider)); pipeline.addLast(new MarshallingEncoder(marshallerProvider)); pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception { //do something } } } 

1三、經過Protocol Buffers序列化

Netty序列化的最後一個解決方案是利用Protocol Buffers的編解碼器,它是一個由Google公司開發的、如今已經開源的數據交換格式。

Protocol Buffers以一種緊湊而高效的方式對結構化的數據進行編碼以及解碼。它具備許多編程語言綁定,使得它很適合跨語言的項目。

在這裏咱們又看到了,使用protobuf只不過是將正確的ChannelHandler添加到ChannelPipeline中,以下代碼。

public class ProtoBufInitializer extends ChannelInitializer<Channel>{ private final MessageLite lite; public ProtoBufInitializer(MessageLite lite) { this.lite = lite; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new ProtobufDecoder(lite)); pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Object>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { // do something with the object } } } 
 
相關文章
相關標籤/搜索