Spring Boot 二三事:WEB 應用消息推送的那點事

閱讀對象:本文適合SpringBoot 初學者及對SpringBoot感興趣的童鞋閱讀。javascript

背景介紹:在企業級 WEB 應用開發中,爲了更好的用戶體驗&提高響應速度,每每會將一些耗時費力的請求 (Excel導入or導出,複雜計算, etc.) 進行***異步化***處理。 由此帶來的一個重要的問題是***如何通知用戶任務狀態***,常見的方法大體分爲2類4種:html

  • HTTP Polling client pull
  • HTTP Long-Polling client pull
  • Server-Sent Events (SSE) server push
  • WebSocket server push

1. Polling 短輪詢

是一種很是簡單的實現方式。就是client經過***定時任務***不斷得重複請求服務器,從而獲取新消息,而server按時間順序提供自上次請求之後發生的單個或多個消息。前端

Polling

短輪詢的優勢很是明顯,就是實現簡單。當兩個方向上的數據都很是少,而且請求間隔不是很是密集時,這種方法就會很是有效。例如,新聞評論信息能夠每半分鐘更新一次,這對用戶來講是能夠的。vue

它得缺點也是很是明顯,一旦咱們對數據實時性要求很是高時,爲了保證消息的及時送達,請求間隔必須縮短,在這種狀況下,會加重服務器資源的浪費,下降服務的可用性。另外一個缺點就是在消息的數量較少時,將會有大量的 request作無用功,進而也致使服務器資源的浪費。java

2. Long-Polling 長輪詢

長輪詢的官方定義是:git

The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.github

若是與Polling的方式相比,會發現Long-Polling的優勢是經過hold open HTTP request 從而減小了無用的請求。web

大體步驟爲:spring

  1. client向server請求並等待響應。
  2. 服務端將請求阻塞,並不斷檢查是否有新消息。若是在這個期間有新消息產生時就當即返回。不然一直等待至請求超時
  3. 當client 獲取到新消息請求超時,進行消息處理併發起下一次請求。

Long Polling

Long-Polling的缺點之一也是服務器資源的浪費,由於它和Polling的同樣都屬於***被動獲取***,都須要不斷的向服務器請求。在併發很高的狀況下,對服務器性能是個嚴峻的考驗。npm

Note:由於以上2兩種方式的實現都比較簡單,因此咱們這裏就不作代碼演示了。接下來咱們重點介紹一下Server-Sent EventsWebSocket

3. Demo概要

下面咱們將經過一個***下載文件***的案例進行演示SSEWebSocket的消息推送,在這以前,咱們先簡單說一下咱們項目的結構,整個項目基於SpringBoot 構建。

首先咱們定義一個供前端訪問的APIDownloadController

@RestController
public class DownloadController {
    private static final Logger log = getLogger(DownloadController.class);
    @Autowired
    private MockDownloadComponent downloadComponent;  

    @GetMapping("/api/download/{type}")
    public String download(@PathVariable String type, HttpServletRequest request) {  // (A)
        HttpSession session = request.getSession();
        String sessionid = session.getId();
        log.info("sessionid=[{}]", sessionid);
        downloadComponent.mockDownload(type, sessionid);  // (B)
        return "success"; // (C)
    }
}
複製代碼
  • (A) type參數用於區分使用哪一種推送方式,這裏爲sse,ws,stomp這三種類型。
  • (B) MockDownloadComponent用於異步模擬下載文件的過程。
  • (C) 由於下載過程爲異步化,因此該方法不會被阻塞並當即向客戶端返回success,用於代表下載開始

DownloadController中咱們調用MockDownloadComponentmockDownload()的方法進行模擬真正的下載邏輯。

@Component
public class MockDownloadComponent {
    private static final Logger log = LoggerFactory.getLogger(DownloadController.class);

