spring+rabbitmq+stomp搭建websocket消息推送(非spring boot方式)

前言:javascript

兩年前作過spring+activemq+stomp的ws推送,那個作起來很簡單,但如今公司用的mq中間件是rabbitmq,所以須要經過rabbitmq去作ws通訊。仔細搜了搜百度/谷歌,網上經過spring boot+rabbitmq+stomp的教程文章卻是一搜一大把,惋惜目前的項目是非spring boot的,無法套用。只好本身去搗鼓。搞了幾個小時,終於弄出來了,特此與你們分享下。html

RabbitMQ:前端

 怎麼安裝就不是本篇討論的話題了,本身百度/谷歌之。rabbitmq默認自帶了stomp插件,可是須要本身啓用。命令爲:java

rabbitmq-plugins enable rabbitmq_stomp

來來來,給個文檔地址參考參考,http://www.rabbitmq.com/stomp.html。默認用guest用戶去鏈接,密碼也是guest。web

這裏有個問題,看rabbitmq配置文件,stomp協議端口默認是61613,可是用ws協議鏈接卻始終鏈接不上,因此只能用web stomp端口,端口號是15674,這個跟activemq有所區別。(P.S. 此處最好有大神來解惑,或者告知如何用61613來連spring

Javascript:chrome

前端代碼擼起來最方便,關鍵是調試也容易,所以先來。json

var stompClient = null;

var headers = {
  login: 'guest',
  passcode: 'guest'
};

function wsConnect(url) {
    var ws = new SockJS(url);
    stompClient = Stomp.over(ws);

    //var ws = new WebSocket(url);
    //stompClient = Stomp.over(ws);

    // SockJS does not support heart-beat: disable heart-beats
    stompClient.heartbeat.outgoing = 0;
    stompClient.heartbeat.incoming = 0;

    stompClient.connect(headers, function (frame) {
        console.log('Connected: ' + frame);

        stompClient.subscribe('/topic/test', function (sms) {
            var obj = JSON.parse(sms.body)
            var count = obj.totalCount;

            console.log("count: " + count);
        });

    });
}

而後就鏈接唄。session

$(function(){
    var url = "http://host:15674/stomp";
    wsConnect(url);
});   

 擼完準備測試,固然是選擇chrome嘍,頁面加載後,打開console控制檯,能夠看到web socket連上了,前端大功告成。socket

  

Java:

定義一個StompService類專門用來發送stomp消息。注意:rabbitmq 3.7之後stomp插件再也不支持sockjs,所以寫法會有變化。

import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

/**
 * stomp服務  rabbitmq作中間件
 * @author Selwyn
 * @version $Id: WebSocketConfig.java, v 0.1 9/7/2018 9:59 AM Selwyn Exp $
 */
@Component
public class StompService {

    private static final String URL_TEMPLATE = "http://%s:%s/stomp";

    @Value("${rabbit.host}")
    private String host;

    //@Value("${rabbit.stomp.port}")
    private Integer port = 15674;

    /**
     * 鏈接用戶名
     */
    //@Value("${rabbit.stomp.login}")
    private String login = "guest";
    /**
     * 鏈接密碼
     */
    //@Value("${rabbit.stomp.passCode}")
    private String passCode = "guest";

    private String url;

    @PostConstruct
    public void init()
    {
        url = String.format(URL_TEMPLATE, host, port);
    }

    /**
     * 發送stomp消息
     * @param dest  目的地 好比/topic/test
     * @param toSend  要發送的信息
     * @param <T>
     */
    public <T> void connectAndSend(String dest, T toSend)
    {
        WebSocketClient client = new StandardWebSocketClient();

        List<Transport> transports = new ArrayList<>(1);
        transports.add(new WebSocketTransport( client) );
        //rabbitmq 3.7之後就別這麼寫了。直接new WebSocketStompClient(client)就行
        WebSocketClient transport = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(transport);
        //StompSessionHandlerAdapter默認的payload類型是String, 所以MessageConverter必須是StringMessageConverter
        stompClient.setMessageConverter(new StringMessageConverter());

        final CustomStompSessionHandler sessionHandler =
                new CustomStompSessionHandler(dest, toSend);

        WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
        headers.setSecWebSocketProtocol("13");

        //鏈接用戶名/密碼也是必須的,不然連不上
        StompHeaders sHeaders = new StompHeaders();
        sHeaders.add("login", this.login);
        sHeaders.add("passcode", this.passCode);

        //開始鏈接,回調鏈接上後發送stomp消息
        stompClient.connect(url, headers, sHeaders, sessionHandler);

        //要同步獲得發送結果的話,用CountDownLatch來作或者connect結果的future對象作get
    }

}

而後編寫回調handler類。

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;


/**
 * 自定義stomp session 回調handler
 * @author Selwyn
 * @version $Id: CustomStompSessionHandler.java, v 0.1 9/7/2018 3:43 PM Selwyn Exp $
 */
@Slf4j
public class CustomStompSessionHandler extends StompSessionHandlerAdapter {

    /**
     * 要發送的對象,將會json化傳輸出去
     */
    private Object toSend;

    /**
     * 目的地,通常是topic地址
     */
    private String dest;

    public CustomStompSessionHandler(String dest, Object toSend) {
        this.toSend = toSend;
        this.dest = dest;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        super.handleFrame(headers, payload);
    }

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        super.afterConnected(session, connectedHeaders);
        String msg = JSON.toJSONString(toSend);
        try{
            session.send(dest, msg);
        }catch(Exception e)
        {
            log.error("failed to send stomp msg({}) to destination {}", msg, dest);
        }finally {
            //作完了關閉唄
            session.disconnect();
        }
    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        super.handleException(session, command, headers, payload, exception);
        log.error("stomp error: {}", exception);
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        super.handleTransportError(session, exception);
        log.error("stomp transport error: {}", exception);
    }

    public void setToSend(Object toSend) {
        this.toSend = toSend;
    }

    public void setDest(String dest) {
        this.dest = dest;
    }
}

再本身寫個controller或者寫個單元測試方法,這裏就不給出代碼了,擼完後啓動服務,就能夠測試消息推送了,實踐證實,真香!

結尾:

其實整個過程還沒完,須要考慮到鏈接中斷等狀況,客戶端和服務後臺都須要作好重連機制。經過sockjs這種方式鏈接是沒有心跳機制的,這個比activemq帶的stomp插件要low。我的建議,若是能用spring boot的話儘可能用那種方式去實現stomp消息推送。

相關文章
相關標籤/搜索