【websocket】spring boot 集成 websocket 的四種方式

集成 websocket 的四種方案

1. 原生註解

pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-18 15:45 buhao
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }
}
說明:

這個配置類很簡單,經過這個配置 spring boot 才能去掃描後面的關於 websocket 的註解html

WsServerEndpoint

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.ws;

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author buhao
 * @version WsServerEndpoint.java, v 0.1 2019-10-18 16:06 buhao
 */
@ServerEndpoint("/myWs")
@Component
public class WsServerEndpoint {

    /**
     * 鏈接成功
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("鏈接成功");
    }

    /**
     * 鏈接關閉
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        System.out.println("鏈接關閉");
    }

    /**
     * 接收到消息
     *
     * @param text
     */
    @OnMessage
    public String onMsg(String text) throws IOException {
        return "servet 發送:" + text;
    }
}
說明

這裏有幾個註解須要注意一下,首先是他們的包都在 javax.websocket 下。並非 spring 提供的,而 jdk 自帶的,下面是他們的具體做用。前端

  1.  @ServerEndpointjava

    1. 經過這個 spring boot 就能夠知道你暴露出去的 ws 應用的路徑,有點相似咱們常常用的@RequestMapping。好比你的啓動端口是8080,而這個註解的值是ws,那咱們就能夠經過 ws://127.0.0.1:8080/ws 來鏈接你的應用
  2.  @OnOpengit

    1. 當 websocket 創建鏈接成功後會觸發這個註解修飾的方法,注意它有一個 Session 參數
  3. @OnClosegithub

    1. 當 websocket 創建的鏈接斷開後會觸發這個註解修飾的方法,注意它有一個 Session 參數
  4. @OnMessageweb

    1. 當客戶端發送消息到服務端時,會觸發這個註解修改的方法,它有一個 String 入參代表客戶端傳入的值
  5. @OnErrorredis

    1. 當 websocket 創建鏈接時出現異常會觸發這個註解修飾的方法,注意它有一個 Session 參數

另一點就是服務端如何發送消息給客戶端,服務端發送消息必須經過上面說的 Session 類,一般是在@OnOpen 方法中,當鏈接成功後把 session 存入 Map 的 value,key 是與 session 對應的用戶標識,當要發送的時候經過 key 得到 session 再發送,這裏能夠經過 session.getBasicRemote_()_.sendText_(_) 來對客戶端發送消息。spring

2. Spring封裝

pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

HttpAuthHandler

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.handler;

import cn.coder4j.study.example.websocket.config.WsSessionManager;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.time.LocalDateTime;

/**
 * @author buhao
 * @version MyWSHandler.java, v 0.1 2019-10-17 17:10 buhao
 */
@Component
public class HttpAuthHandler extends TextWebSocketHandler {

    /**
     * socket 創建成功事件
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 用戶鏈接成功,放入在線用戶緩存
            WsSessionManager.add(token.toString(), session);
        } else {
            throw new RuntimeException("用戶登陸已經失效!");
        }
    }

    /**
     * 接收消息事件
     *
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 得到客戶端傳來的消息
        String payload = message.getPayload();
        Object token = session.getAttributes().get("token");
        System.out.println("server 接收到 " + token + " 發送的 " + payload);
        session.sendMessage(new TextMessage("server 發送給 " + token + " 消息 " + payload + " " + LocalDateTime.now().toString()));
    }

    /**
     * socket 斷開鏈接時
     *
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 用戶退出,移除緩存
            WsSessionManager.remove(token.toString());
        }
    }


}
說明

經過繼承 TextWebSocketHandler 類並覆蓋相應方法,能夠對 websocket 的事件進行處理,這裏能夠同原生註解的那幾個註解連起來看segmentfault

  1. afterConnectionEstablished 方法是在 socket 鏈接成功後被觸發,同原生註解裏的 @OnOpen 功能
  2. afterConnectionClosed  方法是在 socket 鏈接關閉後被觸發,同原生註解裏的 @OnClose 功能
  3. handleTextMessage 方法是在客戶端發送信息時觸發,同原生註解裏的 @OnMessage 功能

WsSessionManager

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author buhao
 * @version WsSessionManager.java, v 0.1 2019-10-22 10:24 buhao
 */
@Slf4j
public class WsSessionManager {
    /**
     * 保存鏈接 session 的地方
     */
    private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();