    @Async // (A)
    public void mockDownload(String type, String sessionid) {
        for (int i = 0; i < 100; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(100); // (B)

                int percent = i + 1;
                String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
                log.info("username={}'s file has been finished [{}]% ", sessionid, percent);

                switch (type) { // (D)
                    case "sse":
                        SseNotificationController.usesSsePush(sessionid, content);
                        break;
                    case "ws":
                        WebSocketNotificationHandler.usesWSPush(sessionid, content);
                        break;
                    case "stomp":
                        this.usesStompPush(sessionid, content);
                        break;
                    default:
                        throw new UnsupportedOperationException("");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
複製代碼
  • (A) 咱們使用@Async讓使其異步化
  • (B) 模擬下載耗時。
  • (C) 消息的格式爲{"username":"abc","percent":1}
  • (D) 根據不一樣的type選擇消息推送方式。

4. Server-Sent Events

SSE 是W3C定義的一組API規範,這使服務器可以經過HTTP將數據推送到Web頁面,它具備以下特色:

  • 單向半雙工:只能由server向client推送消息
  • 基於http:數據被編碼爲「text/event-stream」內容並使用HTTP流機制進行傳輸
  • 數據格式無限制:消息只是遵循規範定義的一組key-value格式&UTF-8編碼的文本數據流,咱們能夠在消息payload中可使用JSON或者XML或自定義數據格式。
  • http 長鏈接: 消息的實際傳遞是經過一個長期存在的HTTP鏈接完成的,消耗資源更少
  • 簡單易用的API

Server-Sent Events

瀏覽器支持狀況:

support browser

Note:IE 瀏覽器可經過第三方JS庫進行支持SSE

4.1 SpringBoot 中使用SSE

從Spring 4.2開始支持SSE規範,咱們只須要在Controller中返回SseEmitter對象便可。

Note:Spring 5 中提供了Spring Webflux 能夠更加方便的使用SSE,可是爲更貼近咱們的實際項目,因此文本僅演示使用Spring MVC SSE。

咱們在服務器端定義一個SseNotificationController用於和客戶端處理和保存SSE鏈接. 其endpoint/api/sse-notification

@RestController
public class SseNotificationController {

    public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)

    @GetMapping("/api/sse-notification")
    public SseEmitter files(HttpServletRequest request) {
        long millis = TimeUnit.SECONDS.toMillis(60);
        SseEmitter sseEmitter = new SseEmitter(millis); // (B)

        HttpSession session = request.getSession();
        String sessionid = session.getId();

        SSE_HOLDER.put(sessionid, sseEmitter); 
        return sseEmitter;
    }

    /** * 經過sessionId獲取對應的客戶端進行推送消息 */
    public static void usesSsePush(String sessionid, String content) {  // (C)
        SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
        if (Objects.nonNull(emitter)) {
            try {
                emitter.send(content);
            } catch (IOException | IllegalStateException e) {
                log.warn("sse send error", e);
                SseNotificationController.SSE_HOLDER.remove(sessionid);
            }
        }
    }

}
複製代碼
  • (A) SSE_HOLDER保存了全部客戶端的SseEmitter,用於後續通知對應客戶端。
  • (B) 根據指定超時時間建立一個SseEmitter對象, 它是SpringMVC提供用於操做SSE的類。
  • (C) usesSsePush()提供根據sessionId向對應客戶端發送消息。發送只須要調用SseEmittersend()方法便可。

至此服務端已經完成,咱們使用Vue編寫客戶端Download.html進行測試。核心代碼以下:

usesSSENotification: function () {
                var tt = this;
                var url = "/api/sse-notification";
                var sseClient = new EventSource(url);  // (A)
                sseClient.onopen = function () {...}; // (B)

                sseClient.onmessage = function (msg) {   // (C)
                    var jsonStr = msg.data;
                    console.log('message', jsonStr);
                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.sseMsg += 'SSE 通知您:已下載完成' + percent + "%\r\n";
                    if (percent === 100) {
                        sseClient.close();  // (D)
                    }
                };
                sseClient.onerror = function () {
                    console.log("EventSource failed.");
                };
            }
複製代碼
  • (A) 開啓一個新的 SSE connection 並訪問 /api/sse-notification
  • (B) 當鏈接成功時的callback。
  • (C) 當有新消息時的callback。
  • (D) 當下載進度爲100%時,關閉鏈接。

效果演示

SSE DEMO

4. WebSocket

WebSocket 相似於標準的TCP鏈接,它是IETF(RFC 6455)定義的經過TCP進行實時全雙工通訊一種通訊方式,這意味這它的功能更強大,經常使用於如股票報價器,聊天應用。

相比於SSE,它不只能夠雙向通訊,並且甚至還能處理音頻/視頻等二進制內容。

Note:使用WebSocket,在高併發狀況下,服務器將擁有許多長鏈接。這對網絡代理層組件及WebSocket服務器都是一個不小的性能挑戰,咱們須要考慮其負載均衡方案。同時鏈接安全等問題也不容忽視。

4.1 Spring WebSocket (低級API)

Spring 4提供了一個新的Spring-WebSocket模塊,用於適應各類WebSocket引擎,它與Java WebSocket API標準(JSR-356)兼容,而且提供了額外的加強功能。

Note: 對於應用程序來講,直接使用WebSocket API會大大增長開發難度,因此Spring爲咱們提供了 STOMP over WebSocket 更高級別的API使用WebSocket。在本文中將會分別演示經過low level API及higher level API進行演示。

若是想在SpringBoot中使用WebSocket,首先須要引入spring-boot-starter-websocket依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
複製代碼

而後就能夠配置相關信息,咱們先經過low level API進行演示。

首先須要自定義一個WebSocketNotificationHandler用於處理WebSocket 的鏈接及消息處理。咱們只須要實現WebSocketHandler或子類TextWebSocketHandler BinaryWebSocketHandler

public class WebSocketNotificationHandler extends TextWebSocketHandler {

    private static final Logger log = getLogger(WebSocketNotificationHandler.class);

    public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>();  // (A)


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {   // (B)
        String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        WS_HOLDER.put(httpSessionId, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("handleTextMessage={}", message.getPayload()); 
    }

    public static void usesWSPush(String sessionid, String content) {    // (C)
        WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
        if (Objects.nonNull(wssession)) {
            TextMessage textMessage = new TextMessage(content);
            try {
                wssession.sendMessage(textMessage);
            } catch (IOException | IllegalStateException e) {
                WebSocketNotificationHandler.SESSIONS.remove(sessionid);
            }
        }
    }
}
複製代碼
  • (A) WS_HOLDER用於保存客戶端的WebSocket Session
  • (B) 重寫afterConnectionEstablished()方法,當鏈接創建以後,按sessionIdWebSocket Session保存至WS_HOLDER,用於後續向client推送消息。
  • (C) 根據sessionId獲取對應WebSocket Session,並調用WebSocket SessionsendMessage(textMessage)方法向client發送消息。

使用@EnableWebSocket開啓WebSocket,並實現WebSocketConfigurer進行配置。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler(); 
        
        registry.addHandler(notificationHandler, "/ws-notification") // (A)
                .addInterceptors(new HttpSessionHandshakeInterceptor())  // (B)
                .withSockJS();  // (C)
    }
}
複製代碼
  • (A) 將咱們自定義的WebSocketNotificationHandler註冊至WebSocketHandlerRegistry.
  • (B) HttpSessionHandshakeInterceptor是一個內置的攔截器,用於傳遞HTTP會話屬性到WebSocket會話。固然你也能夠經過HandshakeInterceptor接口實現本身的攔截器。
  • (C) 開啓SockJS的支持,SockJS的目標是讓應用程序使用WebSocket API時,當發現瀏覽器不支持時,無須要更改任何代碼,便可使用非WebSocket替代方案,儘量的模擬WebSocket。關於SockJS的更多資料,可參考github.com/sockjs/sock…

server端至此就基本大功告成,接下來咱們來完善一下client端Download.html,其核心方法以下:

usesWSNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-notification";
                var sock = new SockJS(url);   // (A)
                sock.onopen = function () {
                    console.log('open');
                    sock.send('test');
                };

                sock.onmessage = function (msg) {   // (B)
                    var jsonStr = msg.data;

                    console.log('message', jsonStr);

                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.wsMsg += 'WS 通知您:已下載完成' + percent + "%\r\n";
                    if (percent === 100) {
                        sock.close();
                    }
                };

                sock.onclose = function () { 
                    console.log('ws close');
                };
            }
複製代碼
  • (A) 首先須要在項目中引入SockJS Client , 並根據指定URL建立一個SockJS對象。
  • (B) 當有新消息時的callback,咱們能夠在該方法中處理咱們的消息。

效果演示

WebSocket

4.2 STOMP over WebSocket (高級API)

WebSocket雖然定義了兩種類型的消息,文本和二進制,可是針對消息的內容沒有定義,爲了更方便的處理消息,咱們但願Client和Server都須要就某種協議達成一致,以幫助處理消息。那麼,有沒有已經造好的輪子呢?答案確定是有的。這就是STOMP。

STOMP是一種簡單的面向文本的消息傳遞協議,它實際上是消息隊列的一種協議, 和AMQP,JMS是平級的。 只不過因爲它的簡單性恰巧能夠用於定義WS的消息體格式。雖然STOMP是面向文本的協議,但消息的內容也能夠是二進制數據。同時STOMP 可已使用任何可靠的雙向流網絡協議,如TCP和WebSocket,目前不少服務端消息隊列都已經支持了STOMP, 好比RabbitMQ, ActiveMQ等。

它結構是一種基於幀的協議,一幀由一個命令,一組可選的Header和一個可選的Body組成。

COMMAND
header1:value1
header2:value2

Body^@
複製代碼

客戶端可使用SENDSUBSCRIBE命令發送或訂閱消息。 經過destination標記述消息應由誰來接收處理,造成了相似於MQ的發佈訂閱機制。

STOMP的優點也很是明顯,即:

