MQTT簡單介紹與實現

1. MQTT 介紹
它是一種 機器之間通信 machine-to-machine (M2M)、物聯網 Internet of Things (IoT)經常使用的一種輕量級消息傳輸協議
適用於網絡帶寬較低的場合
包含發佈、訂閱模式,經過一個代理服務器(broker),任何一個客戶端(client)均可以訂閱或者發佈某個主題的消息,而後訂閱了該主題的客戶端則會收到該消息java

1.1 消息主題
發佈消息或者訂閱消息都要選定一個消息主題,消息主題能夠任意定製,相似文件系統,用 「/」 進行分隔,例如主題爲 /a/b/c/d 的消息
客戶端可使用徹底字符匹配消息,也可使用通配符進行消息匹配
通配符 + :替換任意單個層級。好比訂閱 /a/b/c/d、/a/+/c/d 、+/+/+/+ 主題的消息便可收到主題爲 /a/b/c/d 的消息,而 b/+/c/d 、 +/+/+ 不會匹配
通配符 # :匹配任意層級,只能用於末尾, #、a/# 能夠匹配上面的主題消息
長度爲 0 的主題層級也是容許的。好比發佈主題爲 a//topic 的消息,客戶端能夠用 a/+/topic 進行匹配。/a/topic 的主題用 +/a/topic、#、/# 能夠匹配。spring

1.2 服務質量(Quality of Service,QoS)
MQTT 定義了三種客戶端與代理服務器之間消息到達的難度服務器

0:broker/client 之間消息傳一次,並不確認傳到沒有,消息可能丟失
1:broker/client 之間消息至少一次,帶確認消息的傳輸,可能重複收到
2:broker/client 之間消息僅有一次,利用四次握手進行確認,網絡延遲可能會增長
當客戶端訂閱的消息質量與代理服務器發佈主題的質量不一樣時,客戶端會選擇難度最小的 QoS 接收消息網絡

發佈等級爲 2 ,客戶端訂閱等級爲 0, 那麼客戶端接收到的 QoS = 0
發佈等級爲 0 ,訂閱等級爲 2,那麼客戶端接收到的 QoS = 0session

1.3 消息保留
即當 broker 正在發送消息給 client 時,消息會保存,若是此時有新的 client 訂閱了該主題的消息,那麼它也會收到消息。這種作法的好處就是當消息主題常常變換的時候,若是有新的 client 訂閱該消息,那麼它不用等待太長的時間就能夠收到消息
1.4 會話清除
client 能夠設置 clean session 標誌位,當 clean session = false 時,client 失去鏈接時, broker 會一直保留消息直到 client 從新鏈接。而 clean session = true 時,broker 會清除全部的消息當這個 client 失去鏈接。
1.5 消息意願
當 client 鏈接上 broker 時,client 會提示 broker 它有一個意願消息,這個意願消息將會在 client 失去鏈接時,broker 發送出去。消息意願和普通消息同樣都包含主題和內容。dom

2. 實例

js實現過程eclipse

function RndNum(n){
var rnd="";
for(var i=0;i<n;i++)
rnd+=Math.floor(Math.random()*10);
return rnd;
}


var hostname = '服務器',
port = 「端口號」,
clientId = RndNum(5),
keepAlive = 100,
cleanSession = false,
userName = '',
password = '',
topic = '訂閱消息';
client = new Paho.MQTT.Client(hostname, port, clientId);
//創建客戶端實例
var options = {
invocationContext: {
host: hostname,
port: port,
path: client.path,
clientId: clientId
},
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
userName: userName,
password: password,
onSuccess: onConnect,
};
client.connect(options);
//鏈接服務器並註冊鏈接成功處理事件
function onConnect() {
console.log("onConnected");
client.subscribe(topic);
}
client.onConnectionLost = onConnectionLost;
//註冊鏈接斷開處理事件
client.onMessageArrived = onMessageArrived;

//註冊消息接收處理事件
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" + responseObject.errorMessage);
console.log("鏈接已斷開");
}
}
function onMessageArrived(message) {
let msg = message.payloadString;
// 消息處理

}


var count = 0;
function start() {
window.tester = window.setInterval(function () {
if(count!=0){
if (client.isConnected) {
var s = "content:" + (count++) ;
message = new Paho.MQTT.Message(s);
message.destinationName = topic;
client.send(message);
}
}else
{
window.clearInterval(window.tester);
}
});
}

java spring boot實現過程
1.添加依賴
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
2.建立類
public class mqttutil {    private  String serviceURI = "服務器端口號";    private  String clientID = "客戶端id";    private MqttClientPersistence persistence = new MemoryPersistence();    private  String username = "";    private  String password = "";    private  String topic = "訂閱";    private  String MessageStr = "";    private  int qos = 0;    /**     * 消息訂閱     **/    public  void subscribe() {        try {            MqttClient client = new MqttClient(serviceURI, clientID, persistence);            client.setCallback(new MqttCallback() {                public void connectionLost(Throwable cause) {                    System.out.println("訂閱者鏈接丟失...");                    System.out.println(cause.getMessage());                }                public void messageArrived(String topic, MqttMessage message) {                    MessageStr = message.toString();//                    System.out.println("訂閱者接收到消息:"+MessageStr);                }                public void deliveryComplete(IMqttDeliveryToken token) {                }            });            MqttConnectOptions connectOptions = new MqttConnectOptions();            connectOptions.setUserName(username);            connectOptions.setPassword(password.toCharArray());            connectOptions.setCleanSession(false);            //訂閱者鏈接訂閱主題            client.connect(connectOptions);            client.subscribe(topic, qos);            System.out.println("訂閱者鏈接狀態: " + client.isConnected());        } catch (MqttException e) {            e.printStackTrace();        }    }
相關文章
相關標籤/搜索