物聯網寵兒mqtt.js那些事兒

常見的mq有Kafka,RocketMQ和RabbitMQ,你們也很常見。 前者很常見,MQTT是什麼呢?MQTT屬於IoT也就是物聯網的概念。html

快來和使用mqtt.js開發IM功能2年的做者一探究竟吧~前端

常見的mq有Kafka,RocketMQ和RabbitMQ,你們也很常見。MQTT是什麼呢? Kafka,RocketMQ和RabbitMQ屬於微服務間的mq,而MQTT則屬於IoT也就是物聯網的概念。vue

mqtt.js是MQTT在nodejs端的實現。vue技術棧下的前端也可用。 mqtt.js官方爲微信小程序和支付寶小程序也作了支持。微信小程序的MQTT協議名爲wxs,支付寶小程序則是alisjava

若是仍是一臉懵逼,那麼就跟隨我經過mqtt.js去認識一下這個物聯網領域的寵兒吧。node

  • 什麼是微消息隊列?
  • MQTT關鍵名詞解釋
  • P2P消息和Pub/Sub消息
  • 封裝的mqtt.js通用class
  • 客戶端發包函數sendPacket
  • 客戶端鏈接 mqtt.connect()
  • 訂閱topic mqtt.Client#subscribe()
  • 發送消息 mqtt.Client#publish()
  • 接收消息 mqtt.Client#「message」事件

什麼是微消息隊列?

消息隊列通常分爲兩種:python

  • 微服務消息隊列(微服務間信息傳遞,典型表明有RabbitMQ,Kafka,RocketMQ)
  • 物聯網消息隊列(物聯網端與雲端消息傳遞,表明有MQTT)

目前我實踐過的,也就是咱們本篇博文深刻分析的,是物聯網消息隊列的mqtt.js。ios

傳統的消息隊列(微服務間信息傳遞)

傳統的微服務間(多個子系統服務端間)消息隊列是一種很是常見的服務端間消息傳遞的方式。git

典型表明有:RabbitMQ,Kafka,RocketMQ。 阿里雲官網擁有AMQP(兼容RabbitMQ),Kafka,和RocketMQ這三種微服務消息隊列,對於咱們從此在實際項目中落地提供了很大的幫助。github

使用場景多種多樣:web

  • 高併發:秒殺、搶票(FIFO)
  • 共享型:積分兌換(多子系統共用積分模塊)
  • 通訊型:服務端間消息傳遞(nodejs,java,python,go等等)

MQTT消息隊列(物聯網端與雲間消息傳遞)

MQTT是一個物聯網MQTT協議,主要解決的是物聯網IoT網絡狀況複雜的問題。

阿里雲有MQTT消息隊列服務。通訊協議支持MQTT,STOMP,GB-808等。數據傳輸層支持TCP長鏈接、SSL加密、Websocket等。

使用場景主要爲數據傳輸:

  1. 車聯網(遠程控制,汽車數據上傳)
  2. IM通信(1對1單聊,1對多朋友圈)
  3. 視頻直播(彈幕通知,聊天互動)
  4. 智能家居(電器數據上傳,遙控指令)

目前我手上負責的運行了2年的聊天系統就是使用的這個服務,咱們主要按照設備<->server<->PC的方式,MQTT協議,Websocket傳輸協議進行設備與PC間的數據通訊。

MQTT關鍵名詞解釋

實例(Instance)

每一個MQTT實例都對應一個全局惟一的服務接入點。 肉眼可見的區別就是在經過mqtt.connect(url)與server(broker)創建鏈接時,broker的url都是一致的。 假設有saleman1,salesman2···他們本地的前端與服務端間創建鏈接的url都是統一的,只是在clientId進行區分便可。

客戶端Id(Client ID)

MQTT的Client ID是每一個客戶端的惟一標識,要求全局都是惟一的,使用同一個Client ID鏈接會被拒絕。 阿里雲的ClientID由兩部分組成<GroupID>@@@<DeviceID>。 一般狀況下Group ID是多前端統一的,好比PC端,安卓移動端,ios移動端,DeviceID也是多前端統一的。 那麼如何區分多端呢?能夠對Client ID中間的@@@作修改。 好比:

let CID_PC = `<GroupID>@@@-PC<DeviceID>`
let CID_Android = `<GroupID>@@@-Android<DeviceID>`
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`
複製代碼

組Id(Group ID)

用於指定一組邏輯功能徹底一致的節點公用的組名,表明的是一類相同功能的設備。

Device ID

每一個設備獨一無二的標識。這個須要保證全局惟一,能夠是每一個傳感器設備的序列號,能夠是登陸PC的userId。

父主題(Parent Topic)

MQTT協議基於Pub/Sub模型,任何消息都屬於一個Topic。 Topic能夠存在多級,第一級爲父級Topic。 須要控制檯單首創建。

子主題(Subtopic)

MQTT能夠有二級Topic,也能夠有三級Topic。 無需建立,代碼中直接寫便可。

P2P消息和Pub/Sub消息

Pub/Sub消息就是訂閱和發佈的模式,相似事件監聽和廣播。 若是對發佈訂閱不理解,能夠去看Webhook究竟是個啥? MQTT除了支持Pub/Sub的模式,還支持P2P的模式。

什麼是P2P消息?

  • P2P,全稱爲(Point to Point)。
  • 一對一的消息收發模式,只有一個消息發送者和一個消息接收者。
  • P2P模式下,消息發送者明確知道消息的預期接收者,而且這個消息只能被這個特定的客戶端消費
  • 發送者發送消息時,經過Topic指定接收者,接收者無需訂閱便可得到該消息。
  • P2P 模式不只下降註冊訂閱的成本,並且由於對鏈路有優化,因此下降推送延遲。

P2P模式和Pub/Sub模式的區別

發送消息時

  • Pub/Sub模式下,發送者須要按照與接受者約定好的Topic發送消息
  • P2P模式下,發送者無需按照Tpic發送,能夠直接按照規範進行發送

接收消息時

  • Pub/Sub模式下,接收者須要提早訂閱topic才能接消息
  • P2P模式下無需訂閱便可接收消息

nodejs發送P2P消息

const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";
mqtt.client.publish(p2pTopic);
複製代碼

封裝的mqtt.js通用class

  • 客戶端鏈接 initClient(config)
  • 訂閱topic subscribeTopic(topic, config)
  • 發送消息 publishMessage(message)
  • 接收消息 handleMessage(callback)
import mqtt from 'mqtt';
import config from '@/config';

export default class MQTT {
  constructor(options) {
    this.name = options.name;
    this.connecting = false;
  }
  /** * 客戶端鏈接 */
  initClient(config) {
    const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
    return new Promise((resolve) => {
      this.client = mqtt.connect(
        {
          url,
          clientId: `${groupId}@@@${deviceId}`,
          username: key,
          password,
        }
      );
      this.client.on('connect', () => {
        this.connecting = true;
        resolve(this);
      });
    });
  }

  /** * 訂閱topic */
  subscribeTopic(topic, config) {
    if (this.connecting) {
      this.client.subscribe(topic, config);
    }
    return this;
  }

  /** * 發送消息 */
  publishMessage(message) {
    this.client.publish(publishTopic, message, { qos: 1 });
  }

  /** * 接收消息 */
  handleMessage(callback) {
    if (!this.client._events.message) {
      this.client.on('message', callback);
    }
  }

}
複製代碼

客戶端發包函數sendPacket

mqtt-packet生成一個可傳輸buffer

var mqtt = require('mqtt-packet')
var object = {
  cmd: 'publish',
  retain: false,
  qos: 0,
  dup: false,
  length: 10,
  topic: 'test',
  payload: 'test' // Can also be a Buffer
}
var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet

console.log(mqtt.generate(object))
// Prints:
//
// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>
//
// Which is the same as:
//
// new Buffer([
// 48, 10, // Header (publish)
// 0, 4, // Topic length
// 116, 101, 115, 116, // Topic (test)
// 116, 101, 115, 116 // Payload (test)
// ])
複製代碼

sendPacket函數

發出packetsend事件而且經過mqtt.writeToStream將packet寫入client的stream中。

var mqttPacket = require('mqtt-packet')

function sendPacket (client, packet) {
  client.emit('packetsend', packet)
  mqttPacket.writeToStream(packet, client.stream, client.options)
}
複製代碼

_sendPack方法

MqttClient.prototype._sendPacket = function (packet) {
     sendPacket(this, packet);
}
複製代碼

客戶端鏈接 mqtt.connect()

mqtt client創建與mqtt server(broker)的鏈接,一般是經過給定一個'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'爲協議的url進行鏈接。

mqtt.connect([url], options)
複製代碼

官方說明:

  • 經過給定的url和配置鏈接到一個broker,而且返回一個Client。
  • url能夠遵循如下協議:'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'。(mqtt.js支持微信小程序和支付寶小程序,協議分別爲wxs和alis。
  • url也能夠是經過URL.parse()返回的對象。
  • 能夠傳入一個單對象,既包含url又包含選項。

再來看一下我手上項目的鏈接配置,鏈接結果。 敏感信息已經過foo,bar,baz或者xxxx的組合進行數據脫敏處理。

鏈接配置

{
    key: 'xxxxxxxx',
    secret: 'xxxxxxxx',
    url: 'wss://foo-bar.mqtt.baz.com/mqtt',
    groupId: 'FOO_BAR_BAZ_GID',
    topic: {
      publish: 'PUBLISH_TOPIC',
      subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'],
      unsubscribe: 'PUBLISH_TOPIC/noticeMobile/',
    },
}
複製代碼
  • key 帳號
  • secret 密碼
  • url 用於創建client與server(broker)mqtt鏈接的連接
  • groupId 組id
  • topic 發送消息的topic,訂閱的topic,取消訂閱的topic

鏈接結果

包括總覽,響應頭和請求頭。

General
Request URL: wss://foo-bar.mqtt.baz.com
Request Method: GET
Status Code: 101 Switching Protocols
複製代碼
Response Header
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: xxxxxxx
sec-websocket-protocol: mqtt
複製代碼
Request Header
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1
Host: foo-bar.mqtt.baz.com
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36
Upgrade: websocket
Origin: https://xxx.xxx.com
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6
Sec-WebSocket-Key: xxxxxxxxx
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Protocol: mqtt
複製代碼
源碼分析

下面來看這段mqtt鏈接的代碼。

this.client = mqtt.connect(
  {
    url,
    clientId: `${groupId}@@@${deviceId}`,
    username: key,
    password,
  }
);
複製代碼
function parseAuthOptions (opts) {
  var matches
  if (opts.auth) {
    matches = opts.auth.match(/^(.+):(.+)$/)
    if (matches) {
      opts.username = matches[1]
      opts.password = matches[2]
    } else {
      opts.username = opts.auth
    }
  }
}
/** * connect - connect to an MQTT broker. * * @param {String} [brokerUrl] - url of the broker, optional * @param {Object} opts - see MqttClient#constructor */
function connect (brokerUrl, opts) {
  if ((typeof brokerUrl === 'object') && !opts) {
    // 能夠傳入一個單對象,既包含url又包含選項
    opts = brokerUrl
    brokerUrl = null
  }
  opts = opts || {}
  // 設置username和password
  parseAuthOptions(opts)
  if (opts.query && typeof opts.query.clientId === 'string') {
    // 設置Client Id
    opts.clientId = opts.query.clientId
  }
  function wrapper (client) {
   ...
    return protocols[opts.protocol](client, opts)
  }
  // 最終返回一個mqtt client實例
  return new MqttClient(wrapper, opts)
}
複製代碼

訂閱topic mqtt.Client#subscribe()

實際代碼

const topic =  {
      subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'],
      unsubscribe: 'PUBLISH_TOPIC/noticeMobile/',
};
const config = { qos:1 };
this.client.subscribe(topic.subscribe, config)
複製代碼

源碼分析

MqttClient.prototype.subscribe = function () {
  var packet
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var subs = []
   // obj爲訂閱的topic列表
  var obj = args.shift()
  // qos等配置
  var opts = args.pop()
  var defaultOpts = {
    qos: 0
  }
  opts = xtend(defaultOpts, opts)
  // 數組類型的訂閱的topic列表 
  if (Array.isArray(obj)) {
    obj.forEach(function (topic) {
      if (!that._resubscribeTopics.hasOwnProperty(topic) ||
        that._resubscribeTopics[topic].qos < opts.qos ||
          resubscribe) {
        var currentOpts = {
          topic: topic,
          qos: opts.qos
        }
        // subs是最終的訂閱的topic列表
        subs.push(currentOpts)
      }
    })
  }
  // 這個packet很重要
  packet = {
    // 發出訂閱命令
    cmd: 'subscribe',
    subscriptions: subs,
    qos: 1,
    retain: false,
    dup: false,
    messageId: this._nextId()
  }
  // 發出訂閱包
  this._sendPacket(packet)
  return this
}
複製代碼

發送消息 mqtt.Client#publish()

實際代碼

const topic = {
      publish: 'PUBLISH_TOPIC',
};
const messge = {
   foo: '',
   bar: '',
   baz: '',
   ...
}
const msgStr = JSON.stringify(message);
this.client.publish(topic.publish, msgStr);
複製代碼

注意publish的消息須要使用JSON.stringify進行序列化,而後再發到指定的topic。

源碼分析

MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet
  var options = this.options
  var defaultOpts = {qos: 0, retain: false, dup: false}
  opts = xtend(defaultOpts, opts)

  // 將消息傳入packet的payload
  packet = {
    cmd: 'publish',
    topic: topic,
    payload: message,
    qos: opts.qos,
    retain: opts.retain,
    messageId: this._nextId(),
    dup: opts.dup
  }
  // 處理不一樣qos
  switch (opts.qos) {
    case 1:
    case 2:
       // 發出publish packet
       this._sendPacketI(packet);
        ...
    default:
       this._sendPacket(packet);
        ...
  }
  return this
}
複製代碼

接收消息 mqtt.Client 「message」事件

實際代碼

this.client.on('message', callback);
複製代碼

數據以callback的方式接收。

function (topic, message, packet) {}
複製代碼

topic表明接收到的topic,buffer則是具體的數據。 message是接收到的數據,謹記經過JSON.parse()對buffer作解析。

handleMessage(callback) {
    this.client.on('message', callback);
}
this.client.handleMessage((topic, buffer) => {
  let receiveMsg = null;
  try {
   receiveMsg = JSON.parse(buffer.toString());
  } catch (e) {
   receiveMsg = null;
  }
  if (!receiveMsg) {
    return;
  }
  ...do something with receiveMsg...
});
複製代碼

源碼分析

MqttClient繼承了EventEmitter。 從而進行可使用on監聽「message」事件。

inherits(MqttClient, EventEmitter)
複製代碼

那麼究竟是在哪裏間發出message事件的呢?>emit the message event

  1. 基於websocket-stream創建websocket鏈接
  2. 使用pipe鏈接基於readable-stream.Writable建立的可寫流
  3. nextTick調用_handlePacket
  4. 在handlePacket中調用handlePublish發出message事件
1.基於websocket-stream創建websocket鏈接
this.stream = this.streamBuilder(this)
function streamBuilder (client, opts) {
  return createWebSocket(client, opts)
}
var websocket = require('websocket-stream')
function createWebSocket (client, opts) {
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'

  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  return websocket(url, [websocketSubProtocol], opts.wsOptions)
}
複製代碼
2. 使用pipe鏈接基於readable-stream.Writable建立的可寫流
var Writable = require('readable-stream').Writable
var writable = new Writable();
this.stream.pipe(writable);
複製代碼
3.nextTick調用_handlePacket
writable._write = function (buf, enc, done) {
    completeParse = done
    parser.parse(buf)
    work()
}
function work () {
    var packet = packets.shift()
    if (packet) {
      that._handlePacket(packet, nextTickWork)
    }
}
function nextTickWork () {
    if (packets.length) {
      process.nextTick(work)
    } else {
      var done = completeParse
      completeParse = null
      done()
    }
}
複製代碼
4. 在handlePacket中調用handlePublish發出message事件
MqttClient.prototype._handlePacket = function (packet, done) {
  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done)
      break
   ...
}
// emit the message event
MqttClient.prototype._handlePublish = function (packet, done) {
  switch (qos) {
    case 1: {
      // emit the message event
        if (!code) { that.emit('message', topic, message, packet) }
    }
}
複製代碼

參考資料:

期待和你們交流,共同進步,歡迎你們加入我建立的與前端開發密切相關的技術討論小組:

努力成爲優秀前端工程師!

相關文章
相關標籤/搜索