  1. 不須要建立自定義消息格式
  2. 咱們可使用現有的stomp.js客戶端
  3. 能夠實現消息路由及廣播
  4. 可使用第三方成熟的消息代理中間件,如RabbitMQ, ActiveMQ等

最重要的是,Spring STOMP 爲咱們提供了可以像Spring MVC同樣的編程模型,減小了咱們的學習成本。

下面將咱們的DEMO稍做調整,使用Spring STOMP來實現消息推送,在本例中咱們使用SimpleBroker模式,咱們的應用將會內置一個STOMP Broker,將全部信息保存至內存中。

具體代碼以下:

@Configuration
@EnableWebSocketMessageBroker  // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/ws-stomp-notification")
                .addInterceptors(httpSessionHandshakeInterceptor())   // (B)
                .setHandshakeHandler(httpSessionHandshakeHandler())  // (C)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")  // (D)
                .enableSimpleBroker("/topic", "/queue");  // (E)
    }

    @Bean
    public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
        return new HttpSessionHandshakeInterceptor();
    }

    @Bean
    public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
        return new HttpSessionHandshakeHandler();
    }

}

複製代碼
  • (A) 使用@EnableWebSocketMessageBroker註解開啓支持STOMP
  • (B) 建立一個攔截器,用於傳遞HTTP會話屬性到WebSocket會話。
  • (C) 配置一個自定義的HttpSessionHandshakeHandler,其主要做用是按sessionId標記識別鏈接。
  • (D) 設置消息處理器路由前綴,當消息的destination/app開頭時,將會把該消息路由到server端的對應的消息處理方法中。***(在本例中無實際意義)***
  • (E) 設置客戶端訂閱消息的路徑前綴

