Netty 實現 WebSocket 聊天功能

上一次咱們用Netty快速實現了一個 Java 聊天程序(見http://www.waylau.com/netty-chat/)。如今,咱們要作下修改,加入 WebSocket 的支持,使它能夠在瀏覽器裏進行文本聊天。javascript

準備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

WebSocket

WebSocket 經過「Upgrade handshake(升級握手)」從標準的 HTTP 或HTTPS 協議轉爲 WebSocket。所以,使用 WebSocket 的應用程序將始終以 HTTP/S 開始,而後進行升級。在何時發生這種狀況取決於具體的應用;它能夠是在啓動時,或當一個特定的 URL 被請求時。html

在咱們的應用中,當 URL 請求以「/ws」結束時,咱們才升級協議爲WebSocket。不然,服務器將使用基本的 HTTP/S。一旦升級鏈接將使用的WebSocket 傳輸全部數據。java

整個服務器邏輯以下:git

1.客戶端/用戶鏈接到服務器並加入聊天github

2.HTTP 請求頁面或 WebSocket 升級握手web

3.服務器處理全部客戶端/用戶bootstrap

4.響應 URI 「/」的請求,轉到默認 html 頁面api

5.若是訪問的是 URI「/ws」 ,處理 WebSocket 升級握手瀏覽器

6.升級握手完成後 ,經過 WebSocket 發送聊天消息服務器

服務端

讓咱們從處理 HTTP 請求的實現開始。

處理 HTTP 請求

HttpRequestHandler.java

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "WebsocketChatClient.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain());                  //2
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);                               //3
            }

            RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4

            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

            boolean keepAlive = HttpHeaders.isKeepAlive(request);

            if (keepAlive) {                                        //5
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);                    //6

            if (ctx.pipeline().get(SslHandler.class) == null) {     //7
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);           //8
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);        //9
            }

            file.close();
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"異常");
        // 當出現異常就關閉鏈接
        cause.printStackTrace();
        ctx.close();
    }
}

1.擴展 SimpleChannelInboundHandler 用於處理 FullHttpRequest信息

2.若是請求是 WebSocket 升級,遞增引用計數器(保留)而且將它傳遞給在 ChannelPipeline 中的下個 ChannelInboundHandler

3.處理符合 HTTP 1.1的 "100 Continue" 請求

4.讀取默認的 WebsocketChatClient.html 頁面

5.判斷 keepalive 是否在請求頭裏面

6.寫 HttpResponse 到客戶端

7.寫 index.html 到客戶端,判斷 SslHandler 是否在 ChannelPipeline 來決定是使用 DefaultFileRegion 仍是 ChunkedNioFile

8.寫並刷新 LastHttpContent 到客戶端,標記響應完成

9.若是 keepalive 沒有要求,當寫完成時,關閉 ChannelHttpRequestHandler 作了下面幾件事;

  • 若是該 HTTP 請求被髮送到URI 「/ws」,調用 FullHttpRequest 上的 retain(),並經過調用 fireChannelRead(msg) 轉發到下一個 ChannelInboundHandler。retain() 是必要的,由於 channelRead() 完成後,它會調用 FullHttpRequest 上的 release() 來釋放其資源。 (請參考咱們先前的 SimpleChannelInboundHandler 在第6章中討論)
  • 若是客戶端發送的 HTTP 1.1 頭是「Expect: 100-continue」 ,將發送「100 Continue」的響應。
  • 在 頭被設置後,寫一個 HttpResponse 返回給客戶端。注意,這是否是 FullHttpResponse,惟一的反應的第一部分。此外,咱們不使用 writeAndFlush() 在這裏 - 這個是在最後完成。
  • 若是沒有加密也不壓縮,要達到最大的效率能夠是經過存儲 index.html 的內容在一個 DefaultFileRegion 實現。這將利用零拷貝來執行傳輸。出於這個緣由,咱們檢查,看看是否有一個 SslHandler 在 ChannelPipeline 中。另外,咱們使用 ChunkedNioFile。
  • 寫 LastHttpContent 來標記響應的結束,並終止它
  • 若是不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 對象的最後寫入,並關閉鏈接。注意,這裏咱們調用 writeAndFlush() 來刷新全部之前寫的信息。

