InfluxDB 聚合函数实用案例

文章大纲

InfluxDB 聚合函数实用案例-LMLPHP

InfluxDB 简介

InfluxDB是GO语言编写的分布式时间序列化数据库,非常适合对数据(跟随时间变化而变化的数据)的跟踪、监控和分析。在我们的项目中,主要是用来收集设备实时上传的值。从而分析该设备值的趋势图和各个设备的能耗占比等一系列功能。InfluxDB的功能很强大,文档也很详细。可美中不足的是,它的单机性能并不是很理想。因为InfluxDB存储的数据量本身是非常巨大的,在执行一些时间范围比较大的sql语句,耗时会很长,甚至直接崩溃。而开源的InfluxDB目前已经不再支持集群。若要通过搭建集群提升性能问题,可以考虑企业版。当然,我们写的程序也有很大的性能优化空间。

能耗趋势图分析

需求:统计指定设备、指定区域、指定分项或者指定能耗类型的能耗趋势图。如下图所示,纵坐标是能耗值,横坐标是时刻(每小时、每天、每周、每月)。

InfluxDB 聚合函数实用案例-LMLPHP

分析:获取某个区间时刻的值,可以用GROUP BY time 进行时间分组。再用聚合函数LAST或者SUM统计。但这个看似很简单的需求却暗藏杀机。SQL语句如下

SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC

第一:先要清楚,数据是通过什么规则保存到InfluxDB数据库

为了记录设备能耗的实时数据,我们会通过订阅MQTT通道,当值发生变化后存储到InfluxDB数据库中,或者在指定时间范围内没有变化也会上传。这样做的好处可以避免一些冗余数据,同时也埋下了一个坑。

例如:一台设备在InfluxDB数据库中最后一次记录的时间是15分钟前。但是sql语句是从5分钟前开始统计。这会导致该设备的其点值就是null。简单来说:设备的存储的值正好在分组统计的时间范围外。解决方法有很多:比如用FILL(previous)函数填充;比如使用time(time_interval,offset_interval)进行时间推移等。但是我比较推荐下面的方法:

先获取指定开始时间之前的最后值(lastValue),然后再根据返回值是否为null,来决定是否替换或者更新lastValue。伪代码如下。

## 获取该设备的最后记录值
val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'"
## 遍历查询结果,将currentValue为 null的值替换
"SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC".forEach {
    lastValue = currentValue?: lastValue
    result[time] = currentValue?: lastValue
}

你以为这样就结束了吗?还不够,返回的time格式化后,你会发现有8小时的时区问题。

第二:解决InfluxDB时区问题

InfluxDB 默认以UTC时间存储并返回时间戳,查询返回的时间戳对应的也是UTC时间。我们需要通过tz()子句指定时区名称,比如Asia/Shanghai。若InfluxDB安装在Windows环境上,可能还会出现 error parsing query: unable to find time zone 错误,解决方法是安装GO语言环境,文章也详细介绍过。

SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC tz('Asia/Shanghai')

实用tz() 子句后,返回的时间格式:"2019-11-18T13:50:00+08:00"。需要通过 "yyyy-MM-dd'T'HH:mm:ss" 将其格式化。

第三:GROUP BY time 自然月

group by time 支持秒、分钟、小时、天和周,却唯独不支持自然月。如果对数据的精准性要求不高,可以考虑使用30d实现。或者分12次统计。或者有更好的方法,请不吝赐教😲!

Spring 整合 InfluxDB

初始化配置

整合分三步:导包、配置、初始化连接

compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP
influx.port=8086
influx.username=admin
influx.password=admin
influx.dbname=database
import org.influxdb.InfluxDB
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.influxdb.dto.Query
import org.influxdb.impl.InfluxDBResultMapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy

@Component
class InfluxDbConnector {

    val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java)

    @Value("\${influx.server}")
    lateinit var serverUrl: String

    @Value("\${influx.port}")
    lateinit var serverPort: String

    @Value("\${influx.db-name}")
    lateinit var dbName: String

    @Value("\${influx.user-name}")
    lateinit var userName: String

    @Value("\${influx.password}")
    lateinit var password: String

    lateinit var connection: InfluxDB
    val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper()

    @PostConstruct
    fun initConnection() {
        val connectionUrl = "$serverUrl:$serverPort"
        connection = InfluxDBFactory.connect(connectionUrl, userName, password)
        connection.setDatabase(dbName)
        connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS)
    }

    @PreDestroy
    fun closeConnection() {
        connection.close()
    }


    fun <T> query(sql: String, type: Class<T>): List<T> {
        logger.info("exec influx query: {}", sql)
        val result = connection.query(Query(sql, dbName))
        return resultMapper.toPOJO(result, type)
    }

    fun query(sql: String) {
        logger.info("exec influx query: {}", sql)
        connection.query(Query(sql, dbName))
    }

    fun save(points: List<Point>) {
        points.forEach { connection.write(it) }
    }
}

存储和查询数据

定义实体

import java.time.Instant;

@Measurement(name = "tableName")
public class StringVariableResultJ {

    @Column(name = "currentValue")
    public String value;
    @Column(name = "time")
    public Instant time;

    // ......

}

批量保存数据

val points = equipmentEnergies.map {
    Point.measurement(TABLE_NAME_EQUIPMENT)
    .tag("equipmentId", it.equipmentId)
    .tag("locationId", it.locationId)
    .tag("subItemInstanceId", it.subItemInstanceId)
    .tag("subItemId", it.subItemId)
    .tag("projectId", it.projectId)
    .time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS)
    .addField("currentValue", it.value.toString().toBigDecimalOrNull()).build()
}
influxDbConnector.save(points)

查询数据

influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }

项目是用kotlin写的,可是在用InfluxDBResultMapper.toPOJO 时会出现数据转换异常的问题。若换成Java的实体类就没有问题。原因目前没有找到。

删除数据

我在官网文档上并没有找到删除数据的内容,只有修改数据库存储策略。但实际上执行delete sql语句是生效的😂。数据保留策略目的是让InfluxDB能够知道哪些数据是可以丢弃的,从而节省空间,更高效的处理数据。默认是不限制。以下是常见的命令。

# 查看库存储规则
> SHOW RETENTION POLICIES ON 数据库名称;
[out]:
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 720h0m0s 168h0m0s           1        true
# 修改存储规则
> ALTER RETENTION POLICY autogen ON 数据库名称 DURATION 0;
# 设为默认
> ALTER RETENTION POLICY autogen ON 数据库名称 DEFAULT;
#创建规则
> CREATE RETENTION POLICY "规则名" ON 数据库名称 DURATION 360h REPLICATION 1;
# 删除规则
> DROP RETENTION POLICY 规则名 ON 数据库名称;

duration 表示在这个时间外的数据将不会被保留,0表示不限制。default 表示是否为默认规则。其它含义没有深究。

实际场景中,不同表的数据需要保留的时间也不一样。此时可以考虑用sql语句,用程序定时删除数据。

influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$时间' ")

文章到这里就结束了,更多的聚合函数可以看官方文档:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/

11-24 01:30