HttpSessionHandshakeHandler代碼以下:

public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        return new HttpSessionPrincipal(sessionId);

    }
}
複製代碼

當咱們須要向client發送消息時,只須要注入SimpMessagingTemplate對象便可,是否是感受很是熟悉?! 沒錯,這種Template模式和咱們平常使用的RestTemplate JDBCTemplate是同樣的。 咱們只須要調用SimpMessagingTemplateconvertAndSendToUser()方法便可向對應用戶發送消息了。

private void usesStompPush(String sessionid, String content) {
        String destination = "/queue/download-notification";
        messagingTemplate.convertAndSendToUser(sessionid, destination, content);
    }
複製代碼

在瀏覽器端,client可使用stomp.js和sockjs-client進行以下鏈接:

usesStompNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-stomp-notification";
                // 公共topic
                // var notificationTopic = "/topic/download-notification";
                // 點對點廣播
                var notificationTopic = "/user/queue/download-notification"; // (A)

                var socket = new SockJS(url);
                var stompClient = Stomp.over(socket);

                stompClient.connect({}, function (frame) {
                    console.log("STOMP connection successful");

                    stompClient.subscribe(notificationTopic, function (msg) {   // (B)
                        var jsonStr = msg.body;

                        var obj = JSON.parse(jsonStr);
                        var percent = obj.percent;
                        tt.stompMsg += 'STOMP 通知您:已下載完成' + percent + "%\r\n";
                        if (percent === 100) {
                            stompClient.disconnect()
                        }

                    });

                }, function (error) {
                    console.log("STOMP protocol error " + error)
                })
            }
複製代碼
  • (A) 若是想針對特定用戶接收消息,咱們須要以/user/爲前綴,Spring STOMP會把以/user/爲前綴的消息交給UserDestinationMessageHandler進行處理併發給特定的用戶,固然這個/user/是能夠經過WebSocketBrokerConfig進行個性化配置的,爲了簡單起見,咱們這裏就使用默認配置,因此咱們的topic url就是/user/queue/download-notification
  • (B) 設置stompClient消息處理callback進行消息處理。

效果演示

STOMP

5 總結

在文中爲你們簡單講解了幾種經常使用的消息推送方案,並經過一個下載案例重點演示了SSEWebSocket這兩種server push模式的消息推送。固然還有不少細節並無在文中說明,建議你們下載源碼對照參考。

相比較這幾種模式,小編認爲若是咱們的需求僅僅是向客戶端推送消息,那麼使用SSE的性價比更高一些,Long-Polling次之。使用WebSocket有一種殺雞用牛刀的感受,而且給咱們系統也帶來了更多的複雜性,得不償失,因此不太推薦。而Polling雖然實現方式最簡單且兼容性最強,可是其效率太低,因此不建議使用。固然若是您有其餘看法,歡迎留言討論交流。

文中示例源碼:github.com/leven-space…

若是您以爲這篇文章有用,請留下您的小💗💗,我是一枚Java小學生,歡迎你們吐槽留言。

相關文章
相關標籤/搜索