處理 WebSocket frame

WebSockets 在「幀」裏面來發送數據,其中每個都表明了一個消息的一部分。一個完整的消息能夠利用了多個幀。 WebSocket "Request for Comments" (RFC) 定義了六中不一樣的 frame; Netty 給他們每一個都提供了一個 POJO 實現 ,而咱們的程序只須要使用下面4個幀類型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

在這裏咱們只須要顯示處理 TextWebSocketFrame,其餘的會由 WebSocketServerProtocolHandler 自動處理。

下面代碼展現了 ChannelInboundHandler 處理 TextWebSocketFrame,同時也將跟蹤在 ChannelGroup 中全部活動的 WebSocket 鏈接

TextWebSocketFrameHandler.java

public class TextWebSocketFrameHandler extends
        SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
            TextWebSocketFrame msg) throws Exception { // (1)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
            } else {
                channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
        }
        channels.add(ctx.channel());
        System.out.println("Client:"+incoming.remoteAddress() +"加入");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開"));
        }
        System.out.println("Client:"+incoming.remoteAddress() +"離開");
        channels.remove(ctx.channel());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"在線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"掉線");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"異常");
        // 當出現異常就關閉鏈接
        cause.printStackTrace();
        ctx.close();
    }

}

1.TextWebSocketFrameHandler 繼承自 SimpleChannelInboundHandler,這個類實現了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,而後你能夠覆蓋這些方法。如今僅僅只須要繼承 SimpleChannelInboundHandler 類而不是你本身去實現接口方法。

2.覆蓋了 handlerAdded() 事件處理方法。每當從服務端收到新的客戶端鏈接時,客戶端的 Channel 存入ChannelGroup列表中,並通知列表中的其餘客戶端 Channel

3.覆蓋了 handlerRemoved() 事件處理方法。每當從服務端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,並通知列表中的其餘客戶端 Channel

4.覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入信息時,將信息轉發給其餘客戶端的 Channel。其中若是你使用的是 Netty 5.x 版本時,須要把 channelRead0() 重命名爲messageReceived()

5.覆蓋了 channelActive() 事件處理方法。服務端監聽到客戶端活動

6.覆蓋了 channelInactive() 事件處理方法。服務端監聽到客戶端不活動

7.exceptionCaught() 事件處理方法是當出現 Throwable 對象纔會被調用,即當 Netty 因爲 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分狀況下,捕獲的異常應該被記錄下來而且把關聯的 channel 給關閉掉。然而這個方法的處理方式會在遇到不一樣異常的狀況下有不一樣的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。

上面顯示了 TextWebSocketFrameHandler 僅做了幾件事:

  • 當WebSocket 與新客戶端已成功握手完成,經過寫入信息到 ChannelGroup 中的 Channel 來通知全部鏈接的客戶端,而後添加新 Channel 到 ChannelGroup
  • 若是接收到 TextWebSocketFrame,調用 retain() ,並將其寫、刷新到 ChannelGroup,使全部鏈接的 WebSocket Channel 都能接收到它。和之前同樣,retain() 是必需的,由於當 channelRead0()返回時,TextWebSocketFrame 的引用計數將遞減。因爲全部操做都是異步的,writeAndFlush() 可能會在之後完成,咱們不但願它來訪問無效的引用。

因爲 Netty 處理了其他大部分功能,惟一剩下的咱們如今要作的是初始化 ChannelPipeline 給每個建立的新的 Channel 。作到這一點,咱們須要一個ChannelInitializer

WebsocketChatServerInitializer.java

public class WebsocketChatServerInitializer extends
        ChannelInitializer<SocketChannel> { //1

    @Override
    public void initChannel(SocketChannel ch) throws Exception {//2
         ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64*1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());

    }
}

