跟着源碼一塊兒學:手把手教你用WebSocket打造Web端IM聊天

本文做者芋艿,原題「芋道 Spring Boot WebSocket 入門」,本次有修訂和改動。html

1、引言

WebSocket現在在Web端即時通信技術應用裏使用普遍,不只用於傳統PC端的網頁裏,也被不少移動端開發者用於基於HTML5的混合APP裏。對於想要在基於Web的應用裏添加IM、推送等實時通訊功能,WebSocket幾乎是必需要掌握的技術。前端

本文將基於Tomcat和Spring框架實現一個邏輯簡單的入門級IM應用,對於即時通信初學者來講,能找到一個簡單直接且能順利跑通的實例代碼,顯然意義更大,本文正是如此。但願能給你的IM開發和學習帶來啓發。java

注:源碼在本文第4、五節開頭的附件處可下載。node

學習交流:git

(本文同步發佈於:http://www.52im.net/thread-3483-1-1.htmlgithub

2、知識準備

若是你對Web端即時通信知識一頭霧水,務必先讀:《新手入門貼:史上最全Web端即時通信技術原理詳解》、《Web端即時通信技術盤點:短輪詢、Comet、Websocket、SSE》。web

限於篇幅,本文不會深究WebSocket技術理論,若有興趣請從基礎學習:算法

若是想要更硬核一點的,能夠讀讀下面這幾篇:spring

3、內容概述

相比 HTTP 協議來講,WebSocket 協議對大多數後端開發者是比較陌生的。數據庫

相對而言:WebSocket 協議重點是提供了服務端主動向客戶端發送數據的能力,這樣咱們就能夠完成實時性較高的需求。例如:聊天 IM 即便通信功能、消息訂閱服務、網頁遊戲等等。

同時:由於 WebSocket 使用 TCP 通訊,能夠避免重複建立鏈接,提高通訊質量和效率。例如:美團的長鏈接服務,具體能夠看看 《美團點評的移動端網絡優化實踐:大幅提高鏈接成功率、速度等》 。

友情提示:

這裏有個誤區,WebSocket 相比普通的 Socket 來講,僅僅是藉助 HTTP 協議完成握手,建立鏈接。後續的全部通訊,都和 HTTP 協議無關。

看到這裏,你們必定覺得又要開始嗶嗶 WebSocket 的概念。哈哈,我偏不~若是對這塊不了的朋友,能夠閱讀本文「二、知識準備」這一章。

要想使用WebSocket,通常有以下幾種解決方案可選:

目前筆者手頭有個涉及到 IM 即便通信的項目,採用的是方案三。

主要緣由是:咱們對 Netty 框架的實戰、原理與源碼,都相對熟悉一些,因此就考慮了它。而且,除了須要支持 WebSocket 協議,咱們還想提供原生的 Socket 協議。

若是僅僅是僅僅提供 WebSocket 協議的支持,能夠考慮採用方案一或者方案二,在使用上,兩個方案是比較接近的。相比來講,方案一 Spring WebSocket 內置了對 STOMP 協議的支持。

不過:本文仍是採用方案二「Tomcat WebSocket」來做爲入門示例。咳咳咳,沒有特殊的緣由,主要是開始寫本文以前,已經花了 2 小時使用它寫了一個示例。實在是有點懶,不想改。若是能重來,我要選李白,哈哈哈哈~

固然,不要慌,方案一和方案二的實現代碼,真心沒啥差異。

在開始搭建 Tomcat WebSocket 入門示例以前,咱們先來了解下 JSR-356 規範,定義了 Java 針對 WebSocket 的 API :即 Javax WebSocket 。規範是大哥,打死不會提供實現,因此 JSR-356 也是如此。目前,主流的 Web 容器都已經提供了 JSR-356 的實現,例如說 Tomcat、Jetty、Undertow 等等。

4、Tomcat WebSocket 實戰入門

4.一、基本介紹

示例代碼下載:

(因附件沒法上傳到此處,請從同步連接處下載: http://www.52im.net/thread-3483-1-1.html

代碼目錄內容是這樣: 

在本小節中,咱們會使用 Tomcat WebSocket 搭建一個 WebSocket 的示例。

提供以下消息的功能支持:

  • 1)身份認證請求;
  • 2)私聊消息;
  • 3)羣聊消息。

考慮到讓示例更加易懂,咱們先作成全局有且僅有一個大的聊天室,即創建上 WebSocket 的鏈接,都自動動進入該聊天室。

下面,開始遨遊 WebSocket 這個魚塘...

4.二、引入依賴

