基於MQTT的物聯網雲測量解決方案

1. 問題描述

最近,本實驗室大量上馬雲測量,雲監控方面的項目,大概是屬於物聯網應用的一個分支。老闆也有將舊有儀器改造的想法,因此要實現儀器設備的雲控制。本文是其中的一個解決方案。javascript

2. 技術選型

  1. 消息隊列:MQTT,服務器使用centos,安裝mosquitto
  2. 客戶端使用C#,窗體框架使用WPF,MQTT的客戶端使用MQTTNet
  3. 服務端採用spring-cloud微服務框架
  4. 前端採用Vue,使用Element-admin-ui後臺框架,使用MQTTJS組件(MQTTJS採用websocket鏈接方式)
  5. 客戶端的測量採集數據程序使用TPL Dataflow

3. 架構設計

這個解決方案加入如圖(請原諒個人懶惰):html

以上只是本demo的架構,很簡單,可是也有很大的問題。前端

3.1 客戶端:

客戶端跟隨儀器,原則上一臺儀器一個控制程序,固然也能夠有多個。客戶端實現了對儀器全部硬件設備的控制,對全部數據的採集。客戶端能夠有界面,也能夠沒有界面,通常來講,咱們是須要一個界面的,客戶端能夠獨立完成測量任務。java

客戶端集成了網絡通訊功能,能夠徹底替代用戶對客戶端的操做使用。具體架構以下(請原諒個人懶惰):
node

其中,客戶端界面和網絡接口是等效的,能夠單獨控制,也能夠共同控制。git

3.2 服務端:

服務端基於spring-cloud微服務框架,主要提供服務發現,用戶管理,權限管理,設備管理,MQTT節點管理等管理功能github

3.3 前端網頁:

前端網頁是用戶經過網絡操做儀器設備的交互接口。採用日前流行的Vue框架,因爲是後臺管理模式,就使用Element-admin-ui這個框架。web

4. Demo地址

客戶端: https://github.com/spartajet/IotWpfClient
服務端:https://github.com/spartajet/iot-demo-server
前端網頁:https://github.com/spartajet/iot-demo-web算法

5. MQTT介紹

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通信協議,有可能成爲物聯網的重要組成部分。該協議支持全部平臺,幾乎能夠把全部聯網物品和外部鏈接起來,被用來當作傳感器和制動器(好比經過Twitter讓房屋聯網)的通訊協議。spring

MQTT協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:
一、使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合;
二、對負載內容屏蔽的消息傳輸;
三、使用 TCP/IP 提供網絡鏈接;
四、有三種消息發佈服務質量:
「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
「至少一次」,確保消息到達,但消息重複可能會發生。
「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
五、小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以下降網絡流量;
六、使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制;

詳細的MQTT協議內容請參考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

6. MQTT安裝

本文使用開源的MQTT 服務器mosquitto,介紹以下:

Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 3.1 and 3.1.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.

The MQTT protocol provides a lightweight method of carrying out messaging using a publish/subscribe model. This makes it suitable for Internet of Things messaging such as with low power sensors or mobile devices such as phones, embedded computers or microcontrollers.

6.1 centos 安裝 mosquitto

CentOS 7沒有mosquitto包。要安裝它,咱們將首先安裝一個名爲Extra Packages for Enterprise Linux或EPEL的額外軟件存儲庫。

sudo yum -y install epel-release

而後安裝

sudo yum -y install mosquitto

6.2 MAC 安裝 mosquitto

直接brew開路:

brew install mosquitto

6.3 mosquitto配置

6.3.1 配置文件

配置文件mosquitto.conf路徑:

mac :/usr/local/etc/mosquitto/mosquitto.conf

centos:/etc/mosquitto/mosquitto.conf

6.3.2 配置MQTT端口:

bind_address 127.0.0.1
port 1883
protocol mqtt

bind_address能夠不配置,

6.3.3 開啓websocket支持:

listener 1884
protocol websockets

6.3.4 配置密碼

首先,要禁止匿名登陸

allow_anonymous false

配置密碼文件:

password_file usr/local/etc/mosquitto/pwfile

配置密碼的方法:

mosquitto提供了mosquitto_passwd工具設置密碼,使用方法以下:

➜  ~ mosquitto_passwd help
mosquitto_passwd is a tool for managing password files for mosquitto.

Usage: mosquitto_passwd [-c | -D] passwordfile username
       mosquitto_passwd -b passwordfile username password
       mosquitto_passwd -U passwordfile
 -b : run in batch mode to allow passing passwords on the command line.
 -c : create a new password file. This will overwrite existing files.
 -D : delete the username rather than adding/updating its password.
 -U : update a plain text password file to use hashed passwords.

