Spring消息之STOMP

1、STOMP 簡介

    直接使用WebSocket(或SockJS)就很相似於使用TCP套接字來編寫Web應用。由於沒有高層級的線路協議(wire protocol),所以就須要咱們定義應用之間所發送消息的語義,還須要確保鏈接的兩端都能遵循這些語義。javascript

    就像HTTP在TCP套接字之上添加了請求-響應模型層同樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。css

    與HTTP請求和響應相似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,以下就是發送數據的一個STOMP幀:java

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20

{"message":"Marco!"}

    在這個例子中,STOMP命令是send,代表會發送一些內容。緊接着是三個頭信息:一個表示消息的的事務機制,一個用來表示消息要發送到哪裏的目的地,另一個則包含了負載的大小。而後,緊接着是一個空行,STOMP幀的最後是負載內容。git

2、服務端實現

一、啓用STOMP功能

    STOMP 的消息根據前綴的不一樣分爲三種。以下,以 /app 開頭的消息都會被路由到帶有@MessageMapping 或 @SubscribeMapping 註解的方法中;以/topic 或 /queue 開頭的消息都會發送到STOMP代理中,根據你所選擇的STOMP代理不一樣,目的地的可選前綴也會有所限制;以/user開頭的消息會將消息重路由到某個用戶獨有的目的地上。github

@Configuration
@EnableWebSocketMessageBroker
@PropertySource("classpath:resources.properties")
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.port}")
    private Integer port;

    @Value("${rabbitmq.userName}")
    private String userName;

    @Value("${rabbitmq.password}")
    private String password;

    /**
     * 將 "/stomp" 註冊爲一個 STOMP 端點。這個路徑與以前發送和接收消息的目的地路徑有所
     * 不一樣。這是一個端點,客戶端在訂閱或發佈消息到目的地路徑前,要鏈接到該端點。
     *
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp").withSockJS();
    }


    /**
     * 若是不重載它的話,將會自動配置一個簡單的內存消息代理,用它來處理以"/topic"爲前綴的消息
     *
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //基於內存的STOMP消息代理
        registry.enableSimpleBroker("/queue", "/topic");

        //基於RabbitMQ 的STOMP消息代理
/*        registry.enableStompBrokerRelay("/queue", "/topic")
                .setRelayHost(host)
                .setRelayPort(port)
                .setClientLogin(userName)
                .setClientPasscode(password);*/

        registry.setApplicationDestinationPrefixes("/app", "/foo");
        registry.setUserDestinationPrefix("/user");
    }
}
View Code

二、處理來自客戶端的STOMP消息

    服務端處理客戶端發來的STOMP消息,主要用的是 @MessageMapping 註解。以下:web

  @MessageMapping("/marco")
  @SendTo("/topic/marco")
  public Shout stompHandle(Shout shout){
      LOGGER.info("接收到消息:" + shout.getMessage());
      Shout s = new Shout();
      s.setMessage("Polo!");
      return s;
  }

    2.一、@MessageMapping 指定目的地是「/app/marco」(「/app」前綴是隱含的,由於咱們將其配置爲應用的目的地前綴)。spring

    2.二、方法接收一個Shout參數,由於Spring的某一個消息轉換器會將STOMP消息的負載轉換爲Shout對象。Spring 4.0提供了幾個消息轉換器,做爲其消息API的一部分:websocket

    2.三、尤爲注意,這個處理器方法有一個返回值,這個返回值並非返回給客戶端的,而是轉發給消息代理的,若是客戶端想要這個返回值的話,只能從消息代理訂閱。@SendTo 註解重寫了消息代理的目的地,若是不指定@SendTo,幀所發往的目的地會與觸發處理器方法的目的地相同,只不過會添加上「/topic」前綴。app

    2.四、若是客戶端就是想要服務端直接返回消息呢?聽起來不就是HTTP作的事情!即便這樣,STOMP 仍然爲這種一次性的響應提供了支持,用的是@SubscribeMapping註解,與HTTP不一樣的是,這種請求-響應模式是異步的...異步

   @SubscribeMapping("/getShout")
   public Shout getShout(){
       Shout shout = new Shout();
       shout.setMessage("Hello STOMP");
       return shout;
   }

三、發送消息到客戶端

3.1 在處理消息以後發送消息

    正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 註解能夠處理客戶端發送過來的消息,並選擇方法是否有返回值。

    若是 @MessageMapping 註解的控制器方法有返回值的話,返回值會被髮送到消息代理,只不過會添加上"/topic"前綴。可使用@SendTo 重寫消息目的地;

    若是 @SubscribeMapping 註解的控制器方法有返回值的話,返回值會直接發送到客戶端,不通過代理。若是加上@SendTo 註解的話,則要通過消息代理。

3.2 在應用的任意地方發送消息

    spring-websocket 定義了一個 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),能夠實現自由的向任意目的地發送消息,而且訂閱此目的地的全部用戶都能收到消息。

  @Autowired
  private SimpMessageSendingOperations simpMessageSendingOperations;


  /**
  * 廣播消息,不指定用戶,全部訂閱此的用戶都能收到消息
  * @param shout
  */
  @MessageMapping("/broadcastShout")
  public void broadcast(Shout shout) {
      simpMessageSendingOperations.convertAndSend("/topic/shouts", shout);
  }

