WebSocket實現Web端即時通訊

BST

前言

WebSocket 是HTML5開始提供的一種在瀏覽器和服務器間進行全雙工通訊的協議。目前不少沒有使用WebSocket進行客戶端服務端實時通訊的web應用,大多使用設置規則時間的輪詢,或者使用長輪詢較多來處理消息的實時推送。這樣勢必會較大程度浪費服務器和帶寬資源,而咱們如今要講的WebSocket正是來解決該問題而出現,使得B/S架構的應用擁有C/S架構同樣的實時通訊能力。css

HTTP和WebSocket比較

HTTP

HTTP協議是半雙工協議,也就是說在同一時間點只能處理一個方向的數據傳輸,同時HTTP消息也是過於龐大,裏面包含大量消息頭數據,真正在消息處理中不少數據不是必須的,這也是對資源的浪費。html

  • 定時輪詢:定時輪詢就是客戶端定時去向服務器發送HTTP請求,看是否有數據,服務器接受到請求後,返回數據給客戶端,本次鏈接也會隨着關閉。該實現方案最簡單,可是會存在消息延遲和大量浪費服務器和帶寬資源。
  • 長輪詢:長輪詢與定時輪詢同樣,也是經過HTTP請求實現,但這裏不是定時發送請求。客戶端發送請求給服務端,這時服務端會hold住該請求,當有數據過來或者超時時返回給請求的客戶端並開始下一輪的請求。

WebSocket

WebSocket在客戶端和服務端只需一次請求,就會在客戶端和服務端創建一條通訊通道,能夠實時相互傳輸數據,而且不會像HTTP那樣攜帶大量請求頭等信息。由於WebSocket是基於TCP雙向全雙工通訊的協議,因此支持在同一時間點處理髮送和接收消息,作到實時的消息處理。java

  • 創建WebSocket鏈接:創建WebSocket鏈接,首先客戶端先要向服務端發送一個特殊的HTTP請求,使用的協議不是httphttps,而是使用過wswss(一個非安全的,一個安全的,相似前二者之間的差異),請求頭裏面要附加一個申請協議升級的信息Upgrade: websocket,還有隨機生成一個Sec-WebSocket-Key的值,及版本信息Sec-WebSocket-Version等等。服務端收到客戶端的請求後,會解析該請求的信息,包括請求協議升級,版本校驗,以及將Sec-WebSocket-Key的加密後以sec-websocket-accept的值返回給客戶端,這樣客戶端和服務端的鏈接就創建了。
  • 關閉WebSocket鏈接:客戶端和服務端均可發送一個close控制幀,另外一端主動關閉鏈接。

HTTP輪詢和WebSocket生命週期示意圖jquery

HTTP輪詢和WebSocket生命週期示意圖

服務端

這裏服務端利用Netty的WebSocket開發。這裏首先實現服務端啓動類,而後自定義處理器來處理WebSocket的消息。ios

package com.ytao.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * Created by YANGTAO on 2019/11/17 0017.
 */
public class WebSocketServer {

    public static String HOST = "127.0.0.1";
    public static int PORT = 8806;

    public static void startUp() throws Exception {
        // 監聽端口的線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 處理每一條鏈接的數據讀寫的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 啓動的引導類
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<socketchannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception{
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
                            // 將請求和返回消息編碼或解碼成http
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            // 使http的多個部分組合成一條完整的http
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            // 向客戶端發送h5文件,主要是來支持websocket通訊
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                            // 服務端自定義處理器
                            pipeline.addLast("handler", new WebSocketServerHandler());
                        }
                    })
                    // 開啓心跳機制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<nioserversocketchannel>() {
                        protected void initChannel(NioServerSocketChannel ch) {
                            System.out.println("WebSocket服務端啓動中...");
                        }
                    });

            Channel ch = serverBootstrap.bind(HOST, PORT).sync().channel();
            System.out.println("WebSocket host: "+ch.localAddress().toString().replace("/",""));
            ch.closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        startUp();
    }
}

上面啓動類和HTTP協議的相似,因此較好理解。啓動類啓動後,咱們須要處理WebSocket請求,這裏自定義WebSocketServerHandler。 咱們在處理中設計的業務邏輯有,若是隻有一個鏈接來發送信息聊天,那麼咱們就以服務器自動回覆,若是存在一個以上,咱們就將信息發送給其餘人。web

package com.ytao.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by YANGTAO on 2019/11/17 0017.
 */
public class WebSocketServerHandler extends SimpleChannelInboundHandler<object> {

    private WebSocketServerHandshaker handshaker;

    private static Map<string, channelhandlercontext> channelHandlerContextConcurrentHashMap = new ConcurrentHashMap&lt;&gt;();

