InfluxDB 聚合函數實用案例html
InfluxDB是GO語言編寫的分佈式時間序列化數據庫,很是適合對數據(跟隨時間變化而變化的數據)的跟蹤、監控和分析。在咱們的項目中,主要是用來收集設備實時上傳的值。從而分析該設備值的趨勢圖和各個設備的能耗佔比等一系列功能。InfluxDB的功能很強大,文檔也很詳細。可美中不足的是,它的單機性能並非很理想。由於InfluxDB存儲的數據量自己是很是巨大的,在執行一些時間範圍比較大的sql語句,耗時會很長,甚至直接崩潰。而開源的InfluxDB目前已經再也不支持集羣。若要經過搭建集羣提高性能問題,能夠考慮企業版。固然,咱們寫的程序也有很大的性能優化空間。java
需求:統計指定設備、指定區域、指定分項或者指定能耗類型的能耗趨勢圖。以下圖所示,縱座標是能耗值,橫座標是時刻(每小時、天天、每週、每個月)。spring
分析:獲取某個區間時刻的值,能夠用GROUP BY time 進行時間分組。再用聚合函數LAST或者SUM統計。但這個看似很簡單的需求卻暗藏殺機。SQL語句以下sql
SELECT LAST("currentValue"), * FROM "$TABLE_NAME" WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' GROUP BY time($timeSpan) ORDER BY time DESC
爲了記錄設備能耗的實時數據,咱們會經過訂閱MQTT通道,當值發生變化後存儲到InfluxDB數據庫中,或者在指定時間範圍內沒有變化也會上傳。這樣作的好處能夠避免一些冗餘數據,同時也埋下了一個坑。數據庫
例如:一臺設備在InfluxDB數據庫中最後一次記錄的時間是15分鐘前。可是sql語句是從5分鐘前開始統計。這會致使該設備的其點值就是null。簡單來講:設備的存儲的值正好在分組統計的時間範圍外。解決方法有不少:好比用FILL(previous)函數填充;好比使用time(time_interval,offset_interval)進行時間推移等。可是我比較推薦下面的方法:性能優化
先獲取指定開始時間以前的最後值(lastValue),而後再根據返回值是否爲null,來決定是否替換或者更新lastValue。僞代碼以下。bash
## 獲取該設備的最後記錄值 val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'" ## 遍歷查詢結果,將currentValue爲 null的值替換 "SELECT LAST("currentValue"), * FROM "$TABLE_NAME" WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' GROUP BY time($timeSpan) ORDER BY time DESC".forEach { lastValue = currentValue?: lastValue result[time] = currentValue?: lastValue }
你覺得這樣就結束了嗎?還不夠,返回的time格式化後,你會發現有8小時的時區問題。app
InfluxDB 默認以UTC時間存儲並返回時間戳,查詢返回的時間戳對應的也是UTC時間。咱們須要經過tz()子句指定時區名稱,好比Asia/Shanghai。若InfluxDB安裝在Windows環境上,可能還會出現 error parsing query: unable to find time zone 錯誤,解決方法是安裝GO語言環境,文章也詳細介紹過。分佈式
SELECT LAST("currentValue"), * FROM "$TABLE_NAME" WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' GROUP BY time($timeSpan) ORDER BY time DESC tz('Asia/Shanghai')
實用tz() 子句後,返回的時間格式:"2019-11-18T13:50:00+08:00"。須要經過 "yyyy-MM-dd'T'HH:mm:ss" 將其格式化。函數
group by time 支持秒、分鐘、小時、天和周,卻惟獨不支持天然月。若是對數據的精準性要求不高,能夠考慮使用30d實現。或者分12次統計。或者有更好的方法,請不吝賜教😲!
整合分三步:導包、配置、初始化鏈接
compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP influx.port=8086 influx.username=admin influx.password=admin influx.dbname=database
import org.influxdb.InfluxDB import org.influxdb.InfluxDBFactory import org.influxdb.dto.Point import org.influxdb.dto.Query import org.influxdb.impl.InfluxDBResultMapper import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import java.util.concurrent.TimeUnit import javax.annotation.PostConstruct import javax.annotation.PreDestroy @Component class InfluxDbConnector { val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java) @Value("\${influx.server}") lateinit var serverUrl: String @Value("\${influx.port}") lateinit var serverPort: String @Value("\${influx.db-name}") lateinit var dbName: String @Value("\${influx.user-name}") lateinit var userName: String @Value("\${influx.password}") lateinit var password: String lateinit var connection: InfluxDB val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper() @PostConstruct fun initConnection() { val connectionUrl = "$serverUrl:$serverPort" connection = InfluxDBFactory.connect(connectionUrl, userName, password) connection.setDatabase(dbName) connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS) } @PreDestroy fun closeConnection() { connection.close() } fun <T> query(sql: String, type: Class<T>): List<T> { logger.info("exec influx query: {}", sql) val result = connection.query(Query(sql, dbName)) return resultMapper.toPOJO(result, type) } fun query(sql: String) { logger.info("exec influx query: {}", sql) connection.query(Query(sql, dbName)) } fun save(points: List<Point>) { points.forEach { connection.write(it) } } }
定義實體
import java.time.Instant; @Measurement(name = "tableName") public class StringVariableResultJ { @Column(name = "currentValue") public String value; @Column(name = "time") public Instant time; // ...... }
批量保存數據
val points = equipmentEnergies.map { Point.measurement(TABLE_NAME_EQUIPMENT) .tag("equipmentId", it.equipmentId) .tag("locationId", it.locationId) .tag("subItemInstanceId", it.subItemInstanceId) .tag("subItemId", it.subItemId) .tag("projectId", it.projectId) .time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS) .addField("currentValue", it.value.toString().toBigDecimalOrNull()).build() } influxDbConnector.save(points)
查詢數據
influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }
項目是用kotlin寫的,但是在用InfluxDBResultMapper.toPOJO 時會出現數據轉換異常的問題。若換成Java的實體類就沒有問題。緣由目前沒有找到。
我在官網文檔上並無找到刪除數據的內容,只有修改數據庫存儲策略。但實際上執行delete sql語句是生效的😂。數據保留策略目的是讓InfluxDB可以知道哪些數據是能夠丟棄的,從而節省空間,更高效的處理數據。默認是不限制。如下是常見的命令。
# 查看庫存儲規則 > SHOW RETENTION POLICIES ON 數據庫名稱; [out]: name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 720h0m0s 168h0m0s 1 true # 修改存儲規則 > ALTER RETENTION POLICY autogen ON 數據庫名稱 DURATION 0; # 設爲默認 > ALTER RETENTION POLICY autogen ON 數據庫名稱 DEFAULT; #建立規則 > CREATE RETENTION POLICY "規則名" ON 數據庫名稱 DURATION 360h REPLICATION 1; # 刪除規則 > DROP RETENTION POLICY 規則名 ON 數據庫名稱;
duration 表示在這個時間外的數據將不會被保留,0表示不限制。default 表示是否爲默認規則。其它含義沒有深究。
實際場景中,不一樣表的數據須要保留的時間也不同。此時能夠考慮用sql語句,用程序定時刪除數據。
influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$時間' ")
文章到這裏就結束了,更多的聚合函數能夠看官方文檔:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/