spring websocket中 STOMP

26.4 基於WebSocket消息架構STOMP

WebSocket協議定義了兩種消息類型,文本或字節,可是沒定義它們的內容.它有意讓客戶端和服務端經過通用的子協議(例如,更高水平的協議)來定義消息語法.可是WebSocket協議的使用是可選的,客戶端和服務端須要贊成某些種類的協議來翻譯這些消息.html

26.4.1 STOMP概述

STOMP是一個簡單的面向文本的消息協議,原來是爲Ruby,Python,Perl等腳本語言建立的,用來鏈接企業消息代理器.設計它的目的是解決一部分通用的消息格式問題.STOMP能夠任何可靠的雙向流協議中使用,好比TCP或WebSocket.儘管STOMP是一個面向文本的協議,可是消息的內容能夠是文本或字節.html5

STOMP 是一個基於Http框架的協議.STOMP框架的結構以下:java

COMMAND
header1:value1
header2:value2

Body^@

客戶端可使用"SEND"或"SUBSCRIBE"命名來發送或訂閱消息,須要一個目的地,用來描述這個消息是什麼和誰來接收它.它會激活一個簡單的發佈訂閱機制,能夠用來經過代理向其餘鏈接的客戶端來發送消息或者向服務端發送消息來要求執行一些工做.react

當使用spring的STOMP支持時,spring webSocket應用扮演了客戶端的STOMP代理器的角色.消息被路由到@Controller裏的消息處理方法,或一個簡單的,內置的用來跟蹤訂閱或廣播消息的代理器來轉發給訂閱者.你也能夠配置一個專用的STOMP代理器(如RabbitMQ,ActiveMQ等)進行消息廣播.在這種狀況下,spring會保持代理的TCP鏈接,傳遞消息給它,也經過它把消息傳遞給鏈接的webSocket客戶端.因此spring web應用能夠依賴統一的基於http的安全,通用驗證;相同的編程模式在消息處理上也起做用.git

這裏有一個例子,客戶端訂閱接收服務端按期發佈的股票報價,例如定時經過SimpMessagingTemplate將消息發送到代理器;github

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

下面例子:一個客戶端發送交易請求,服務端經過@NessageMapping的方法處理,處理完以後,將交易信息和細節廣播給客戶端.web

SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

目的地意味着在STOMP規則中有意遺留印記.它能夠是任何符合STOMP服務器支持的目的地的語法和語義的字符串.可是,一般來講,路徑字符串中"/topic/.."是發佈訂閱(一對多),"/queue/"意味着點到點的信息交換(一對一).spring

STOMP服務器能夠用Message命令向全部訂閱者發佈消息.這裏是一個服務器向一個訂閱客戶端發送股票交易的例子:數據庫

MESSAGE
message-id:nxahklf6-1
subscription:sub-1

destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

你須要知道服務器不會發送未被訂閱的消息.服務器端全部的消息都對應一個定義的客戶端訂閱,而且服務器消息的"subscription-id"頭必須與客戶端訂閱的Id頭一致的.編程

上面的簡介是爲了提供對STOMP協議最簡單的理解.你能夠去閱讀該協議的說明書瞭解更多.

使用STOMP做爲WebSocket子協議的好處:

  • 無需建立一個自定義消息格式
  • 能夠在瀏覽器中使用已存在的stomp.js
  • 能夠根據端點來回路消息
  • 可使用成熟的消息代理器,如RabbitMQ,ActiveMQ等進行廣播

最重要的一點是使用STOMP能夠像spring-mvc提供的基於HTTP的編程模式來啓用spring提供的應用級別的編程模式.

26.4.2 在WebSocket上啓用STOMP

spring框架經過spring-message,spring-websocket模塊提供了對基於WebSocket的STOMP協議的支持.下面有個經過在"/portfolio"暴露STOMP webSocket/SockJS端點例子,這裏終點以"app"開頭的消息會迴路到消息處理方法(如應用業務);另外的以"/topic","/queue"開頭的消息會迴路到消息代理器(如,廣播到其餘在線客戶端)

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");
    }

}

在xml中的配置

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>

</beans>

這個"/app"前綴是任選的,它只是用來分區那些應該被迴路到消息處理方法中由應用處理的消息,和那麼由代理器廣播到訂閱客戶端的消息.

而"/topic","/queue"前綴依賴於使用中的代理器.在簡單的,內置代理下這個前綴沒有特定的含義,它沒法區分這個終點如何使用.(發佈-訂閱目標有不少訂閱者或通常只針對單個接受者的點對點消息).若是使用備用代理器,大多數代理器使用"/topic"做爲發佈/訂閱語義終點的前綴,並把"/queue"做爲點作點語義終點的前綴.

在瀏覽器裏,客戶端能夠經過使用Stomp.js和sockjs-client鏈接