See http://mosquitto.org/ for more information.

個人作法是:先在usr/local/etc/mosquitto/pwfile中把用的密碼寫下來,以下:

user1:passwd1
user2:passwd2
user3:passwd3
user4:passwd4
user5:passwd5
user6:passwd6

而後使用mosquitto_passwd -U usr/local/etc/mosquitto/pwfile指令生成密碼

更多配置請參考 https://mosquitto.org

6.3.5 完整配置

# Config file for mosquitto 
# 
# See mosquitto.conf(5) for more information. 
# 
# Default values are shown, uncomment to change. 
# 
# Use the 
# character to indicate a comment, but only if it is the 
# very first character on the line. 
# ================================================================= 
# General configuration 
# ================================================================= 
# 從新發送已經發出去的Qos 爲1或者2的消息的等待時間 
retry_interval 20 
# 系統狀態的刷新時間,設置爲0表示不刷新 
sys_interval 10 
#清除在內部消息存儲裏面的未引用的消息的時間。 
#較低的值將佔用較少的內存,但處理器時間較長, 
#越高的值將產生相反的效果。 
#設置值爲0意味着未引用的消息將以儘量快的速度處理。 
store_clean_interval 10
#pid_file 
# 以什麼用戶啓動 mosquitto,此配置在 windows 下無效,以非 root 運行無效 
#user mosquitto 
#當前每一個客戶端正在傳輸的Qo1和2消息的最大數量。 
#這包括經過握手信息,以及那些正在重試的信息。 
#默認爲20。設置爲0表示無上限。 
#設置爲1將保證QoS 1 和2的消息按順序傳遞 max_inflight_messages 20 
#當前正在進行的隊列中Qos 1和2條消息的最大數量。默認爲100。 
#設置到0表示沒有上限(不推薦)。一樣可參見queue_qos0_messages max_queued_messages 100 
#設置爲true,當一個持久客戶端被斷開鏈接時,以Qos爲0將消息放到隊列中。 
#這些消息受max_queued_messages限制 queue_qos0_messages false 
#此選項設置被代理容許發佈的消息的大小。 
#超過這個尺寸的消息將不會被代理接受。 
#默認值爲0,這意味着全部有效的MQTT消息都被接受。 
#MQTT的最大有效大小爲268435455字節 
message_size_limit 0 
# 用於設置客戶端長鏈接的過時時間,默認永不過時,必須以h d w m y爲單位 
#分別表明 小時,天,星期,月,念 
#persistent_client_expiration 
# 若是客戶端訂閱了多個重疊的訂閱,例如foo/
#和foo/+/baz,而後MQTT指望當代理接 
#收到一個與兩個訂閱相匹配的主題的消息時,例如foo/bar/baz,那麼客戶端應該只接 
#收一次消息。爲了知足這一要求,mosquitto不斷跟蹤發送給客戶的消息。容許重複的 
#消息選項容許禁用此行爲,若是您有大量的客戶端訂閱相同的主題集合,而且很是關注 
#最小化內存使用的狀況,那麼這個選項多是有用的。若是你事先知道你的客戶端永不 
#會有重疊的訂閱,那麼你的客戶必須可以正確處理重複的信息,即便在Qo = 2的時候, 
#你的客戶端也必須可以正確地處理重複的信息 
#allow_duplicate_messages false 
# ================================================================= 
# Default listener 
# ================================================================= 
# 服務綁定的IP地址 
#bind_address 
# 服務綁定的端口號 
#port 1883 
# 容許的最大鏈接數,-1表示沒有限制 
#max_connections -1 
# cafile:CA證書文件 
# capath:CA證書目錄 
# certfile:PEM證書文件 
# keyfile:PEM密鑰文件 
#cafile 
#capath 
#certfile 
#keyfile 
# 必須提供證書以保證數據安全性 
#require_certificate false 
# 若require_certificate值爲true,use_identity_as_username也必須爲true #use_identity_as_username false 
# 啓用PSK(Pre-shared-key)支持 
#psk_hint 
# SSL/TSL加密算法,可使用「openssl ciphers」命令獲取 
# as the output of that command. 
#ciphers 
# ================================================================= 
# Persistence 
# ================================================================= 
# 消息自動保存的間隔時間 
#autosave_interval 1800 
# 消息自動保存功能的開關 
#autosave_on_changes false 
# 持久化功能的開關 persistence true 
# 持久化DB文件
#persistence_file mosquitto.db 
# 持久化DB文件目錄 
#persistence_location /var/lib/mosquitto/ 
# ================================================================= 
# Logging 
# ================================================================= 
# 4種日誌模式:stdout、stderr、syslog、topic 
# none 則表示不記日誌,此配置能夠提高些許性能 log_dest none 
# 選擇日誌的級別(可設置多項) 
#log_type error 
#log_type warning 
#log_type notice 
#log_type information 
# 是否記錄客戶端鏈接信息 
#connection_messages true 
# 是否記錄日誌時間 
#log_timestamp true 
# ================================================================= 
# Security 
# ================================================================= 
# 客戶端ID的前綴限制,可用於保證安全性 
#clientid_prefixes 
# 容許匿名用戶 
#allow_anonymous true 
# 用戶/密碼文件,默認格式:username:password 
#password_file 
# PSK格式密碼文件,默認格式:identity:key 
#psk_file 
# pattern write sensor/%u/data 
# ACL權限配置,經常使用語法以下: 
# 用戶限制:user <username> 
# 話題限制:topic [read|write] <topic> 
# 正則限制:pattern write sensor/%u/data 
#acl_file 
# ================================================================= 
# Bridges 
# ================================================================= 
# 容許服務之間使用「橋接」模式(可用於分佈式部署) 
#connection <name> 
#address <host>[:<port>] 
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] 
# 設置橋接的客戶端ID 
#clientid 
# 橋接斷開時,是否清除遠程服務器中的消息 
#cleansession false 
# 是否發佈橋接的狀態信息 
#notifications true 
# 設置橋接模式下,消息將會發布到的話題地址 
# $SYS/broker/connection/<clientid>/state #notification_topic 
# 設置橋接的keepalive數值 
#keepalive_interval 60 
# 橋接模式,目前有三種:automatic、lazy、once 
#start_type automatic 
# 橋接模式automatic的超時時間 
#restart_timeout 30 
# 橋接模式lazy的超時時間 
#idle_timeout 60 
# 橋接客戶端的用戶名 
#username 
# 橋接客戶端的密碼 
#password 
# bridge_cafile:橋接客戶端的CA證書文件 
# bridge_capath:橋接客戶端的CA證書目錄 
# bridge_certfile:橋接客戶端的PEM證書文件 
# bridge_keyfile:橋接客戶端的PEM密鑰文件 
#bridge_cafile 
#bridge_capath 
#bridge_certfile 
#bridge_keyfile
參考文獻: https://www.imooc.com/article/19459

