Netty就是這麼回事(九)

很久沒更新了,以前一段時間一直在忙雙11,外加其餘項目實在是沒有時間寫,好在如今快過春節了,終於有時間寫了。這一章主要介紹netty的websocket協議開發,websocket協議其實不是很新鮮了,可是比較重要,自己設計它的目的就是爲了取代輪訓和Comet技術,使客戶端具備像C/S架構下桌面系統同樣的實時通訊能力。websocket協議有以下特色:javascript

  • 單一的TCP鏈接,採用全雙工模式通訊;
  • 對代理、防火牆和路由器透明;
  • 無頭部信息、Cookie和身份驗證;
  • 無安全開銷;
  • 經過」ping/pong「幀保持鏈路激活;
  • 服務端能夠主動傳遞消息給客戶端,再也不須要客戶端輪訓;

下面看下websocket在netty是怎麼實現的?廢話很少說,直接上代碼。html

服務端代碼:java

package com.dlb.note.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

/**
 * 做者:dlb
 * 功能: websocket服務器
 * 日期: 2018/1/16 11:26
 * 版本:V0.1
 */
public class WebSocketServer {
    /**
     * 主函數
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8888;
        new WebSocketServer().start(port);
    }

    /**
     * 執行函數
     * @param port
     * @throws Exception
     */
    public void start(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    // 將請求和應答消息編碼或解碼爲HTTP消息
                                    .addLast("http-codec", new HttpServerCodec())
                                    // 將HTTP消息的多個部分組合成一個完整的HTTP消息
                                    .addLast("aggregator", new HttpObjectAggregator(65536))
                                    // 向客戶端發送HTML5文件,主要用於支持瀏覽器和服務端進行Websocket通訊
                                    .addLast("http-chunked", new ChunkedWriteHandler())
                                    // websocket處理器
                                    .addLast("handler", new WebSocketServerHandler());
                        }
                    });

            Channel ch = b.bind(port).sync().channel();
            System.out.println("Websocket server start,port=" + port);

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

/**
 * websocket處理器
 */
class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    private WebSocketServerHandshaker handshaker;

    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object o)
            throws Exception {
        if (o instanceof WebSocketFrame) { // websocket
            System.out.println("websocket body=【 " + o + " 】");
            handleWebSocketFrame(channelHandlerContext, (WebSocketFrame) o);
        }else if(o instanceof FullHttpRequest){ // http
            System.out.println("http body=【 " + o + " 】");
            handleHttpRequest(channelHandlerContext, (FullHttpRequest) o);

        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * 處理http請求
     * @param ctx
     * @param req
     * @throws Exception
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
        // 若是HTTP解碼失敗或者不是websocket請求,返回HTTP異常
        if (!req.getDecoderResult().isSuccess()
                || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.BAD_REQUEST));
            return;
        }

        // 構造握手響應返回,本機測試
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://localhost:8888/websocket", null, false);

        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    /**
     * 處理websocket請求
     * @param ctx
     * @param frame
     */
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判斷是不是關閉鏈路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }

        // 判斷是不是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }

        // 本例程僅支持文本消息,不支持二進制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(
                    String.format("%s frame types not supported", frame.getClass().getName()));
        }

        // 返回應答消息
        String data = ((TextWebSocketFrame) frame).text();
        System.out.println("服務器接收數據=" + data);

        ctx.channel().write(new TextWebSocketFrame(data + " , welocme to:"
                + new java.util.Date().toString()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

   /** 
    * 返回應答給客戶端
    * @param ctx
    * @param req
    * @param res
    */
    private static void sendHttpResponse(
            ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // 返回應答給客戶端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpHeaders.setContentLength(res, res.content().readableBytes());
        }

        // 若是是非Keep-Alive,關閉鏈接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}

客戶端代碼:web

<!DOCTYPE html>  
<html>  
<head>  
<meta charset="UTF-8">  
Netty WebSocket 時間服務器  
</head>  
<br>  
<body>  
<br>  
<script type="text/javascript">  
var socket;  
if (!window.WebSocket)   
{  
    window.WebSocket = window.MozWebSocket;  
}  
if (window.WebSocket) {  
    socket = new WebSocket("ws://localhost:8888/websocket");  
    socket.onmessage = function(event) {  
        var ta = document.getElementById('responseText');  
        ta.value="";  
        ta.value = event.data  
    };  
    socket.onopen = function(event) {  
        var ta = document.getElementById('responseText');  
        ta.value = "打開WebSocket服務正常,瀏覽器支持WebSocket!";  
    };  
    socket.onclose = function(event) {  
        var ta = document.getElementById('responseText');  
        ta.value = "";  
        ta.value = "WebSocket 關閉!";   
    };  
}  
else  
    {  
    alert("抱歉,您的瀏覽器不支持WebSocket協議!");  
    }  
  
function send(message) {  
    if (!window.WebSocket) { return; }  
    if (socket.readyState == WebSocket.OPEN) {  
        socket.send(message);  
    }  
    else  
        {  
          alert("WebSocket鏈接沒有創建成功!");  
        }  
}  
</script>  
<form onsubmit="return false;">  
<input type="text" name="message" value="Netty是個好東西"/>
<br><br>  
<input type="button" value="發送WebSocket請求消息" onclick="send(this.form.message.value)"/>  
<hr color="blue"/>  
<h3>服務端返回的應答消息</h3>  
<textarea id="responseText" style="width:500px;height:300px;"></textarea>  
</form>  
</body>  
</html>
相關文章
相關標籤/搜索