var socket=new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient =Stomp.over(socket);
stompClient.connect({},function(frame){
});

又或者經過WebSocket(不經過SockJS)鏈接

var socket=new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient=Stomp.over(socket);

stompClient.connect({},function(frame){
});

記住stompClient不須要指定login或passcode頭.即便加了,客戶端也會忽略或者會覆蓋掉它.能夠經過查看 26.4.8節,成熟代理器的連接,和26.4.10 安全,來獲取更多安全信息

26.4.3 Flow of Messages

當STOMP端點被配置,spring應用會扮演連接客戶端的STOMP代理器的角色.本節提供了一個大圖來展現消息是如何在應用內部流動的.

spring-messaging模塊聽過異步消息處理的基礎.它包含了大量源於spring Integration項目裏的抽象,並把它們做爲消息應用裏的基礎模塊.

  • Message 一個有頭和內容的消息

  • MessageHandler 消息處理的協議

  • MessageChannel 消息發送的協議,能夠減小發送者和接受者之間的耦合.

  • SubScribableChannel (訂閱頻道)繼承了消息協議,將消息發送到已註冊的消息處理的訂閱者們.

  • ExecutorSubscribableChannel 訂閱協議的具體實現,能夠經過線程池實現異步分發消息.

@EnableWebSocketMessageBroker java配置和websocket:message-broker的xml配置共同構成一個複雜消息流.下面的圖表展現的是使用簡單,內存代理器的狀況.

輸入圖片說明

上面的圖片包含了三個消息頻道:

  • ClientInboundChannel用於接收從webSocket客戶端發出的消息.
  • clientOutboundChanel 用於向WebSocket客戶端發送消息
  • brokerChannel 應用內部消息的代理器

這三個頻道一樣在專用代理器裏使用,除了"代理器替身"替代了簡單代理器.

輸入圖片說明

"clientInboundChannel"裏的消息流向註解方法給應用處理(例如股票交易執行請求)或直接發送給代理器(例如客戶端訂閱股票信息報價).這個STOMP目的地用於簡單的前綴迴路.例如,"/app"前綴的消息迴路到註解方法,而"/topic","/queue"前綴的消息用於迴路到代理器.

當一個消息處理註解的方法有返回值時,它的返回值會做爲spring消息的內容發送到"brokerChannel".這個代理器會把這個消息廣播到客戶端.只要藉助消息模板,你能夠在應用的任何地方將消息發送到任何一個目的地.例如,一個http的POST處理方法能夠廣播一個消息到全部的關聯客戶端,或者一個服務組件能夠定時廣播股票價格.

下面是一個簡單說明消息的例子:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }

}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }

}

下面是關於上例中消息流的解釋:

  • WebSocket 客戶端連接WebSocket端點"/portfolio"

  • 關於"/topic/greeting"訂閱經過"clientInboundChannel"傳播到代理器

  • 發送到"/app/greeting"的擁抱經過"clientInboundChannel"進入並跳轉到GreetingController.控制器添加了當前時間,它的返回值經過"brokerChannel"做爲消息發送到"/topic/greeting"端口.(終點通常根據協議選擇,可是能夠經過@SendTo來重寫)

  • 代理器隨後把消息廣播給訂閱者,他們經過"clientOutboundChannel"實現傳輸.

下節提供註解方法更多的細節,包括參數的種類和返回值的支持.

26.4.4 註解消息處理

@MessageMapping註解能夠在@Controller類的方法上使用.它既能夠來用表示消息目的地的映射方法,也能夠組合成類級別的@MessageMapping,用來表示控制器裏的全部映射方法的共用映射.

默認的目的地映射被認爲是Ant風格,斜線分割,路徑格式.例如"/foo*","/foo/**".這裏還包括模板變量,如"/foo/{id}",它能夠被@DestinationVariable註解來引用.

也可使用逗號做爲分隔符.

@MessageMapping標誌的方法支持一下方法參數:

  • Message 方法參數,用來獲取要處理的完整消息

  • @Payload 用來獲取消息內容的註解參數,由org.springframework.messaging.converter.MessageConverter轉化.這個註解不是必須的,由於默認它是存在的.帶有驗證註解的負載方法的參數註解受限於JSR-303驗證.

  • @Header 必要時可使用org.springframework.messaging.converter.Converter類來訪問特定的頭的值

  • @Headers 該註解的方法參數必須指定爲java.util.Map用來訪問消息裏全部的頭

  • MessageHeaders 用來獲取消息裏全部頭的Map的方法參數

  • MessageHeaderAccessor,SimpMessageHeaderAccessor,StompHeaderAccessor都是通過類型訪問方法來訪問頭信息.

  • @DestinationVariable 用來訪問消息目的地中模板參數的註解參數.他的值能夠更具須要轉爲成申明的方法參數類型.

  • java.security.Principal 在webSocket進行http握手時,用來反射用戶日誌的方法參數