6.4 開啓MQTT

6.4.1 MAC

brew services start/stop mosquitto

6.4.2 CentOs

systemctl start/stop/restart mosquitto

6.5 MQTT的Topic

與消息隊列相比,主題很是輕量級。 客戶端不須要在發佈或訂閱以前建立所需的主題,由於代理接受每一個有效主題時不須要進行任何預初始化。

主題使用/來分層次,如下是幾個主題的示例:

sensors/COMPUTER_NAME/temperature/HARDDRIVE_NAME

更重要的是,MQTT提供了通配符

6.5.1 單層通配符 +

+號僅僅匹配一個主題層次。例如,finance/stock/+匹配finance/stock/ibm與finance/stock/xyz,可是不匹配finance/stock/ibm/closingprice。由於單層次通配符僅僅匹配一個層次,finance/+不匹配finance。
單層次通配符可用於主題樹內任何層次,並與多層次通配符一塊兒使用。必須用於在頂層分隔符以後,除了當本身指定時。所以,+和finance/+ 都是有效的,可是finance+無效。單層通配符可用於主題樹最後或者在主題樹內,例如,finance/+與finance/+/ibm都是有效的。

示例以下:

* a/b/c/d
* +/b/c/d
* a/+/c/d
* a/+/+/d
* +/+/+/+

6.5.2 多層通配符

#號能夠匹配主題內任何層次

單層次通配符可用於主題樹內任何層次,並與多層次通配符一塊兒使用。必須用於在頂層分隔符以後,除了當本身指定時。所以,+和finance/+ 都是有效的,可是finance+無效。單層通配符可用於主題樹最後或者在主題樹內,例如,finance/+與finance/+/ibm都是有效的。

示例以下:

* a/b/c/d
* #
* a/#
* a/b/#
* a/b/c/#
* +/b/c/#

7. 客戶端程序

目前,我手上尚未一個趁手的採樣設備,因此只能模擬生成數據,可是,預計下週,我就能夠解決這個問題了

