IoT Simulator(publisher)----> MQTT broker---->Telegraf(subscriber)---->InfluxDB---->Hosted Grafana(Cloud)
數據來源在IoT Case下通常來自各個傳感設備。 由於身邊沒有可用的傳感器設備,因而在github上搜了個小工具來模擬數據發射器。該工具可輸出自定義的json格式數據,而且支持MQTT,HTTP(s),Azure IoT hub, Kafka等主流協議/工具,應用範圍和場景普遍是我選擇該工具的主要緣由。
惟一的缺點是輸出的json默認爲object, 不支持對array of object json的擴展,在API config的時候可能會遇到一些工具只能識別array of json object 的狀況(好比power bi的rest api)。
若是你的case中有使用真實的iot設備和通信協議可自動忽略此part。
安裝和配置相關請參照readme
在mySimConfigjson中對MQTT進行配置,語法參見[配置中使用的MQTT]html
{ "type": "mqtt", "broker.server": "tcp://localhost", "broker.port": 1883, "topic": "sensors/iot_simulator", "clientId": "iot_simulator", "qos": 2 }
使用如下命令開始模擬數據 java -jar json-data-generator-1.3.1-SNAPSHOT.jar mySimConfig.json
前端
MQTT爲MQ Telemetry Transport的縮寫,該協議定義了在機器對機器或物聯網環境下的通訊規則。它採用發佈/訂閱的模式傳輸數據,設計思想是在盡力保證必定程度的數據可達性及穩定性的同時,減小對網絡帶寬和設備資源的依賴。MQTT協議簡潔且輕量,適用於低帶寬,高延遲或不穩定的網絡環境中的設備,同時也適用於帶寬和電源受限的移動應用.
ref:http://mqtt.org/faqjava
在使用Power BI做爲可視化方案時,曾成功使用REST API直接將數據源經過HTTP推送到POWER BI所提供的endpoint。所以在架構這套服務之初,我並無打算使用MQTT做爲數據傳輸協議。但通過一番研究後,在個人案例當中,發現使用HTTP協議有如下侷限性:node
ref:https://docs.influxdata.com/t...git
MQTT client1(sub)--->MQTT broker<----MQTT Client2(pub) (message center) ^ | MQTT client2(pub)
two wildcards: # and +
"+" for a single level of hierarchy,+/+/+/HARDDRIVE_NAME表示了包含上述例子的一個父集
"#" for all remaining levels of hierarchy,sensors/COMPUTER_NAME/temperature/# 表示了包含上述例子的一個父集程序員
0: The broker/client will deliver the message once, with no confirmation.
1: The broker/client will deliver the message at least once, with confirmation required.
2: The broker/client will deliver the message exactly once by using a four-step handshake. github
消息訂閱者容許在消息發佈者制定的QoS級別上進行降級;例如mqtt_pub規定qos=2, 則mqtt_sub可使qos=2 or 1 or 0;
ref: https://mosquitto.org/man/mqt...數據庫
step 1: 安裝並解壓telegraf (若是沒有wget請自行下載(恕在下直言,windows簡直就是個辣雞))
step 2: 修改配置文件telegraf.conf(主要配置input,output&processor plugins)
配置主要參見:[InfluxDB HTTP API和Hosted Grafana HTTPS 通信的衝突問題]及[配置中使用的MQTT]
processor plugin的功能主要是打印從mqtt broker訂閱的數據並顯示在console中json
[[outputs.influxdb]] urls = ["https://localhost:8086"] # required # The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required precision = "s" ## Name of existing retention policy to write to. Empty string writes to ## the default retention policy. retention_policy = "" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all" write_consistency = "any" ## Write timeout (for the InfluxDB client), formatted as a string. ## If not provided, will default to 5s. 0s means no timeout (not recommended). timeout = "5s" [[inputs.mqtt_consumer]] ## MQTT broker URLs to be used. The format should be scheme://host:port, ## schema can be tcp, ssl, or ws. servers = ["tcp://localhost:1883"] ## MQTT QoS, must be 0, 1, or 2 qos = 2 ## Connection timeout for initial connection in seconds connection_timeout = "30s" ## Topics to subscribe to topics = [ "sensors/iot_simulator" ] # if true, messages that can't be delivered while the subscriber is offline # will be delivered when it comes back (such as on service restart). # NOTE: if true, client_id MUST be set persistent_session = false # If empty, a random client ID will be generated. client_id = "" ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification insecure_skip_verify = true ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json" ## List of tag names to extract from top-level of JSON server response tag_keys = [ "equippment name", "timestamp" ] [[inputs.exec]] ## Commands array commands = [] #"/tmp/test.sh", "/usr/bin/mycollector --foo=bar" ## measurement name suffix (for separating different commands) name_suffix = "mqtt_consumer" ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json" [[processors.printer]]
step 3: 運行telegraf,運行前先開啓數據模擬發射器和MQTT broker確保influxdb能訂閱到穩定的數據流,不然influxdb有可能會報錯監聽不到數據寫入。 to\your\dir: telegraf --config telegraf.conf
因爲配置了printer plugin,在telegraf正常運行的狀況下能夠看到數據流打印在console中
step 4: 檢查數據是否已寫入數據庫
ref: https://docs.influxdata.com/t...windows
influxDB做爲數據和終端可視化工具之間的橋樑,角色尤其重要。influxDB做爲一個time-series database很是適合實時IoT數據的存儲。 配置influxdb的過程較爲簡單,主要解決的問題集中在從http到https協議轉換問題。
step 1: 按照官網文檔下載並解壓influxdb
step 2: 運行influxdb(若是不須要修改任何influxdb的config文件) to\your\dir:influxd
Influx DB默認採用HTTP協議進行Client和Server端的通訊,而云端的Grafana服務則強制採用HTTPS確保數據傳輸的安全性。 衆所周知,HTTPS協議是HTTP協議的安全版本,其安全性能的實現主要依靠在Transport Layer之上增長的TLS/SSL層實現文本及數據的加密。HTTPS與HTTP一個重要的區別在於HTTPS增長了對身份的驗證功能,所以第三方沒法僞造服務端或客戶端身份,引入的證書認證機制就是用來確保這一功能的實現。
爲了確保網絡間通信的安全,我將InfluxDB的接口也進行了相關配置,讓其利用TLS層使用HTTPS協議進行數據的傳輸。
在配置過程當中我使用的證書是self-signed-certificate,使用windows系統的配置過程稍有不一樣(windows真是傷不起,連個配置說明都沒有)
step 1:使用openssl(沒有的朋友要安裝一下)生成證書和密鑰;sudo openssl req -x509 -nodes -newkey rsa:2048 -keyout /route/to/your/dir/influxdb-selfsigned.key -out /route/to/your/route/influxdb-selfsigned.crt -days <NUMBER_OF_DAYS>
進入文件所在目錄,雙擊證書文件並安裝證書(注意!要安裝在受信任的根目錄下)
step 2:配置influxDB;
和官方文檔一致,點[這裏](https://docs.influxdata.com/i...
)
step 3:重啓influxdb /influxdb/folder: influxd --config influxdb.conf
step 4:測試
和官方文檔一致,點這裏
step 5:若是有使用telegraf,記得要將telegraf中output plugin的相關API也改爲https!
這一步也是卡了好久,grafana的錯誤提示基本形同虛設,最好inpect一下頁面看看dev tool的錯誤提示。(這點是否是太不程序員友好了,瘋狂diss )
在沒有使用https以前grafana報錯(誰能知道這個undefined是什麼鬼意思!!
inspect後終於在console看到了錯誤詳細狀況:
使用以後!Bang!
注意此處不要選擇proxy模式,讓grafana的後端服務代理請求,這樣處於本地服務器的influxdb沒法接受到grafana的請求;
最後一個折騰了我好久才獲得的一個很粗略的demo。