@MessageMapping 方法能夠被org.springframework.messaging.converter.MessageConverter轉化器轉爲有一個新的消息主體,默認,它們會看成客戶端消息用相同的目的地,但使用"/topic"前綴來發送到"brokerChannel"頻道.用@SendTo能夠指定其餘目的地.他還能夠設置一個類級別的共用目的地.

返回的消息能夠異步提供,經過ListenableFuture或CompletableFutrue/ComplentionStage等返回類型簽名,相似springmvc類似的方法.

@SubscribeMapping註解能夠將訂閱請求映射到@Controller方法裏.它支持方法級別,可是能夠被組合爲類別的@MessageMapping註解,可讓相同控制器裏處理方法共享..

默認@SubscribeMapping方法的返回值會直接返回到鏈接的客戶端,不須要經過代理器中轉.這對請求回覆的消息交互很是有用,例如,應用UI初始化時,能夠方便的獲取應用數據.另外@SubscribeMapping方法也能夠被@SendTo方法註解,結果消息被被髮送到"brokerChannel"用特定的目標目的地.

有時一個控制器在運行時須要用AOP代理裝飾.例如,你在本身的控制器上選擇添加@Transactional註解.在這種狀況下,對於這些控制器,咱們建議使用類級別的代理.這是控制器的典型默認選擇.可是若是一個控制器實現了一個spring上下文沒有回調的接口,你須要明確的配置基於類的代理.例如,tx:annotation-driven/,須要轉化爲<tx:annotation-driven proxy-target-class="true"/>

26.4.5 Sending messages 發送消息

你是否想從應用的任何地方都想已鏈接的客戶端發送消息.每一個應用組件均可以把消息發送到"brokerChannel"中.最簡單的實現方式是進行SimpMessagingTemplate注入,並用他發送消息.通常它很容易按類型注入,例如:

@Controller
public class GreetingController{
@Autowired
private SimpMessagingTemplate template;

@RequestMapping(path="/greetings",method=POST)
public void greet(String greeting){
   String text="["+getTimestamp()+"]"+greeting;
   template.convertAndSend("/tpic/greetings",text)
}

}

可是這個等同"brokerMessagingTemplate".

26.4.6 Simple Broker 簡單代理器

這個簡單的內置的代理器處理客戶端的訂閱請求,把它們儲存到內存,並根據目的地匹配來廣播消息到在線客戶端.這個代理器支持路徑風格目的地,包括ANT風格的目的地匹配.

ant風格路徑:http://blog.csdn.net/wangshfa/article/details/26471641

26.4.7 成熟代理器

簡單代理器容易上手,但只支持部分STOMP命令,它基於一個簡單的消息發送池,且不支持集羣.相應的,應用能夠經過使用成熟的消息代理器來升級.

檢查STOMP的文檔,選擇適合的消息代理器,安裝他們,並在STOMP支持下運行他們.接着用spring配置中的STOMP代理器替身來替代簡單代理器. 下面的例子是如何配置一個成熟的代理器

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

xml配置以下:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

</beans>

上述例子中配置的"STOMP broker relay"是一個spring的消息處理器,他將消息轉發給外部的消息代理器.他會創建到代理器的TCp連接,轉發全部的消息給他,並把全部從代理器收到的消息經過webSocket session轉發給客戶端.基本上他扮演了一箇中介器的做用,向兩邊發送消息.

spring使用org.projecteactor:reactor-net和io.netty:netty-all來管理TCP鏈接,須要時能夠將他們做爲依賴添加.

spring4.3支持的STOMP代理器兼容2.0x的反應器,因此它不支持與spring-cloud-stream-reactive聚合,它須要3.0X的反應器.

spring5依賴於Reactor3和Reactor Netty,他們有獨立的版本,支持STOMP的代理器,還對活性編程模式提供了大量支持.

另外,服務組件仍是能夠向代理器替身發送信息,以進行廣播的

實際上,代理器替身使得消息廣播更加健壯和可擴展.

26.4.8 Connections To Full-Featured Broker鏈接到成熟代理器

STOMP代理器中介器保持這地代理器的一個系統級別的TCP連接.這個鏈接用來接收服務端產生的消息,而不是接收其餘消息.你能夠設置該連接的憑證,如STOMP框架的登錄和密碼頭.這個能夠在XML命令空間或java配置裏經過設置SystemLogin/systemPasscode屬性來設置,默認值是guest/guest.

STOMP代理器中介器也爲每一個鏈接的webSocket客戶端建立了一個單獨的TCP鏈接.你能夠爲全部的客戶端鏈接配置STOMP憑證.這個能夠在XML命令空間或java配置裏經過設置SystemLogin/systemPasscode屬性來設置,默認值是guest/guest.

