以前寫過一篇 Spring 集成 WebSocket 協議的文章 —— Spring消息之WebSocket ,因此對於 WebSocket 協議的介紹就很少說了,能夠參考這篇文章。這裏只作一些補充說明。另外,Netty 對 WebSocket 協議的支持要比 Spring 好太多了,用起來舒服的多。html
WebSocket 以幀的方式傳輸數據,每一幀表明消息的一部分。一個完整的消息可能會包含許多幀。java
由 IETF 發佈的 WebSocket RFC,定義了 6 種幀, Netty 爲它們每種都提供了一個 POJO 實現。下表列出了這些幀類型,並描述了它們的用法。git
一、A、B、C 等全部用戶均可以加入同一個聊天室。github
二、A 發送的消息,B、C 能夠同時收到,可是 A 收不到本身發送的消息。bootstrap
三、當用戶長時間沒有發送消息,系統將把他踢出聊天室。瀏覽器
一、Netty 版本dom
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency>
二、處理 HTTP 協議的 ChannelHandler —— 非 WebSocket 協議的請求,返回 index.html 頁面socket
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String wsUri; private static File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); } catch (URISyntaxException e) { e.printStackTrace(); } } public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } @Override protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 若是請求了Websocket,協議升級,增長引用計數(調用retain()),並將他傳遞給下一個 ChannelHandler // 之因此須要調用 retain() 方法,是由於調用 channelRead() 以後,資源會被 release() 方法釋放掉,須要調用 retain() 保留資源 if (wsUri.equalsIgnoreCase(request.uri())) { ctx.fireChannelRead(request.retain()); } else { //處理 100 Continue 請求以符合 HTTP 1.1 規範 if (HttpHeaderUtil.is100ContinueExpected(request)) { send100Continue(ctx); } // 讀取 index.html RandomAccessFile randomAccessFile = new RandomAccessFile(INDEX, "r"); HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK); HttpHeaders headers = response.headers(); //在該 HTTP 頭信息被設置之後,HttpRequestHandler 將會寫回一個 HttpResponse 給客戶端 headers.set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); boolean keepAlive = HttpHeaderUtil.isKeepAlive(request); if (keepAlive) { headers.setLong(HttpHeaderNames.CONTENT_LENGTH, randomAccessFile.length()); headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } ctx.write(response); //將 index.html 寫給客戶端 if (ctx.pipeline().get(SslHandler.class) == null) { ctx.write(new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length())); } else { ctx.write(new ChunkedNioFile(randomAccessFile.getChannel())); } //寫 LastHttpContent 並沖刷至客戶端,標記響應的結束 ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if (!keepAlive) { channelFuture.addListener(ChannelFutureListener.CLOSE); } } } private void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
三、處理 WebSocket 協議的 ChannelHandler —— 處理 TextWebSocketFrame 的消息幀ide
/** * WebSocket 幀:WebSocket 以幀的方式傳輸數據,每一幀表明消息的一部分。一個完整的消息可能會包含許多幀 */ public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final ChannelGroup group; public TextWebSocketFrameHandler(ChannelGroup group) { this.group = group; } @Override protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //增長消息的引用計數(保留消息),並將他寫到 ChannelGroup 中全部已經鏈接的客戶端 Channel channel = ctx.channel(); //本身發送的消息不返回給本身 group.remove(channel); group.writeAndFlush(msg.retain()); group.add(channel); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //是否握手成功,升級爲 Websocket 協議 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 握手成功,移除 HttpRequestHandler,所以將不會接收到任何消息 // 並把握手成功的 Channel 加入到 ChannelGroup 中 ctx.pipeline().remove(HttpRequestHandler.class); group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined")); group.add(ctx.channel()); } else if (evt instanceof IdleStateEvent) { IdleStateEvent stateEvent = (IdleStateEvent) evt; if (stateEvent.state() == IdleState.READER_IDLE) { group.remove(ctx.channel()); ctx.writeAndFlush(new TextWebSocketFrame("因爲您長時間不在線,系統已自動把你踢下線!")).addListener(ChannelFutureListener.CLOSE); } } else { super.userEventTriggered(ctx, evt); } } }
WebSocket 協議升級完成以後, WebSocketServerProtocolHandler 將會把 HttpRequestDecoder 替換爲 WebSocketFrameDecoder,把 HttpResponseEncoder 替換爲WebSocketFrameEncoder。爲了性能最大化,它將移除任何再也不被 WebSocket 鏈接所須要的 ChannelHandler。這也包括了 HttpObjectAggregator 和 HttpRequestHandler 。工具
四、ChatServerInitializer —— 多個 ChannelHandler 合併成 ChannelPipeline 鏈
public class ChatServerInitializer extends ChannelInitializer<Channel> { private final ChannelGroup group; private static final int READ_IDLE_TIME_OUT = 60; // 讀超時 private static final int WRITE_IDLE_TIME_OUT = 0;// 寫超時 private static final int ALL_IDLE_TIME_OUT = 0; // 全部超時 public ChatServerInitializer(ChannelGroup group) { this.group = group; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // 處理那些不發送到 /ws URI的請求 pipeline.addLast(new HttpRequestHandler("/ws")); // 若是被請求的端點是 "/ws",則處理該升級握手 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // //當鏈接在60秒內沒有接收到消息時,進會觸發一個 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法處理 pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS)); pipeline.addLast(new TextWebSocketFrameHandler(group)); } }
tips:上面這些開箱即用 ChannelHandler 的做用,我就不一一介紹了,能夠參考上一篇文章。
五、引導類 ChatServer
public class ChatServer { private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address) { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer(channelGroup)); ChannelFuture channelFuture = bootstrap.bind(address); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); return channelFuture; } public void destroy() { if (channel != null) { channel.close(); } channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) { final ChatServer chatServer = new ChatServer(); ChannelFuture channelFuture = chatServer.start(new InetSocketAddress(9999)); // 返回與當前Java應用程序關聯的運行時對象 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { chatServer.destroy(); } }); channelFuture.channel().closeFuture().syncUninterruptibly(); } }
在瀏覽器中輸入 http://127.0.0.1:9999 便可看到預先準備好的 index.html 頁面;訪問 ws://127.0.0.1:9999/ws (可隨意找一個 WebSocket 測試工具測試)便可加入聊天室。
有點 low 的聊天室總算是完成了,算是 Netty 對 HTTP 協議和 WebSocket 協議的一次實踐吧!雖然功能欠缺,但千里之行,始於足下!不積硅步,無以致千里;不積小流,無以成江海!
參考資料:《Netty IN ACTION》
演示源代碼:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/chatroom