WebSocket原理與實踐

開題思考:如何實現客戶端及時獲取服務端數據?

Polling

指客戶端每隔一段時間(週期性)請求服務端獲取數據,可能有更新數據返回,也可能什麼都沒有,它並不在意服務端數據有無更新。(Web端通常採用ajax polling實現)javascript

Long Polling

阻塞型Polling,和Polling不一樣的是假如服務端數據沒有準備好,那麼可能會hold住請求,直到服務端有相關數據,或者等待必定時間超時纔會返回。html

WebSocket

HTML5 WebSocket規範定義了一種API,使Web頁面可以使用WebSocket協議與遠程主機進行雙向通訊。與輪詢和長輪詢相比,巨大減小了沒必要要的網絡流量和等待時間。 

Websocket體系結構

前端

Websocket協議

WebSocket協議被設計成與現有的Web基礎結構很好地工做。該協議規範定義了HTTP鏈接做爲WebSocket鏈接生命的開始,從Http協議轉換成WebSocket,被稱爲WebSocket握手。
  • 瀏覽器向服務器發送請求,表示它但願將協議從HTTP切換到WebSocket。客戶端經過升級報頭表達其願望:java

    GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw== Sec-WebSocket-Protocol: chat, superchat Sec-WebSocket-Version: 13 Origin: http://example.com 
  • 從上面的報文能夠看到,和HTTP協議的請求中,多了幾樣東西,核心就是Upgrade和Connection兩個參數,用來告訴服務器,我須要升級爲Websocket:jquery

    Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw== Sec-WebSocket-Protocol: chat, superchat Sec-WebSocket-Version: 13 
  • 若是服務端可以理解WebSocket協議,它贊成以Upgrade頭字段來升級協議,會響應如下信息:nginx

    HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept:HSmrc0sMlYUkAGmm5OPpG2HaGWk= Sec-WebSocket-Protocol: chat 
  • 此時,HTTP鏈接中斷,並由同一底層TCP/IP鏈接上的WebSocket鏈接替換。 默認狀況下,WebSocket鏈接使用與HTTP(80)和HTTPS(443)相同的端口。git

Spring-WebSocket實戰

Spring框架提供了WebSocket支持,很容易實現相關功能,此處分享一下使用Spring集成WebSocket實現簡單的多人會議系統。

