Spring Cloud 微服務架構下的 WebSocket 解決方案

WebSocket在現代瀏覽器中的應用已經算是比較廣泛了,在某些業務場景下,要求必須可以在服務器端推送消息至客戶端。在沒有WebSocket的年代,咱們使用過dwr,在那個時候dwr真實一個很是棒的方案。可是在WebSocket興起以後,咱們更願意使用標準實現來解決問題、html

首先交代一下,本篇文章不講解WebSocket的配置,主要講的是針對在微服務架構集羣模式下解決方案的選擇。前端

微服務架構你們應該都不陌生了,在微服務架構下,服務是分佈式的,並且爲了保證業務的可用性,每一個服務都是以集羣的形式存在。在集羣模式下,要保證集羣的每個節點的訪問獲得相同的結果就須要作到數據一致性,如緩存、session等。web

微服務集羣緩存一般使用分佈式緩存redis解決,session一致性也一般會經過redis解決,可是如今更流行的是無狀態的Http,即無session化,最多見的解決方案就是OAuth。redis

WebSocket有所不一樣,它是與服務端創建一個長鏈接,在集羣模式下,顯然不可能把前端與服務集羣中的每個節點創建鏈接,一個可行的思路是像解決http session的共享同樣,經過redis來實現websocket的session共享,可是websocket session的數量是遠多於http session的數量的(由於每打開一個頁面都會創建一個websocket鏈接),因此隨着用戶量的增加,共享的數據量太大,很容易形成瓶頸。spring

另外一個思路是,websocket總歸會與集羣中某個節點創建鏈接,那麼,只要找到鏈接所在的節點,就能夠向服務端推送消息了,那麼要解決的問題就是如何找到一個websocket鏈接所在的節點。要找到鏈接在哪一個節點上,咱們須要一個惟一的標識符用於尋找鏈接,然而在基於stomp的發佈-訂閱模式下,一個消息的推送多是面向若干個鏈接的,可能分佈在集羣中的每個節點上,這樣去尋找鏈接的代價也很高。既然這樣,咱們不妨換種思路,每個websocket消息,咱們在集羣的每一個節點上都進行推送,訂閱了該消息的鏈接,無論有一個仍是一萬個,最終確定都能收到這個消息。基於這個思路,咱們作了一些技術選型:瀏覽器

  • RabbitMQ緩存

  • Spring Cloud Streambash

首先說RabbitMQ,高級消息隊列,能夠實現消息廣播(固然kafka同樣能夠作到,這裏只介紹一種),另外一項技術是Spring Cloud Stream,stream是一個用於構建高度可擴展事件驅動型微服務的框架,而且它能夠跟RabbitMQ、Kafka以及其餘多種消息服務集成,使用了stream,要把rabbitmq換成kafka只不過是改改配置的事情。接下來重點介紹使用方法:服務器

引入依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
複製代碼

配置Binder

binder是stream中的重要概念,是用於配置用於stream發佈和訂閱事件的消息中間件。先看一段配置:websocket

spring:
 cloud:
 stream:
 binders:
 defaultRabbit:
 type: rabbit
 environment:
 spring:
 rabbitmq:
 host: localhost
 username: username
 password: password
 virtual-host: /
複製代碼

配置中的 defaultRabbit 是binder的名稱,一會會在其餘配置中引用,type指定了消息中間件的類型,environment是對消息中間件的配置,這裏的配置結構和spring.rabbitmq命名空間下的配置項如出一轍的,能夠參照着進行配置(這樣配置的做用是能夠把stream的rabbitmq配置和項目中其餘地方使用的rabbitmq區分開,若是這裏不配置environment,binder會沿用spring.rabbitmq命名空間下的配置),好比你的項目中的rabbitmq的配置是這樣的:

spring:
 rabbitmq:
 host: localhost
 username: username
 password: password
 virtual-host: /
複製代碼

那上門的binder的environment配置徹底能夠去掉。

消息流與binder的綁定

微服務要接收揮着發佈事件消息,根據spring cloud stream的名字,顧名思義,須要使用流,因此須要在配置中聲明兩個事件流,一個輸入流,一個輸出流:

spring:
 cloud:
 stream:
 bindings:
 websocketMessageIn:
 destination: websocketMessage
 binder: defaultRabbit
 websocketMessageOut:
 destination: websocketMessage
 binder: defaultRabbit
複製代碼

這裏咱們看到,事件流引用了binder,表示這兩個流使用rabbitmq這個中間件(看到這裏想必你們已經明白了,在一個項目中徹底能夠同時使用rabbit和kafka做爲事件流的消息中間件)。

