MQTT(消息隊列遙測傳輸) 是基於 TCP/IP 協議棧而構建的支持在各方之間異步通訊的消息協議。MQTT在空間和時間上將消息發送者與接收者分離,所以能夠在不可靠的網絡環境中進行擴展。雖然叫作消息隊列遙測傳輸,但它與消息隊列毫無關係,而是使用了發佈和訂閱(Pub/Sub)的模型。javascript
MQTT 是一種輕量級的、靈活的網絡協議,致力於爲 IoT 開發人員實現適當的平衡:php
MQTT Client 庫在不少語言中都有實現,包括 Embedded C、C、Java、JavaScript、Python、C++、C#、Go、iOS、Android等。Eclipse Paho的MQTT庫下載地址:www.eclipse.org/paho/downlo…html
下面開發實踐基於Nodejs版mqtt,獲取地址 www.npmjs.com/package/mqt…java
Bit
|
7
|
6
|
5
|
4
|
3
|
2
|
1
|
0
|
byte 1
|
MQTT控制報文的類型
|
用於指定控制報文類型的標誌位
|
||||||
byte 2,3,4,5
|
剩餘長度,最大4個字節
|
控制報文類型算法
名字 | 值 | 報文流動方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | Client -> Broker | device鏈接IoT平臺 |
CONNACK | 2 | Broker -> Client | IoT平臺確認鏈接結果 |
PUBLISH | 3 | 雙向 | 發佈消息 |
PUBACK | 4 | 雙向 | QoS=1消息發佈收到確認 |
SUBSCRIBE | 8 | Client -> Broker | device訂閱IoT平臺Topic |
SUBACK | 9 | Broker -> Client | IoT平臺確認訂閱結果 |
UNSUBSCRIBE | 10 | Client -> Broker | device取消訂閱IoT平臺Topic |
UNSUBACK | 11 | Broker -> Client | IoT平臺確認取消訂閱結果 |
PINGREQ | 12 | Client -> Broker | device發送心跳請求到IoT平臺 |
PINGRESP | 13 | Broker -> Client | IoT平臺響應device心跳 |
DISCONNECT | 14 | Client -> Broker | device斷開IoT平臺鏈接 |
Reserved | 15 | 禁止 | 保留 |
控制報文類型標誌位npm
控制報文 | 固定報頭標誌 | Bit 3 | Bit 2 | Bit 1 | Bit 0 |
---|---|---|---|---|---|
PUBLISH | MQTT 3.1.1使用 | DUP | QoS | QoS | RETAIN |
剩餘長度安全
字節數 | 最小值 | 最大值 |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
注:阿里雲IoT的單個payload最大256K網絡
某些MQTT控制報文包含一個可變報頭部分。它在固定報頭和負載之間。可變報頭的內容根據報文類型的不一樣而不一樣。可變報頭的報文標識符(Packet Identifier)字段存在於在多個類型的報文裏。session
Bit | 7 - 0 |
---|---|
byte 1 | 報文標識符 MSB |
byte 2 | 報文標識符 LSB |
控制報文 | 報文標識符 |
---|---|
PUBLISH | 須要(若是QoS =1,2) |
PUBACK | 須要 |
PUBREC | 須要 |
PUBREL | 須要 |
PUBCOMP | 須要 |
SUBSCRIBE | 須要 |
SUBACK | 須要 |
UNSUBSCRIBE | 須要 |
UNSUBACK | 須要 |
如下MQTT控制報文在報文的最後部分包含一個有效載荷。對於PUBLISH來講有效載荷就是業務消息。dom
控制報文 | 有效載荷 |
---|---|
CONNECT | 須要 |
PUBLISH | 可選 |
SUBSCRIBE | 須要 |
SUBACK | 須要 |
UNSUBSCRIBE | 須要 |
阿里雲IoT物聯網平臺的MQTT協議不支持will消息,CONNECT 消息內容參數以下:
參數
|
說明
|
cleanSession
|
此標誌指定鏈接是不是持久性的。
0爲持久會話,QoS=1消息不會丟失;
1爲非持久會話,清理離線消息。
|
clientId
|
客戶端標識符
|
username
|
代理的身份驗證和受權憑證。
|
password
|
代理的身份驗證和受權憑證。
|
keepAlive
|
心跳時間,
IoT平臺約定心跳範圍 30s~1200s
|
其中clientId,username,password由設備三元組(productKey,deviceName,deviceSecret)按照規則生成,具體規則以下:
clientId
|
id+"|securemode=3,signmethod=hmacsha1,timestamp="+timestamp+"|"
|
id
:表示客戶端ID,64字符內。其中
||內爲擴展參數。
securemode:安全模式;2爲TLS加密,3爲非加密
signmethod:簽名算法類型。 timestamp:當前時間毫秒值。
|
username
|
deviceName+"&"+productKey
|
|
password
|
sign_hmac(deviceSecret,content)
|
sign_hmac爲clientId中的signmethod算法類型
content爲以下拼接字符串: "
clientId${
id}
deviceName${deviceName}
productKey${productKey}
timestamp${timestamp}"
|
官方文檔:help.aliyun.com/document_de…
設備端代碼示例(Nodejs版) client.js
/** "dependencies": { "mqtt": "2.18.8" } */
const crypto = require('crypto');
const mqtt = require('mqtt');
//設備身份三元組+區域
const deviceConfig = {
productKey: "替換",
deviceName: "替換",
deviceSecret: "替換",
regionId: "cn-shanghai"
};
//根據三元組生成mqtt鏈接參數
const options = initMqttOptions(deviceConfig);
const url = `tcp://${deviceConfig.productKey}.iot-as-mqtt.${deviceConfig.regionId}.aliyuncs.com:1883`;
//2.創建鏈接
const client = mqtt.connect(url, options);
client.on('packetsend', function (packet){
console.log('send '+packet.cmd+' packet =>',packet)
})
client.on('packetreceive', function (packet){
console.log('receive '+packet.cmd+' packet =>',packet)
})
//IoT平臺mqtt鏈接參數初始化
function initMqttOptions(deviceConfig) {
const params = {
productKey: deviceConfig.productKey,
deviceName: deviceConfig.deviceName,
timestamp: Date.now(),
clientId: Math.random().toString(36).substr(2),
}
//CONNECT參數
const options = {
keepalive: 60, //60s
clean: false, //cleanSession保持持久會話
protocolVersion: 4 //MQTT v3.1.1
}
//1.生成clientId,username,password
options.password = signHmacSha1(params, deviceConfig.deviceSecret);
options.clientId = `${params.clientId}|securemode=3,signmethod=hmacsha1,timestamp=${params.timestamp}|`;
options.username = `${params.deviceName}&${params.productKey}`;
return options;
}
/* 生成基於HmacSha1的password 參考文檔:https://help.aliyun.com/document_detail/73742.html?#h2-url-1 */
function signHmacSha1(params, deviceSecret) {
let keys = Object.keys(params).sort();
// 按字典序排序
keys = keys.sort();
const list = [];
keys.map((key) => {
list.push(`${key}${params[key]}`);
});
const contentStr = list.join('');
return crypto.createHmac('sha1', deviceSecret)
.update(contentStr)
.digest('hex');
}
複製代碼
receive connack packet => Packet {
cmd: 'connack',
retain: false,
qos: 0,
dup: false,
length: 2,
topic: null,
payload: null,
sessionPresent: false,
returnCode: 0
}
複製代碼
send pingreq packet => { cmd: 'pingreq' }
複製代碼
receive pingresp packet => Packet {
cmd: 'pingresp',
retain: false,
qos: 0,
dup: false,
length: 0,
topic: null,
payload: null
}
複製代碼
//3.屬性數據上報
const topic = `/sys/${deviceConfig.productKey}/${deviceConfig.deviceName}/thing/event/property/post`;
setInterval(function() {
//發佈數據到topic
client.publish(topic, getPostData(),{qos:1});
}, 5 * 1000);
function getPostData() {
const payloadJson = {
id: Date.now(),
params: {
temperature: Math.floor((Math.random() * 20) + 10),
humidity: Math.floor((Math.random() * 20) + 60)
},
method: "thing.event.property.post"
}
console.log("===postData\n topic=" + topic)
console.log(payloadJson)
return JSON.stringify(payloadJson);
}
複製代碼
send publish packet => { cmd: 'publish',
topic: '/sys/a1hQSwFledE/eud1jXfEgCsAiP2eId9Q/thing/event/property/post',
payload: '{"id":1543896481106,"params":{"temperature":23,"humidity":73},"method":"thing.event.property.post"}',
qos: 1,
retain: false,
messageId: 38850,
dup: false
}
複製代碼
receive puback packet => Packet {
cmd: 'puback',
retain: false,
qos: 0,
dup: false,
length: 2,
topic: null,
payload: null,
messageId: 38850
}
複製代碼
//4.訂閱主題,接收指令
const subTopic = `/${deviceConfig.productKey}/${deviceConfig.deviceName}/control`;
client.subscribe(subTopic)
client.on('message', function(topic, message) {
console.log("topic " + topic)
console.log("message " + message)
})
複製代碼
SUBSCRIBE消息體
send subscribe packet => { cmd: 'subscribe',
subscriptions:
[ { topic: '/a1hQSwFledE/eud1jXfEgCsAiP2eId9Q/control', qos: 0 } ],
qos: 1,
retain: false,
dup: false,
messageId: 38851
}
複製代碼
SUBACK消息體
receive suback packet => Packet {
cmd: 'suback',
retain: false,
qos: 0,
dup: false,
length: 3,
topic: null,
payload: null,
granted: [ 128 ],
messageId: 38851
}
複製代碼
send unsubscribe packet => { cmd: 'unsubscribe',
qos: 1,
messageId: 34323,
unsubscriptions: [ '/a1hQSwFledE/eud1jXfEgCsAiP2eId9Q/control' ]
}
複製代碼
receive unsuback packet => Packet {
cmd: 'unsuback',
retain: false,
qos: 0,
dup: false,
length: 2,
topic: null,
payload: null,
messageId: 34323
}
複製代碼
服務質量
Quality of Service
|
描述
|
阿里雲IoT
|
QoS=0
|
最多一次的傳輸,可能會收不到消息
|
支持
|
QoS=1
|
至少一次的傳輸,必定會收到消息,可能重複
|
支持
|
QoS=2
|
有且僅有一次的傳輸
|
不支持
|
設備與阿里雲IoT的訂閱關係在雲端保持,除非設備主動unsubscribe,不然訂閱關係不清理。設備重連後,依然保持以前的訂閱關係,不須要重複訂閱。
設備和IoT平臺之間的鏈路能夠經過TLS v1.2加密。 若是使用TLS加密,須要下載根證書。 CONNECT參數中clientId的securemode=2