本章介紹java
Codec,編解碼器web
Decoder,解碼器服務器
Encoder,編碼器websocket
7.1 編解碼器Codec網絡
編寫一個網絡應用程序須要實現某種編解碼器,編解碼器的做用就是講原始字節數據與自定義的消息對象進行互轉。網絡中都是以字節碼的數據形式來傳輸數據的,服務器編碼數據後發送到客戶端,客戶端須要對數據進行解碼,由於編解碼器由兩部分組成:app
Decoder(解碼器)框架
Encoder(編碼器)socket
解碼器負責將消息從字節或其餘序列形式轉成指定的消息對象,編碼器則相反;解碼器負責處理「入站」數據,編碼器負責處理「出站」數據。編碼器和解碼器的結構很簡單,消息被編碼後解碼後會自動經過ReferenceCountUtil.release(message)釋放,若是不想釋放消息可使用ReferenceCountUtil.retain(message),這將會使引用數量增長而沒有消息發佈,大多數時候不須要這麼作。ide
7.2 解碼器this
Netty提供了豐富的解碼器抽象基類,咱們能夠很容易的實現這些基類來自定義解碼器。下面是解碼器的一個類型:
解碼字節到消息
解碼消息到消息
解碼消息到字節
一般你須要將消息從字節解碼成消息或者從字節解碼成其餘的序列化字節。
Netty中提供的ByteToMessageDecoder能夠將字節消息解碼成POJO對象,下面列出了ByteToMessageDecoder兩個主要方法:
decode(ChannelHandlerContext, ByteBuf, List<Object>),這個方法是惟一的一個須要本身實現的抽象方法,做用是將ByteBuf數據解碼成其餘形式的數據。
decodeLast(ChannelHandlerContext, ByteBuf, List<Object>),實際上調用的是decode(...)。
例如服務器從某個客戶端接收到一個整數值的字節碼,服務器將數據讀入ByteBuf並通過ChannelPipeline中的每一個ChannelInboundHandler進行處理,看下圖:
上圖顯示了從「入站」ByteBuf讀取bytes後由ToIntegerDecoder進行解碼,而後向解碼後的消息傳遞到ChannelPipeline中的下一個ChannelInboundHandler。看下面ToIntegerDecoder的實現代碼:
public class ToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if(in.readableBytes() >= 4){ out.add(in.readInt()); } } }
從上面的代碼可能會發現,咱們須要檢查ByteBuf讀以前是否有足夠的字節,若沒有這個檢查豈不更好?是的,Netty提供了這樣的處理容許byte-to-message解碼,在下一節講解。除了ByteToMessageDecoder以外,Netty還提供了許多其餘的解碼接口。
ReplayingDecoder是byte-to-message解碼的一種特殊的抽象基類,讀取緩衝區的數據以前須要檢查緩衝區是否有足夠的字節,使用ReplayingDecoder就無需本身檢查;若ByteBuf中有足夠的字節,則會正常讀取;若沒有足夠的字節則會中止解碼。也正由於這樣的包裝使得ReplayingDecoder帶有必定的侷限性。
不是全部的操做都被ByteBuf支持,若是調用一個不支持的操做會拋出DecoderException。
ByteBuf.readableBytes()大部分時間不會返回指望值
若是你能忍受上面列出的限制,相比ByteToMessageDecoder,你可能更喜歡ReplayingDecoder。在知足需求的狀況下推薦使用ByteToMessageDecoder,由於它的處理比較簡單,沒有ReplayingDecoder實現的那麼複雜。ReplayingDecoder繼承與ByteToMessageDecoder,因此他們提供的接口是相同的。
下面代碼是ReplayingDecoder的實現:
public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readInt()); } }
當從接收的數據ByteBuf讀取integer,若沒有足夠的字節可讀,decode(...)會中止解碼,如有足夠的字節可讀,則會讀取數據添加到List列表中。使用ReplayingDecoder或ByteToMessageDecoder是我的喜愛的問題,Netty提供了這兩種實現,選擇哪個均可以。
上面講了byte-to-message的解碼實現方式,那message-to-message該如何實現呢?Netty提供了MessageToMessageDecoder抽象類。
7.2.3 MessageToMessageDecoder
將消息對象轉成消息對象但是使用MessageToMessageDecoder,它是一個抽象類,須要咱們本身實現其decode(...)。message-to-message同上面講的byte-to-message的處理機制同樣,看下圖:
看下面的實現代碼:
/** * 將接收的Integer消息轉成String類型,MessageToMessageDecoder實現 * @author c.k * */ public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> { @Override protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
解碼器是用來處理入站數據,Netty提供了不少解碼器的實現,能夠根據需求詳細瞭解。那咱們發送數據須要將數據編碼,Netty中也提供了編碼器的支持。下一節將講解如何實現編碼器。
7.3 編碼器
Netty提供了一些基類,咱們能夠很簡單的編碼器。一樣的,編碼器有下面兩種類型:
消息對象編碼成消息對象
消息對象編碼成字節碼
相對解碼器,編碼器少了一個byte-to-byte的類型,由於出站數據這樣作沒有意義。編碼器的做用就是將處理好的數據轉成字節碼以便在網絡中傳輸。對照上面列出的兩種編碼器類型,Netty也分別提供了兩個抽象類:MessageToByteEncoder和MessageToMessageEncoder。下面是類關係圖:
MessageToByteEncoder是抽象類,咱們自定義一個繼承MessageToByteEncoder的編碼器只須要實現其提供的encode(...)方法。其工做流程以下圖:
實現代碼以下:
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } }
須要將消息編碼成其餘的消息時可使用Netty提供的MessageToMessageEncoder抽象類來實現。例如將Integer編碼成String,其工做流程以下圖:
代碼實現以下:
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
7.4 編解碼器
實際編碼中,通常會將編碼和解碼操做封裝在一個類中,解碼處理「入站」數據,編碼處理「出站」數據。知道了編碼和解碼器,對於下面的狀況不會感受驚訝:
byte-to-message編碼和解碼
message-to-message編碼和解碼
若是肯定須要在ChannelPipeline中使用編碼器和解碼器,須要更好的使用一個抽象的編解碼器。一樣,使用編解碼器的時候,不可能只刪除解碼器或編碼器而離開ChannelPipeline致使某種不一致的狀態。使用編解碼器將強制性的要麼都在ChannelPipeline,要麼都不在ChannelPipeline。
7.4.1 byte-to-byte編解碼器
在Netty4中實現byte-to-byte提供了2個類:ByteArrayEncoder和ByteArrayDecoder。這兩個類用來處理字節到字節的編碼和解碼。
下面是這兩個類的源碼:
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { // copy the ByteBuf content to a byte array byte[] array = new byte[msg.readableBytes()]; msg.getBytes(0, array); out.add(array); } }
@Sharable public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> { @Override protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception { out.add(Unpooled.wrappedBuffer(msg)); }
ByteToMessageCodec用來處理byte-to-message和message-to-byte。若是想要解碼字節消息成POJO或編碼POJO消息成字節,對於這種狀況,ByteToMessageCodec<I>是一個不錯的選擇。ByteToMessageCodec是一種組合,其等同於ByteToMessageDecoder和MessageToByteEncoder的組合。MessageToByteEncoder是個抽象類,其中有2個方法須要咱們本身實現:
encode(ChannelHandlerContext, I, ByteBuf),編碼
decode(ChannelHandlerContext, ByteBuf, List<Object>),解碼
7.4.3 MessageToMessageCodec
MessageToMessageCodec用於message-to-message的編碼和解碼,能夠當作是MessageToMessageDecoder和MessageToMessageEncoder的組合體。MessageToMessageCodec是抽象類,其中有2個方法須要咱們本身實現:
encode(ChannelHandlerContext, OUTBOUND_IN, List<Object>) decode(ChannelHandlerContext, INBOUND_IN, List<Object>)
可是,這種編解碼器能有用嗎?
有許多用例,最多見的就是須要將消息從一個API轉到另外一個API。這種狀況下須要自定義API或舊的API使用另外一種消息類型。下面的代碼顯示了在WebSocket框架APIs之間轉換消息:
package netty.in.action; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; @Sharable public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> { public static final WebSocketConvertHandler INSTANCE = new WebSocketConvertHandler(); @Override protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception { switch (msg.getType()) { case BINARY: out.add(new BinaryWebSocketFrame(msg.getData())); break; case CLOSE: out.add(new CloseWebSocketFrame(true, 0, msg.getData())); break; case PING: out.add(new PingWebSocketFrame(msg.getData())); break; case PONG: out.add(new PongWebSocketFrame(msg.getData())); break; case TEXT: out.add(new TextWebSocketFrame(msg.getData())); break; case CONTINUATION: out.add(new ContinuationWebSocketFrame(msg.getData())); break; default: throw new IllegalStateException("Unsupported websocket msg " + msg); } } @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { if (msg instanceof BinaryWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, msg.content().copy())); return; } if (msg instanceof CloseWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, msg.content().copy())); return; } if (msg instanceof PingWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, msg.content().copy())); return; } if (msg instanceof PongWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, msg.content().copy())); return; } if (msg instanceof TextWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, msg.content().copy())); return; } if (msg instanceof ContinuationWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, msg.content().copy())); return; } throw new IllegalStateException("Unsupported websocket msg " + msg); } public static final class MyWebSocketFrame { public enum FrameType { BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION } private final FrameType type; private final ByteBuf data; public MyWebSocketFrame(FrameType type, ByteBuf data) { this.type = type; this.data = data; } public FrameType getType() { return type; } public ByteBuf getData() { return data; } } }
7.5 其餘編解碼方式
使用編解碼器來充當編碼器和解碼器的組合失去了單獨使用編碼器或解碼器的靈活性,編解碼器是要麼都有要麼都沒有。你可能想知道是否有解決這個僵化問題的方式,還可讓編碼器和解碼器在ChannelPipeline中做爲一個邏輯單元。幸運的是,Netty提供了一種解決方案,使用CombinedChannelDuplexHandler。雖然這個類不是編解碼器API的一部分,可是它常常被用來簡歷一個編解碼器。
7.5.1 CombinedChannelDuplexHandler
如何使用CombinedChannelDuplexHandler來結合解碼器和編碼器呢?下面咱們從兩個簡單的例子看了解
/** * 解碼器,將byte轉成char * @author c.k * */ public class ByteToCharDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while(in.readableBytes() >= 2){ out.add(Character.valueOf(in.readChar())); } } }
/** * 編碼器,將char轉成byte * @author Administrator * */ public class CharToByteEncoder extends MessageToByteEncoder<Character> { @Override protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception { out.writeChar(msg); } }
/** * 繼承CombinedChannelDuplexHandler,用於綁定解碼器和編碼器 * @author c.k * */ public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> { public CharCodec(){ super(new ByteToCharDecoder(), new CharToByteEncoder()); } }
從上面代碼能夠看出,使用CombinedChannelDuplexHandler綁定解碼器和編碼器很容易實現,比使用*Codec更靈活。
Netty還提供了其餘的協議支持,放在io.netty.handler.codec包下,如:
Google的protobuf,在io.netty.handler.codec.protobuf包下
Google的SPDY協議
RTSP(Real Time Streaming Protocol,實時流傳輸協議),在io.netty.handler.codec.rtsp包下
SCTP(Stream Control Transmission Protocol,流控制傳輸協議),在io.netty.handler.codec.sctp包下