STOMP代理器中介器通常表明客戶端給每一個跳轉到的代理器的鏈接框架設置login和passcode頭.因此WebSocket客戶端不須要設置這麼頭,他們會被忽略.下面的部分,webSocket客戶端能夠依賴HTTP安全來保護WebSocket端點和建立客戶端身份.

STOMP代理器中繼會經過"系統"的TCP鏈接向消息代理器發送和接受心跳消息.你能夠設置發送和接受心跳的頻率(默認10秒每次).若是指向代理器的連接消失了,代理器中介會每5分鐘一次,持續嘗試重連,直到成功.

spring的bean能夠實現爲ApplicationListener<BrokerAvailabilityEvent>的接口,用來接收指向代理器的系統鏈接中斷或重連的通知.例如,當這裏沒有可用的系統鏈接時,一個股票價格服務能夠中止嘗試發送消息.

STOMP代理器中介能夠經過virtualHost來配置.這個屬性的值能夠被設置到每一個鏈接框架的host頭裏,這會頗有用,例如在一個雲環境裏,每個TCP鏈接的實際地址會根據雲基礎STOMP服務提供host的不一樣而差別.

26.4.9 在@MessageMapping 目的地裏使用句號分隔符

儘管斜線分割的路徑格式被web開發者熟知,但在消息開發裏,"."是主流,例如,在主題的名字,隊列,交換者等.應用也能夠經過配置自定義的AntPathMatcher在@MessageMapping映射中使用句號來代替斜線.

java配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

  // ...

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/queue/", "/topic/");
    registry.setApplicationDestinationPrefixes("/app");
    registry.setPathMatcher(new AntPathMatcher("."));
  }

}

在xml配置

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:websocket="http://www.springframework.org/schema/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/websocket
    http://www.springframework.org/schema/websocket/spring-websocket.xsd">

  <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
    <websocket:stomp-endpoint path="/stomp" />
    <websocket:simple-broker prefix="/topic, /queue"/>
  </websocket:message-broker>

  <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
    <constructor-arg index="0" value="." />
  </bean>

</beans>

下面是控制器中使用"."分隔符的簡單例子

@Controller
@MessageMapping("foo")
public class FooController {

  @MessageMapping("bar.{baz}")
  public void handleBaz(@DestinationVariable String baz) {
  }

}

若是這個應用的前綴是"/app",那麼這個方法能夠被映射爲"/app/foo.bar.{baz}";

26.4.10 Authentication 認證

每個WebSocket的消息的STOMP session開始於一個http請求,而後它能夠被升級爲WebSockets,或者被回退爲一系列的SockJS http傳輸請求.

web應用早已用認證和權限來保護HTTp請求.通常一個用戶會通過spring安全的一些機制,如登錄也,http基礎認證或其餘來認真.已認證用戶的認證的上下文被保存到Http session裏,並與基於相同cookie的session相關聯.

所以對於一個WebSocket握手,或者SockJS的http傳輸請求,通常它們已經經過HttpServletRequest#getUserPrincipal()認證爲一個認證用戶.spring 會自動給這些用戶建立WebSocket或SockJS session,隨後的STOMP消息傳輸經過一個用戶頭來進行傳輸.

上面對於一個典型的web應用來言沒有什麼特別的,它們已經爲安全這麼作了.用戶經過http請求進行認證,並經過基於cookie的http session持有這個上下文.這樣SockJS或WebSocket就會爲用戶建立Session,並添加一個用戶頭的戳,這樣在應用中它們就可與進行消息傳輸了.

記住 STOMP協議在鏈接框架中有一個"login"或"passcode"頭.這些開始就是爲,如今也是爲基於TCP的STOMP設計的.可是,spring通常會忽略STOMP協議頭的認證信息,它認爲這個用戶已經被http傳輸請求認證過了,並指望WebSocket或SockJs session包含認證用戶.

spring 安全提供了webSocket 子協議認證:經過一個 ChannelInterceptor基於消息中的用戶頭來認證消息.另外,spring session提供了WebSocket integration確保使用webSocket session時,http session不會過時.

26.4.11 基於令牌的認證

Spring Security OAuth 支持基於令牌的安全包括JSON Web Token.這個能夠做爲web應用的安全機制,包括基於WebSocket的STOMP交互,如上下文所述同樣,經過一個基於cookie的session來保持認證.

同時基於Cookie的session並非最好的選擇,例如在應用裏,他們不但願持有來自服務器端的session,在手機應用中,它們更傾向於用用戶頭進行安全認真.