    /**
     * 添加 session
     *
     * @param key
     */
    public static void add(String key, WebSocketSession session) {
        // 添加 session
        SESSION_POOL.put(key, session);
    }

    /**
     * 刪除 session,會返回刪除的 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession remove(String key) {
        // 刪除 session
        return SESSION_POOL.remove(key);
    }

    /**
     * 刪除並同步關閉鏈接
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 關閉鏈接
                session.close();
            } catch (IOException e) {
                // todo: 關閉出現異常處理
                e.printStackTrace();
            }
        }
    }

    /**
     * 得到 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession get(String key) {
        // 得到 session
        return SESSION_POOL.get(key);
    }
}
說明

這裏簡單經過 ConcurrentHashMap 來實現了一個 session 池,用來保存已經登陸的 web socket 的  session。前文提過,服務端發送消息給客戶端必需要經過這個 session。跨域

MyInterceptor

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.interceptor;

import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.HashMap;
import java.util.Map;

/**
 * @author buhao
 * @version MyInterceptor.java, v 0.1 2019-10-17 19:21 buhao
 */
@Component
public class MyInterceptor implements HandshakeInterceptor {

    /**
     * 握手前
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param attributes
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        System.out.println("握手開始");
        // 得到請求參數
        HashMap<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), "utf-8");
        String uid = paramMap.get("token");
        if (StrUtil.isNotBlank(uid)) {
            // 放入屬性域
            attributes.put("token", uid);
            System.out.println("用戶 token " + uid + " 握手成功!");
            return true;
        }
        System.out.println("用戶登陸已失效");
        return false;
    }

    /**
     * 握手後
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        System.out.println("握手完成");
    }

}
說明

經過實現 HandshakeInterceptor 接口來定義握手攔截器,注意這裏與上面 Handler 的事件是不一樣的,這裏是創建握手時的事件,分爲握手前與握手後,而  Handler 的事件是在握手成功後的基礎上創建 socket 的鏈接。因此在若是把認證放在這個步驟相對來講最節省服務器資源。它主要有兩個方法 beforeHandshake 與 afterHandshake ,顧名思義一個在握手前觸發,一個在握手後觸發。

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import cn.coder4j.study.example.websocket.handler.HttpAuthHandler;
import cn.coder4j.study.example.websocket.interceptor.MyInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-17 15:43 buhao
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private HttpAuthHandler httpAuthHandler;
    @Autowired
    private MyInterceptor myInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry
                .addHandler(httpAuthHandler, "myWS")
                .addInterceptors(myInterceptor)
                .setAllowedOrigins("*");
    }
}
說明

經過實現 WebSocketConfigurer 類並覆蓋相應的方法進行 websocket 的配置。咱們主要覆蓋 registerWebSocketHandlers 這個方法。經過向 WebSocketHandlerRegistry 設置不一樣參數來進行配置。其中 addHandler 方法添加咱們上面的寫的 ws 的  handler 處理類,第二個參數是你暴露出的 ws 路徑。addInterceptors 添加咱們寫的握手過濾器。setAllowedOrigins("*") 這個是關閉跨域校驗,方便本地調試,線上推薦打開。

3. TIO

pom.xml

<dependency>
     <groupId>org.t-io</groupId>
     <artifactId>tio-websocket-spring-boot-starter</artifactId>
     <version>3.5.5.v20191010-RELEASE</version>
</dependency>

application.xml

tio:
  websocket:
    server:
      port: 8989
說明

這裏只配置了 ws 的啓動端口,還有不少配置,能夠經過結尾給的連接去尋找

MyHandler

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.handler;

import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.server.handler.IWsMsgHandler;

/**
 * @author buhao
 * @version MyHandler.java, v 0.1 2019-10-21 14:39 buhao
 */
@Component
public class MyHandler implements IWsMsgHandler {
    /**
     * 握手
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        return httpResponse;
    }

    /**
     * 握手成功
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @throws Exception
     */
    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        System.out.println("握手成功");
    }

