InfluxDB 聚合函數實用案例

InfluxDB 聚合函數實用案例html

文章大綱

InfluxDB 簡介

InfluxDB是GO語言編寫的分佈式時間序列化數據庫,很是適合對數據(跟隨時間變化而變化的數據)的跟蹤、監控和分析。在咱們的項目中,主要是用來收集設備實時上傳的值。從而分析該設備值的趨勢圖和各個設備的能耗佔比等一系列功能。InfluxDB的功能很強大,文檔也很詳細。可美中不足的是,它的單機性能並非很理想。由於InfluxDB存儲的數據量自己是很是巨大的,在執行一些時間範圍比較大的sql語句,耗時會很長,甚至直接崩潰。而開源的InfluxDB目前已經再也不支持集羣。若要經過搭建集羣提高性能問題,能夠考慮企業版。固然,咱們寫的程序也有很大的性能優化空間。java

能耗趨勢圖分析

需求:統計指定設備、指定區域、指定分項或者指定能耗類型的能耗趨勢圖。以下圖所示,縱座標是能耗值,橫座標是時刻(每小時、天天、每週、每個月)。spring

1574150324712

分析:獲取某個區間時刻的值,能夠用GROUP BY time 進行時間分組。再用聚合函數LAST或者SUM統計。但這個看似很簡單的需求卻暗藏殺機。SQL語句以下sql

SELECT LAST("currentValue"), * FROM "$TABLE_NAME" 
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' 
GROUP BY time($timeSpan) 
ORDER BY time DESC

第一:先要清楚,數據是經過什麼規則保存到InfluxDB數據庫

爲了記錄設備能耗的實時數據,咱們會經過訂閱MQTT通道,當值發生變化後存儲到InfluxDB數據庫中,或者在指定時間範圍內沒有變化也會上傳。這樣作的好處能夠避免一些冗餘數據,同時也埋下了一個坑。數據庫

例如:一臺設備在InfluxDB數據庫中最後一次記錄的時間是15分鐘前。可是sql語句是從5分鐘前開始統計。這會致使該設備的其點值就是null。簡單來講:設備的存儲的值正好在分組統計的時間範圍外。解決方法有不少:好比用FILL(previous)函數填充;好比使用time(time_interval,offset_interval)進行時間推移等。可是我比較推薦下面的方法:性能優化

先獲取指定開始時間以前的最後值(lastValue),而後再根據返回值是否爲null,來決定是否替換或者更新lastValue。僞代碼以下。bash

## 獲取該設備的最後記錄值
val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'"
## 遍歷查詢結果,將currentValue爲 null的值替換
"SELECT LAST("currentValue"), * FROM "$TABLE_NAME" 
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' 
GROUP BY time($timeSpan) 
ORDER BY time DESC".forEach {
    lastValue = currentValue?: lastValue
    result[time] = currentValue?: lastValue
}

你覺得這樣就結束了嗎?還不夠,返回的time格式化後,你會發現有8小時的時區問題。app

第二:解決InfluxDB時區問題

InfluxDB 默認以UTC時間存儲並返回時間戳,查詢返回的時間戳對應的也是UTC時間。咱們須要經過tz()子句指定時區名稱,好比Asia/Shanghai。若InfluxDB安裝在Windows環境上,可能還會出現 error parsing query: unable to find time zone 錯誤,解決方法是安裝GO語言環境,文章也詳細介紹過。分佈式

SELECT LAST("currentValue"), * FROM "$TABLE_NAME" 
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' 
GROUP BY time($timeSpan) 
ORDER BY time DESC tz('Asia/Shanghai')

實用tz() 子句後,返回的時間格式:"2019-11-18T13:50:00+08:00"。須要經過 "yyyy-MM-dd'T'HH:mm:ss" 將其格式化。函數

第三:GROUP BY time 天然月

group by time 支持秒、分鐘、小時、天和周,卻惟獨不支持天然月。若是對數據的精準性要求不高,能夠考慮使用30d實現。或者分12次統計。或者有更好的方法,請不吝賜教😲!