在 webSocket protocol RFC 6455中說起,在webSocket握手期間,服務器對客戶端的身份驗證,不須要指定特別的方式.實際上,雖然瀏覽器客戶端能夠是用標準的認證頭或cookie,但沒法使用自定義頭.例如SockJS js客戶端沒法提供一個方式來發送Http頭,查看 sockjs-client issue 196. 可是它容許發送查詢參數,這能夠用來發送token.這個也有缺點,如,這個令牌帶着URL可能無心中被服務器日誌記錄下來.

以上的缺點只針對基於瀏覽器的客戶端,而不是基於java的客戶端.它支持在webSocket或SockJS請求中發送頭消息.

不傾向於使用cookie,但在http協議水平上沒有更好的方法.傾向於使用安全頭,這裏有兩個步驟:

    1. 在鏈接期間使用STOMP 客戶端發送認證頭
    1. 使用ChannelInterceptor處理認證頭

下面的例子中服務側配置會註冊一個自定義認證攔截器.記住這個攔截器須要認證並在鏈接信息中設置用戶頭.spring會概率並存這些認證用戶,在隨後的相同session的STOMP消息會用到他們.

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new ChannelInterceptorAdapter() {

        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {

            StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

            if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                Principal user = ... ; // access authentication header(s)
                accessor.setUser(user);
            }

            return message;
        }
    });
  }
}

還要記住在消息中使用spring Security的安全機制,首先你要保證你的認證的ChannelInterceptor配置優先於spring安全的.最好的作法就是在AbstatctWebSocketMessageBrokerConfigurer的子類中宣佈自定義攔截的標記爲@Order(Ordered.HIGHEST_PRECEDENCE + 99)

26.4.12 用戶目的地

一個應用能夠發送消息給特定的用戶,spring的STOMP經過在目的地添加前綴"/user/"來支持它.例如,一個客戶端可能訂閱目的地"/user/queue/position-updates".這個目的地能夠被UserDestinationMessageHandler處理,被會加上惟一的用戶session,例如"/queue/position-updates-user123".它提供了通用的訂閱名稱目的地的便利,同時保證與其餘訂閱相同的目的地的用戶沒有衝突,因此每一個用戶都會收到惟一的股票位置更新.

在發送端消息能夠被髮送到目的地例如:"/user/{username}/queue/position-updates",這個目的地會被UserDestinationMessageHandler轉譯爲一個或多個目的地,對應用戶的每一個session.這容許應用中的全部組件發送消息給一個特定的用戶,只要知道這個用戶名字和原始的目的地.它還支持像消息模板同樣的註解.

例如,一個消息處理方法能夠在@SendToUser註解的協助處理下將消息發送給用戶.(它還支持類級別的共享公共目的地)

@Controller
public class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

若是用戶有多個session,那麼默認的全部session訂閱對於特定的目的地都是可標誌的.可是有時候,它必須只能指向那個發送了要處理消息的session.這個能夠經過設置broadCast屬性爲false來實現.

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // raise MyBusinessException here
    }

    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}

雖然用戶的目的地通常意味着是認證用戶,但這並不嚴格.一個與認證用戶無關的session也能夠訂閱用戶目的地.在這種狀況下,@SendToUser會默認爲broadcast=false;即只面向那麼發送了消息的session.

還能夠經過注入SimpMessagingTemplate的bean發送用戶目的地消息.通常經過java配置或XML命名空間注入.若是bean的名字是"brokeMessagingTemplate",則須要匹配@Qualifier.

@Service
public class TradeServiceImpl implements TradeService {

	private final SimpMessagingTemplate messagingTemplate;

	@Autowired
	public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
		this.messagingTemplate = messagingTemplate;
	}

	// ...

	public void afterTradeExecuted(Trade trade) {
		this.messagingTemplate.convertAndSendToUser(
				trade.getUserName(), "/queue/position-updates", trade.getResult());
	}
}

在額外消息代理器下使用用戶目的地時,檢查代理器的文檔瞭解如何管理未被激活的隊列.當用戶的session結束,全部惟一的用戶隊列會被移除.例如,當像/exchange/amq.direct/position-updates目的地被使用時,RabbitMQ建立自動刪除的隊列.這種狀況下,客戶端能夠訂閱/user/exchange/amq.direct/position-updates.相似的,ActiveMQ也有相同的配置選項來清除未啓動的目的地.

在混合應用服務器場景下,一個用戶目的地能夠保持未釋放由於用戶在關聯其餘服務器.這種狀況下,你須要配置一個能夠廣播未釋放的消息,這樣其餘服務器有機會去嘗試.這個能夠經過java配置裏的MessageBrokerRegistry類裏的userDestinationBroadcast屬性配置,或者經過message-broker元素的user-destination-broadcast屬性配置.

26.4.13 監聽應用上下文事件和攔截消息