    private static final Map<string, string> replyMap = new ConcurrentHashMap&lt;&gt;();
    static {
        replyMap.put("博客", "https://ytao.top");
        replyMap.put("公衆號", "ytao公衆號");
        replyMap.put("在嗎", "在");
        replyMap.put("吃飯了嗎", "吃了");
        replyMap.put("你好", "你好");
        replyMap.put("誰", "ytao");
        replyMap.put("幾點", "如今本地時間:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
    }

    @Override
    public void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception{
        channelHandlerContextConcurrentHashMap.put(channelHandlerContext.channel().toString(), channelHandlerContext);
        // http
        if (msg instanceof FullHttpRequest){
            handleHttpRequest(channelHandlerContext, (FullHttpRequest) msg);
        }else if (msg instanceof WebSocketFrame){ // WebSocket
            handleWebSocketFrame(channelHandlerContext, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception{
        if (channelHandlerContextConcurrentHashMap.size() &gt; 1){
            for (String key : channelHandlerContextConcurrentHashMap.keySet()) {
                ChannelHandlerContext current = channelHandlerContextConcurrentHashMap.get(key);
                if (channelHandlerContext == current)
                    continue;
                current.flush();
            }
        }else {
            // 單條處理
            channelHandlerContext.flush();
        }
    }

    private void handleHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest request) throws Exception{
        // 驗證解碼是否異常
        if (!"websocket".equals(request.headers().get("Upgrade")) || request.decoderResult().isFailure()){
            // todo send response bad
            System.err.println("解析http信息異常");
            return;
        }

        // 建立握手工廠類
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
          "ws:/".concat(channelHandlerContext.channel().localAddress().toString()),
                null,
                false
        );
        handshaker = factory.newHandshaker(request);

        if (handshaker == null)
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channelHandlerContext.channel());
        else
            // 響應握手消息給客戶端
            handshaker.handshake(channelHandlerContext.channel(), request);

    }

    private void handleWebSocketFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame){
        // 關閉鏈路
        if (webSocketFrame instanceof CloseWebSocketFrame){
            handshaker.close(channelHandlerContext.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
            return;
        }

        // Ping消息
        if (webSocketFrame instanceof PingWebSocketFrame){
            channelHandlerContext.channel().write(
              new PongWebSocketFrame(webSocketFrame.content().retain())
            );
            return;
        }

        // Pong消息
        if (webSocketFrame instanceof PongWebSocketFrame){
            // todo Pong消息處理
        }

        // 二進制消息
        if (webSocketFrame instanceof BinaryWebSocketFrame){
            // todo 二進制消息處理
        }

        // 拆分數據
        if (webSocketFrame instanceof ContinuationWebSocketFrame){
            // todo 數據被拆分爲多個websocketframe處理
        }

        // 文本信息處理
        if (webSocketFrame instanceof TextWebSocketFrame){
            // 推送過來的消息
            String  msg = ((TextWebSocketFrame) webSocketFrame).text();
            System.out.println(String.format("%s 收到消息 : %s", new Date(), msg));

            String responseMsg = "";
            if (channelHandlerContextConcurrentHashMap.size() &gt; 1){
                responseMsg = msg;
                for (String key : channelHandlerContextConcurrentHashMap.keySet()) {
                    ChannelHandlerContext current = channelHandlerContextConcurrentHashMap.get(key);
                    if (channelHandlerContext == current)
                        continue;
                    Channel channel = current.channel();
                    channel.write(
                            new TextWebSocketFrame(responseMsg)
                    );
                }
            }else {
                // 自動回覆
                responseMsg = this.answer(msg);
                if(responseMsg == null)
                    responseMsg = "暫時沒法回答你的問題 -&gt;_-&gt;";
                System.out.println("回覆消息:"+responseMsg);
                Channel channel = channelHandlerContext.channel();
                channel.write(
                        new TextWebSocketFrame("【服務端】" + responseMsg)
                );
            }
        }

    }

    private String answer(String msg){
        for (String key : replyMap.keySet()) {
            if (msg.contains(key))
                return replyMap.get(key);
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable){
        throwable.printStackTrace();
        channelHandlerContext.close();
    }

    @Override
    public void close(ChannelHandlerContext channelHandlerContext, ChannelPromise promise) throws Exception {
        channelHandlerContextConcurrentHashMap.remove(channelHandlerContext.channel().toString());
        channelHandlerContext.close(promise);
    }

}

剛創建鏈接時,第一次握手有HTTP協議處理,因此WebSocketServerHandler#messageReceived會判斷是HTTP仍是WebSocket,若是是HTTP時,交由WebSocketServerHandler#handleHttpRequest處理,裏面會去驗證請求,而且處理握手後將消息返回給客戶端。 若是不是HTTP協議,而是WebSocket協議時,處理交給WebSocketServerHandler#handleWebSocketFrame處理,進入WebSocket處理後,這裏面有判斷消息屬於哪一種類型,裏面包括CloseWebSocketFramePingWebSocketFramePongWebSocketFrameBinaryWebSocketFrameContinuationWebSocketFrameTextWebSocketFrame,他們都是WebSocketFrame的子類,而且WebSocketFrame又繼承自DefaultByteBufHolderbootstrap

channelHandlerContextConcurrentHashMap是緩存WebSocket已鏈接的信息,由於咱們實現的需求要記錄鏈接數量,當有鏈接關閉時咱們要刪除以緩存的鏈接,因此在WebSocketServerHandler#close中要移除緩存。promise

最後的發送文本到客戶端,根據鏈接數量判斷。若是鏈接數量不大於1,那麼,咱們"價值一個億的AI核心代碼"WebSocketServerHandler#answer來回復客戶端消息。不然除了本次接收的鏈接,消息會發送給其餘全部鏈接的客戶端。瀏覽器

客戶端

客戶端使用JS實現WebSocket的操做,目前主流的瀏覽器基本都支持WebSocket。支持狀況如圖:緩存

支持WebSocket的瀏覽器

客戶端H5的代碼實現:

<meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>ytao-websocket</title>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
    <style type="text/css">
        #msgContent{
            line-height:200%;
            width: 500px;
            height: 300px;
            resize: none;
            border-color: #FF9900;
        }
        .clean{
            background-color: white;
        }
        .send{
            border-radius: 10%;
            background-color: #2BD56F;
        }
        @media screen and (max-width: 600px) {
            #msgContent{
                line-height:200%;
                width: 100%;
                height: 300px;
            }
        }
    </style>