websocketMessageIn,websocketMessageOut是事件流的名字(能夠本身隨便起),destination指定了兩個事件流的destination是同一個,這決定了寫入和讀取是指向同一個地方(不必定是同一個消息隊列)。

事件流聲明

事件流使用接口進行定義:

/** * websocket消息事件流接口 * Created by 吳昊 on 18-11-8. * * @author 吳昊 * @since 1.4.3 */
interface WebSocketMessageStream {
  companion object {
    const val INPUT: String = "webSocketMessageIn"
    const val OUTPUT: String = "webSocketMessageOut"
  }

  /** * 輸入 */
  @Input(INPUT)
  fun input(): SubscribableChannel

  /** * 輸出 */
  @Output(OUTPUT)
  fun output(): MessageChannel
}
複製代碼

聲明事件流接口,這裏面定義了兩個常量,分別對應配置中的兩個流名稱,經過調用input()方法獲取輸入流,經過調用output()獲取輸出流。

該接口的實現由spring cloud stream完成,不須要本身實現。

使用事件流

聲明一個bean:

@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……
複製代碼

這裏的@EnableBinding 註解指明瞭事件流接口類,只有添加了這個註解(要能被Spring識別到,能夠加在入口類上,也能夠加在@Configuration註解的類上),該接口才會被實現,而且加入到Spring的容器中(能夠注入)。

上面WebSocketMessageService的內容以下:

@Autowired
  private lateinit var stream: WebSocketMessageStream
  @Autowired
  private lateinit var template: SimpMessagingTemplate

  @StreamListener(WebSocketMessageStream.INPUT)
  fun messageReceived(message: WebSocketMessage) {
    template.convertAndSend(message.destination, message.body)
  }

  fun send(destination: String, body: Any) {
    stream.output().send(
        MutableMessage(WebSocketMessage(destination, body))
    )
  }
複製代碼

接收消息

@StreamListener 註解指明瞭要監聽的事件流,方法接收的參數即事件的消息內容(使用jackson反序列化),這裏的messageReceived方法直接將接收到的消息直接用websocket發送給前端

發送消息

一樣,發送也很簡單,將消息直接發送到輸入流中,上面的send方法便是將本來應該用SimpMessagingTemplate發送給websocket的消息發送到spring cloud stream的事件流中。這樣作之後,項目中全部須要向前端推送webSocket消息的操做都應該調用send方法來進行。

講到這裏你們可能還有點糊塗,也有一些疑問,爲何這樣每一個微服務節點就能收到事件消息了?或者單個節點接收事件消息和多個節點接收的配置是怎麼控制的。各位不要着急,待我慢慢道來,接下來就要結合rabbit的知識來說解 了:

首先看一下rabbit的消息隊列:

從圖中看到,存在多個以webSocketMessage開頭的隊列,這是每個微服務節點建立了一個消息隊列,再來看exchange:

exchange綁定的消息隊列

這裏的exchange名稱和上面消息隊列的名稱前綴均是webSocketMessage, 這個都是由前面的binding配置中的destination指定的,和destination名稱保持一致

當應用向輸入流中寫入事件時,使用destination做爲key(即webSocketMessage),將消息寫入名爲webSocketMessage的exchange,因爲exchange綁定的消息隊列前綴均爲webSocketMessage且routing key都是#,因此exchange會將消息路由到每個webSocketMessage開頭的消息隊列上(這裏涉及到rabbitmq的知識點,如過不懂請自行查閱資料),這樣每個微服務都能接收到相同的消息。

咱們再來看前面提出的問題,這樣的配置能夠把消息推送到每個微服務節點,那麼若是須要一個消息只被一個節點接收,該怎麼配置呢?很簡單,一個配置項就能夠搞定:

spring:
 cloud:
 stream:
 bindings:
 websocketMessageIn:
 group: test
 destination: websocketMessage
 binder: defaultRabbit
複製代碼

能夠看到,相比前面的配置,僅僅多了一個group的配置,這樣配置以後,rabbitmq會生成一個名爲websocketMessage.test的消息隊列(前面講到的每一個微服務創建的消息隊列是自動刪除的,即微服務斷開鏈接後消息隊列就被刪除,而這個消息隊列是持久化的,也就是即便全部的微服務節點所有斷開鏈接也不會被刪除),全部的微服務節點監聽這一個隊列,當隊列中有消息時,只會被一個節點消費。

要講的內容到此結束,spring cloud stream的配置遠不止這些,可是這些配置已足夠完成我所須要作的事情,其餘的配置請參考spring cloud stream官方文檔:

cloud.spring.io/spring-clou…

相關文章
相關標籤/搜索