一些ApplicationContext事件能夠被髮布,能夠被實現了spring的ApplicationListener接口的類接受.

  • BrokerAvailabilityEvent 說明代理器變成可用/不可用狀態.當應用運行時,簡單的代理器當即啓用,變得可行,那麼STOMP 代理器中介會斷掉它和成熟代理器的連接,例如當這個成熟代理器重啓時.代理器中介有重連機制,當代理器重啓後會從新創建連接,因此在連到斷鏈,以及斷鏈到連上他們都會發布事件.使用SimpMessagingTemplate組件應該訂閱這個事件,在代理器不可用時避免發送消息.在任何狀況下,當發送消息是,他們須要準備處理MessageDeliverException

  • SessionConnectEvent 當一個新的STOMP鏈接被接收時發佈,代表一個新的客戶端session.這個事件包含了表明鏈接的session id,用戶信息,全部客戶端發送的自定義消息頭.這對跟蹤客戶端session很是有用.訂閱這些事件的組件可使用SimpMessageHeaderAccessor或StompmessageHeaderAccessor包裹包含的消息.

  • SessionConnectedEvent 在SessionConnectEvent事件以後,當代理器向已經發布了一個STOMP Connection框架回應CONNECT以後,就會當即發佈.此時,會認爲STOMP的session已徹底創建.

  • SessionSubscribeEvent 當收到一個新的STOMP訂閱時發佈.

  • SessionUnsubscribeEvent 當收到一個新的STOMP退訂時發佈.

  • SessionDisconnectEvent 當STOMP的session關閉時發佈.這個關閉多是從客戶端發送的,也多是WebSocket關閉自動產生的.在某些狀況下,每一個session的這個事件會發生屢次.組件須要在處理混合關閉事件上作到冪等操做.

當你使用一個成熟的代理器,當代理器臨時變得不可靠時,這個STOMP代理器中介會自動重連繫統鏈接.但客戶端鏈接不會重連.若是心跳可用,客戶端通常會在10秒內發現代理器沒法迴應.客戶端須要實現本身的重連邏輯.

另外,應用能夠經過註冊ChannelInterceptor在相應的消息頻道上攔截進出消息.例如攔截進入的消息:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new MyChannelInterceptor());
  }
}

一個自定義的ChannelInterceptor能夠實現基類的ChannelInterceptorAdapter,並使用StompHeaderAccessor或SimpMessageHeaderAccessor來訪問消息裏的信息.

public class MyChannelInterceptor extends ChannelInterceptorAdapter {

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
    StompCommand command = accessor.getStompCommand();
    // ...
    return message;
  }
}

26.4.14 STOMP客戶端

spring提供了一個基於WebSocket的STOMP客戶端和基於TCP客戶端的STOMP.

開始建立和配置WebSocketStompClient

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上面的例子StandardWebSocketClient能夠被SockJsClient替代,由於它也是WebSocketClient的一個實現.這個SockJsClient可使用WebSocket或http-based傳輸做爲回退.更多的細節查看26.3.7.

下一步是創建一個鏈接並提供了一個STOMP session的處理器.

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

當這個session可使用時,這個handler會被通知.

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

當session創建以後,它的內容就會發送,並經過已配置的MessageConverter序列化.

session.send("/topic/foo", "payload");

你能夠訂閱目的地.這個訂閱方法要求一個關於訂閱信息的處理器並返回能夠用來退訂的訂閱處理.對於每個收到的信息這個處理器能夠指定每一個能夠被序列化的負載內容的目標對象類型.

session.subscribe("/topic/foo", new StompFrameHandler() {

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }

});

要啓用STOMP心跳,須要用TaskScheduler配置webSocketStompClient,能夠自定義心跳頻率,10秒鐘寫入不活躍的,它將引起一次心跳發送;10秒讀取補貨要的,它會關閉鏈接.

若是在同一機器上進行數千個客戶端模擬,則須要考慮關閉心跳.由於每一個鏈接都有本身的心跳任務,這裏沒有爲一個機器上的大量客戶端運行作優化.

STOMP協議還支持收據,即客戶端必須添加收據頭以對應服務器處理髮送或訂閱以後返回的RECEIPT框架.爲了支持它,StompSession提供了setAutoReceipt(boolean)屬性,它能夠在隨後的每次發送和訂閱都添加"receipt"頭.你還能夠手動選擇添加一個"receipt"頭到StompHeaders裏.發送和訂閱能夠返回一個Receiptable實例,這樣能夠被用來註冊收據成功或失敗回調.對於該功能,客戶端必須配置TaskScheduler,其時間量必需要小於訂閱過時時間(默認15秒過時).

記住StompSessionHandler自己也是一個StompFrameHandler.它能夠處理ERROR框架相應回調處理異常,這些異常來自處理消息,傳輸級別的HandleTransportError,包括ConnectionLostException.