<script>
    var socket;
    var URL = "ws://127.0.0.1:8806/ytao";

    connect();

    function connect() {
        $("#status").html("<span>鏈接中.....</span>");
        window.WebSocket = !window.WebSocket == true? window.MozWebSocket : window.WebSocket;
        if(window.WebSocket){
            socket = new WebSocket(URL);
            socket.onmessage = function(event){
                var msg = event.data + "\n";
                addMsgContent(msg);
            };

            socket.onopen = function(){
                $("#status").html("<span style='background-color: #44b549'>WebSocket已鏈接</span>");
            };

            socket.onclose = function(){
                $("#status").html("<span style='background-color: red'>WebSocket已斷開鏈接</span>");
                setTimeout("connect()", 3000);
            };
        }else{
            $("#status").html("<span style='background-color: red'>該瀏覽器不支持WebSocket協議!</span>");
        }
    }

    function addMsgContent(msg) {
        var contet = $("#msgContent").val() + msg;
        $("#msgContent").val(contet)
    }

    function clean() {
        $("#msgContent").val("");
    }

    function getUserName() {
        var n = $("input[name=userName]").val();
        if (n == "")
            n = "匿名";
        return n;
    }

    function send(){
        var message = $("input[name=message]").val();
        if(!window.WebSocket) return;
        if ($.trim(message) == ""){
            alert("不能發送空消息!");
            return;
        }
        if(socket.readyState == WebSocket.OPEN){
            var msg = "【我】" + message + "\n";
            this.addMsgContent(msg);
            socket.send("【"+getUserName()+"】"+message);
            $("input[name=message]").val("");
        }else{
            alert("沒法創建WebSocket鏈接!");
        }
    }

    $(document).keyup(function(){
        if(event.keyCode ==13){
            send()
        }
    });
</script>

    <div style="text-align: center;">
        <div id="status">
            <span>鏈接中.....</span>
        </div>
        <div>
            <h2>信息面板</h2>
            <textarea id="msgContent" readonly></textarea>
        </div>
        <div>
            <input class="clean" type="button" value="清除聊天紀錄" onclick="clean()">
            <input type="text" name="userName" value="" placeholder="用戶名">
        </div>
        <hr>
        <div>
            <form onsubmit="return false">
                <input type="text" name="message" value="" placeholder="請輸入消息">
                <input class="send" type="button" name="msgBtn" value="send" onclick="send()">
            </form>
        </div>
        <div>
            <br><br>
            <img src="https://oscimg.oschina.net/oscnet/ytao%E5%85%AC%E4%BC%97%E5%8F%B7.jpg">
        </div>
    </div>

JS這裏實現相對較簡單,主要用到:

  • new WebSocket(URL)建立WebSocket對象
  • onopen()打開鏈接
  • onclose()關閉鏈接
  • onmessage接收消息
  • send()發送消息

當斷開鏈接後,客戶端這邊從新發起鏈接,直到鏈接成功爲止。

啓動

客戶端和服務端鏈接後,咱們從日誌和請求中能夠看到上面所提到的驗證信息。 客戶端:

客戶端鏈接信息

服務端:

服務端鏈接信息

啓動服務端後,先實驗咱們"價值一個億的AI",只有一個鏈接用戶時,發送信息結果如圖:

多個用戶鏈接,這裏使用三個鏈接用戶羣聊。

用戶一:

用戶二:

用戶三:

到目前爲止,WebSocket已幫助咱們實現即時通訊的需求,相信你們也基本入門了WebSocket的基本使用。

總結

經過本文了解,能夠幫助你們入門WebSocket而且解決當前可能存在的一些Web端的通訊問題。我曾經在兩個項目中也有看到該類解決方案都是經過定時輪詢去作的,也或多或少對服務器資源形成必定的浪費。由於WebSocket自己是較複雜的,它提供的API也是比較多,因此在使用過程,要去真正使用好或去優化它,並非一件很簡單的事,也是須要根據現實場景針對性的去作。

我的博客: https://ytao.top

個人公衆號 ytao

個人公衆號

相關文章
相關標籤/搜索