常見的mq有Kafka,RocketMQ和RabbitMQ,你們也很常見。 前者很常見,MQTT是什麼呢?MQTT屬於IoT也就是物聯網的概念。html
快來和使用mqtt.js開發IM功能2年的做者一探究竟吧~前端
常見的mq有Kafka,RocketMQ和RabbitMQ,你們也很常見。MQTT是什麼呢? Kafka,RocketMQ和RabbitMQ屬於微服務間的mq,而MQTT則屬於IoT也就是物聯網的概念。vue
mqtt.js是MQTT在nodejs端的實現。vue技術棧下的前端也可用。 mqtt.js官方爲微信小程序和支付寶小程序也作了支持。微信小程序的MQTT協議名爲wxs
,支付寶小程序則是alis
。java
若是仍是一臉懵逼,那麼就跟隨我經過mqtt.js去認識一下這個物聯網領域的寵兒吧。node
消息隊列通常分爲兩種:python
目前我實踐過的,也就是咱們本篇博文深刻分析的,是物聯網消息隊列的mqtt.js。ios
傳統的微服務間(多個子系統服務端間)消息隊列是一種很是常見的服務端間消息傳遞的方式。git
典型表明有:RabbitMQ,Kafka,RocketMQ。 阿里雲官網擁有AMQP(兼容RabbitMQ),Kafka,和RocketMQ這三種微服務消息隊列,對於咱們從此在實際項目中落地提供了很大的幫助。github
使用場景多種多樣:web
MQTT是一個物聯網MQTT協議,主要解決的是物聯網IoT網絡狀況複雜的問題。
阿里雲有MQTT消息隊列服務。通訊協議支持MQTT,STOMP,GB-808等。數據傳輸層支持TCP長鏈接、SSL加密、Websocket等。
使用場景主要爲數據傳輸:
目前我手上負責的運行了2年的聊天系統就是使用的這個服務,咱們主要按照設備<->server<->PC
的方式,MQTT協議,Websocket傳輸協議進行設備與PC間的數據通訊。
每一個MQTT實例都對應一個全局惟一的服務接入點。 肉眼可見的區別就是在經過mqtt.connect(url)
與server(broker)創建鏈接時,broker的url都是一致的。 假設有saleman1,salesman2···他們本地的前端與服務端間創建鏈接的url都是統一的,只是在clientId進行區分便可。
MQTT的Client ID是每一個客戶端的惟一標識,要求全局都是惟一的,使用同一個Client ID鏈接會被拒絕。 阿里雲的ClientID由兩部分組成<GroupID>@@@<DeviceID>
。 一般狀況下Group ID是多前端統一的,好比PC端,安卓移動端,ios移動端,DeviceID也是多前端統一的。 那麼如何區分多端呢?能夠對Client ID中間的@@@作修改。 好比:
let CID_PC = `<GroupID>@@@-PC<DeviceID>`
let CID_Android = `<GroupID>@@@-Android<DeviceID>`
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`
複製代碼
用於指定一組邏輯功能徹底一致的節點公用的組名,表明的是一類相同功能的設備。
每一個設備獨一無二的標識。這個須要保證全局惟一,能夠是每一個傳感器設備的序列號,能夠是登陸PC的userId。
MQTT協議基於Pub/Sub模型,任何消息都屬於一個Topic。 Topic能夠存在多級,第一級爲父級Topic。 須要控制檯單首創建。
MQTT能夠有二級Topic,也能夠有三級Topic。 無需建立,代碼中直接寫便可。
Pub/Sub消息就是訂閱和發佈的模式,相似事件監聽和廣播。 若是對發佈訂閱不理解,能夠去看Webhook究竟是個啥? MQTT除了支持Pub/Sub的模式,還支持P2P的模式。
發送消息時
接收消息時
const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";
mqtt.client.publish(p2pTopic);
複製代碼
import mqtt from 'mqtt';
import config from '@/config';
export default class MQTT {
constructor(options) {
this.name = options.name;
this.connecting = false;
}
/** * 客戶端鏈接 */
initClient(config) {
const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
return new Promise((resolve) => {
this.client = mqtt.connect(
{
url,
clientId: `${groupId}@@@${deviceId}`,
username: key,
password,
}
);
this.client.on('connect', () => {
this.connecting = true;
resolve(this);
});
});
}
/** * 訂閱topic */
subscribeTopic(topic, config) {
if (this.connecting) {
this.client.subscribe(topic, config);
}
return this;
}
/** * 發送消息 */
publishMessage(message) {
this.client.publish(publishTopic, message, { qos: 1 });
}
/** * 接收消息 */
handleMessage(callback) {
if (!this.client._events.message) {
this.client.on('message', callback);
}
}
}
複製代碼
var mqtt = require('mqtt-packet')
var object = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: 10,
topic: 'test',
payload: 'test' // Can also be a Buffer
}
var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet
console.log(mqtt.generate(object))
// Prints:
//
// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>
//
// Which is the same as:
//
// new Buffer([
// 48, 10, // Header (publish)
// 0, 4, // Topic length
// 116, 101, 115, 116, // Topic (test)
// 116, 101, 115, 116 // Payload (test)
// ])
複製代碼
發出packetsend事件而且經過mqtt.writeToStream將packet寫入client的stream中。
var mqttPacket = require('mqtt-packet')
function sendPacket (client, packet) {
client.emit('packetsend', packet)
mqttPacket.writeToStream(packet, client.stream, client.options)
}
複製代碼
MqttClient.prototype._sendPacket = function (packet) {
sendPacket(this, packet);
}
複製代碼
mqtt client創建與mqtt server(broker)的鏈接,一般是經過給定一個'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'爲協議的url進行鏈接。
mqtt.connect([url], options)
複製代碼
官方說明:
再來看一下我手上項目的鏈接配置,鏈接結果。 敏感信息已經過foo,bar,baz或者xxxx的組合進行數據脫敏處理。
{
key: 'xxxxxxxx',
secret: 'xxxxxxxx',
url: 'wss://foo-bar.mqtt.baz.com/mqtt',
groupId: 'FOO_BAR_BAZ_GID',
topic: {
publish: 'PUBLISH_TOPIC',
subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'],
unsubscribe: 'PUBLISH_TOPIC/noticeMobile/',
},
}
複製代碼
包括總覽,響應頭和請求頭。
Request URL: wss://foo-bar.mqtt.baz.com
Request Method: GET
Status Code: 101 Switching Protocols
複製代碼
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: xxxxxxx
sec-websocket-protocol: mqtt
複製代碼
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1
Host: foo-bar.mqtt.baz.com
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36
Upgrade: websocket
Origin: https://xxx.xxx.com
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6
Sec-WebSocket-Key: xxxxxxxxx
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Protocol: mqtt
複製代碼
下面來看這段mqtt鏈接的代碼。
this.client = mqtt.connect(
{
url,
clientId: `${groupId}@@@${deviceId}`,
username: key,
password,
}
);
複製代碼
function parseAuthOptions (opts) {
var matches
if (opts.auth) {
matches = opts.auth.match(/^(.+):(.+)$/)
if (matches) {
opts.username = matches[1]
opts.password = matches[2]
} else {
opts.username = opts.auth
}
}
}
/** * connect - connect to an MQTT broker. * * @param {String} [brokerUrl] - url of the broker, optional * @param {Object} opts - see MqttClient#constructor */
function connect (brokerUrl, opts) {
if ((typeof brokerUrl === 'object') && !opts) {
// 能夠傳入一個單對象,既包含url又包含選項
opts = brokerUrl
brokerUrl = null
}
opts = opts || {}
// 設置username和password
parseAuthOptions(opts)
if (opts.query && typeof opts.query.clientId === 'string') {
// 設置Client Id
opts.clientId = opts.query.clientId
}
function wrapper (client) {
...
return protocols[opts.protocol](client, opts)
}
// 最終返回一個mqtt client實例
return new MqttClient(wrapper, opts)
}
複製代碼
const topic = {
subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'],
unsubscribe: 'PUBLISH_TOPIC/noticeMobile/',
};
const config = { qos:1 };
this.client.subscribe(topic.subscribe, config)
複製代碼
MqttClient.prototype.subscribe = function () {
var packet
var args = new Array(arguments.length)
for (var i = 0; i < arguments.length; i++) {
args[i] = arguments[i]
}
var subs = []
// obj爲訂閱的topic列表
var obj = args.shift()
// qos等配置
var opts = args.pop()
var defaultOpts = {
qos: 0
}
opts = xtend(defaultOpts, opts)
// 數組類型的訂閱的topic列表
if (Array.isArray(obj)) {
obj.forEach(function (topic) {
if (!that._resubscribeTopics.hasOwnProperty(topic) ||
that._resubscribeTopics[topic].qos < opts.qos ||
resubscribe) {
var currentOpts = {
topic: topic,
qos: opts.qos
}
// subs是最終的訂閱的topic列表
subs.push(currentOpts)
}
})
}
// 這個packet很重要
packet = {
// 發出訂閱命令
cmd: 'subscribe',
subscriptions: subs,
qos: 1,
retain: false,
dup: false,
messageId: this._nextId()
}
// 發出訂閱包
this._sendPacket(packet)
return this
}
複製代碼
const topic = {
publish: 'PUBLISH_TOPIC',
};
const messge = {
foo: '',
bar: '',
baz: '',
...
}
const msgStr = JSON.stringify(message);
this.client.publish(topic.publish, msgStr);
複製代碼
注意publish的消息須要使用JSON.stringify進行序列化,而後再發到指定的topic。
MqttClient.prototype.publish = function (topic, message, opts, callback) {
var packet
var options = this.options
var defaultOpts = {qos: 0, retain: false, dup: false}
opts = xtend(defaultOpts, opts)
// 將消息傳入packet的payload
packet = {
cmd: 'publish',
topic: topic,
payload: message,
qos: opts.qos,
retain: opts.retain,
messageId: this._nextId(),
dup: opts.dup
}
// 處理不一樣qos
switch (opts.qos) {
case 1:
case 2:
// 發出publish packet
this._sendPacketI(packet);
...
default:
this._sendPacket(packet);
...
}
return this
}
複製代碼
this.client.on('message', callback);
複製代碼
數據以callback的方式接收。
function (topic, message, packet) {}
複製代碼
topic表明接收到的topic,buffer則是具體的數據。 message是接收到的數據,謹記經過JSON.parse()對buffer作解析。
handleMessage(callback) {
this.client.on('message', callback);
}
this.client.handleMessage((topic, buffer) => {
let receiveMsg = null;
try {
receiveMsg = JSON.parse(buffer.toString());
} catch (e) {
receiveMsg = null;
}
if (!receiveMsg) {
return;
}
...do something with receiveMsg...
});
複製代碼
MqttClient繼承了EventEmitter。 從而進行可使用on監聽「message」事件。
inherits(MqttClient, EventEmitter)
複製代碼
那麼究竟是在哪裏間發出message事件的呢?>emit the message event
this.stream = this.streamBuilder(this)
function streamBuilder (client, opts) {
return createWebSocket(client, opts)
}
var websocket = require('websocket-stream')
function createWebSocket (client, opts) {
var websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'
setDefaultOpts(opts)
var url = buildUrl(opts, client)
return websocket(url, [websocketSubProtocol], opts.wsOptions)
}
複製代碼
var Writable = require('readable-stream').Writable
var writable = new Writable();
this.stream.pipe(writable);
複製代碼
writable._write = function (buf, enc, done) {
completeParse = done
parser.parse(buf)
work()
}
function work () {
var packet = packets.shift()
if (packet) {
that._handlePacket(packet, nextTickWork)
}
}
function nextTickWork () {
if (packets.length) {
process.nextTick(work)
} else {
var done = completeParse
completeParse = null
done()
}
}
複製代碼
MqttClient.prototype._handlePacket = function (packet, done) {
switch (packet.cmd) {
case 'publish':
this._handlePublish(packet, done)
break
...
}
// emit the message event
MqttClient.prototype._handlePublish = function (packet, done) {
switch (qos) {
case 1: {
// emit the message event
if (!code) { that.emit('message', topic, message, packet) }
}
}
複製代碼
參考資料:
期待和你們交流,共同進步,歡迎你們加入我建立的與前端開發密切相關的技術討論小組:
- SegmentFault技術圈:ES新規範語法糖
- SegmentFault專欄:趁你還年輕,作個優秀的前端工程師
- 知乎專欄:趁你還年輕,作個優秀的前端工程師
- Github博客: 趁你還年輕233的我的博客
- 前端開發QQ羣:660634678
- 微信公衆號: 生活在瀏覽器裏的咱們 / excellent_developers
努力成爲優秀前端工程師!