26.4.15 WebSocket Scope(做用域)

每個WebSocket的session都是屬性的集合.這個map與客戶端傳入消息相連,還能夠被控制器裏的方法訪問.例如:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

你也能夠申明一個spring管理的bean是websocket做用域.Websocket-scope的bean能夠被注入到控制器和任何被註冊爲"clientInboundChannel"的頻道攔截器裏.這裏bean通常是單例的,存活時間大於單個的webSocket session.因此你須要對WebSocket-scope的bean使用做用域代理模式.

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {

    @PostConstruct
    public void init() {
        // Invoked after dependencies injected
    }

    // ...

    @PreDestroy
    public void destroy() {
        // Invoked when the WebSocket session ends
    }
}

@Controller
public class MyController {

    private final MyBean myBean;

    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }

    @MessageMapping("/action")
    public void handle() {
        // this.myBean from the current WebSocket session
    }
}

同任何自定義做用域同樣,spring初始化一個新的Mybean實例,它能夠被控制器獲取,並存儲到webScoket seesion屬性裏.每次相同的實例都會返回直到session終結.WebSocket做用域的bean會擁有全部的spring生命週期方法調用,如上例示.

26.4.16 配置和性能

談到性能時,這裏沒有銀彈.許多因素會影響它,包括消息大小,體積,應用方法工做是否須要阻塞,以及外部因素,包括網絡速度和其餘.本節主要提供了一些可配置選項的簡介,還有關於如何合理縮放的思考.

在一個消息應用裏,消息在線程池支持下被頻道傳輸並能夠被異步操做.配置這樣一個應用須要消息頻道和流相關的只是.具體能夠參考26.4.3節.

顯然,首先配置線程池支持的"clientInboundChannel"和"clientOutboundChannel".默認配置的數目是可用處理器的兩倍.

若是註解方法處理消息受限於CPU,那麼"clientInboundChannel"裏的線程的數量應該和處理器數量相同.若是他們的工做更可能是首先IO,須要阻塞,等待數據庫或其餘外部系統,那麼線程池容量就能夠增長.

ThreadPoolExecutor有三個重要屬性.這裏是核心線程池容量,線程池最大容量,等待線程最大容量.

最打困擾就是配置核心線程池容量(10)或最大線程池容量(20)這樣就配置出一個10到20個線程的線程池.即便你把等待capacity加到最大,即Integer.MAX_VALUE,核心線程池裏的線程數量也不會增長,由於只是增長了排隊任務的數量.

請查看THreadPoolExecutor的文檔來學習這些屬性,並理解不一樣的排隊策略.

在"clientOutboundChannel",是關於發送消息到WebSocket客戶端.若是客戶端的網絡比較快,那麼線程數量應該與可用處理器數量保持一致.但若是他們太慢或低帶寬,接受消息會耗費太長時間,並給線程池添加負擔.因此增長線程池容量是有必要的.

"clientInBoundChannel"的工做量很好預測-這個依賴於應用自己,但如何配置"clientOutboundChannel"卻很難,由於有太多超出應用自己的因素.所以這裏有兩個與信息發送相關屬性.這是"sendTimeLimit","sendBufferSizeLimit".分別用來配置發送消息到客戶端時,發送能夠花費多長時間或多大的數據會被緩存.

基本觀點,任何單個線程用來發送消息給客戶端的時間都是有限制的.同時全部其餘消息會被緩衝,你須要使用屬性來配置須要消耗多久時間發送一條消息,同時能夠緩衝多大的數據.能夠經過javadoc或xml文檔來配置這些重要的額外細節.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

上面的webSocket傳輸配置還能夠配置接受的STOMP信息的最大數量.儘管理論上webSocket 消息在容量上是無限的,實際上有webSocket服務器限制,例如:tomcat是8K,jetty是64K.基於這個緣由,stomp.js把大的STOMP消息分割爲16K一份,而後把它們做爲混合webSocket消息發送,這要求服務器緩衝並從新組裝.

基於webSocket的spring STOMP支持這樣,因此應用能夠配置消息的最大容量,而無關webSocket server指定的消息容量.記住webSocket消息的容量能夠自動調整,只要能保證他們最多傳輸16K大小的webSocket消息.

例子以下:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

關於收放的最重要一點是使用混成應用實例.目前這個不適於簡單代理器.可是適用如RabbitMQ等成熟的代理器.即每一個應用實例能夠鏈接到代理器,一個應用的消息廣播能夠經過代理器廣播到其餘應用實例相連的WebSocket客戶端上.

26.4.17 運行監控

