本章介紹java
使用SSL/TLS建立安全的Netty程序web
使用Netty建立HTTP/HTTPS程序安全
處理空閒鏈接和超時服務器
解碼分隔符和基於長度的協議websocket
寫大數據網絡
序列化數據併發
上一章講解了如何建立本身的編解碼器,咱們如今能夠用上一章的知識來編寫本身的編解碼器。不過Netty提供了一些標準的ChannelHandler和Codec。框架
Netty提供的這些實現能夠解決咱們的大部分需求。本章講解Netty中使用SSL/TLS編寫安全的應用程序,編寫HTTP協議服務器,以及使用如WebSocket或Google的SPDY協議來使HTTP服務得到更好的性能;這些都是很常見的應用,本章還會介紹數據壓縮,在數據量比較大的時候,壓縮數據是頗有必要的。異步
8.1 使用SSL/TLS建立安全的Netty程序socket
SSL和TLS是衆所周知的標準和分層的協議,它們能夠確保數據時私有的。例如,使用HTTPS或SMTPS都使用了SSL/TLS對數據進行了加密。
對於SSL/TLS,Java中提供了抽象的SslContext和SslEngine。實際上,SslContext能夠用來獲取SslEngine來進行加密和解密。使用指定的加密技術是高度可配置的。
Netty擴展了Java的SslEngine,添加了一些新功能,使其更適合基於Netty的應用程序。Netty提供的這個擴展是SslHandler,是SslEngine的包裝類,用來對網絡數據進行加密和解密。
下圖顯示SslHandler實現的數據流:
上圖顯示瞭如何使用ChannelInitializer將SslHandler添加到ChannelPipeline,看下面代碼:
public class SslChannelInitializer extends ChannelInitializer<Channel> { private final SSLContext context; private final boolean client; private final boolean startTls; public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) { this.context = context; this.client = client; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(client); ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls)); } }
SslHandler必需要添加到ChannelPipeline的第一個位置,可能有一些例外,可是最好這樣來作。
ChannelPipeline就像是一個在處理「入站」數據時先進先出,在處理「出站」數據時後進先出的隊列。最早添加的SslHandler會啊在其餘Handler處理邏輯數據以前對數據進行加密,從而確保Netty服務端的全部的Handler的變化都是安全的。
SslHandler提供了一些有用的方法,能夠用來修改其行爲或獲得通知,一旦SSL/TLS完成握手(在握手過程當中的兩個對等通道互相驗證對方,而後選擇一個加密密碼),SSL/TLS是自動執行的。看下面方法列表:
setHandshakeTimeout(long handshakeTimeout, TimeUnit unit),設置握手超時時間,ChannelFuture將獲得通知 setHandshakeTimeoutMillis(long handshakeTimeoutMillis),設置握手超時時間,ChannelFuture將獲得通知 getHandshakeTimeoutMillis(),獲取握手超時時間值 setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit),設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗 setHandshakeTimeoutMillis(long handshakeTimeoutMillis),設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗 getCloseNotifyTimeoutMillis(),獲取關閉通知超時時間 handshakeFuture(),返回完成握手後的ChannelFuture close(),發送關閉通知請求關閉和銷燬
8.2 使用Netty建立HTTP/HTTPS程序
HTTP/HTTPS是最經常使用的協議之一,能夠經過HTTP/HTTPS訪問網站,或者是提供對外公開的接口服務等等。Netty附帶了使用HTTP/HTTPS的handlers,而不須要咱們本身來編寫編解碼器。
8.2.1 Netty的HTTP編碼器,解碼器和編解碼器
HTTP是請求-響應模式,客戶端發送一個http請求,服務就響應此請求。Netty提供了簡單的編碼解碼HTTP協議消息的Handler。下圖顯示了http請求和響應:
如上面兩個圖所示,一個HTTP請求/響應消息可能包含不止一個,但最終都會有LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的兩個接口,分別用來完成http請求和響應。全部的HTTP消息類型都實現了HttpObject接口。下面是類關係圖:
Netty提供了HTTP請求和響應的編碼器和解碼器,看下面列表:
HttpRequestEncoder,將HttpRequest或HttpContent編碼成ByteBuf
HttpRequestDecoder,將ByteBuf解碼成HttpRequest和HttpContent
HttpResponseEncoder,將HttpResponse或HttpContent編碼成ByteBuf
HttpResponseDecoder,將ByteBuf解碼成HttpResponse和HttpContent
看下面代碼:
public class HttpDecoderEncoderInitializer extends ChannelInitializer<Channel> { private final boolean client; public HttpDecoderEncoderInitializer(boolean client) { this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client) { pipeline.addLast("decoder", new HttpResponseDecoder()); pipeline.addLast("", new HttpRequestEncoder()); } else { pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("encoder", new HttpResponseEncoder()); } } }
若是你須要在ChannelPipeline中有一個解碼器和編碼器,還分別有一個在客戶端和服務器簡單的編解碼器:HttpClientCodec和HttpServerCodec。
在ChannelPipeline中有解碼器和編碼器(或編解碼器)後就能夠操做不一樣的HttpObject消息了;可是HTTP請求和響應能夠有不少消息數據,你須要處理不一樣的部分,可能也須要聚合這些消息數據,這是很麻煩的。爲了解決這個問題,Netty提供了一個聚合器,它將消息部分合併到FullHttpRequest和FullHttpResponse,所以不須要擔憂接收碎片消息數據
處理HTTP時可能接收HTTP消息片斷,Netty須要緩衝直到接收完整個消息。要完成的處理HTTP消息,而且內存開銷也不會很大,Netty爲此提供了HttpObjectAggregator。經過HttpObjectAggregator,Netty能夠聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一個ChannelHandler,這就消除了斷裂消息,保證了消息的完整。下面代碼顯示瞭如何聚合:
/** * 添加聚合http消息的Handler */ public class HttpAggregatorInitializer extends ChannelInitializer<Channel> { private final boolean client; public HttpAggregatorInitializer(boolean client) { this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client) { pipeline.addLast("codec", new HttpClientCodec()); } else { pipeline.addLast("codec", new HttpServerCodec()); } pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); } }
如上面代碼,很容使用Netty自動聚合消息。可是請注意,爲了防止Dos攻擊服務器,須要合理的限制消息的大小。應設置多大取決於實際的需求,固然也得有足夠的內存可用。
8.2.3 HTTP壓縮
使用HTTP時建議壓縮數據以減小傳輸流量,壓縮數據會增長CPU負載,如今的硬件設施都很強大,大多數時候壓縮數據時一個好主意。Netty支持「gzip」和「deflate」,爲此提供了兩個ChannelHandler實現分別用於壓縮和解壓。看下面代碼:
@Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client) { pipeline.addLast("codec", new HttpClientCodec()); //添加解壓縮Handler pipeline.addLast("decompressor", new HttpContentDecompressor()); } else { pipeline.addLast("codec", new HttpServerCodec()); //添加解壓縮Handler pipeline.addLast("decompressor", new HttpContentDecompressor()); } pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); }
網絡中傳輸的重要數據須要加密來保護,使用Netty提供的SslHandler能夠很容易實現,看下面代碼:
/** * 使用SSL對HTTP消息加密 */ public class HttpsCodecInitializer extends ChannelInitializer<Channel> { private final SSLContext context; private final boolean client; public HttpsCodecInitializer(SSLContext context, boolean client) { this.context = context; this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(client); ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("ssl", new SslHandler(engine)); if (client) { pipeline.addLast("codec", new HttpClientCodec()); } else { pipeline.addLast("codec", new HttpServerCodec()); } } }
HTTP是不錯的協議,可是若是須要實時發佈信息怎麼作?有個作法就是客戶端一直輪詢請求服務器,這種方式雖然能夠達到目的,可是其缺點不少,也不是優秀的解決方案,爲了解決這個問題,便出現了WebSocket。
WebSocket容許數據雙向傳輸,而不須要請求-響應模式。早期的WebSocket只能發送文本數據,而後如今不只能夠發送文本數據,也能夠發送二進制數據,這使得可使用WebSocket構建你想要的程序。下圖是WebSocket的通訊示例圖:
在應用程序中添加WebSocket支持很容易,Netty附帶了WebSocket的支持,經過ChannelHandler來實現。使用WebSocket有不一樣的消息類型須要處理。下面列表列出了Netty中WebSocket類型:
BinaryWebSocketFrame,包含二進制數據 TextWebSocketFrame,包含文本數據 ContinuationWebSocketFrame,包含二進制數據或文本數據,BinaryWebSocketFrame和TextWebSocketFrame的結合體 CloseWebSocketFrame,WebSocketFrame表明一個關閉請求,包含關閉狀態碼和短語 PingWebSocketFrame,WebSocketFrame要求PongWebSocketFrame發送數據 PongWebSocketFrame,WebSocketFrame要求PingWebSocketFrame響應
爲了簡化,咱們只看看如何使用WebSocket服務器。客戶端使用能夠看Netty自帶的WebSocket例子。
Netty提供了許多方法來使用WebSocket,但最簡單經常使用的方法是使用WebSocketServerProtocolHandler。看下面代碼:
/** * WebSocket Server,若想使用SSL加密,將SslHandler加載ChannelPipeline的最前面便可 */ public class WebSocketServerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler()); } public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // handler text frame } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception { //handler binary frame } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { //handler continuation frame } } }
SPDY(讀做「SPeeDY」)是Google開發的基於TCP的應用層協議,用以最小化網絡延遲,提高網絡速度,優化用戶的網絡使用體驗。SPDY並非一種用於替代HTTP的協議,而是對HTTP協議的加強。新協議的功能包括數據流的多路複用、請求優先級以及HTTP報頭壓縮。谷歌表示,引入SPDY協議後,在實驗室測試中頁面加載速度比原先快64%。
SPDY的定位:
將頁面加載時間減小50%。
最大限度地減小部署的複雜性。SPDY使用TCP做爲傳輸層,所以無需改變現有的網絡設施。
避免網站開發者改動內容。 支持SPDY惟一須要變化的是客戶端代理和Web服務器應用程序。
SPDY實現技術:
單個TCP鏈接支持併發的HTTP請求。
壓縮報頭和去掉沒必要要的頭部來減小當前HTTP使用的帶寬。
定義一個容易實現,在服務器端高效率的協議。經過減小邊緣狀況、定義易解析的消息格式來減小HTTP的複雜性。
強制使用SSL,讓SSL協議在現存的網絡設施下有更好的安全性和兼容性。
容許服務器在須要時發起對客戶端的鏈接並推送數據。
8.3 處理空閒鏈接和超時
處理空閒鏈接和超時是網絡應用程序的核心部分。當發送一條消息後,能夠檢測鏈接是否還處於活躍狀態,若很長時間沒用了就能夠斷開鏈接。Netty提供了很好的解決方案,有三種不一樣的ChannelHandler處理閒置和超時鏈接:
IdleStateHandler,當一個通道沒有進行讀寫或運行了一段時間後出發IdleStateEvent
ReadTimeoutHandler,在指定時間內沒有接收到任何數據將拋出ReadTimeoutException
WriteTimeoutHandler,在指定時間內有寫入數據將拋出WriteTimeoutException
最經常使用的是IdleStateHandler,下面代碼顯示瞭如何使用IdleStateHandler,若是60秒內沒有接收數據或發送數據,操做將失敗,鏈接將關閉:
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer( "HEARTBEAT", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } }
8.4 解碼分隔符和基於長度的協議
使用Netty時會遇到須要解碼以分隔符和長度爲基礎的協議,本節講解Netty如何解碼這些協議。
常常須要處理分隔符協議或建立基於它們的協議,例如SMTP、POP三、IMAP、Telnet等等;Netty附帶的handlers能夠很容易的提取一些序列分隔:
DelimiterBasedFrameDecoder,解碼器,接收ByteBuf由一個或多個分隔符拆分,如NUL或換行符
LineBasedFrameDecoder,解碼器,接收ByteBuf以分割線結束,如"\n"和"\r\n"
下圖顯示了使用"\r\n"分隔符的處理
下面代碼顯示使用LineBasedFrameDecoder提取"\r\n"分隔幀:
/** * 處理換行分隔符消息 * @author c.k * */ public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // do something with the frame } } }
若是框架的東西除了換行符還有別的分隔符,可使用DelimiterBasedFrameDecoder,只須要將分隔符傳遞到構造方法中。若是想實現本身的以分隔符爲基礎的協議,這些解碼器是有用的。例如,如今有個協議,它只處理命令,這些命令由名稱和參數造成,名稱和參數由一個空格分隔,實現這個需求的代碼以下:
/** * 自定義以分隔符爲基礎的協議 */ public class CmdHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new CmdDecoder(65 * 1024), new CmdHandler()); } public static final class Cmd { private final ByteBuf name; private final ByteBuf args; public Cmd(ByteBuf name, ByteBuf args) { this.name = name; this.args = args; } public ByteBuf getName() { return name; } public ByteBuf getArgs() { return args; } } public static final class CmdDecoder extends LineBasedFrameDecoder { public CmdDecoder(int maxLength) { super(maxLength); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); if (frame == null) { return null; } int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' '); return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex())); } } public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> { @Override protected void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception { // do something with the command } } }
通常常常會碰到以長度爲基礎的協議,對於這種狀況Netty有兩個不一樣的解碼器能夠幫助咱們來解碼:
FixedLengthFrameDecoder
LengthFieldBasedFrameDecoder
下圖顯示了FixedLengthFrameDecoder的處理流程:
如上圖所示,FixedLengthFrameDecoder提取固定長度,例子中的是8字節。大部分時候幀的大小被編碼在頭部,這種狀況可使用LengthFieldBasedFrameDecoder,它會讀取頭部長度並提取幀的長度。下圖顯示了它是如何工做的:
若是長度字段是提取框架的一部分,能夠在LengthFieldBasedFrameDecoder的構造方法中配置,還能夠指定提供的長度。FixedLengthFrameDecoder很容易使用,咱們重點講解LengthFieldBasedFrameDecoder。下面代碼顯示如何使用LengthFieldBasedFrameDecoder提取8字節長度:
public class LengthBasedInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65*1024, 0, 8)) .addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { //do something with the frame } } }
8.5 寫大數據
寫大量的數據的一個有效的方法是使用異步框架,若是內存和網絡都處於飽滿負荷狀態,你須要中止寫,不然會報OutOfMemoryError。Netty提供了寫文件內容時zero-memory-copy機制,這種方法再將文件內容寫到網絡堆棧空間時能夠得到最大的性能。使用零拷貝寫文件的內容時經過DefaultFileRegion、ChannelHandlerContext、ChannelPipeline,看下面代碼:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { File file = new File("test.txt"); FileInputStream fis = new FileInputStream(file); FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length()); Channel channel = ctx.channel(); channel.writeAndFlush(region).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(!future.isSuccess()){ Throwable cause = future.cause(); // do something } } }); }
若是隻想發送文件中指定的數據塊應該怎麼作呢?Netty提供了ChunkedWriteHandler,容許經過處理ChunkedInput來寫大的數據塊。下面是ChunkedInput的一些實現類:
ChunkedFile
ChunkedNioFile
ChunkedStream
ChunkedNioStream
看下面代碼:
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> { private final File file; public ChunkedWriteHandlerInitializer(File file) { this.file = file; } @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ChunkedWriteHandler()) .addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } } }
8.6 序列化數據
開發網絡程序過程當中,不少時候須要傳輸結構化對象數據POJO,Java中提供了ObjectInputStream和ObjectOutputStream及其餘的一些對象序列化接口。Netty中提供基於JDK序列化接口的序列化接口。
8.6.1 普通的JDK序列化
若是你使用ObjectInputStream和ObjectOutputStream,而且須要保持兼容性,不想有外部依賴,那麼JDK的序列化是首選。Netty提供了下面的一些接口,這些接口放在io.netty.handler.codec.serialization包下面:
CompatibleObjectEncoder
CompactObjectInputStream
CompactObjectOutputStream
ObjectEncoder
ObjectDecoder
ObjectEncoderOutputStream
ObjectDecoderInputStream
8.6.2 經過JBoss編組序列化
若是你想使用外部依賴的接口,JBoss編組是個好方法。JBoss Marshalling序列化的速度是JDK的3倍,而且序列化的結構更緊湊,從而使序列化後的數據更小。Netty附帶了JBoss編組序列化的實現,這些實現接口放在io.netty.handler.codec.marshalling包下面:
CompatibleMarshallingEncoder
CompatibleMarshallingDecoder
MarshallingEncoder
MarshallingDecoder
看下面代碼:
/** * 使用JBoss Marshalling */ public class MarshallingInitializer extends ChannelInitializer<Channel> { private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider)) .addLast(new MarshallingEncoder(marshallerProvider)) .addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> { @Override protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception { // do something } } }
最有一個序列化方案是Netty附帶的ProtoBuf。protobuf是Google開源的一種編碼和解碼技術,它的做用是使序列化數據更高效。而且谷歌提供了protobuf的不一樣語言的實現,因此protobuf在跨平臺項目中是很是好的選擇。Netty附帶的protobuf放在io.netty.handler.codec.protobuf包下面:
ProtobufDecoder
ProtobufEncoder
ProtobufVarint32FrameDecoder
ProtobufVarint32LengthFieldPrepender
看下面代碼:
/** * 使用protobuf序列化數據,進行編碼解碼 * 注意:使用protobuf須要protobuf-java-2.5.0.jar */ public class ProtoBufInitializer extends ChannelInitializer<Channel> { private final MessageLite lite; public ProtoBufInitializer(MessageLite lite) { this.lite = lite; } @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufEncoder()) .addLast(new ProtobufDecoder(lite)) .addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> { @Override protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception { // do something } } }