在 pom.xml 文件中,引入相關依賴。

<?xml version="1.0"encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 [url= http://maven.apache.org/xsd/m...">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>lab-25-01</artifactId>
    <dependencies>
        <!-- 實現對 WebSocket 相關依賴的引入,方便~ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!-- 引入 Fastjson ,實現對 JSON 的序列化,由於後續咱們會使用它解析消息 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
</project>
具體每一個依賴的做用,本身認真看下注釋。

4.三、WebsocketServerEndpoint

在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,建立 WebsocketServerEndpoint 類,定義 Websocket 服務的端點(EndPoint)。

代碼以下:

// WebsocketServerEndpoint.java
@Controller
@ServerEndpoint("/")
public class WebsocketServerEndpoint {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        logger.info("onOpen", session);
    }
    @OnMessage
    public void onMessage(Session session, String message) {
        logger.info("onOpen", session, message); // 生產環境下,請設置成 debug 級別
    }
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        logger.info("onClose", session, closeReason);
    }
    @OnError
    public void onError(Session session, Throwable throwable) {
        logger.info("onClose", session, throwable);
    }
}

如代碼所示:

  • 1)在類上,添加 @Controller 註解,保證建立一個 WebsocketServerEndpoint Bean;
  • 2)在類上,添加 JSR-356 定義的 @ServerEndpoint 註解,標記這是一個 WebSocket EndPoint ,路徑爲 / ;
  • 3)WebSocket 一共有四個事件,分別對應使用 JSR-356 定義的 @OnOpen@OnMessage@OnClose@OnError 註解。

這是最簡版的 WebsocketServerEndpoint 的代碼。在下文,咱們會慢慢把代碼補全。

4.四、WebSocketConfiguration

在 cn.iocoder.springboot.lab24.springwebsocket.config 包路徑下,建立 WebsocketServerEndpoint 配置類。

代碼以下:

// WebSocketConfiguration.java
@Configuration
// @EnableWebSocket // 無需添加該註解,由於咱們並非使用 Spring WebSocket
public class WebSocketConfiguration {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

PS:在 #serverEndpointExporter() 方法中,建立 ServerEndpointExporter Bean 。該 Bean 的做用,是掃描添加有 @ServerEndpoint 註解的 Bean 。

4.五、Application

建立 Application.java 類,配置 @SpringBootApplication 註解便可。

代碼以下:

// Application.java
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

執行 Application 啓動該示例項目。

考慮到你們可能不會或者不肯意寫前端代碼,因此咱們直接使用 WebSocket在線測試工具,測試 WebSocket 鏈接。

以下圖:

至此,最簡單的一個 WebSocket 項目的骨架,咱們已經搭建完成。下面,咱們開始改造,把相應的邏輯補全。

4.六、消息

在 HTTP 協議中,是基於 Request/Response 請求響應的同步模型,進行交互。在 Websocket 協議中,是基於 Message 消息的異步模型,進行交互。這一點,是很大的不一樣的,等會看到具體的消息類,感覺會更明顯。

由於 WebSocket 協議,不像 HTTP 協議有 URI 能夠區分不一樣的 API 請求操做,因此咱們須要在 WebSocket 的 Message 裏,增長可以標識消息類型,這裏咱們採用 type 字段。

因此在這個示例中,咱們採用的 Message 採用 JSON 格式編碼。

格式以下:

{
    type: "", // 消息類型
    body: {} // 消息體
}

解釋一下:

  • 1)type 字段,消息類型。經過該字段,咱們知道使用哪一個 MessageHandler 消息處理器(關於 MessageHandler ,咱們在下一節中,詳細解析);
  • 2)body 字段,消息體。不一樣的消息類型,會有不一樣的消息體;
  • 3)Message 採用 JSON 格式編碼,主要考慮便捷性,實際項目下,也能夠考慮 Protobuf 等更加高效且節省流量的編碼格式。

實際上:咱們在該示例中,body 字段對應的 Message 相關的接口和類,實在想不到名字了。全部的 Message 們,咱們都放在 cn.iocoder.springboot.lab25.springwebsocket.message 包路徑下。

4.6.1 Message

建立 Message 接口,基礎消息體,全部消息體都要實現該接口。

代碼以下:

// Message.java
publicinterfaceMessage {
}

目前做爲一個標記接口,未定義任何操做。

4.6.2 認證相關 Message

建立 AuthRequest 類,用戶認證請求。

代碼以下:

// AuthRequest.java
public class AuthRequest implements Message {
    public static final String TYPE = "AUTH_REQUEST";
    /**
     * 認證 Token
     */
    private String accessToken;
    // ... 省略 set/get 方法
}

解釋一下:

  • 1)TYPE 靜態屬性,消息類型爲 AUTH_REQUEST 。
  • 2)accessToken 屬性,認證 Token 。

對於第2)點,在 WebSocket 協議中,咱們也須要認證當前鏈接,用戶身份是什麼。通常狀況下,咱們採用用戶調用 HTTP 登陸接口,登陸成功後返回的訪問令牌 accessToken 。這裏,咱們先不拓展開講,過後能夠看看 《基於 Token 認證的 WebSocket 鏈接》 文章。

雖說,WebSocket 協議是基於 Message 模型,進行交互。可是,這並不意味着它的操做,不須要響應結果。例如說,用戶認證請求,是須要用戶認證響應的。因此,咱們建立 AuthResponse 類,做爲用戶認證響應。

代碼以下:

// AuthResponse.java
public class AuthResponse implements Message {
    public static final String TYPE = "AUTH_RESPONSE";
    /**
     * 響應狀態碼
     */
    private Integer code;
    /**
     * 響應提示
     */
    private String message;
    // ... 省略 set/get 方法
}

解釋一下:

  • 1)TYPE 靜態屬性,消息類型爲 AUTH_REQUEST ;
  • 2)code 屬性,響應狀態碼;
  • 3)message 屬性,響應提示。

對於第1)點,實際上,咱們在每一個 Message 實現類上,都增長了 TYPE 靜態屬性,做爲消息類型。下面,咱們就不重複贅述了。

在本示例中,用戶成功認證以後,會廣播用戶加入羣聊的通知 Message ,使用 UserJoinNoticeRequest 。

代碼以下:

// UserJoinNoticeRequest.java
public class UserJoinNoticeRequest implements Message {
    public static final String TYPE = "USER_JOIN_NOTICE_REQUEST";
    /**
     * 暱稱
     */
    private String nickname;
    // ... 省略 set/get 方法
}

實際上,咱們能夠在須要使用到 Request/Response 模型的地方,將 Message 進行拓展:

  • 1)Request 抽象類,增長 requestId 字段,表示請求編號;
  • 2)Response 抽象類,增長 requestId 字段,和每個 Request 請求映射上(同時,裏面統必定義 code 和 message 屬性,表示響應狀態碼和響應提示)。

這樣,在使用到同步模型的業務場景下,Message 實現類使用 Request/Reponse 做爲後綴。例如說,用戶認證請求、刪除一個好友請求等等。

而在使用到異步模型能的業務場景下,Message 實現類仍是繼續 Message 做爲後綴。例如說,發送一條消息,用戶操做完後,無需阻塞等待結果

4.6.3 發送消息相關 Message

建立 SendToOneRequest 類,發送給指定人的私聊消息的 Message。

代碼以下:

// SendToOneRequest.java
public class SendToOneRequest implements Message {
    public static final String TYPE = "SEND_TO_ONE_REQUEST";
    /**
     * 發送給的用戶
     */
    private String toUser;
    /**
     * 消息編號
     */
    private String msgId;
    /**
     * 內容
     */
    private String content;
    // ... 省略 set/get 方法
}

每一個字段,本身看註釋噢。

建立 SendToAllRequest 類,發送給全部人的羣聊消息的 Message。

代碼以下:

// SendToAllRequest.java
public class SendToAllRequest implements Message {
    public static final String TYPE = "SEND_TO_ALL_REQUEST";
    /**
     * 消息編號
     */
    private String msgId;
    /**
     * 內容
     */
    private String content;
    // ... 省略 set/get 方法
}

每一個字段,本身看註釋噢。

在服務端接收到發送消息的請求,須要異步響應發送是否成功。因此,建立 SendResponse 類,發送消息響應結果的 Message 。

代碼以下:

// SendResponse.java
public class SendResponse implements Message {
    public static final String TYPE = "SEND_RESPONSE";
    /**
     * 消息編號
     */
    private String msgId;
    /**
     * 響應狀態碼
     */
    private Integer code;
    /**
     * 響應提示
     */
    private String message;
    // ... 省略 set/get 方法
}

重點看 msgId 字段:即消息編號。客戶端在發送消息,經過使用 UUID 算法,生成全局惟一消息編號(惟一ID的生成技術見:《重新手到專家:如何設計一套億級消息量的分佈式IM系統》的「_五、惟一ID的技術方案_」章節)。這樣,服務端經過 SendResponse 消息響應,經過 msgId 作映射。