1.擴展 ChannelInitializer

2.添加 ChannelHandler 到 ChannelPipeline

initChannel() 方法設置 ChannelPipeline 中全部新註冊的 Channel,安裝全部須要的  ChannelHandler。

WebsocketChatServer.java

編寫一個 main() 方法來啓動服務端。

public class WebsocketChatServer {

    private int port;

    public WebsocketChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new WebsocketChatServerInitializer())  //(4)
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("WebsocketChatServer 啓動了");

            // 綁定端口,開始接收進來的鏈接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服務器  socket 關閉 。
            // 在這個例子中,這不會發生,但你能夠優雅地關閉你的服務器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("WebsocketChatServer 關閉了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new WebsocketChatServer(port).run();

    }
}

1.NioEventLoopGroup是用來處理I/O操做的多線程事件循環器,Netty 提供了許多不一樣的EventLoopGroup的實現用來處理不一樣的傳輸。在這個例子中咱們實現了一個服務端的應用,所以會有2個 NioEventLoopGroup 會被使用。第一個常常被叫作‘boss’,用來接收進來的鏈接。第二個常常被叫作‘worker’,用來處理已經被接收的鏈接,一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。如何知道多少個線程已經被使用,如何映射到已經建立的 Channel上都須要依賴於 EventLoopGroup 的實現,而且能夠經過構造函數來配置他們的關係。

2.ServerBootstrap是一個啓動 NIO 服務的輔助啓動類。你能夠在這個服務中直接使用 Channel,可是這會是一個複雜的處理過程,在不少狀況下你並不須要這樣作。

3.這裏咱們指定使用NioServerSocketChannel類來舉例說明一個新的 Channel 如何接收進來的鏈接。

4.這裏的事件處理類常常會被用來處理一個最近的已經接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想經過增長一些處理類好比 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline來實現你的網絡程序。當你的程序變的複雜時,可能你會增長更多的處理類到 pipline 上,而後提取這些匿名類到最頂層的類上。

5.你能夠設置這裏指定的 Channel 實現的配置參數。咱們正在寫一個TCP/IP 的服務端,所以咱們被容許設置 socket 的參數選項好比tcpNoDelay 和 keepAlive。請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此能夠對ChannelOption 的有一個大概的認識。

6.option() 是提供給NioServerSocketChannel用來接收進來的鏈接。childOption() 是提供給由父管道ServerChannel接收到的鏈接,在這個例子中也是 NioServerSocketChannel。

7.咱們繼續,剩下的就是綁定端口而後啓動服務。這裏咱們在機器上綁定了機器全部網卡上的 8080 端口。固然如今你能夠屢次調用 bind() 方法(基於不一樣綁定地址)。

恭喜!你已經完成了基於 Netty 聊天服務端程序。

客戶端

在程序的 resources 目錄下,咱們建立一個 WebsocketChatClient.html 頁面來做爲客戶端

WebsocketChatClient.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8080/ws");
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data
            };
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "鏈接開啓!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "鏈接被關閉";
            };
        } else {
            alert("你的瀏覽器不支持 WebSocket!");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("鏈接沒有開啓.");
            }
        }
    </script>
    <form onsubmit="return false;">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br> 
        <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
        <input type="button" value="發送消息" onclick="send(this.form.message.value)">
        <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
    </form>
    <br> 
    <br> 
    <a href="http://www.waylau.com/" >更多例子請訪問 www.waylau.com</a>
</body>
</html>

邏輯比較簡單,不累述。

運行效果

先運行 WebsocketChatServer,再打開多個瀏覽器頁面實現多個 客戶端訪問 http://localhost:8080

源碼

https://github.com/waylau/netty-4-user-guide-demos中 websocketchat

參考 Netty 4.x 用戶指南https://github.com/waylau/netty-4-user-guide Netty 實戰(精髓)https://github.com/waylau/essential-netty-in-action

相關文章
相關標籤/搜索