Netty 系列八(基於 WebSocket 的簡單聊天室).

1、前言

    以前寫過一篇 Spring 集成 WebSocket 協議的文章 —— Spring消息之WebSocket ,因此對於 WebSocket 協議的介紹就很少說了,能夠參考這篇文章。這裏只作一些補充說明。另外,Netty 對 WebSocket 協議的支持要比 Spring 好太多了,用起來舒服的多。html

    WebSocket 以幀的方式傳輸數據,每一幀表明消息的一部分。一個完整的消息可能會包含許多幀。java

    由 IETF 發佈的 WebSocket RFC,定義了 6 種幀, Netty 爲它們每種都提供了一個 POJO 實現。下表列出了這些幀類型,並描述了它們的用法。git

2、聊天室功能說明

    一、A、B、C 等全部用戶均可以加入同一個聊天室。github

    二、A 發送的消息,B、C 能夠同時收到,可是 A 收不到本身發送的消息。bootstrap

    三、當用戶長時間沒有發送消息,系統將把他踢出聊天室。瀏覽器

   

3、聊天室功能實現

    一、Netty 版本dom

<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>5.0.0.Alpha2</version>
</dependency>

    二、處理 HTTP 協議的 ChannelHandler —— 非 WebSocket 協議的請求,返回 index.html 頁面socket

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final String wsUri;
    private static File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }


    @Override
    protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // 若是請求了Websocket,協議升級,增長引用計數(調用retain()),並將他傳遞給下一個 ChannelHandler
        // 之因此須要調用 retain() 方法,是由於調用 channelRead() 以後,資源會被 release() 方法釋放掉,須要調用 retain() 保留資源
        if (wsUri.equalsIgnoreCase(request.uri())) {
            ctx.fireChannelRead(request.retain());
        } else {
            //處理 100 Continue 請求以符合 HTTP 1.1 規範
            if (HttpHeaderUtil.is100ContinueExpected(request)) {
                send100Continue(ctx);
            }
            // 讀取 index.html
            RandomAccessFile randomAccessFile = new RandomAccessFile(INDEX, "r");
            HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
            HttpHeaders headers = response.headers();
            //在該 HTTP 頭信息被設置之後,HttpRequestHandler 將會寫回一個 HttpResponse 給客戶端
            headers.set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
            boolean keepAlive = HttpHeaderUtil.isKeepAlive(request);
            if (keepAlive) {
                headers.setLong(HttpHeaderNames.CONTENT_LENGTH, randomAccessFile.length());
                headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
            ctx.write(response);
            //將 index.html 寫給客戶端
            if (ctx.pipeline().get(SslHandler.class) == null) {
                ctx.write(new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length()));
            } else {
                ctx.write(new ChunkedNioFile(randomAccessFile.getChannel()));
            }
            //寫 LastHttpContent 並沖刷至客戶端,標記響應的結束
            ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            if (!keepAlive) {
                channelFuture.addListener(ChannelFutureListener.CLOSE);
            }
        }

    }

    private void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    三、處理 WebSocket 協議的 ChannelHandler —— 處理 TextWebSocketFrame 的消息幀ide

/**
 * WebSocket 幀:WebSocket 以幀的方式傳輸數據,每一幀表明消息的一部分。一個完整的消息可能會包含許多幀
 */
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //增長消息的引用計數(保留消息),並將他寫到 ChannelGroup 中全部已經鏈接的客戶端
        Channel channel = ctx.channel();
        //本身發送的消息不返回給本身
        group.remove(channel);
        group.writeAndFlush(msg.retain());
        group.add(channel);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //是否握手成功,升級爲 Websocket 協議
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            // 握手成功,移除 HttpRequestHandler,所以將不會接收到任何消息
            // 並把握手成功的 Channel 加入到 ChannelGroup 中
            ctx.pipeline().remove(HttpRequestHandler.class);
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
            group.add(ctx.channel());
        } else if (evt instanceof IdleStateEvent) {
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            if (stateEvent.state() == IdleState.READER_IDLE) {
                group.remove(ctx.channel());
                ctx.writeAndFlush(new TextWebSocketFrame("因爲您長時間不在線,系統已自動把你踢下線!")).addListener(ChannelFutureListener.CLOSE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

   WebSocket 協議升級完成以後, WebSocketServerProtocolHandler 將會把 HttpRequestDecoder 替換爲 WebSocketFrameDecoder,把 HttpResponseEncoder 替換爲WebSocketFrameEncoder。爲了性能最大化,它將移除任何再也不被 WebSocket 鏈接所須要的 ChannelHandler。這也包括了 HttpObjectAggregator 和 HttpRequestHandler 。工具

    四、ChatServerInitializer —— 多個 ChannelHandler 合併成 ChannelPipeline 鏈

public class ChatServerInitializer extends ChannelInitializer<Channel> {

    private final ChannelGroup group;
    private static final int READ_IDLE_TIME_OUT = 60; // 讀超時
    private static final int WRITE_IDLE_TIME_OUT = 0;// 寫超時
    private static final int ALL_IDLE_TIME_OUT = 0; // 全部超時

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        // 處理那些不發送到 /ws URI的請求
        pipeline.addLast(new HttpRequestHandler("/ws"));
        // 若是被請求的端點是 "/ws",則處理該升級握手
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // //當鏈接在60秒內沒有接收到消息時,進會觸發一個 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法處理
        pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));
        pipeline.addLast(new TextWebSocketFrameHandler(group));

    }
}
ChatServerInitializer.java

tips:上面這些開箱即用 ChannelHandler 的做用,我就不一一介紹了,能夠參考上一篇文章

    五、引導類 ChatServer

public class ChatServer {

    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChatServerInitializer(channelGroup));
        ChannelFuture channelFuture = bootstrap.bind(address);
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
        return channelFuture;
    }

    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) {
        final ChatServer chatServer = new ChatServer();
        ChannelFuture channelFuture = chatServer.start(new InetSocketAddress(9999));
        // 返回與當前Java應用程序關聯的運行時對象
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                chatServer.destroy();
            }
        });
        channelFuture.channel().closeFuture().syncUninterruptibly();
    }

}
ChatServer.java

3、效果展現

    在瀏覽器中輸入 http://127.0.0.1:9999 便可看到預先準備好的 index.html 頁面;訪問 ws://127.0.0.1:9999/ws (可隨意找一個 WebSocket 測試工具測試)便可加入聊天室。

有點 low 的聊天室總算是完成了,算是 Netty 對 HTTP 協議和 WebSocket 協議的一次實踐吧!雖然功能欠缺,但千里之行,始於足下!不積硅步,無以致千里;不積小流,無以成江海!

 

參考資料:《Netty IN ACTION》

演示源代碼:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/chatroom

相關文章
相關標籤/搜索