1、前言
時隔一年半,技術變化特別快,學習也要跟上才行。之前寫過EMQ數據轉存問題,當時用了比較笨的方法,經過寫插件的方式,把MQTT裏面的數據發送到數據庫進行存儲。當時也是爲了學習erlang和emq。如今隨着對物聯網的深刻,也結合實際需求,不停的學習。
下面將介紹我實驗測試可行的物聯網數據分析解決方案。採用的仍是開源方案。經過訂閱MQTT的根Topic,把全部物聯網數據轉存到InfluxDB時序數據庫,而後經過Grafana進行圖表顯示。這應該是目前比較流行的方案。
2、安裝InfluxDB
InfluxDB是時序數據庫,特別適合作數據監控和物聯網數據存儲。【也能夠說適合我如今參與架構的物聯網平臺的技術選型】
針對InfluxDB也沒有什麼能夠多說的,詳細能夠查閱官方文檔,或者網上的博客文章。我寫的都是平時實踐過程的操做記錄,寫博客,主要是爲了之後忘記的時候,回看查閱用的。另外一方面是增強跟同行讀者交流的渠道。有一點要注意,一開始爲了新,我用InfluxDB 2.0 版本,發現不行,那個太新的,不少對應的開發庫沒有完善好。因此仍是採用InfluxDB 1.x版本。這樣在spring boot 裏面也有自帶的starter庫可使用,操做起來特別方便。
InfluxDB官方文檔: https://docs.influxdata.com/influxdb/v1.7/ 安裝:html
1 wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add - 2 echo "deb https://repos.influxdata.com/debian stretch stable" | sudo tee /etc/apt/sources.list.d/influxdb.list 3 apt-get update 4 apt-get install influxdb
3、InfluxDB基礎命令使用
修改配置文件 /etc/influxdb/influxdb.confjava
1 [http] 2 enabled = true 3 bind-address = ":8086" 4 auth-enabled = false 5 log-enabled = true 6 write-tracing = false 7 pprof-enabled = true
這裏先設置不受權,等一下建立用戶後,再修改成 auth-enabled=true,這個通常也是屬於內部應用,不用ssl加密了。即便要也是經過Nginx進行反向代理。spring
用戶管理數據庫
1 --顯示全部用戶: 2 show users 3 --新增用戶: 4 --普通用戶 (注意:用戶名用雙引號,密碼用單引號) 5 create user "user" with password 'user' 6 --管理員用戶 7 create user "admin" with password 'admin' with all privileges 8 --刪除用戶 9 drop user "user"
建立好後,注意修改influxdb.conf 中的 auth-enable=true, 而後重啓服務 service influxdb restartjson
1 --建立數據庫 2 create database wunaozai 3 --建立好後,就能夠不用管了。一些簡單的操做,能夠參考其餘博客資料。 4 --刪除數據庫 5 drop database wunaozai 6 --切換使用數據庫 7 use wunaozai
1 --顯示全部表 2 show measurements 3 --新建表(往表裏面插入數據,就是新建表了) 4 --插入數據的語法有點特殊,採用的是InfluxDB特有的語法: 5 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] 6 insert wnztable,tag=mqtt value=33 7 --刪除表 8 drop measurements wunaozai
其餘的高級語法,不如查詢還有策略就不展開,暫時不是重點,等之後深刻研究後,在寫博客介紹。緩存
4、EMQ轉存InfluxDB
EMQ如何把消息轉存到InfluxDB呢,就是本章節的重點,利用上一篇博客中提到的,SpringBoot客戶端監聽EMQ的根Topic,而後把須要進行轉存的Topic及其對應的Payload,構形成InfluxDB表數據,而後插入到InfluxDB中。
下面介紹一下用到的InfluxDB工具類
先在pom.xml中引入InfluxDB相關jar包架構
1 <!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java --> 2 <dependency> 3 <groupId>org.influxdb</groupId> 4 <artifactId>influxdb-java</artifactId> 5 <version>2.15</version> 6 </dependency>
相關工具類代碼socket
1 import org.influxdb.InfluxDB; 2 import org.influxdb.InfluxDBFactory; 3 import org.influxdb.dto.Point; 4 5 /** 6 * 數據緩存至InfluxDB 7 * @author wunaozai 8 * 9 */ 10 public class InfluxDBService { 11 12 private static String INFLUXDB_URL = "http://127.0.0.1:8086"; 13 private static String INFLUXDB_USERNAME = "admin"; 14 private static String INFLUXDB_PASSWORD = "admin"; 15 private static String INFLUXDB_DATABASE = "wunaozai"; //注意這裏對應數據庫,通常要先在命令行中建立數據庫 16 private static InfluxDB influxDB = null; 17 18 private InfluxDBService(){ 19 20 } 21 22 public static InfluxDB getInstance(){ 23 if(influxDB == null){ 24 influxDB = InfluxDBFactory.connect(INFLUXDB_URL, INFLUXDB_USERNAME, INFLUXDB_PASSWORD); 25 influxDB.setDatabase(INFLUXDB_DATABASE); 26 influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); 27 } 28 return influxDB; 29 } 30 public static int writePoint(Point point){ 31 getInstance().write(point); 32 return 0; 33 } 34 }
在上一篇博客中的MqttPushCallback.java中的ide
public void messageArrived(String topic, MqttMessage message);
這個函數來轉存。函數
1 @Override 2 public void messageArrived(String topic, MqttMessage message) throws Exception { 3 try{ 4 System.out.println(topic); 5 String json = new String(message.getPayload()); 6 MQTTProtocolVoModel protocol = BaseModel.parseJSON(json, MQTTProtocolVoModel.class); 7 8 String cmd = protocol.getCmd(); 9 String customer_id = protocol.getProfile().getCustomer_id(); //廠商ID 10 String product_id = protocol.getProfile().getProduct_id(); //產品ID 11 String device_sn = protocol.getProfile().getDevice_sn(); //設備ID 12 Map<String, String> para = protocol.getDatapoint().getPara(); 13 Map<String, Object> fields = new HashMap<>(); //這裏是客戶端傳過來的數據點,就是須要被顯示和監控的數據 14 for (Map.Entry<String, String> entry : para.entrySet()) { 15 fields.put(entry.getKey(), entry.getValue()); 16 } 17 Map<String, String> tag = new HashMap<>(); 18 tag.put("customer_id", customer_id); 19 tag.put("product_id", product_id); 20 tag.put("device_sn", device_sn); 21 //這裏能夠添加不少Tag,爲了簡單演示,這裏隱藏部分Tag 22 //構造數據點 23 Point point = Point.measurement("datapoint") 24 .tag(tag).fields(fields).build(); 25 InfluxDBService.writePoint(point); 26 }catch (Exception e) { 27 e.printStackTrace(); 28 } 29 }
這裏能夠經過EMQ Dashboard自帶的Websocket進行發送,也能夠經過前面小節用到的PC工具,網上Web端MQTT客戶也不少,能夠經過任意MQTT工具進行測試。
下面這個是查詢InfluxDB獲得的表數據。
參考資料:
http://www.javashuo.com/article/p-xbkysrtm-n.html
https://blog.csdn.net/caodanwang/article/details/51967393
https://docs.influxdata.com/influxdb/v1.7/
http://www.javashuo.com/article/p-kscihaur-n.html