當使用@EnableWebSocketMessageBroker或websocket:message-broker時,關鍵框架組件會自動收集統計和數據,這能夠對整個應用的內在狀態提供洞察.這個配置也聲明一個WebSocketMessageBrokerStats的bean在某處收集各類可用的信息,默認每30分鐘記錄爲INFO級別.這個bean在運行時能夠被經過spring的MBeanExporterd導出到JMX,例如經過JDK的Jsonsole.下面是可用信息的總結.

Client WebSocket Session(客戶端 webSocket Session)

  • Current

    代表當前有多少客戶端session經過代理器的方式 WebSocket VS Http流和SockJS session長輪詢.

  • Total

    目前總計創建了多少session

  • Abnormal Closed(不正常關閉)

  • connect Failure (鏈接失敗) 一個session已經創建,但在60秒內沒有收到消息會被關閉.一般是網絡或網絡代理問題.

  • Send Limit Exceeded (發送過限)

    發送消息超過配置的時間或緩存數量致使session關閉,通常多是客戶端網速過慢/
  • Transport Errors (傳輸錯誤)

    當發生傳輸錯誤時,seesion關閉.好比讀寫WebSocket鏈接或Http請求/相應失敗.
  • STOMP Frames

    CONNECT,CONNECTED,DISCONNECT框架週期的數量代表有多少客戶端經過STOMP層次鏈接.記住鏈接關閉的數量要小於session非正常關閉的數量,或未經過發送DISCONNECT框架關閉的數量.

STOMP Broker Relay (stomp 代理器中介)

  • Tcp Conenctions (TCP鏈接)

說明有多少表明webSocket客戶端的session的指向代理器的TCp鏈接.這個應該是客戶端session的數量+1,另外一個是應用內部用於發送消息的系統鏈接.

  • STOMP Frames (STOMP框架)

    從代理器接受或發送的表明客戶端的CONENCT,CONNECTED,DISCONNECT框架的總數.記住一個DISCONECT是發送到代理器的,而與客戶端WebSocket sessoin是否關閉無關.因此更少DISCONNECT框架的數量代表代理器掙積極關閉鏈接,多是由於心跳爲及時到達,一個無效的輸入框架,或其餘.

  • Client Inbound Channel(客戶端輸入頻道)

來自支持"clientInboundChannel"端的線程池統計提供了入庫消息進程的健康狀態的審視.任務隊列上升代表應用處理消息太慢了.若是這裏有I/O方面的任務(例如下降數據庫查詢,第三方的REST API的http請求),則須要考慮增長線程池容量.

  • Client Outbound Channel (客戶輸出方面頻道)

    從支持"clientOutboundChannel"的線程池獲得的統計提供了想客戶端廣播消息相關的健康情況.排隊數量變大,則說明客戶端接受消息太慢.一種解決辦法是增長線程池的數量來適應當前緩慢客戶端的數量.另外一種方法是下降發送時間間隔和發送緩衝限制.(查看上節內容)

  • SockJS Task Scheduler(SockJS 任務調度器)

    這個是用來發送心跳的SockJS任務調度器的線程池數量的統計.記住小心跳協商爲STOMP層次時,SockJS心跳會失效.

26.4.18 測試標誌的控制器方法

這裏有兩種主要步驟經過spring STOMP對WebSocket的支持來測試應用.一個是服務器測試控制器和它註解的消息處理方法的功能.另外一個是寫一個全功能的端對端的測試,包括運行着的客戶端和服務器.

這兩個步驟不是互相排斥的.相反,每個都有總體測試策略的地點.服務端的測試更加集中在寫和處理.端對端集成測試一方面更加完整,測試更多,但他們更多的是啊是啊寫和處理.

服務器端測試最簡單的方案是寫一個控制器單元測試.可是由於控制器依賴它的註解,因此這個不是特別好用.單純的單元測試沒法檢測他們.

理想的控制器測試應該在運行時喚醒,就像使用spring mvc測試框架來測試控制器處理http請求同樣,無需運行在Servlet容器裏但能夠經過spring框架來喚醒這個標誌控制器.項spring mvc測試同樣,這裏有兩種可能的選擇.要麼使用"context-based"或"standalone"安裝:

    1. 經過spring TestContext framework的幫助來加載真實的spring配置.將"clientInboundChannel"做爲一個測試域,使用它發送消息會被控制器方法處理.
  • 2.手動安裝能喚醒控制器(名爲SimpAnnotationMethodMessageHandler)的最小的spring框架組件,並經過控制器直接發送消息.

這兩種計劃方案都表如今tests for the stock portfolio項目裏.

第二種方案是建立端對端的集成測試.對此,你須要在可行狀態下運行一個webSocket服務器,並經過WebSocket客戶端鏈接它,併發送包含了STOMP框架的消息.tests for the stock portfolio項目裏也介紹了一種方案,使用Tomcat做爲可用服務器和一個簡單STOMP客戶端以達到測試目的.

相關文章
相關標籤/搜索