    /**
     * 接收二進制文件
     *
     * @param wsRequest
     * @param bytes
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        return null;
    }

    /**
     * 斷開鏈接
     *
     * @param wsRequest
     * @param bytes
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        System.out.println("關閉鏈接");
        return null;
    }

    /**
     * 接收消息
     *
     * @param wsRequest
     * @param s
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
        System.out.println("接收文本消息:" + s);
        return "success";
    }
}
說明

這個同上個例子中的 handler 很像,也是經過實現接口覆蓋方法來進行事件處理,實現的接口是IWsMsgHandler,它的方法功能以下

  1. handshake

    1. 在握手的時候觸發
  2. onAfterHandshaked

    1. 在握手成功後觸發
  3. onBytes

    1. 客戶端發送二進制消息觸發
  4. onClose

    1. 客戶端關閉鏈接時觸發
  5. onText

    1. 客戶端發送文本消息觸發

StudyWebsocketExampleApplication

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */

package cn.coder4j.study.example.websocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;

@SpringBootApplication
@EnableTioWebSocketServer
public class StudyWebsocketExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(StudyWebsocketExampleApplication.class, args);
    }
}
說明

這個類的名稱不重要,它實際上是你的 spring boot 啓動類,只要記得加上@EnableTioWebSocketServer註解就能夠了

STOMP

pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-21 16:32 buhao
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 配置客戶端嘗試鏈接地址
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 設置廣播節點
        registry.enableSimpleBroker("/topic", "/user");
        // 客戶端向服務端發送消息需有/app 前綴
        registry.setApplicationDestinationPrefixes("/app");
        // 指定用戶發送(一對一)的前綴 /user/
        registry.setUserDestinationPrefix("/user/");
    }
}
說明
  1. 經過實現 WebSocketMessageBrokerConfigurer 接口和加上@EnableWebSocketMessageBroker來進行 stomp 的配置與註解掃描。
  2. 其中覆蓋 registerStompEndpoints 方法來設置暴露的 stomp 的路徑,其它一些跨域、客戶端之類的設置。
  3. 覆蓋 configureMessageBroker 方法來進行節點的配置。

    1. 其中 enableSimpleBroker 配置的廣播節點,也就是服務端發送消息,客戶端訂閱就能接收消息的節點。
    2. 覆蓋setApplicationDestinationPrefixes 方法,設置客戶端向服務端發送消息的節點。
    3. 覆蓋 setUserDestinationPrefix 方法,設置一對一通訊的節點。

WSController

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.controller;

import cn.coder4j.study.example.websocket.model.RequestMessage;
import cn.coder4j.study.example.websocket.model.ResponseMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @author buhao
 * @version WSController.java, v 0.1 2019-10-21 17:22 buhao
 */
@Controller
public class WSController {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/hello")
    @SendTo("/topic/hello")
    public ResponseMessage hello(RequestMessage requestMessage) {
        System.out.println("接收消息:" + requestMessage);
        return new ResponseMessage("服務端接收到你發的:" + requestMessage);
    }

