物聯網架構成長之路(33)-EMQ數據存儲到influxDB

1、前言
  時隔一年半,技術變化特別快,學習也要跟上才行。之前寫過EMQ數據轉存問題,當時用了比較笨的方法,經過寫插件的方式,把MQTT裏面的數據發送到數據庫進行存儲。當時也是爲了學習erlang和emq。如今隨着對物聯網的深刻,也結合實際需求,不停的學習。
下面將介紹我實驗測試可行的物聯網數據分析解決方案。採用的仍是開源方案。經過訂閱MQTT的根Topic,把全部物聯網數據轉存到InfluxDB時序數據庫,而後經過Grafana進行圖表顯示。這應該是目前比較流行的方案。
2、安裝InfluxDB
  InfluxDB是時序數據庫,特別適合作數據監控和物聯網數據存儲。【也能夠說適合我如今參與架構的物聯網平臺的技術選型】
  針對InfluxDB也沒有什麼能夠多說的,詳細能夠查閱官方文檔,或者網上的博客文章。我寫的都是平時實踐過程的操做記錄,寫博客,主要是爲了之後忘記的時候,回看查閱用的。另外一方面是增強跟同行讀者交流的渠道。有一點要注意,一開始爲了新,我用InfluxDB 2.0 版本,發現不行,那個太新的,不少對應的開發庫沒有完善好。因此仍是採用InfluxDB 1.x版本。這樣在spring boot 裏面也有自帶的starter庫可使用,操做起來特別方便。
  InfluxDB官方文檔: https://docs.influxdata.com/influxdb/v1.7/   安裝:html

1 wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add -
2 echo "deb https://repos.influxdata.com/debian stretch stable" | sudo tee /etc/apt/sources.list.d/influxdb.list
3 apt-get update
4 apt-get install influxdb

 

3、InfluxDB基礎命令使用
  修改配置文件 /etc/influxdb/influxdb.confjava

1 [http]
2     enabled = true
3     bind-address = ":8086"
4     auth-enabled = false
5     log-enabled = true
6     write-tracing = false
7     pprof-enabled = true

  這裏先設置不受權,等一下建立用戶後,再修改成 auth-enabled=true,這個通常也是屬於內部應用,不用ssl加密了。即便要也是經過Nginx進行反向代理。spring

  用戶管理數據庫

1 --顯示全部用戶:
2 show users
3 --新增用戶:
4 --普通用戶 (注意:用戶名用雙引號,密碼用單引號)
5 create user "user" with password 'user'
6 --管理員用戶
7 create user "admin" with password 'admin' with all privileges
8 --刪除用戶
9 drop user "user"

  建立好後,注意修改influxdb.conf 中的 auth-enable=true, 而後重啓服務 service influxdb restartjson

1 --建立數據庫
2 create database wunaozai
3 --建立好後,就能夠不用管了。一些簡單的操做,能夠參考其餘博客資料。
4 --刪除數據庫
5 drop database wunaozai
6 --切換使用數據庫
7 use wunaozai
1 --顯示全部表
2 show measurements
3 --新建表(往表裏面插入數據,就是新建表了)
4 --插入數據的語法有點特殊,採用的是InfluxDB特有的語法:
5 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
6 insert wnztable,tag=mqtt value=33
7 --刪除表
8 drop measurements wunaozai

  其餘的高級語法,不如查詢還有策略就不展開,暫時不是重點,等之後深刻研究後,在寫博客介紹。緩存

4、EMQ轉存InfluxDB
  EMQ如何把消息轉存到InfluxDB呢,就是本章節的重點,利用上一篇博客中提到的,SpringBoot客戶端監聽EMQ的根Topic,而後把須要進行轉存的Topic及其對應的Payload,構形成InfluxDB表數據,而後插入到InfluxDB中。
  下面介紹一下用到的InfluxDB工具類
  先在pom.xml中引入InfluxDB相關jar包架構

1 <!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java -->
2 <dependency>
3 <groupId>org.influxdb</groupId>
4 <artifactId>influxdb-java</artifactId>
5 <version>2.15</version>
6 </dependency>

  相關工具類代碼socket

 1 import org.influxdb.InfluxDB;
 2 import org.influxdb.InfluxDBFactory;
 3 import org.influxdb.dto.Point;
 4 
 5 /**
 6  * 數據緩存至InfluxDB
 7  * @author wunaozai
 8  *
 9  */
10 public class InfluxDBService {
11     
12     private static String INFLUXDB_URL = "http://127.0.0.1:8086";
13     private static String INFLUXDB_USERNAME = "admin";
14     private static String INFLUXDB_PASSWORD = "admin";
15     private static String INFLUXDB_DATABASE = "wunaozai"; //注意這裏對應數據庫,通常要先在命令行中建立數據庫
16     private static InfluxDB influxDB = null;
17     
18     private InfluxDBService(){
19         
20     }
21     
22     public static InfluxDB getInstance(){
23         if(influxDB == null){
24             influxDB = InfluxDBFactory.connect(INFLUXDB_URL, INFLUXDB_USERNAME, INFLUXDB_PASSWORD);
25             influxDB.setDatabase(INFLUXDB_DATABASE);
26             influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
27         }
28         return influxDB;
29     }
30     public static int writePoint(Point point){
31         getInstance().write(point);
32         return 0;
33     }
34 }

  在上一篇博客中的MqttPushCallback.java中的ide

public void messageArrived(String topic, MqttMessage message);

  這個函數來轉存。函數

 1     @Override
 2     public void messageArrived(String topic, MqttMessage message) throws Exception {
 3         try{
 4             System.out.println(topic);
 5             String json = new String(message.getPayload());
 6             MQTTProtocolVoModel protocol = BaseModel.parseJSON(json, MQTTProtocolVoModel.class);
 7             
 8             String cmd = protocol.getCmd();
 9             String customer_id = protocol.getProfile().getCustomer_id(); //廠商ID
10             String product_id = protocol.getProfile().getProduct_id(); //產品ID
11             String device_sn = protocol.getProfile().getDevice_sn(); //設備ID
12             Map<String, String> para = protocol.getDatapoint().getPara();
13             Map<String, Object> fields = new HashMap<>(); //這裏是客戶端傳過來的數據點,就是須要被顯示和監控的數據
14             for (Map.Entry<String, String> entry : para.entrySet()) {
15                 fields.put(entry.getKey(), entry.getValue());
16             }
17             Map<String, String> tag = new HashMap<>();
18             tag.put("customer_id", customer_id);
19             tag.put("product_id", product_id);
20             tag.put("device_sn", device_sn);
21             //這裏能夠添加不少Tag,爲了簡單演示,這裏隱藏部分Tag
22             //構造數據點
23             Point point = Point.measurement("datapoint")
24                     .tag(tag).fields(fields).build();
25             InfluxDBService.writePoint(point);
26         }catch (Exception e) {
27             e.printStackTrace();
28         }
29     }

  這裏能夠經過EMQ Dashboard自帶的Websocket進行發送,也能夠經過前面小節用到的PC工具,網上Web端MQTT客戶也不少,能夠經過任意MQTT工具進行測試。
  下面這個是查詢InfluxDB獲得的表數據。

 


參考資料:
  http://www.javashuo.com/article/p-xbkysrtm-n.html
  https://blog.csdn.net/caodanwang/article/details/51967393
  https://docs.influxdata.com/influxdb/v1.7/
  http://www.javashuo.com/article/p-kscihaur-n.html

本文地址: http://www.javashuo.com/article/p-uobvtnhe-cv.html

相關文章
相關標籤/搜索