3.3 爲指定用戶發送消息

    3.2介紹瞭如何廣播消息,訂閱目的地的全部用戶都能收到消息。若是消息只想發送給特定的用戶呢?spring-websocket 介紹了兩種方式來實現這種功能,一種是 基於@SendToUser註解和Principal參數,一種是SimpMessageSendingOperations 接口的convertAndSendToUser方法。

  • 基於@SendToUser註解和Principal參數

    @SendToUser 表示要將消息發送給指定的用戶,會自動在消息目的地前補上"/user"前綴。以下,最後消息會被髮布在  /user/queue/notifications-username。可是問題來了,這個username是怎麼來的呢?就是經過 principal 參數來得到的。那麼,principal 參數又是怎麼來的呢?須要在spring-websocket 的配置類中重寫 configureClientInboundChannel 方法,添加上用戶的認證。

    /**
     * 一、設置攔截器
     * 二、首次鏈接的時候,獲取其Header信息,利用Header裏面的信息進行權限認證
     * 三、經過認證的用戶,使用 accessor.setUser(user); 方法,將登錄信息綁定在該 StompHeaderAccessor 上,在Controller方法上能夠獲取 StompHeaderAccessor 的相關信息
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                //一、判斷是否首次鏈接
                if (StompCommand.CONNECT.equals(accessor.getCommand())){
                    //二、判斷用戶名和密碼
                    String username = accessor.getNativeHeader("username").get(0);
                    String password = accessor.getNativeHeader("password").get(0);

                    if ("admin".equals(username) && "admin".equals(password)){
                        Principal principal = new Principal() {
                            @Override
                            public String getName() {
                                return userName;
                            }
                        };
                        accessor.setUser(principal);
                        return message;
                    }else {
                        return null;
                    }
                }
                //不是首次鏈接,已經登錄成功
                return message;
            }

        });
    }
spring-websocket 用戶認證
  @MessageMapping("/shout")
  @SendToUser("/queue/notifications")
  public Shout userStomp(Principal principal, Shout shout) {
        String name = principal.getName();
        String message = shout.getMessage();
        LOGGER.info("認證的名字是:{},收到的消息是:{}", name, message);
        return shout;
  }
  • convertAndSendToUser方法

    除了convertAndSend()之外,SimpMessageSendingOperations 還提供了convertAndSendToUser()方法。按照名字就能夠判斷出來,convertAndSendToUser()方法可以讓咱們給特定用戶發送消息。

    @MessageMapping("/singleShout")
    public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) {
        String message = shout.getMessage();
        LOGGER.info("接收到消息:" + message);
        Principal user = stompHeaderAccessor.getUser();
        simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout);
    }

    如上,這裏雖然我仍是用了認證的信息獲得用戶名。可是,其實大可沒必要這樣,由於 convertAndSendToUser 方法能夠指定要發送給哪一個用戶。也就是說,徹底能夠把用戶名的看成一個參數傳遞給控制器方法,從而繞過身份認證!convertAndSendToUser 方法最終會把消息發送到 /user/sername/queue/shouts 目的地上。

四、處理消息異常

    在處理消息的時候,有可能會出錯並拋出異常。由於STOMP消息異步的特色,發送者可能永遠也不會知道出現了錯誤。@MessageExceptionHandler標註的方法可以處理消息方法中所拋出的異常。咱們能夠把錯誤發送給用戶特定的目的地上,而後用戶從該目的地上訂閱消息,從而用戶就能知道本身出現了什麼錯誤啦...

     @MessageExceptionHandler(Exception.class)
     @SendToUser("/queue/errors")
     public Exception handleExceptions(Exception t){
         t.printStackTrace();
         return t;
     }

3、客戶端實現

一、JavaScript 依賴

    STOMP 依賴 sockjs.js 和 stomp.min.js。stomp.min.js的下載連接:http://www.bootcdn.cn/stomp.js/

    <script type="text/javascript" src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.js"></script>
    <script type="text/javascript" src="/js/stomp.min.js"></script>

二、JavaScript 客戶端實現

/*STOMP*/
var url = 'http://localhost:8080/stomp';
var sock = new SockJS(url);
var stomp = Stomp.over(sock);

var strJson = JSON.stringify({'message': 'Marco!'});

//默認的和STOMP端點鏈接
/*stomp.connect("guest", "guest", function (franme) {

});*/

var headers={
    username:'admin',
    password:'admin'
};

stomp.connect(headers, function (frame) {

    //發送消息
    //第二個參數是一個頭信息的Map,它會包含在STOMP的幀中
    //事務支持
    var tx = stomp.begin();
    stomp.send("/app/marco", {transaction: tx.id}, strJson);
    tx.commit();


    //訂閱服務端消息 subscribe(destination url, callback[, headers])
    stomp.subscribe("/topic/marco", function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("訂閱的服務端消息:" + obj.message);
    }, {});


    stomp.subscribe("/app/getShout", function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("訂閱的服務端直接返回的消息:" + obj.message);
    }, {});


    /*如下是針對特定用戶的訂閱*/
    var adminJSON = JSON.stringify({'message': 'ADMIN'});
    /*第一種*/
    stomp.send("/app/singleShout", {}, adminJSON);
    stomp.subscribe("/user/queue/shouts",function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("admin用戶特定的消息1:" + obj.message);
    });
    /*第二種*/
    stomp.send("/app/shout", {}, adminJSON);
    stomp.subscribe("/user/queue/notifications",function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("admin用戶特定的消息2:" + obj.message);
    });

    /*訂閱異常消息*/
    stomp.subscribe("/user/queue/errors", function (message) {
        console.log(message.body);
    });

    //若使用STOMP 1.1 版本,默認開啓了心跳檢測機制(默認值都是10000ms)
    stomp.heartbeat.outgoing = 20000;

    stomp.heartbeat.incoming = 0; //客戶端不從服務端接收心跳包
});
View Code

 

演示源代碼連接:https://github.com/JMCuixy/SpringWebSocket

相關文章
相關標籤/搜索