【SpringBoot WEB 系列】SSE 服務器發送事件詳解

【SpringBoot WEB系列】SSE 服務器發送事件詳解html

SSE 全稱Server Sent Event,直譯一下就是服務器發送事件,通常的項目開發中,用到的機會很少,可能不少小夥伴不太清楚這個東西,究竟是幹啥的,有啥用前端

本文主要知識點以下:html5

  • SSE 掃盲,應用場景分析
  • 藉助異步請求實現 sse 功能,加深概念理解
  • 使用SseEmitter實現一個簡單的推送示例

<!-- more -->java

I. SSE 掃盲

對於 sse 基礎概念比較清楚的能夠跳過本節git

1. 概念介紹

sse(Server Sent Event),直譯爲服務器發送事件,顧名思義,也就是客戶端能夠獲取到服務器發送的事件github

咱們常見的 http 交互方式是客戶端發起請求,服務端響應,而後一次請求完畢;可是在 sse 的場景下,客戶端發起請求,鏈接一直保持,服務端有數據就能夠返回數據給客戶端,這個返回能夠是屢次間隔的方式web

2. 特色分析

SSE 最大的特色,能夠簡單規劃爲兩個spring

  • 長鏈接
  • 服務端能夠向客戶端推送信息

瞭解 websocket 的小夥伴,可能也知道它也是長鏈接,能夠推送信息,可是它們有一個明顯的區別後端

sse 是單通道,只能服務端向客戶端發消息;而 webscoket 是雙通道服務器

那麼爲何有了 webscoket 還要搞出一個 sse 呢?既然存在,必然有着它的優越之處

sse websocket
http 協議 獨立的 websocket 協議
輕量,使用簡單 相對複雜
默認支持斷線重連 須要本身實現斷線重連
文本傳輸 二進制傳輸
支持自定義發送的消息類型 -

3. 應用場景

從 sse 的特色出發,咱們能夠大體的判斷出它的應用場景,須要輪詢獲取服務端最新數據的 case 下,多半是能夠用它的

好比顯示當前網站在線的實時人數,法幣匯率顯示當前實時匯率,電商大促的實時成交額等等...

II. 手動實現 sse 功能

sse 自己是有本身的一套玩法的,後面會進行說明,這一小節,則主要針對 sse 的兩個特色長鏈接 + 後端推送數據,若是讓咱們本身來實現這樣的一個接口,能夠怎麼作?

1. 項目建立

藉助 SpringBoot 2.2.1.RELEASE來建立一個用於演示的工程項目,核心的 xml 依賴以下

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

2. 功能實現

在 Http1.1 支持了長鏈接,請求頭添加一個Connection: keep-alive便可

在這裏咱們藉助異步請求來實現 sse 功能,至於什麼是異步請求,推薦查看博文: 【WEB 系列】異步請求知識點與使用姿式小結

由於後端能夠不定時返回數據,因此咱們須要注意的就是須要保持鏈接,不要返回一次數據以後就斷開了;其次就是須要設置請求頭Content-Type: text/event-stream;charset=UTF-8 (若是不是流的話會怎樣?)

// 新建一個容器,保存鏈接,用於輸出返回
private Map<String, PrintWriter> responseMap = new ConcurrentHashMap<>();

// 發送數據給客戶端
private void writeData(String id, String msg, boolean over) throws IOException {
    PrintWriter writer = responseMap.get(id);
    if (writer == null) {
        return;
    }

    writer.println(msg);
    writer.flush();
    if (over) {
        responseMap.remove(id);
    }
}

// 推送
@ResponseBody
@GetMapping(path = "subscribe")
public WebAsyncTask<Void> subscribe(String id, HttpServletResponse response) {

    Callable<Void> callable = () -> {
        response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
        responseMap.put(id, response.getWriter());
        writeData(id, "訂閱成功", false);
        while (true) {
            Thread.sleep(1000);
            if (!responseMap.containsKey(id)) {
                break;
            }
        }
        return null;
    };

    // 採用WebAsyncTask 返回 這樣能夠處理超時和錯誤 同時也能夠指定使用的Excutor名稱
    WebAsyncTask<Void> webAsyncTask = new WebAsyncTask<>(30000, callable);
    // 注意:onCompletion表示完成,無論你是否超時、是否拋出異常,這個函數都會執行的
    webAsyncTask.onCompletion(() -> System.out.println("程序[正常執行]完成的回調"));

    // 這兩個返回的內容,最終都會放進response裏面去===========
    webAsyncTask.onTimeout(() -> {
        responseMap.remove(id);
        System.out.println("超時了!!!");
        return null;
    });
    // 備註:這個是Spring5新增的
    webAsyncTask.onError(() -> {
        System.out.println("出現異常!!!");
        return null;
    });


    return webAsyncTask;
}

看一下上面的實現,基本上仍是異步請求的那一套邏輯,請仔細看一下callable中的邏輯,有一個 while 循環,來保證長鏈接不中斷

接下來咱們新增兩個接口,用來模擬後端給客戶端發送消息,關閉鏈接的場景