    @GetMapping("/sendMsgByUser")
    public @ResponseBody
    Object sendMsgByUser(String token, String msg) {
        simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg);
        return "success";
    }

    @GetMapping("/sendMsgByAll")
    public @ResponseBody
    Object sendMsgByAll(String msg) {
        simpMessagingTemplate.convertAndSend("/topic", msg);
        return "success";
    }

    @GetMapping("/test")
    public String test() {
        return "test-stomp.html";
    }
}
說明
  1. 經過 @MessageMapping 來暴露節點路徑,有點相似 @RequestMapping。注意這裏雖然寫的是 hello ,可是咱們客戶端調用的真正地址是 /app/hello。 由於咱們在上面的 config 裏配置了registry.setApplicationDestinationPrefixes("/app")
  2. @SendTo這個註解會把返回值的內容發送給訂閱了 /topic/hello 的客戶端,與之相似的還有一個@SendToUser 只不過他是發送給用戶端一對一通訊的。這兩個註解通常是應答時響應的,若是服務端主動發送消息能夠經過 simpMessagingTemplate類的convertAndSend方法。注意 simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg) ,聯繫到咱們上文配置的 registry.setUserDestinationPrefix("/user/"),這裏客戶端訂閱的是/user/{token}/msg,千萬不要搞錯。

Session 共享的問題

上面反覆提到一個問題就是,服務端若是要主動發送消息給客戶端必定要用到 session。而你們都知道的是 session 這個東西是不跨 jvm 的。若是有多臺服務器,在 http 請求的狀況下,咱們能夠經過把 session 放入緩存中間件中來共享解決這個問題,經過 spring session 幾條配置就解決了。可是 web socket  不能夠。他的 session 是不能序列化的,固然這樣設計的目的不是爲了爲難你,而是出於對 http 與 web socket 請求的差別致使的。
目前網上找到的最簡單方案就是經過 redis 訂閱廣播的形式,主要代碼跟第二種方式差很少,你要在本地放個 map 保存請求的 session。也就是說每臺服務器都會保存與他鏈接的 session 於本地。而後發消息的地方要修改,並非如今這樣直接發送,而經過 redis 的訂閱機制。服務器要發消息的時候,你經過 redis 廣播這條消息,全部訂閱的服務端都會收到這個消息,而後本地嘗試發送。最後確定只有有這個對應用戶 session 的那臺才能發送出去。

如何選擇

  1. 若是你在使用 tio,那推薦使用 tio 的集成。由於它已經實現了不少功能,包括上面說的經過 redis 的 session 共享,只要加幾個配置就能夠了。可是 tio 是半開源,文檔是須要收費的。若是沒有使用,那就忘了他。
  2. 若是你的業務要求比較靈活多變,推薦使用前兩種,更推薦第二種 Spring 封裝的形式。
  3. 若是隻是簡單的服務器雙向通訊,推薦 stomp 的形式,由於他更容易規範使用。

其它

  1. websocket 在線驗證

寫完服務端代碼後想調試,可是不會前端代碼怎麼辦,點這裏,這是一個在線的 websocket 客戶端,功能徹底夠咱們調試了。

  1. stomp 驗證

這個沒找到在線版的,可是網上有不少 demo 能夠下載到本地進行調試,也能夠經過後文的鏈接找到。

  1. 另外因爲篇幅有限,並不能放上全部代碼,可是測試代碼全都上傳 gitlab,保證能夠正常運行,能夠在 這裏 找到

參考連接

  1. SpringBoot 系統 - 集成 WebSocket 實時通訊
  2. WebSocket 的故事(二)—— Spring 中如何利用 STOMP 快速構建 WebSocket 廣播式消息模式
  3. [SpringBoot集成WebSocket【基於純H5】進行點對點[一對一]和廣播[一對多]實時推送](https://blog.csdn.net/Ouyzc/a...
  4. Spring Framework 參考文檔(WebSocket STOMP)
  5. Spring Boot中使用WebSocket總結(一):幾種實現方式詳解
  6. Spring Boot 系列 - WebSocket 簡單使用
  7. tio-websocket-spring-boot-starter
相關文章
相關標籤/搜索