MQTT,是:php
- 輕量級的消息訂閱和發佈(publish/subscribe)協議
- 創建在TCP/IP協議之上
IoT,internet of things,物聯網,MQTT在這方面應用較多。html
官方網站:http://mqtt.org/java
MQTT協議是針對以下狀況設計的:node
- M2M(Machine to Machine) communication,機器端到端通訊,好比傳感器之間的數據通信
- 由於是Machine to Machine,須要考慮:
- Machine,或者叫設備,好比溫度傳感器,硬件能力很弱,協議要考慮儘可能小的資源消耗,好比計算能力和存儲等
- M2M多是無線鏈接,網絡不穩定,帶寬也比較小
MQTT協議的架構,用一個示例說明。好比有1個溫度傳感器(1個Machine),2個小的顯示屏(2個Machine),顯示屏要顯示溫度傳感器的溫度值。git
可經過MQTT V3.1 Protocol Specification查閱詳細規範的細節。github
顯示器須要先經過MQTT協議subscribe(訂閱)一個好比叫temperature
的topic(主題):web
當溫度傳感器publish(發佈)溫度數據,顯示器就能夠收到了:redis
注:以上兩張圖,取自MQTT and CoAP, IoT Protocolsmongodb
協議裏還有2個主要的角色:npm
- client,客戶端
- broker,服務器端
它們是經過TCP/IP協議鏈接的。
由於MQTT是協議,因此不能拿來直接用的,就比如HTTP協議同樣。須要找實現這個協議的庫或者服務器來運行。
這裏是官方的Server support。
我服務器端使用nodejs開發,所以選擇了:
MQTT.js最基本使用
安裝是很簡單的:
npm install mqtt
MQTT.js實現的服務器端
代碼以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
var mqtt = require('mqtt');
//{'topicName':[clientObj,clientObj ..]}
var subscribeTopics={};
//建立服務器對象
var server = mqtt.createServer(function(client) {
//創建鏈接時觸發
client.on(
'connect', function(packet) {
client.connack({returnCode:
0});
});
//客戶端發佈主題時觸發
client.on(
'publish', function(packet) {
var topic=packet.topic;
var payload=packet.payload;
//若是沒有建立空的主題對應的client數組
if(subscribeTopics[topic]==null){
subscribeTopics[topic]=[];
}
else{
//遍歷該主題下所有client,並逐一發送消息
for(var i in subscribeTopics[topic]){
var client=subscribeTopics[topic][i];
client.publish({
topic: topic,
payload: payload
});
}
}
});
//當客戶端訂閱時觸發
client.on(
'subscribe', function(packet) {
var topic=packet.subscriptions[0].topic;
//如沒有,建立空的主題對應的client數組
if(subscribeTopics[topic]==null){
subscribeTopics[topic]=[];
}
//若是client數組中沒有當前client,加入
if(subscribeTopics[topic].indexOf(client)==-1){
subscribeTopics[topic].push(client);
}
});
client.on(
'pingreq', function(packet) {
client.pingresp();
});
client.on(
'disconnect', function(packet) {
//遍歷全部主題,檢查對應的數組中是否有當前client,從數組中刪除
for (var topic in subscribeTopics){
var index=subscribeTopics[topic].indexOf(client);
if(index>-1){
subscribeTopics[topic].splice(index,
1);
}
}
});
});
//監聽端口
server.listen(
1883);
|
這是一個最基本的服務器端,消息的存儲和查詢都須要本身編程處理。
好比你若是須要用redis保存和觸發數據,可參考這篇中文文章:node mqtt server (redis pub/sub)。
MQTT.js實現的客戶端
代碼:
1
2
3
4
5
6
7
8
9
10
11
12
|
var mqtt = require('mqtt');
client = mqtt.createClient(
1883, 'localhost');
client.subscribe(
'testMessage');
client.publish(
'testMessage', '發佈測試信息');
client.on(
'message', function (topic, message) {
console.log(message);
client.end();
});
|
寫的很簡易,訂閱了主題,而後向相同主題發佈消息,接收到消息後client中止。
使用Mosca
MQTT.js只是實現了最基礎的MQTT協議部分,對於服務器端的處理須要本身完成。
有關MQTT.js是否實現了MQTT server,詳細的說明,可參見MQTT Server: MQTT.js or Mosca?
正好,Mosca在MQTT基礎上實現了這些,它能夠:
- 做爲獨立運行的MQTT服務器運行
- 集成到nodejs程序裏使用
安裝很簡單:
npm install mosca bunyan -g
做爲獨立服務器運行
運行:
mosca -v | bunyan
而後,還能夠用我上文的客戶端代碼運行測試。
集成在本身程序中使用
我考慮的後端持久化,是用MongoDB。Mosca另外幾個選項:
- Redis,缺點是更注重做爲緩存,而不適合可靠持久化
- LevelUp,頭一次據說,不打算作技術準備了,是用nodejs的包裝起來的LevelDB
- Memory,使用內存,估計默認的就是這個,不適合我使用的狀況
首先要安裝mosca的庫:
npm install mosca
而後,在本機將mongodb運行起來,應該就能夠執行下面的代碼了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
var mosca = require('mosca')
var settings = {
port:
1883,
backend:{
type:
'mongo',
url:
'mongodb://localhost:27017/mqtt',
pubsubCollection:
'ascoltatori',
mongo: {}
},
persistence:{
factory: mosca.persistence.Mongo,
url:
"mongodb://localhost:27017/mosca"
}
};
var server = new mosca.Server(settings);
server.on(
'ready', function(){
console.log('Mosca server is up and running');
});
server.on(
'published', function(packet, client) {
console.log('Published', packet.payload);
});
|
直接運行做者文檔中的代碼會在屢次運行客戶端後出現錯誤,我是參考了他2天前加上的示例代碼。
做者Matteo Collina生活在乎大利的博洛尼亞,寫代碼很勤奮,這個項目更新很快,是否是說明這個方向(mqtt)很活躍呢?
做者也寫了個幻燈片,MQTT and Node.js
MQTT高級問題
keepalive和PING
從這篇文章MQTT協議筆記之鏈接和心跳:
心跳時間(Keep Alive timer)
以秒爲單位,定義服務器端從客戶端接收消息的最大時間間隔。通常應用服務會在業務層次檢測客戶端網絡是否鏈接,不是TCP/IP協議層面的 心跳機制(好比開啓SOCKET的SO_KEEPALIVE選項)。 通常來說,在一個心跳間隔內,客戶端發送一個PINGREQ消息到服務器,服務器返回PINGRESP消息,完成一次心跳交互,繼而等待下一輪。若客戶端 沒有收到心跳反饋,會關閉掉TCP/IP端口鏈接,離線。 16位兩個字節,可看作一個無符號的short類型值。最大值,2^16-1 = 65535秒 = 18小時。最小值能夠爲0,表示客戶端不斷開。通常設爲幾分鐘,好比微信心跳週期爲300秒。
下面的代碼中我設置的是10秒:
1
2
3
4
5
6
7
8
9
10
11
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b',
clean:
false
}
client = mqtt.createClient(
1883, 'localhost',settings);
|
可使用MQTT.js編寫簡單的服務器代碼,觀察到服務器端接收到PING請求,併發回PING響應:
1
2
3
4
|
client.
on('pingreq', function(packet) {
client.pingresp();
console.
log('pingreq & resp');
});
|
完整代碼上面已經貼過,另見Gist
QoS
QoS在MQTT中有(摘自MQ 遙測傳輸 (MQTT) V3.1 協議規範):
- 「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
- 「至少一次」,確保消息到達,但消息重複可能會發生。
- 「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
MQTT.js只是支持了MQTT協議,並無支持QoS,也就是說,只支持最低級別的「至多一次」(QoS0)。
Mosca支持QoS0和1,但不支持2,見Add support QOS 2
接收離線消息
我在應用中的一個主要場景是,使用MQTT.js+Mosca作聊天服務器。
默認Mosca是不支持離線消息的,表現的現象是,若是是有人(client-a)先在主題上發佈了消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-a'
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.publish(
'testMessage', '發佈new測試信息0',{qos:1,retain: true});
client.publish(
'testMessage', '發佈new測試信息1',{qos:1,retain: true});
client.publish(
'testMessage', '發佈new測試信息2',{qos:1,retain: true});
client.publish(
'testMessage', '發佈new測試信息3',{qos:1,retain: true});
setTimeout(
function(){
client.end();
},
1000);
|
那麼另一我的(client-b),隨後訂閱,僅能看到最後一條消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b'
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.subscribe(
'testMessage',{qos:1},function(){
console.log('subscribe ok.');
});
client.on(
"message", function(topic, payload) {
console.log('message: '+payload);
});
|
運行結果相似這樣:
subscribe ok.
message: 發佈new測試信息3
離線消息,須要如下幾點:
- 客戶端訂閱設置QoS=1
- 客戶端鏈接屬性
clean: false
,做用是斷開鏈接重連的時候服務器端幫助恢復session,不須要再次訂閱
用代碼說明如下,先運行這段代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b',
clean:
false
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.subscribe(
'testMessage',{qos:1},function(){
console.log('subscribe ok.');
client.end();
});
|
而後執行剛纔發佈多條消息的代碼。再執行下面的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
var mqtt = require('mqtt');
var settings = {
keepalive:
10,
protocolId:
'MQIsdp',
protocolVersion:
3,
clientId:
'client-b',
clean:
false
}
client = mqtt.createClient(
1883, 'localhost',settings);
client.on(
"message", function(topic, payload) {
console.log('message: '+payload);
});
|
運行結果相似這樣:
message: 發佈new測試信息1 message: 發佈new測試信息3 message: 發佈new測試信息2 message: 發佈new測試信息0
收到消息的順序是亂的,爲何會這樣,其實很好理解,爲了小型受限設備以及網絡不穩定的狀況,消息是很差保證順序的。
解決辦法是發送的消息帶時間戳,接收後再作排序。
另外,擔憂客戶端沒有作client.end()
而非正常退出,那麼再次鏈接是否能恢復session,測試了一下,註釋client.end()
,沒有問題,正常收到多條離線消息。
SSL鏈接
Mosca支持SSL鏈接,可根據Nodejs TLS建立公鑰私鑰。
而後相似這樣啓動:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
var mosca = require('mosca')
var SECURE_KEY = __dirname + '/../../test/secure/tls-key.pem';
var SECURE_CERT = __dirname + '/../../test/secure/tls-cert.pem';
var settings = {
port:
8443,
logger: {
name:
"secureExample",
level:
40,
},
secure : {
keyPath: SECURE_KEY,
certPath: SECURE_CERT,
}
};
var server = new mosca.Server(settings);
server.on(
'ready', setup);
// fired when the mqtt server is ready
function setup() {
console.log('Mosca server is up and running')
}
|
這部分我沒有測試,直接轉自Mosca Encryption Support。
認證和受權
在Mosca Authentication提供了個簡易的命令行,可建立帳號用於認證並受權。
可是它不適合個人需求場景,我須要本身編寫認證和受權的邏輯。
雖然在做者官方網站上未找到,但在問題管理記錄中提交了這方面的支持:Authentication & Authorization。
有下面兩條支持,應該能夠寫出本身的回調,並集成到Mosca中:
- add a callback to authorize a publish.
- add a callback to authorize a subscribe.
不過這塊沒有寫代碼,只是大體能肯定。
性能問題
MQTT.js並非完整解決方案,不須要考慮它的性能問題。
說一下Mosca,有一個這方面問題做者的答覆,what about mosca’s performance,問問題的仍是個中國人,我前面還引用了他的文章。做者基本意思是:
It basically depends on the RAM. On an AWS large instance it can reach 10k concurrent connections, with roughly 10k messages/second.