在服務端接收到發送消息的請求,須要轉發消息給對應的人。因此,建立 SendToUserRequest 類,發送消息給一個用戶的 Message 。

代碼以下:

// SendResponse.java
public class SendToUserRequest implements Message {
    public static final String TYPE = "SEND_TO_USER_REQUEST";
    /**
     * 消息編號
     */
    private String msgId;
    /**
     * 內容
     */
    private String content;
    // ... 省略 set/get 方法
}

相比 SendToOneRequest 來講,少一個 toUser 字段。由於,咱們能夠經過 WebSocket 鏈接,已經知道發送給誰了。

4.七、消息處理器

每一個客戶端發起的 Message 消息類型,咱們會聲明對應的 MessageHandler 消息處理器。這個就相似在 SpringMVC 中,每一個 API 接口對應一個 Controller 的 Method 方法。

全部的 MessageHandler 們,咱們都放在 cn.iocoder.springboot.lab25.springwebsocket.handler 包路徑下。

4.7.1 MessageHandler

建立 MessageHandler 接口,消息處理器接口。

代碼以下:

// MessageHandler.java
public interface MessageHandler<T extends Message> {
    /**
     * 執行處理消息
     *
     * @param session 會話
     * @param message 消息
     */
    void execute(Session session, T message);
    /**
     * @return 消息類型,即每一個 Message 實現類上的 TYPE 靜態字段
     */
    String getType();
}

解釋一下:

  • 1)定義了泛型 <T> ,須要是 Message 的實現類;
  • 2)定義的兩個接口方法,本身看下注釋哈。

4.7.2 AuthMessageHandler

建立 AuthMessageHandler 類,處理 AuthRequest 消息。

代碼以下:

// AuthMessageHandler.java
@Component
public class AuthMessageHandler implements MessageHandler<AuthRequest> {
    @Override
    public void execute(Session session, AuthRequest message) {
        // 若是未傳遞 accessToken
        if(StringUtils.isEmpty(message.getAccessToken())) {
            WebSocketUtil.send(session, AuthResponse.TYPE,
                    new AuthResponse().setCode(1).setMessage("認證 accessToken 未傳入"));
            return;
        }
        // 添加到 WebSocketUtil 中
        WebSocketUtil.addSession(session, message.getAccessToken()); // 考慮到代碼簡化,咱們先直接使用 accessToken 做爲 User
        // 判斷是否定證成功。這裏,僞裝直接成功
        WebSocketUtil.send(session, AuthResponse.TYPE,newAuthResponse().setCode(0));
        // 通知全部人,某我的加入了。這個是可選邏輯,僅僅是爲了演示
        WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,
                newUserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考慮到代碼簡化,咱們先直接使用 accessToken 做爲 User
    }
    @Override
    public String getType() {
        return AuthRequest.TYPE;
    }
}

代碼比較簡單,跟着代碼讀讀便可。

關於 WebSocketUtil 類,咱們在「5.八、WebSocketUtil」一節中再來詳細看看。

4.7.3 SendToOneRequest

建立 SendToOneHandler 類,處理 SendToOneRequest 消息。

代碼以下:

// SendToOneRequest.java
@Component
public class SendToOneHandler implements MessageHandler<SendToOneRequest> {
    @Override
    public void execute(Session session, SendToOneRequest message) {
        // 這裏,僞裝直接成功
        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
        // 建立轉發的消息
        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        // 廣播發送
        WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);
    }
    @Override
    public String getType() {
        return SendToOneRequest.TYPE;
    }
}

代碼比較簡單,跟着代碼讀讀便可。

4.7.4 SendToAllHandler

建立 SendToAllHandler 類,處理 SendToAllRequest 消息。

代碼以下:

// SendToAllRequest.java
@Component
public class SendToAllHandler implements MessageHandler<SendToAllRequest> {
    @Override
    public void execute(Session session, SendToAllRequest message) {
        // 這裏,僞裝直接成功
        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
        // 建立轉發的消息
        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        // 廣播發送
        WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);
    }
    @Override
    public String getType() {
        return SendToAllRequest.TYPE;
    }
}

代碼比較簡單,跟着代碼讀讀便可。

4.八、WebSocketUtil

代碼在 cn.iocoder.springboot.lab25.springwebsocket.util 包路徑下。

建立 WebSocketUtil 工具類,主要提供兩方面的功能:

  • 1)Session 會話的管理;
  • 2)多種發送消息的方式。

總體代碼比較簡單,本身瞅瞅喲。

代碼在目錄中的以下位置:

4.九、完善 WebsocketServerEndpoint

在本小節,咱們會修改 WebsocketServerEndpoint 的代碼,完善其功能。

4.9.1 初始化 MessageHandler 集合

實現 InitializingBean 接口,在 #afterPropertiesSet() 方法中,掃描全部 MessageHandler Bean ,添加到 MessageHandler 集合中。

代碼以下:

// WebsocketServerEndpoint.java
/**
 * 消息類型與 MessageHandler 的映射
 *
 * 注意,這裏設置成靜態變量。雖說 WebsocketServerEndpoint 是單例,可是 Spring Boot 仍是會爲每一個 WebSocket 建立一個 WebsocketServerEndpoint Bean 。
 */
private static final Map<String, MessageHandler> HANDLERS = newHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() throws Exception {
    // 經過 ApplicationContext 得到全部 MessageHandler Bean
    applicationContext.getBeansOfType(MessageHandler.class).values() // 得到全部 MessageHandler Bean.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
    logger.info("afterPropertiesSet", HANDLERS.size());
}

經過這樣的方式,能夠避免手動配置 MessageHandler 與消息類型的映射。

4.9.2 onOpen

從新實現 #onOpen(Session session, EndpointConfig config) 方法,實現鏈接時,使用 accessToken 參數進行用戶認證。

代碼以下:

// WebsocketServerEndpoint.java
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    logger.info("onOpen", session);
    // <1> 解析 accessToken
    List<String> accessTokenValues = session.getRequestParameterMap().get("accessToken");
    String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;
    // <2> 建立 AuthRequest 消息類型
    AuthRequest authRequest = newAuthRequest().setAccessToken(accessToken);
    // <3> 得到消息處理器
    MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE);
    if(messageHandler == null) {
        logger.error("onOpen");
        return;
    }
    messageHandler.execute(session, authRequest);
}

如代碼所示:

  • <1> 處:解析 ws:// 地址上的 accessToken 的請求參。例如說:ws://127.0.0.1:8080?accessToken=999999;
  • <2> 處:建立 AuthRequest 消息類型,並設置 accessToken 屬性;
  • <3> 處:得到 AuthRequest 消息類型對應的 MessageHandler 消息處理器,而後調用 MessageHandler#execute(session, message) 方法,執行處理用戶認證請求。

打開三個瀏覽器建立,分別設置服務地址以下:

  • 1)ws://127.0.0.1:8080/?accessToken=芋艿;
  • 2)ws://127.0.0.1:8080/?accessToken=番茄;
  • 3)ws://127.0.0.1:8080/?accessToken=土豆。

而後,逐個點擊「開啓鏈接」按鈕,進行 WebSocket 鏈接。

最終效果以下圖:

如上圖所示:

  • 1)在紅圈中,能夠看到 AuthResponse 的消息;
  • 2)在黃圈中,能夠看到 UserJoinNoticeRequest 的消息。

4.9.3 onMessage

從新實現 _#onMessage(Session session, String message)_ 方法,實現不一樣的消息,轉發給不一樣的 MessageHandler 消息處理器。

代碼以下:

// WebsocketServerEndpoint.java
@OnMessage
public void onMessage(Session session, String message) {
    logger.info("onOpen", session, message); // 生產環境下,請設置成 debug 級別
    try{
        // <1> 得到消息類型
        JSONObject jsonMessage = JSON.parseObject(message);
        String messageType = jsonMessage.getString("type");
        // <2> 得到消息處理器
        MessageHandler messageHandler = HANDLERS.get(messageType);
        if(messageHandler == null) {
            logger.error("onMessage", messageType);
            return;
        }
        // <3> 解析消息
        Class<? extendsMessage> messageClass = this.getMessageClass(messageHandler);
        // <4> 處理消息
        Message messageObj = JSON.parseObject(jsonMessage.getString("body"), messageClass);
        messageHandler.execute(session, messageObj);
    } catch(Throwable throwable) {
        logger.info("onMessage", session, throwable);
    }
}

代碼中:

  • <1> 處,得到消息類型,從 "type" 字段中;
  • <2> 處,得到消息類型對應的 MessageHandler 消息處理器;
  • <3> 處,調用 #getMessageClass(MessageHandler handler) 方法,經過 MessageHandler 中,經過解析其類上的泛型,得到消息類型對應的 Class 類。

代碼以下:

// WebsocketServerEndpoint.java
private Class<? extends Message> getMessageClass(MessageHandler handler) {
    // 得到 Bean 對應的 Class 類名。由於有可能被 AOP 代理過。
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
    // 得到接口的 Type 數組
    Type[] interfaces = targetClass.getGenericInterfaces();
    Class<?> superclass = targetClass.getSuperclass();
    while((Objects.isNull(interfaces) || 0== interfaces.length) && Objects.nonNull(superclass)) { // 此處,是以父類的接口爲準
        interfaces = superclass.getGenericInterfaces();
        superclass = targetClass.getSuperclass();
    }
    if(Objects.nonNull(interfaces)) {
        // 遍歷 interfaces 數組
        for(Type type : interfaces) {
            // 要求 type 是泛型參數
            if(type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                // 要求是 MessageHandler 接口
                if(Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
                    Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                    // 取首個元素
                    if(Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                        return(Class<Message>) actualTypeArguments[0];
                    } else{
                        thrownewIllegalStateException(String.format("類型(%s) 得到不到消息類型", handler));
                    }
                }
            }
        }
    }
    throw new IllegalStateException(String.format("類型(%s) 得到不到消息類型", handler));
}

這是參考 rocketmq-spring 項目的 DefaultRocketMQListenerContainer#getMessageType() 方法,進行略微修改。

若是你們對 Java 的泛型機制沒有作過一點了解,可能略微有點硬核。能夠先暫時跳過,知道意圖便可。

<4> 處,調用 MessageHandler#execute(session, message) 方法,執行處理請求。

另外:這裏增長了 try-catch 代碼,避免整個執行的過程當中,發生異常。若是在 onMessage 事件的處理中,發生異常,該消息對應的 Session 會話會被自動關閉。顯然,這個不符合咱們的要求。例如說,在 MessageHandler 處理消息的過程當中,發生一些異常是沒法避免的。

繼續基於上述建立的三個瀏覽器,咱們先點擊「清空消息」按鈕,清空下消息,打掃下上次測試展現出來的接收穫得的 Message 。固然,WebSocket 的鏈接,不須要去斷開。

在第一個瀏覽器中,分別發送兩種聊天消息。

一條 SendToOneRequest 私聊消息:

{
    type: "SEND_TO_ONE_REQUEST",
    body: {
        toUser: "番茄",
        msgId: "eaef4a3c-35dd-46ee-b548-f9c4eb6396fe",
        content: "我是一條單聊消息"
    }
}

一條 SendToAllHandler 羣聊消息:

{
    type: "SEND_TO_ALL_REQUEST",
    body: {
        msgId: "838e97e1-6ae9-40f9-99c3-f7127ed64747",
        content: "我是一條羣聊消息"
    }
}

最終結果以下圖:

如上圖所示:

  • 1)在紅圈中,能夠看到一條 SendToUserRequest 的消息,僅有第二個瀏覽器(番茄)收到;
  • 2)在黃圈中,能夠看到三條 SendToUserRequest 的消息,全部瀏覽器都收到。

4.9.4 onClose

從新實現 _#onClose(Session session, CloseReason closeReason)_ 方法,實現移除關閉的 Session 。

代碼以下:

// WebsocketServerEndpoint.java
@OnClose
public void onClose(Session session, CloseReason closeReason) {
    logger.info("onClose", session, closeReason);
    WebSocketUtil.removeSession(session);
}

4.9.5 onError

#onError(Session session, Throwable throwable) 方法,保持不變。

代碼以下:

// WebsocketServerEndpoint.java
@OnError
public void onError(Session session, Throwable throwable) {
    logger.info("onClose", session, throwable);
}

5、Spring WebSocket 實戰入門

5.0、基礎介紹

示例代碼下載:

(因附件沒法上傳到此處,請從同步連接處下載: http://www.52im.net/thread-3483-1-1.html

仔細一個捉摸,虎軀一震,仍是提供一個 Spring WebSocket 快速入門的示例。

在 上章「Tomcat WebSocket 實戰入門」 的 _lab-websocket-25-01_ 示例的基礎上,咱們複製出 lab-websocket-25-02 項目,進行改造。

改造的代碼目錄內容是這樣:

5.一、WebSocketUtil

由於 Tomcat WebSocket 使用的是 Session 做爲會話,而 Spring WebSocket 使用的是 WebSocketSession 做爲會話,致使咱們須要略微修改下 WebSocketUtil 工具類。改動很是略微,點擊 WebSocketUtil.java 查看下,秒懂的噢。

主要有兩點:

  • 1)將全部使用 Session 類的地方,調整成 WebSocketSession 類;
  • 2)將發送消息,從 Session 修改爲 WebSocketSession 。

5.二、消息處理器