Spring 整合 InfluxDB

初始化配置

整合分三步:導包、配置、初始化鏈接

compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP
influx.port=8086
influx.username=admin
influx.password=admin
influx.dbname=database
import org.influxdb.InfluxDB
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.influxdb.dto.Query
import org.influxdb.impl.InfluxDBResultMapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy

@Component
class InfluxDbConnector {

    val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java)

    @Value("\${influx.server}")
    lateinit var serverUrl: String

    @Value("\${influx.port}")
    lateinit var serverPort: String

    @Value("\${influx.db-name}")
    lateinit var dbName: String

    @Value("\${influx.user-name}")
    lateinit var userName: String

    @Value("\${influx.password}")
    lateinit var password: String

    lateinit var connection: InfluxDB
    val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper()

    @PostConstruct
    fun initConnection() {
        val connectionUrl = "$serverUrl:$serverPort"
        connection = InfluxDBFactory.connect(connectionUrl, userName, password)
        connection.setDatabase(dbName)
        connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS)
    }

    @PreDestroy
    fun closeConnection() {
        connection.close()
    }


    fun <T> query(sql: String, type: Class<T>): List<T> {
        logger.info("exec influx query: {}", sql)
        val result = connection.query(Query(sql, dbName))
        return resultMapper.toPOJO(result, type)
    }

    fun query(sql: String) {
        logger.info("exec influx query: {}", sql)
        connection.query(Query(sql, dbName))
    }

    fun save(points: List<Point>) {
        points.forEach { connection.write(it) }
    }
}

存儲和查詢數據

定義實體

import java.time.Instant;

@Measurement(name = "tableName")
public class StringVariableResultJ {

    @Column(name = "currentValue")
    public String value;
    @Column(name = "time")
    public Instant time;

    // ......

}

批量保存數據

val points = equipmentEnergies.map {
    Point.measurement(TABLE_NAME_EQUIPMENT)
    .tag("equipmentId", it.equipmentId)
    .tag("locationId", it.locationId)
    .tag("subItemInstanceId", it.subItemInstanceId)
    .tag("subItemId", it.subItemId)
    .tag("projectId", it.projectId)
    .time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS)
    .addField("currentValue", it.value.toString().toBigDecimalOrNull()).build()
}
influxDbConnector.save(points)

查詢數據

influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }

項目是用kotlin寫的,但是在用InfluxDBResultMapper.toPOJO 時會出現數據轉換異常的問題。若換成Java的實體類就沒有問題。緣由目前沒有找到。

刪除數據

我在官網文檔上並無找到刪除數據的內容,只有修改數據庫存儲策略。但實際上執行delete sql語句是生效的😂。數據保留策略目的是讓InfluxDB可以知道哪些數據是能夠丟棄的,從而節省空間,更高效的處理數據。默認是不限制。如下是常見的命令。

# 查看庫存儲規則
> SHOW RETENTION POLICIES ON 數據庫名稱;
[out]: 
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 720h0m0s 168h0m0s           1        true
# 修改存儲規則
> ALTER RETENTION POLICY autogen ON 數據庫名稱 DURATION 0;
# 設爲默認
> ALTER RETENTION POLICY autogen ON 數據庫名稱 DEFAULT;
#建立規則
> CREATE RETENTION POLICY "規則名" ON 數據庫名稱 DURATION 360h REPLICATION 1;
# 刪除規則
> DROP RETENTION POLICY 規則名 ON 數據庫名稱;

duration 表示在這個時間外的數據將不會被保留,0表示不限制。default 表示是否爲默認規則。其它含義沒有深究。

實際場景中,不一樣表的數據須要保留的時間也不同。此時能夠考慮用sql語句,用程序定時刪除數據。

influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$時間' ")

文章到這裏就結束了,更多的聚合函數能夠看官方文檔:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/

相關文章
相關標籤/搜索