rabbitmq做爲mqtt服務器實現websocket消息推送給瀏覽器

rabbitmq的RabbitMQ Web MQTT插件能夠用來支持將rabbitmq做爲MQTT協議的服務器,而websocket支持mqtt協議通訊實現消息推送。由於咱們目前使用rabbitmq,因此採用其做爲ws的服務端(原來有過activemq的作法,其原生也支持MQTT協議)。html

首先安裝RabbitMQ Web MQTT插件,以下:java

rabbitmq-plugins enable rabbitmq_web_mqtt

MQTT在15675端口下的ws命名空間暴露WebSocket端點。以下:web

http://IP:15675/ws

Eclipse旗下的Paho JavaScript客戶端能夠使用MQTT協議實現ws通訊,其使用以下:服務器

<script src="mqttws31.js"></script>
<script>

    var wsbroker = location.hostname;  // mqtt websocket enabled broker
    var wsport = 15675; // port for above
    client = new Paho.MQTT.Client(wsbroker, wsport, "/ws", "myclientid_" + parseInt(Math.random() * 100, 10)); client.onConnectionLost = function (responseObject) { debug("CONNECTION LOST - " + responseObject.errorMessage); }; client.onMessageArrived = function (message) { debug("RECEIVE ON " + message.destinationName + " PAYLOAD " + message.payloadString); print_first(message.payloadString); }; var sessionId = getSessionId(); var options = { timeout: 3, keepAliveInterval: 30, onSuccess: function () { debug("CONNECTION SUCCESS");
// 這樣就能夠作到點對點通訊 client.subscribe(sessionId
, {qos: 1}); }, onFailure: function (message) { debug("CONNECTION FAILURE - " + message.errorMessage); } }; if (location.protocol == "https:") { options.useSSL = true; } debug("CONNECT TO " + wsbroker + ":" + wsport); client.connect(options);

java服務端發送消息給rabbitmq:websocket

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; private static void testSendMqtt() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // factory.setVirtualHost("");
        factory.setHost("127.0.0.1"); factory.setPort(5672); Connection conn = null; Channel channel = null; try { conn = factory.newConnection(); channel = conn.createChannel(); String sessionId = getSessionId(); byte[] messageBodyBytes = "{'text':'Hello, world!中文'}".getBytes(); // 這樣就能夠作到點對點通訊了,amq.topic是默認exchange
            channel.basicPublish("amq.topic", sessionId, null, messageBodyBytes); }finally { if (channel != null) { channel.close(); } if (conn != null) { conn.close(); } } }

 

參考:session

https://www.rabbitmq.com/web-mqtt.htmldom

https://www.eclipse.org/paho/clients/js/eclipse

相關文章
相關標籤/搜索