將 _cn.iocoder.springboot.lab25.springwebsocket.handler_ 包路徑下的消息處理器們,使用到 Session 類的地方,調整成 WebSocketSession 類。

5.三、DemoWebSocketShakeInterceptor

在 _cn.iocoder.springboot.lab25.springwebsocket.websocket_ 包路徑下,建立 DemoWebSocketShakeInterceptor 攔截器。由於 WebSocketSession 沒法得到 ws 地址上的請求參數,因此只好經過該攔截器,得到 accessToken 請求參數,設置到 attributes 中。

代碼以下:

// DemoWebSocketShakeInterceptor.java
public class DemoWebSocketShakeInterceptor extends HttpSessionHandshakeInterceptor {
    @Override// 攔截 Handshake 事件
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {
        // 得到 accessToken
        if(request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            attributes.put("accessToken", serverRequest.getServletRequest().getParameter("accessToken"));
        }
        // 調用父方法,繼續執行邏輯
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
}

5.四、DemoWebSocketHandler

在 _cn.iocoder.springboot.lab25.springwebsocket.websocket_ 包路徑下,建立 DemoWebSocketHandler 處理器。該處理器參考 「5.九、完善 WebsocketServerEndpoint」 小節,編寫它的代碼。

DemoWebSocketHandler.java代碼位於以下目錄處,具體內容就不貼出來了,自已去讀一讀:

代碼極其類似,簡單擼下便可。

5.五、WebSocketConfiguration

修改 WebSocketConfiguration 配置類,代碼以下:

// WebSocketConfiguration.java
@Configuration
@EnableWebSocket// 開啓 Spring WebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(this.webSocketHandler(), "/") // 配置處理器
                .addInterceptors(newDemoWebSocketShakeInterceptor()) // 配置攔截器
                .setAllowedOrigins("*"); // 解決跨域問題
    }
    @Bean
    public DemoWebSocketHandler webSocketHandler() {
        return new DemoWebSocketHandler();
    }
    @Bean
    public DemoWebSocketShakeInterceptor webSocketShakeInterceptor() {
        return new DemoWebSocketShakeInterceptor();
    }
}

解釋一下:

  • 1)在類上,添加 @EnableWebSocket 註解,開啓 Spring WebSocket 功能;
  • 2)實現 WebSocketConfigurer 接口,自定義 WebSocket 的配置(具體能夠看看 #registerWebSocketHandlers(registry) 方法,配置 WebSocket 處理器、攔截器,以及容許跨域)。

至此,咱們已經完成 Spring WebSocket 的示例。

後面,咱們執行 Application 來啓動項目。具體的測試,這裏就不重複了,能夠本身使用 WebSocket 在線測試工具 來測試下。

7、寫在最後

雖說,WebSocket 協議已經在主流的瀏覽器上,獲得很是好的支持,可是總有一些「異類」,是不兼容的。因此就誕生了 SockJS、Socket.io這類庫。關於它們的介紹與使用,能夠看看 《SockJS 簡單介紹》 、《Web端即時通信技術的發展與WebSocket、Socket.io的技術實踐》文章。

實際場景下,咱們在使用 WebSocket 仍是原生 Socket 也好,都須要考慮「如何保證消息必定送達給用戶?」

你們確定可以想到的是:若是用戶不處於在線的時候,消息持久化到 MySQL、MongoDB 等等數據庫中。這個是正確,且是必需要作的。

咱們在一塊兒考慮下邊界場景:客戶端網絡環境較差,特別是在移動端場景下,出現網絡閃斷,可能會出現鏈接實際已經斷開,而服務端覺得客戶端處於在線的狀況。此時,服務端會將消息發給客戶端,那麼消息實際就發送到「空氣」中,產生丟失的狀況。

要解決這種狀況下的問題,須要引入客戶端的 ACK 消息機制。

目前,主流的有兩種作法。

第一種:基於每一條消息編號 ACK

總體流程以下:

  • 1)不管客戶端是否在線,服務端都先把接收到的消息持久化到數據庫中。若是客戶端此時在線,服務端將完整消息推送給客戶端;
  • 2)客戶端在接收到消息以後,發送 ACK 消息編號給服務端,告知已經收到該消息。服務端在收到 ACK 消息編號的時候,標記該消息已經發送成功;
  • 3)服務端定時輪詢,在線的客戶端,是否有超過 N 秒未 ACK 的消息。若是有,則從新發送消息給對應的客戶端。

這種方案,由於客戶端逐條 ACK 消息編號,因此會致使客戶端和服務端交互次數過多。固然,客戶端能夠異步批量 ACK 多條消息,從而減小次數。

