若是你有跟進Web技術的最新進展,你極可能就遇到過「實時Web」這個短語,這裏並非指所謂的硬實時服務質量(QoS),硬實時服務質量是保證計算結果將在指定的時間間隔內被遞交。僅HTTP的請求/響應模式設計就使得其很難被支持。html
實時Web利用技術和實踐,使用戶在信息的做者發佈信息以後就可以當即收到信息,而不須要他們或者他們的軟件週期性地檢查信息源以及獲取更新。bootstrap
一、WebSocket簡介瀏覽器
WebSocket協議是徹底從新設計的協議,旨在爲Web上的雙向數據傳輸問題提供一個切實可行的解決方案,使得客戶端和服務器之間能夠在任意時刻傳輸信息,所以,這也就要求他們異步地處理消息回執(做爲HTML5客戶端API的一部分,大部分最新的瀏覽器都已經支持了WebSocket)服務器
Netty對於WebSocket的支持包含了全部正在使用中的主要實現,所以在你的下一個應用程序中採用它將是簡單直接的。和往常使用Netty同樣,你能夠徹底使用該協議,而無需關心它內部的實現細節,咱們將經過建立一個基於WbeSocket的實時聊天應用程序來演示。dom
二、WebSocket示例應用程序異步
爲了讓示例應用程序展現它的實時功能,咱們將經過使用WebSocket協議來實現一個基於瀏覽器的聊天應用程序,就像你可能在FaceBook的文本消息功能中見到過的那樣。咱們將經過使用多個用戶之間能夠同時進行相互通訊,從而更進一步。ide
下圖說明應用邏輯:oop
——客戶端發送一個消息性能
——該消息將被廣播到全部其餘連接的客戶端this
這正如你可能會預期的一個聊天室應當的工做方式:全部的人均可以和其餘的人聊天。在示例中,咱們將只實現服務器端,而客戶端則是經過Web頁面訪問該聊天室的瀏覽器。正如同你將在接下來的幾頁中所看到的,WebSocket簡化了編寫這樣的服務器的過程。
三、添加WebSocket支持
在從標準的HTTP或者HTTPS協議切換到WebSocket時,將會使用一種稱爲升級握手的機制。所以,使用WebSocket的應用程序將始終以HTTP/S做爲開始,而後再執行升級。這個升級動做發生的確切時刻特定於應用程序;他可能會發生在啓動時,也可能會發生在請求了某個特定的URL以後。
咱們的應用程序將採用下面的約定:若是被請求的URL以/ws結尾,那麼咱們將會把該協議升級爲WebSocket;不然,服務器將使用基本的HTTP/S。在鏈接已經升級完成以後,全部數據都將會使用WebSocket進行傳輸。下圖說明了該服務器邏輯,一如在Netty中同樣,它由一組ChannelHandler實現。
四、處理HTTP請求
首先,咱們將實現該處理HTTP請求的組件。這個組件將提供用於訪問聊天室並顯示由鏈接的客戶端發送的消息的網頁。以下代碼給出了這個HttpRequestHandler對應的代碼,其擴展了SimpleChannelInboundHandler以處理FullHttpRequest消息。須要注意是,channelRead0()方法的實現是如何轉發任何目標URI爲/ws的請求的。
//擴展SimpleChannelInboundHandler以處理FullHttpReuqest消息public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest>{ private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain() .getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); }catch (URISyntaxException e){ throw new IllegalStateException("Unable to locate index.html",e); } } public HttpRequestHandler(String wsUri){ this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { //若是請求了WebSocket協議升級,則增長引用技術,並將它傳遞給下一個ChannelInboundHandler if (wsUri.equalsIgnoreCase(request.getUri())){ ctx.fireChannelRead(request.retain()); } else { //處理100Continue請求以符合HTTP1.1規範 if (HttpHeaders.is100ContinueExpected(request)){ send100Continue(ctx); } //讀取「index.html」 RandomAccessFile file = new RandomAccessFile(INDEX,"r"); HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(),HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/html;charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); //若是請求了keep-alive,則添加所須要的HTTP頭信息 if (keepAlive){ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,file.length()); response.headers().set(HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE); } //將HttpResponse寫到客戶端 ctx.write(response); //將index.html寫到客戶端 if (ctx.pipeline().get(SslHandler.class) == null){ ctx.write(new DefaultFileRegion(file.getChannel(),0,file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } //寫LastHttpContent並沖刷至客戶端 ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if (!keepAlive){ //若是沒有請求keep-alive,則在寫操做完成後關閉Channel future.addListener(ChannelFutureListener.CLOSE); } } } private static void send100Continue(ChannelHandlerContext ctx){ FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
若是該HTTP請求指向了地址爲/ws的URI,那麼HttpRequestHandler將調用FullHttpRequest對象上的retain()方法。並經過調用fireChannelRead(msg)方法將它轉發給下一個ChannelInboundHandler。之因此須要調用retain()方法,是由於調用channelRead()方法完成以後,它將調用FullHttpRequest對象上的release()方法以釋放它的資源。
若是客戶端發送了HTTP1.1的HTTP頭信息Expect:100-continue,那麼HttpRequestHandler將會發送一個100Continue響應。在該HTTP頭信息被設置以後,HttpRequestHandler將會寫回一個HttpResponse給客戶端。這不是一個FullHttpResponse,由於它只是響應的第一部分。此外,這裏也不會調用writeAndFlush()方法,在結束的時候纔會調用。
若是不須要加密和壓縮,那麼能夠經過將index.html的內容存儲到DefaultFileRegion中來達到最佳效率。這將會利用零拷貝特性來進行內容的傳輸。爲此,你能夠檢查一下,是否有SslHandler存在於在ChannelPipeline中。不然,你可使用ChunkedNioFile。
HttpRequestHandler將寫一個LastHttpContent來標記響應的結束。若是沒有請求keep-alive,那麼HttpRequestHandler將會添加一個ChannelFutureListener到最後一次寫出動做的ChannelFuture,並關閉該鏈接。在這裏,你將調用writeAndFlush()方法以沖刷全部以前寫入的消息。
這部分代碼表明瞭聊天服務器的第一個部分,它管理純粹的HTTP請求和響應。接下來,咱們將處理傳輸實際聊天消息的WebSocket幀。
WEBSOCKET幀:WebSocket以幀的方式傳輸數據,每一幀表明消息的一部分。一個完整的消息可能會包含許多幀。
五、處理WebSocket幀
有IETF發佈的WebSocket RFC,定義了6種幀,Netty爲它們都提供了一個POJO實現。
BinaryWebSocketFrame——包含了二進制數據
TextWebSocketFrame——包含了文本數據
ContinuationWebSocketFrame——包含屬於上一個BinaryWebSocketFrame或TextWebSocketFrame的文本數據或者二進制數據
CloseWebSocketFrame——表示一個CLOSE請求,包含一個關閉的狀態碼和關閉的緣由
PingWebSocketFrame——請求傳輸一個PongWebSocketFrame
PongWebSocketFrame——做爲一個對於PingWebSocketFrame的響應被髮送
TextWebSocketFrame是咱們惟一真正須要處理的幀類型。爲了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler來處理其餘類型的幀。
如下代碼展現了咱們用於處理TextWebSocketFrame的ChannelInboundHandler,其還將在它的ChannelGroup中跟蹤全部活動的WebSocket鏈接。
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ private final ChannelGroup group; public TextWebSocketFrameHandler(ChannelGroup group){ this.group = group; } //重寫userEventTriggered方法以處理自定義事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){ //若是該事件表示握手成功,則從該ChannelPipeline中移除HttpRequestHandler,由於將不會接收到任何HTTP消息了 ctx.pipeline().remove(HttpRequestHandler.class); //通知全部已經鏈接的WebSocket客戶端新的客戶端鏈接上了 group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined")); //將新的WebSocket Channel添加到ChannelGroup中,以便它能夠接收到全部的消息 group.add(ctx.channel()); } else { super.userEventTriggered(ctx,evt); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception { //增長消息的引用計數,並將它寫到ChannelGroup中全部已經鏈接的客戶端 group.writeAndFlush(msg.retain()); } }
TextWebSocketFrameHandler只有一組很是少許的責任。當和新客戶端的WebSocket握手成功完成以後,它將經過把通知消息寫到ChannelGroup中的全部Channel來通知全部已經鏈接的客戶端,而後它將把這個新Channel加入到該ChannelGroup中。
若是接收到了TextWebSocketFrame消息,TextWebSocketFrameHandler將調用TextWebSocketFrame消息上的retain()方法,並使用writeAndFlush()方法來將它傳輸給ChannelGroup,以便全部已經鏈接的WebSocket Channel都將接收到它。
和以前同樣,對於retain()方法的調用時必需的。由於當ChannelRead0()方法返回時,TextWebSocketFrame的引用技術將會被減小。因爲全部的操做都是異步的,所以,writeAndFlush()方法可能會在channelRead0()方法返回以後完成,並且它絕對不能訪問一個已經失效的引用。
由於Netty在內部處理了大部分剩下的功能,全部如今剩下惟一須要作的事情就是爲每一個新建立的Channel初始化其ChannelPipeline。爲此,咱們須要一個ChannelInitializer。
六、初始化ChannelPipeline
如下代碼展現了生成的ChatServerInitializer。
public class ChatServerInitializer extends ChannelInitializer<Channel>{ private final ChannelGroup group; public ChatServerInitializer(ChannelGroup group) { this.group = group; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler(group)); }
對於initChannel()方法調用,經過安裝全部必須的ChannelHandler來設置該新註冊的Channel的ChannelPipeline。
Netty的WebSocketServerProtocolHandler處理了全部委託管理的WebSocket幀類型以及升級握手自己。若是握手成功,那麼所需的ChannelHandler將會被添加到ChannelPipeline中,而那些再也不須要的ChannelHandler則將會被移除。
WebSocket協議升級以前的ChannelPipeline的狀態以下圖,這表明了剛剛被ChatServerInitializer初始化以後的ChannelPipeline。 圖片當WebSocket協議升級完成以後,WebSocketServerProtocolHandler將會把HttpRequestDecoder替換爲WebSocketFrameDecoder,把HttpResponseEncoder替換爲WebSocketFrameEncoder。爲了性能最大化,它將移除任何再也不被WebSocket鏈接所須要的ChannelHandler。這也包括上圖所示的HttpObjectAggregator和HttpRequestHandler。
下圖展現了這些操做完成以後的ChannelPipeline。須要注意的是,Netty目前支持4個版本的WebSocket協議,他們每一個都具備本身的實現類。Netty將會根據客戶端(這裏指瀏覽器)所支持的版本,自動地選擇正確版本的WebSocketFrameDecoder和WebSocketFrameEncoder。 圖片
七、引導
這幅拼圖最後的一部分是引導該服務器,並安裝ChatSererInitializer的代碼。這將有ChatServer類處理,以下代碼所示。
public class ChatServer { //建立DefaultChannelGroup,其將保存全部已經鏈接的WebSocket Channel private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address){ //引導服務器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(createInitializer(channelGroup)); ChannelFuture future = bootstrap.bind(address); future.syncUninterruptibly(); channel = future.channel(); return future; } //建立ChatServerInitializer protected ChannelInitializer<Channel> createInitializer(ChannelGroup group){ return new ChatServerInitializer(group); } //處理服務器關閉,並釋放全部的資源 public void destroy(){ if (channel != null){ channel.close(); } channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); final ChatServer endpoint = new ChatServer(); ChannelFuture future = endpoint.start( new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
八、如何進行加密
在真實世界的場景中,你將很快就會被要求向該服務器添加加密。使用Netty,這不過是將一個SslHandler添加到ChannelPipeline中,並配置它的問題。如下代碼展現瞭如何經過擴展咱們的ChatServerInitializer來建立一個SecureChatServerInitializer以完成需求。
//擴展ChatServerInitializer以添加加密public class SecureChatServerInitializer extends ChatServerInitializer{ private final SslContext context; public SecureChatServerInitializer(ChannelGroup group,SslContext context) { super(group); this.context = context; } @Override protected void initChannel(Channel channel) throws Exception { //調用父類的initChannel()方法 super.initChannel(channel); SSLEngine engine = context.newEngine(channel.alloc()); engine.setUseClientMode(false); //將SslHandler添加到ChannelPipeline中 channel.pipeline().addFirst(new SslHandler(engine)); } }
最後一步是調整ChatServer以使用SecureChatServerInitializer,以便在ChannelPipeline中安裝SslHandler。
public class SecureChatServer extends ChatServer{ private final SslContext context; public SecureChatServer(SslContext context) { this.context = context; } @Override protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) { //返回以前建立的SecureChatServerInitializer以啓用加密 return new SecureChatServerInitializer(group,context); } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); SelfSignedCertificate cert = new SelfSignedCertificate(); SslContext context = SslContext.newServerContext(cert.certificate(),cert.privateKey()); final SecureChatServer endpoint = new SecureChatServer(context); ChannelFuture future = endpoint.start( new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
這就是爲全部的通訊啓用SSL/TLS加密須要作的所有。