關於如何生成數據以及使用TPL Dataflow數據採集和處理,請參考個人另外一篇博客:

像Labview同樣,使用C#構建測量數據流式處理框架

這裏重點介紹MQTTnet的使用

7.1 MQTTnet介紹

MQTTnet是一個高性能的MQTT基礎鏈接.NET庫。提供了MQTT服務端和客戶端支持。

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.

特性以下:

  • Async support
  • TLS 1.2 support for client and server (but not UWP servers)
  • Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS)
  • Lightweight (only the low level implementation of MQTT, no overhead)
  • Performance optimized (processing ~70.000 messages / second)*
  • Interfaces included for mocking and testing
  • Access to internal trace messages
  • Unit tested (~90 tests)

目前支持的版本以下:

  • .NET Standard 1.3+
  • .NET Core 1.1+
  • .NET Core App 1.1+
  • .NET Framework 4.5.2+ (x86, x64, AnyCPU)
  • Mono 5.2+
  • Universal Windows Platform (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU, Windows 10 IoT Core)
  • Xamarin.Android 7.5+
  • Xamarin.iOS 10.14+

重點是:支持異步

7.2 MQTTnet客戶端使用

鏈接MQTT

/// <summary>
/// 初始化初始化MQTT
/// </summary>
private void InitialMqtt()
{
   this._mqttClient = new MqttFactory().CreateMqttClient();
   this._mqttClient.ConnectAsync(new MqttClientOptionsBuilder()
       .WithClientId(Guid.NewGuid().ToString("N"))
       .WithTcpServer("*****",1883)
       .WithCredentials("admin", "admin")
       .WithCleanSession()
       .Build());
}

能夠看到,咱們使用的ConnectAsync()方法,是異步鏈接。

發佈消息

首先要構建消息

var message = new MqttApplicationMessageBuilder()
                    .WithTopic("measure/force")
                    .WithPayload(t.ToString(CultureInfo.InvariantCulture))
                    .WithExactlyOnceQoS()
                    .WithRetainFlag()
                    .Build();

而後異步發送

this._mqttClient.PublishAsync(message);

詳細代碼請參考

客戶端: https://github.com/spartajet/IotWpfClient

8. 服務端程序

服務端主要是提供用戶管理,參考源代碼便可,涉及到CORS跨域問題,請參考個人另外一篇博客:

Spring boot 和Vue開發中CORS跨域問題

9. 前端MQTTJS使用

9.1 mqttjs 介紹

mqttjs是支持MQTT協議的客戶端javascript庫,注意只是客戶端,而且通訊方式是websockt,因此要在mosquitto服務器開啓websocket支持。

MQTT.js is a client library for the MQTT protocol, written in JavaScript for node.js and the browser.

9.2 mqttjs安裝

npm install mqtt

9.3 mqttjs的API

mqtt.connect()
mqtt.Client()
mqtt.Client#publish()
mqtt.Client#subscribe()
mqtt.Client#unsubscribe()
mqtt.Client#end()
mqtt.Client#removeOutgoingMessage()
mqtt.Client#reconnect()
mqtt.Client#handleMessage()
mqtt.Client#connected
mqtt.Client#reconnecting
mqtt.Client#getLastMessageId()
mqtt.Store()
mqtt.Store#put()
mqtt.Store#del()
mqtt.Store#createStream()
mqtt.Store#close()

9.4 mqttjs的使用

鏈接服務器:

const client = mqtt.connect('ws://ip:1884', {
          clientid: 'fdafdafas',
          username: 'admin',
          password: 'admin'
        })

設置鏈接後的事件,要訂閱相關主題的消息

client.on('connect', function() {
          client.subscribe('measure/force', function(err) {
            if (!err) {
               client.publish('measure/force', 'Hello mqtt')
            }
          })
        })

消息推送通知事件

client.on('message', function (topic, message) {
  // message is Buffer
  console.log(message.toString())
  client.end()
})

其餘內容請參考項目源碼:

前端網頁:https://github.com/spartajet/iot-demo-web

10 總結&展望

  1. 目前的demo只是完成了mqtt的使用基礎範例,沒有其餘功能
  2. 對於設備管理,用戶權限等功能,打算用hsweb大神的物聯網框架hsweb-iot-cloud,也不排除本身開發的可能,看個人時間和項目需求
  3. mqtt的壓力測試尚未測試,可是從目前的狀況來看(個人MQTT服務器用的華爲雲),我客戶端每秒生成100個數據,網頁端顯示基本沒什麼延時,但並不表明實時性很好
相關文章
相關標籤/搜索