不過由於服務端仍然須要定時輪詢,也會致使服務端壓力較大。因此,這種方案基本已經不採用了。

第二種:基於滑動窗口 ACK

總體流程以下:

  • 1)不管客戶端是否在線,服務端都先把接收到的消息持久化到數據庫中。若是客戶端此時在線,服務端將消息編號推送給客戶端;
  • 2)客戶端在接收到消息編號以後,和本地的消息編號進行比對。若是比本地的小,說明該消息已經收到,忽略不處理;若是比本地的大,使用本地的消息編號,向服務端拉取大於本地的消息編號的消息列表,即增量消息列表。拉取完成後,更新消息列表中最大的消息編號爲新的本地的消息編號;
  • 3)服務端在收到客戶端拉取增量的消息列表時,將請求的編號記錄到數據庫中,用於知道客戶端此時本地的最新消息編號;
  • 4)考慮到服務端將消息編號推送給客戶端,也會存在丟失的狀況,因此客戶端會每 N 秒定時向服務端拉取大於本地的消息編號的消息列表。

這種方式,在業務被稱爲推拉結合的方案,在分佈式消息隊列、配置中心、註冊中心實現實時的數據同步,常常被採用。

而且,採用這種方案的狀況下,客戶端和服務端不必定須要使用長鏈接,也可使用長輪詢所替代。

作法好比,客戶端發送帶有消息版本號的 HTTP 請求到服務端:

  • 1)若是服務端已有比客戶端新的消息編號,則直接返回增量的消息列表;
  • 2)若是服務端沒有比客戶端新的消息編號,則 HOLD 住請求,直到有新的消息列表能夠返回,或者 HTTP 請求超時;
  • 3)客戶端在收到 HTTP 請求超時時,當即又從新發起帶有消息版本號的 HTTP 請求到服務端。如此反覆循環,經過消息編號做爲增量標識,達到實時獲取消息的目的。

若是你們對消息可靠投遞這塊感興趣,能夠看看下面這幾篇:

畢竟,本篇這裏寫的有點簡略哈 ~

最後:若是你想系統的學習IM開發方面方面的知識,推薦詳讀:《新手入門一篇就夠:從零開發移動端IM》。若是你自認爲已經有點小牛x了,能夠看看生產環境下的大用戶量IM系統架構設計方面的知識:《重新手到專家:如何設計一套億級消息量的分佈式IM系統》。

限於篇幅,這裏就再也不繼續展開了。

附錄:更多IM開發動手實踐文章

自已開發IM有那麼難嗎?手把手教你自擼一個Andriod版簡易IM (有源碼)
一種Android端IM智能心跳算法的設計與實現探討(含樣例代碼)
手把手教你用Netty實現網絡通訊程序的心跳機制、斷線重連機制
《[輕量級即時通信框架MobileIMSDK的iOS源碼(開源版)[附件下載]](http://www.52im.net/thread-35...
《[開源IM工程「蘑菇街TeamTalk」2015年5月前未刪減版完整代碼 [附件下載]](http://www.52im.net/thread-77...
《[NIO框架入門(一):服務端基於Netty4的UDP雙向通訊Demo演示 [附件下載]](http://www.52im.net/thread-36...
《[NIO框架入門(二):服務端基於MINA2的UDP雙向通訊Demo演示 [附件下載]](http://www.52im.net/thread-37...
《[NIO框架入門(三):iOS與MINA二、Netty4的跨平臺UDP雙向通訊實戰 [附件下載]](http://www.52im.net/thread-37...
《[NIO框架入門(四):Android與MINA二、Netty4的跨平臺UDP雙向通訊實戰 [附件下載]](http://www.52im.net/thread-38...
《[一個WebSocket實時聊天室Demo:基於node.js+socket.io [附件下載]](http://www.52im.net/thread-51...
適合新手:從零開發一個IM服務端(基於Netty,有完整源碼)
拿起鍵盤就是幹:跟我一塊兒徒手開發一套分佈式IM系統
正確理解IM長鏈接的心跳及重連機制,並動手實現(有完整IM源碼)
適合新手:手把手教你用Go快速搭建高性能、可擴展的IM系統(有源碼)
跟着源碼一塊兒學:手把手教你用WebSocket打造Web端IM聊天

本文已同步發佈於「即時通信技術圈」公衆號。

▲ 本文在公衆號上的連接是:點此進入。同步發佈連接是:http://www.52im.net/thread-3483-1-1.html

相關文章
相關標籤/搜索