服務端相關代碼

  • MeetingController (很簡單的一個入口,建立會議,並生成會議id和對應隨機串)web

    @Controller
    public class MeetingController {
    
        private static AtomicInteger id = new AtomicInteger(0);
    
        @RequestMapping(value = "/meeting", method = RequestMethod.POST)
        @ResponseBody
        public Map<String, Object> createMeeting() {
            int meetingId = id.incrementAndGet();
            String randStr = RandomStringUtils.random(6, true, true);
            SystemCache.idRandStrMap.put(meetingId, randStr);
            Map<String, Object> meetingVO = new HashMap<>();
            meetingVO.put("id", meetingId);
            meetingVO.put("randStr", randStr);
            return meetingVO;
        }
    }
  • WebSocketConfig (經過WebSocketConfigurer來配置定義本身的Websocket處理器和攔截器)ajax

    @Configuration
    @EnableWebSocket
    public class WebSocketConfig implements WebSocketConfigurer {
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            /**
             * 註冊websocket處理器以及攔截器
             */
            registry.addHandler(meetingWebSocketHandler(), "/websocket/spring/meeting").addInterceptors(myInterceptor());
        }
    
        @Bean
        public MeetingWebSocketHandler meetingWebSocketHandler() {
            return new MeetingWebSocketHandler();
        }
    
        @Bean
        public WebSocketHandshakeInterceptor myInterceptor() {
            return new WebSocketHandshakeInterceptor();
        }
    }
  • WebSocketHandshakeInterceptor (握手攔截器,用於處理請求攜帶參數)redis

    public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Map<String, Object> attributes) throws Exception {
            if (request instanceof ServletServerHttpRequest) {
                ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
                String randStr = serverHttpRequest.getServletRequest().getParameter("randStr");
                String role = serverHttpRequest.getServletRequest().getParameter("role");
                if (StringUtils.isNotBlank(randStr)) {
                    attributes.put("randStr", randStr);
                }
                if (StringUtils.isNotBlank(role)) {
                    attributes.put("role", role);
                }
            }
            return true;
        }
    
       @Override
       public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception exception) {
       }
    }
  • MeetingWebSocketHandler(websocket處理器,用於接受客戶端發送各類類型數據,主要分爲數據幀和控制幀)

    @Service
    public class MeetingWebSocketHandler extends TextWebSocketHandler {
    
        private static final Log LOG = LogFactory.getLog(MeetingWebSocketHandler.class);
        // 會議id和wsSession列表
        private static final ConcurrentHashMap<Integer, CopyOnWriteArraySet<WebSocketSession>> meetingWsSeesionMap = new ConcurrentHashMap<>();
    
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            LOG.info("spring websocket成功創建鏈接...");
            int meetingId = getMeetingId(session);
            if (meetingId <= 0) {
                singleMessage(session, new TextMessage("會議不存在!"));
                session.close();
            }
            // 若是該會議已存在,則直接加入
            if (meetingWsSeesionMap.containsKey(meetingId)) {
                CopyOnWriteArraySet<WebSocketSession> webSocketSessions = meetingWsSeesionMap.get(meetingId);
                webSocketSessions.add(session);
            }
            // 若是不存在,則新建
            else {
                CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>();
                webSocketSessions.add(session);
                meetingWsSeesionMap.put(meetingId, webSocketSessions);
            }
        }
    
        @Override
        public void handleTextMessage(WebSocketSession session, TextMessage message) {
            if (!session.isOpen())
                return;
            LOG.info(message.getPayload());
            int meetingId = getMeetingId(session);
            TextMessage wsMessage = new TextMessage(message.getPayload());
            broadcastMessage(meetingId, wsMessage);
        }
    
        /**
         * 發送信息給指定用戶
         * @param clientId
         * @param message
         * @return
         */
        public void singleMessage(WebSocketSession session, TextMessage message) {
            if (!session.isOpen())
                return;
            try {
                session.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 廣播信息
         * @param message
         * @return
         */
        public void broadcastMessage(int meetingId, TextMessage message) {
            // 獲取會議全部的wsSession
            CopyOnWriteArraySet<WebSocketSession> webSocketSessions = meetingWsSeesionMap.get(meetingId);
            for (WebSocketSession session : webSocketSessions) {
                try {
                    if (session.isOpen()) {
                        session.sendMessage(message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            if (session.isOpen()) {
                session.close();
            }
            LOG.info("鏈接出錯");
        }
    
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
            LOG.info("鏈接已關閉:" + status);
            int meetingId = getMeetingId(session);
            // role 1爲主持人
            String role = String.valueOf(session.getAttributes().get("role"));
            // 若是是主持人,則關閉全部該會議鏈接
            CopyOnWriteArraySet<WebSocketSession> webSocketSessions = meetingWsSeesionMap.get(meetingId);
            if (StringUtils.equals("1", role)) {
                SystemCache.idRandStrMap.remove(meetingId);
                for (WebSocketSession webSocketSession : webSocketSessions) {
                    webSocketSession.close();
                }
                webSocketSessions.remove(meetingId);
            } else {
                webSocketSessions.remove(session);
            }
        }
    
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
    
        private int getMeetingId(WebSocketSession session) {
            String randStr = String.valueOf(session.getAttributes().get("randStr"));
            int meetingId = SystemCache.getMeetingIdByRandStr(randStr);
            return meetingId;
        }
     }   
  • SystemCache(系統緩存,集羣部署的狀況下,可改成redis實現分佈式緩存,單機則不須要)

    public class SystemCache {
    
        // 會議id和隨機字符串的映射關係
        public static ConcurrentHashMap<Integer, String> idRandStrMap = new ConcurrentHashMap<>();
    
        public static int getMeetingIdByRandStr(String randStr) {
            int meetingId = 0;
            for (Map.Entry<Integer, String> entry : idRandStrMap.entrySet()) {
                if (randStr.equals(entry.getValue())) {
                    meetingId = entry.getKey();
                }
            }
            return meetingId;
        }
    }

前端相關代碼

  • meeting-create.html(主持人頁面,用於建立會議而且能夠發送消息)

    <!DOCTYPE html>
    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
    <title>在線會議系統</title>
    </head>
    <body>
        <h2>歡迎使用會議系統</h2>
        <button id="create" onclick="createMeeting()">建立會議</button>
        <hr />
        <div id="meeting"></div>
        消息內容:
        <input id="text" type="text" />
        <button id="send" disabled="disabled" onclick="send()">發送消息</button>
        <hr />
        <button id="close" onclick="closeWebSocket()">結束會議</button>
        <hr />
        <div id="message"></div>
    </body>
    
    <script type="text/javascript" src="js/jquery-1.12.0.js"></script>
    <script type="text/javascript">
        var websocket = null;
        var randStr;
        var remote = window.location.host;
        function openWebsocket() {
            //判斷當前瀏覽器是否支持WebSocket
            if ('WebSocket' in window) {
                websocket = new WebSocket("ws://" + window.location.host
                        + "/websocket/spring/meeting?role=1&randStr=" + randStr);
    
                //鏈接發生錯誤的回調方法
                websocket.onerror = function() {
                    setMessageInnerHTML("會議鏈接發生錯誤!");
                };
    
                //鏈接成功創建的回調方法
                websocket.onopen = function() {
                    setMessageInnerHTML("會議鏈接成功...");
                    document.getElementById("send").disabled = false;
                }
    
                //接收到消息的回調方法
                websocket.onmessage = function(event) {
                    setMessageInnerHTML(event.data);
                }
    
                //鏈接關閉的回調方法
                websocket.onclose = function() {
                    setMessageInnerHTML("會議結束,鏈接關閉!");
                    document.getElementById("create").disabled = false;
                    document.getElementById("send").disabled = true;
                }
    
                //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket鏈接,防止鏈接還沒斷開就關閉窗口,server端會拋異常
                window.onbeforeunload = function() {
                    closeWebSocket();
                }
            } else {
                alert('當前瀏覽器 Not support websocket');
            }
        }
    
        //將消息顯示在網頁上
        function setMessageInnerHTML(innerHTML) {
            document.getElementById('message').innerHTML += innerHTML + '<br/>';
        }
    
        //關閉WebSocket鏈接
        function closeWebSocket() {
            websocket.close();
        }
    
        //發送消息
        function send() {
            var content = document.getElementById('text').value;
            websocket.send(content);
        }
    
        function createMeeting() {
            $.post("/meeting", function(data, status) {
                randStr = data.randStr;
                $("#create").after("<p>會議邀請碼:" + randStr + "</p>");
                $("#create").attr("disabled", true);
                openWebsocket();
            });
    }
    </script>
    </html>
  • meeting-join.html(觀衆頁面,用於加入會議而且也能夠發送消息)

    <!DOCTYPE html>
    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
    <title>在線會議系統</title>
    </head>
    <body>
        <h2>歡迎使用會議系統</h2>
        會議邀請碼:
        <input id="randStr" type="text" />
        <button id="open" onclick="openWebsocket()">加入會議</button>
        <hr />
        消息內容:
        <input id="text" type="text" />
        <button id="send" disabled="disabled" onclick="send()">發送消息</button>
        <hr />
        <button id="close" disabled="disabled" onclick="closeWebSocket()">離開會議</button>
        <hr />
        <div id="message"></div>
    </body>
    
    <script type="text/javascript">
        var websocket = null;
        var remote = window.location.host;
        function openWebsocket() {
            var randStr = document.getElementById('randStr').value;
            //判斷當前瀏覽器是否支持WebSocket
            if ('WebSocket' in window) {
                websocket = new WebSocket("ws://" + window.location.host
                        + "/websocket/spring/meeting?randStr=" + randStr);
    
                //鏈接發生錯誤的回調方法
                websocket.onerror = function() {
                    setMessageInnerHTML("會議鏈接發生錯誤!");
                };
    
                //鏈接成功創建的回調方法
                websocket.onopen = function() {
                    setMessageInnerHTML("會議鏈接成功...");
                    document.getElementById("open").disabled = true;
                    document.getElementById("randStr").disabled = true;
                    document.getElementById("send").disabled = false;
                    document.getElementById("close").disabled = false;
                }
    
                //接收到消息的回調方法
                websocket.onmessage = function(event) {
                    setMessageInnerHTML(event.data);
                }
    
                //鏈接關閉的回調方法
                websocket.onclose = function() {
                    setMessageInnerHTML("會議結束,鏈接關閉!");
                    document.getElementById("randStr").disabled = false;
                    document.getElementById("open").disabled = false;
                    document.getElementById("send").disabled = true;
                    document.getElementById("close").disabled = true;
                }
    
                //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket鏈接,防止鏈接還沒斷開就關閉窗口,server端會拋異常
                window.onbeforeunload = function() {
                    closeWebSocket();
                }
            } else {
                alert('當前瀏覽器 Not support websocket');
            }
        }
    
        //將消息顯示在網頁上
        function setMessageInnerHTML(innerHTML) {
            document.getElementById('message').innerHTML += innerHTML + '<br/>';
        }
    
        //關閉WebSocket鏈接
        function closeWebSocket() {
            websocket.close();
        }
    
        //發送消息
        function send() {
            var content = document.getElementById('text').value;
            websocket.send(content);
    }
    </script>
    </html>

     

項目演示

  • 訪問meeting-create.html進入主持人界面,點擊建立會議,生成會議邀請碼,並顯示會議鏈接成功,界面以下:

  • 訪問meeting-join.html進入觀衆界面,並經過上面的邀請碼加入會議,界面以下:

  • 此時雙方就能夠互相發送消息,主持人離開會議,則全部人退出,觀衆離開,不影響會議進行。

  • 具體代碼地址:https://gitee.com/yehx/websocket-meeting

總結

WebSocket做爲一個雙通道的協議,顛覆了傳統的Client請求Server這種單向通道的模式。因爲WebSocket的興起,Web領域的實時推送技術也被普遍使用,能夠簡單實現讓用戶不須要刷新瀏覽器就能夠得到實時更新。它有着普遍的應用場景,好比在線聊天室、在線客服系統、評論系統、WebIM等。 
相關文章
相關標籤/搜索