編解碼器的做用是將原始字節數據與自定義的消息對象進行互轉。編碼器負責處理「出站」數據。java
解碼器web
解碼器負責解碼「入站」數據從一種格式到另外一種格式,解碼器處理入站數據是抽象ChannelInboundHandler的實現。安全
解碼器有三種類型:解碼字節到消息;解碼消息到消息以及解碼消息到字節。服務器
ByteToMessageDecoderwebsocket
decode(ChannelHandlerContext, ByteBuf, List<Object>):這個方法須要本身實現的抽象方法,做用是將ByteBuf數據解碼成其餘形式的數據併發
decodeLast(ChannelHandlerContext, ByteBuf, List<Object>)app
ReplayingDecodersocket
ReplayingDecoder是ByteToMessageDecoder的一種特殊的抽象基類,讀取緩衝區的數據以前須要檢查緩衝區是否有足夠的字節,使用ReplayingDecoder就無需本身檢查,若ByteBuf中有足夠的字節,則正常讀取,不然就是中止解碼。不是全部的操做都被ByteBuf支持,如有一個不支持的就會拋出DecoderException;ByteBuf.readableBytes()大部分不會返回指望值ide
MessageToMessageDecoder大數據
decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Excetopn;
編碼器
編碼器有兩種類型:消息對象編碼成消息對象和消息對象編碼成字節碼
Netty也提供了兩個抽象類:MessageToByteEncoder和MessageToMessageEncoder。須要重寫encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception;
byte-to-byte編解碼器
Netty提供了ByteArrayEncoder和ByteArrayDecoder兩個類。
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf>{ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception{ byte[] array = new byte[msg.readableBytes()]; msg.getBytes(0, array); out.add(array); } } @Sharable public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]>{ protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception{ out.add(Unpooled.wrappedBuffer(msg)); } }
ByteToMessageCodec
ByteToMessageCodec用來處理byte-to-message和message-to-byte。ByteToMessageCodec是一種組合,等同於ByteToMessageDecoder和MessageToByteEncoder的組合。MessageToByteEncoder有兩個抽象方法:
encode(ChannelHandlerContext, I, ByteBuf) //編碼
decode(ChannelHandlerContext, ByteBuf, List<Object>) //解碼
MessageToMessageCodec
MessageToMessageCodec用於message-to-message的編碼和解碼。能夠當作是MessageToMessageDecoder和MessageToMessageEncoder的組合體。MessageToMessageCodec有兩個抽象方法:
encode(ChannelHandlerContext, OUTBOUNG_IN, List<Object>)
decode(ChannelHandlerContext, INBOUND_IN, List<Object>)
CombinedChannelDuplexHandler
自定義編碼器和解碼器
public class CharCodec extends CombinedChannelDuplexHandler<Decoder, Encoder>{ public CharCodec(){ super(new Decoder(), new Encoder()); } }
使用SSL/TLS建立安全的netty程序
java提供了SslContext和SslEngine支持SSL/TLS。Netty提供了SslHandler,它擴展了Java的SslEngine。
public class SslChannelInitializer extends ChannelInitializer<Channel>{ private final SSLContext context; private final boolean client; private final boolean startTls; public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) { this.context = context; this.client = client; this.startTls = startTls; } protected void initChannel(Channel ch) throws Exception{ SSLEngine engine = context.createSSLEngine(); engine.setUserClientMode(client); ch.pipeline().addFirst("ssl", new SslHander(engine, startTls)); } }
SslHandler必需要添加到ChannelPipeline的第一個位置。
SSL/TLS的方法:
setHandshakeTimeout(long handshakeTimeout, TimeUnit unit):設置握手超時時間,ChannelFuture將獲得通知
setHandshakeTimeoutMillis(long handshakeTimeoutMillis):設置握手超時時間,ChannelFuture將會獲得通知
getHandshakeTimeoutMillis():獲取握手超時時間值
setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit):設置關閉通知超時事件,若超時,ChannelFuture會關閉失敗
setHandshakeTimeoutMillis(long handshakeTimeoutMillis):設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗
getCloseNotifyTimeoutMillis():獲取關閉通知超時時間
handshakeFuture():返回完成握手後的ChannelFuture
close():發送關閉通知請求關閉和銷燬
一個HTTP請求/響應消息可能不止一個,但最終都會包含LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的兩個接口,分別用來完成http請求和響應。
Netty提供了HTTP請求和響應的編碼器和解碼器:
HttpRequestEncoder:將HttpRequest或HttpContent編碼成ByteBuf
HttpRequestDecoder:將ByteBuf解碼成HttpRequest和HttpContent
HttpResponseEncoder:將HttpResponse或HttpContent編碼成ByteBuf
HttpResponseDecoder:將ByteBuf解碼成HttpResponse和HttpContent
HTTP消息聚合
處理HTTP時可能接受HTTP消息片斷,Netty須要緩衝直到接受完整個消息。要處理HTTP消息,Netty提供了HttpObjectAggregator。經過HttpObjectAggregator,Netty能夠聚合HTPP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一個ChannelHandler,這樣就消除了斷裂消息,保證了消息的完整性。
HTTP壓縮
Netty支持「gzip」和「deflate」。
protected void intiChannel(Channel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); if(client){ pipeline.addLast("codec", new HttpClientCodec()); pipeline.addLast("decompressor", new HttpContentDecompressor()); }else{ pipeline.addLast("codec", new HttpServerCodec()); pipeline.addLast("decompressor", new HttpContentDecompressor()); } pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); }
使用HTTPS
public class HttpsCodecInitializer extends ChannelInitializer<Channel>{ private final SSLContext context; private final boolean client; public HttpsCodecInitializer(SSLContext context, boolean client){ this.context = context; this.client = client; } protected void initChannel(Channel ch) throws Exception{ SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(client); ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("ssl", new SslHandler(engine)); if(client) pipeline.addLast("codec", new HttpClientCodec()); else pipeline.addLast("codec", new HttpServerCodec()); } }
WebSocket
Netty經過ChannelHandler對WebSocket進行支持。Netty支持以下WebSocket:
BinaryWebSocketFrame:包含二進制數據
TextWebSocketFrame:包含文本數據
ContinuationWebSocketFrame:包含二進制數據或文本數據
CloseWebSocketFrame:表明一個關閉請求,包含關閉狀態碼和短語
PingWebSocketFrame:WebSocketFrame要求PongWebSocketFrame發送數據
PongWebSocketFrame:WebSocketFrame要求PingWebSocketFrame響應
public class WebSocketServerInitializer extends ChannelInitializer<Channel>{ protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler()); } public static final class TextFrameHanlder extends SimpleChannelInboundHandler<TextWebSocketFrame>{ protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{ } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{ protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception{ } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{ protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception{ } } }
SPDY
SPDY是Google開發的基於TCP的應用層協議。SPDY的定位:將頁面加載時間減小50%;最大限度減小部署的複雜性;避免網站開發者改動內容
SPDY技術實現:單個TCP鏈接支持併發的HTTP請求;壓縮包頭和去掉沒必要要的頭部來減小當前HTTP使用的帶寬;定義一個易實現,在服務器端高效的協議。經過減小邊緣狀況,定義易解析的消息格式來減小HTTP的複雜性;強制使用SSL;容許服務器在須要時發起對客戶端的鏈接並推送數據。
Netty用三種不一樣的ChannelHanlder處理閒置和超時鏈接:
IdeStateHandler:當一個通道沒有進行讀寫或運行了一段時間後發出IdleStateEvent
ReadTimeoutHandler:在指定時間內沒有接收到任何數據將拋出ReadTimeoutException
WriteTimeoutHandler:在指定時間內寫入數據將拋出WriteTimeoutException
public class IdleStateHandlerInitializer extends ChannelInitialzier<Channel>{ protected void initChannel(Channel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{ private static final ByteBuf HEARTBEAT_SEQUEUE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ if(evt instanceof IdleStateEvent) ctx.writeAndFlush(HEARTBEAT_SEQUEUE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); else super.userEventTriggered(ctx, evt); } } }
解碼分隔符和基於長度的協議
Netty提供兩個類用於提取序列分割:
DelimiterBasedFrameDecoder:接收ByteBuf由一個或多個分隔符拆分
LineBasedFrameDecoder:接收ByteBuf以分隔線結束,如\n, \r\n
public class LineBaseHandlerInitializer extends ChannelInitializer<Channel>{ protected void initChannel(Channel ch) throw Exception{ ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1024), new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{ } } }
Netty提供了兩個以長度爲單位的解碼器:FixedLengthFrameDecoder和LengthFieldBasedFrameDecoder。
public class LengthBasedInitializer extends ChannelInitializer<Channel>{ protected void intiChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new LengthFileBasedFrameDecoder(65 * 1024, 0, 8)).addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{ } } }
寫大數據
Netty使用零拷貝寫文件內容時經過DefaultFileRegion,ChannelHandlerContext和ChannelPipeline。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ File file = new File("test.txt"); FileInputStream fis = new FileInputStream(file); FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length()); Channel channel = ctx.channel(); channel.writeAndFlush(region).addListener(new ChannelFutureListener(){ protected void operationComplelte(ChannelFuture future) throws Exception{ if(!future.isSuccess()) Throwable cause = future.cause(); } }); }
Netty提供了ChunkedWriteHandler,容許經過處理ChunkedInput來寫大的數據塊。ChunkedFile,ChunkedNioFile,ChunkedStream和ChunkedNioStream實現了ChunkedInput。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{ private final File file; public ChunkedWriteHandlerInitializer(File file){ this.file = file; } protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{ public void channelActive(ChannelHandlerContext ctx) throws Exception{ super.channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } } }
序列化
Java提供了ObjectInputStream和ObjectOutputStream等序列化接口。
io.netty.handler.codec.serialization提供了以下接口:CompatibleObjectEncoder,CompactObjectInputStream,CompactObjectOutputStream,ObjectEncoder,ObjectDecoder,ObjectEncoderOutputStream和ObjectDecoderInputStream。
JBoss Marshalling序列化的速度是JDK的3倍且序列化的結構更緊湊。
io.netty.handler.codec.marshalling提供以下接口:CompatibleMarshallingEncoder,CompatibleMarshallingDecoder,MarshallingEncoder和MarshallingDecoder。
pubilc class MarshallingInitializer extends ChannelInitializer<Channel>{ private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider){ this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider)).addLast(new MarshallingEncoder(marshallerProvider)).addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHander<Serializable>{ protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{ } } }
ProtoBuf是Google開源的一種編碼和解碼技術,做用是使序列化數據更高效。
io.netty.handler.codec.protobuf提供了:ProtoBufDecoder,ProtobufEncoder,ProtobufVarint32FrameDecoder和ProtibufVarint32LengthFieldPrepender
public class ProtoBufInitializer extends ChannelInitializer<Channel>{ private final MessageLite lite; public ProtoBufInitializer(MessageLite lite){ this.lite = lite; } protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufEncoder()).addLast(new ProtobufDecoder(lite)).addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{ protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{ } } }