@ResponseBody
@GetMapping(path = "push")
public String pushData(String id, String content) throws IOException {
    writeData(id, content, false);
    return "over!";
}

@ResponseBody
@GetMapping(path = "over")
public String over(String id) throws IOException {
    writeData(id, "over", true);
    return "over!";
}

咱們簡單的來演示下操做過程

III. SseEmitter

上面只是簡單實現了 sse 的長鏈接 + 後端推送消息,可是與標準的 SSE 仍是有區別的,sse 有本身的規範,而咱們上面的實現,實際上並無管這個,致使的問題是前端按照 sse 的玩法來請求數據,可能並不能正常工做

1. sse 規範

在 html5 的定義中,服務端 sse,通常須要遵循如下要求

請求頭

開啓長鏈接 + 流方式傳遞

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

數據格式

服務端發送的消息,由 message 組成,其格式以下:

field:value\n\n

其中 field 有五種可能

  • 空: 即以:開頭,表示註釋,能夠理解爲服務端向客戶端發送的心跳,確保鏈接不中斷
  • data:數據
  • event: 事件,默認值
  • id: 數據標識符用 id 字段表示,至關於每一條數據的編號
  • retry: 重連時間

2. 實現

SpringBoot 利用 SseEmitter 來支持 sse,能夠說很是簡單了,直接返回SseEmitter對象便可;重寫一下上面的邏輯

@RestController
@RequestMapping(path = "sse")
public class SseRest {
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
    @GetMapping(path = "subscribe")
    public SseEmitter push(String id) {
        // 超時時間設置爲1小時
        SseEmitter sseEmitter = new SseEmitter(3600_000L);
        sseCache.put(id, sseEmitter);
        sseEmitter.onTimeout(() -> sseCache.remove(id));
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }

    @GetMapping(path = "push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.send(content);
        }
        return "over";
    }

    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.complete();
            sseCache.remove(id);
        }
        return "over";
    }
}

上面的實現,用到了 SseEmitter 的幾個方法,解釋以下

  • send(): 發送數據,若是傳入的是一個非SseEventBuilder對象,那麼傳遞參數會被封裝到 data 中
  • complete(): 表示執行完畢,會斷開鏈接
  • onTimeout(): 超時回調觸發
  • onCompletion(): 結束以後的回調觸發

一樣演示一下訪問請求

上圖總的效果和前面的效果差很少,並且輸出還待上了前綴,接下來咱們寫一個簡單的 html 消費端,用來演示一下完整的 sse 的更多特性

<!doctype html>
<html lang="en">
<head>
    <title>Sse測試文檔</title>
</head>
<body>
<div>sse測試</div>
<div id="result"></div>
</body>
</html>
<script>
    var source = new EventSource('http://localhost:8080/sse/subscribe?id=yihuihui');
    source.onmessage = function (event) {
        text = document.getElementById('result').innerText;
        text += '\n' + event.data;
        document.getElementById('result').innerText = text;
    };
    <!-- 添加一個開啓回調 -->
    source.onopen = function (event) {
        text = document.getElementById('result').innerText;
        text += '\n 開啓: ';
        console.log(event);
        document.getElementById('result').innerText = text;
    };
</script>

將上面的 html 文件放在項目的resources/static目錄下;而後修改一下前面的SseRest

@Controller
@RequestMapping(path = "sse")
public class SseRest {
    @GetMapping(path = "")
    public String index() {
        return "index.html";
    }

    @ResponseBody
    @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter push(String id) {
        // 超時時間設置爲3s,用於演示客戶端自動重連
        SseEmitter sseEmitter = new SseEmitter(1_000L);
        // 設置前端的重試時間爲1s
        sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("鏈接成功"));
        sseCache.put(id, sseEmitter);
        System.out.println("add " + id);
        sseEmitter.onTimeout(() -> {
            System.out.println(id + "超時");
            sseCache.remove(id);
        });
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }
}

咱們上面超時時間設置的比較短,用來測試下客戶端的自動重連,以下,開啓的日誌不斷增長

其次將 SseEmitter 的超時時間設長一點,再試一下數據推送功能

請注意上面的演示,當後端結束了長鏈接以後,客戶端會自動從新再次鏈接,不用寫額外的重試邏輯了,就這麼神奇

3. 小結

本篇文章介紹了 SSE 的相關知識點,並對比 websocket 給出了 sse 的優勢(至於啥優勢請往上翻)

請注意,本文雖然介紹了兩種 sse 的方式,第一種藉助異步請求來實現,若是須要完成 sse 的規範要求,須要本身作一些適配,若是須要了解 sse 底層實現原理的話,能夠參考一下;在實際的業務開發中,推薦使用SseEmitter

IV. 其餘

0. 項目

系列博文

源碼

1. 一灰灰 Blog

盡信書則不如,以上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現 bug 或者有更好的建議,歡迎批評指正,不吝感激

下面一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛

一灰灰blog

相關文章
相關標籤/搜索