InfluxDB 聚合函数实用案例
文章大纲
InfluxDB 简介
InfluxDB是GO语言编写的分布式时间序列化数据库,非常适合对数据(跟随时间变化而变化的数据)的跟踪、监控和分析。在我们的项目中,主要是用来收集设备实时上传的值。从而分析该设备值的趋势图和各个设备的能耗占比等一系列功能。InfluxDB的功能很强大,文档也很详细。可美中不足的是,它的单机性能并不是很理想。因为InfluxDB存储的数据量本身是非常巨大的,在执行一些时间范围比较大的sql语句,耗时会很长,甚至直接崩溃。而开源的InfluxDB目前已经不再支持集群。若要通过搭建集群提升性能问题,可以考虑企业版。当然,我们写的程序也有很大的性能优化空间。
能耗趋势图分析
需求:统计指定设备、指定区域、指定分项或者指定能耗类型的能耗趋势图。如下图所示,纵坐标是能耗值,横坐标是时刻(每小时、每天、每周、每月)。
分析:获取某个区间时刻的值,可以用GROUP BY time 进行时间分组。再用聚合函数LAST或者SUM统计。但这个看似很简单的需求却暗藏杀机。SQL语句如下
SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC
第一:先要清楚,数据是通过什么规则保存到InfluxDB数据库
为了记录设备能耗的实时数据,我们会通过订阅MQTT通道,当值发生变化后存储到InfluxDB数据库中,或者在指定时间范围内没有变化也会上传。这样做的好处可以避免一些冗余数据,同时也埋下了一个坑。
例如:一台设备在InfluxDB数据库中最后一次记录的时间是15分钟前。但是sql语句是从5分钟前开始统计。这会导致该设备的其点值就是null。简单来说:设备的存储的值正好在分组统计的时间范围外。解决方法有很多:比如用FILL(previous)函数填充;比如使用time(time_interval,offset_interval)进行时间推移等。但是我比较推荐下面的方法:
先获取指定开始时间之前的最后值(lastValue),然后再根据返回值是否为null,来决定是否替换或者更新lastValue。伪代码如下。
## 获取该设备的最后记录值
val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'"
## 遍历查询结果,将currentValue为 null的值替换
"SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC".forEach {
lastValue = currentValue?: lastValue
result[time] = currentValue?: lastValue
}
你以为这样就结束了吗?还不够,返回的time格式化后,你会发现有8小时的时区问题。
第二:解决InfluxDB时区问题
InfluxDB 默认以UTC时间存储并返回时间戳,查询返回的时间戳对应的也是UTC时间。我们需要通过tz()子句指定时区名称,比如Asia/Shanghai。若InfluxDB安装在Windows环境上,可能还会出现 error parsing query: unable to find time zone 错误,解决方法是安装GO语言环境,文章也详细介绍过。
SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC tz('Asia/Shanghai')
实用tz() 子句后,返回的时间格式:"2019-11-18T13:50:00+08:00"。需要通过 "yyyy-MM-dd'T'HH:mm:ss" 将其格式化。
第三:GROUP BY time 自然月
group by time 支持秒、分钟、小时、天和周,却唯独不支持自然月。如果对数据的精准性要求不高,可以考虑使用30d实现。或者分12次统计。或者有更好的方法,请不吝赐教😲!
Spring 整合 InfluxDB
初始化配置
整合分三步:导包、配置、初始化连接
compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP
influx.port=8086
influx.username=admin
influx.password=admin
influx.dbname=database
import org.influxdb.InfluxDB
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.influxdb.dto.Query
import org.influxdb.impl.InfluxDBResultMapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy
@Component
class InfluxDbConnector {
val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java)
@Value("\${influx.server}")
lateinit var serverUrl: String
@Value("\${influx.port}")
lateinit var serverPort: String
@Value("\${influx.db-name}")
lateinit var dbName: String
@Value("\${influx.user-name}")
lateinit var userName: String
@Value("\${influx.password}")
lateinit var password: String
lateinit var connection: InfluxDB
val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper()
@PostConstruct
fun initConnection() {
val connectionUrl = "$serverUrl:$serverPort"
connection = InfluxDBFactory.connect(connectionUrl, userName, password)
connection.setDatabase(dbName)
connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS)
}
@PreDestroy
fun closeConnection() {
connection.close()
}
fun <T> query(sql: String, type: Class<T>): List<T> {
logger.info("exec influx query: {}", sql)
val result = connection.query(Query(sql, dbName))
return resultMapper.toPOJO(result, type)
}
fun query(sql: String) {
logger.info("exec influx query: {}", sql)
connection.query(Query(sql, dbName))
}
fun save(points: List<Point>) {
points.forEach { connection.write(it) }
}
}
存储和查询数据
定义实体
import java.time.Instant;
@Measurement(name = "tableName")
public class StringVariableResultJ {
@Column(name = "currentValue")
public String value;
@Column(name = "time")
public Instant time;
// ......
}
批量保存数据
val points = equipmentEnergies.map {
Point.measurement(TABLE_NAME_EQUIPMENT)
.tag("equipmentId", it.equipmentId)
.tag("locationId", it.locationId)
.tag("subItemInstanceId", it.subItemInstanceId)
.tag("subItemId", it.subItemId)
.tag("projectId", it.projectId)
.time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS)
.addField("currentValue", it.value.toString().toBigDecimalOrNull()).build()
}
influxDbConnector.save(points)
查询数据
influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }
项目是用kotlin写的,可是在用InfluxDBResultMapper.toPOJO 时会出现数据转换异常的问题。若换成Java的实体类就没有问题。原因目前没有找到。
删除数据
我在官网文档上并没有找到删除数据的内容,只有修改数据库存储策略。但实际上执行delete sql语句是生效的😂。数据保留策略目的是让InfluxDB能够知道哪些数据是可以丢弃的,从而节省空间,更高效的处理数据。默认是不限制。以下是常见的命令。
# 查看库存储规则
> SHOW RETENTION POLICIES ON 数据库名称;
[out]:
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 720h0m0s 168h0m0s 1 true
# 修改存储规则
> ALTER RETENTION POLICY autogen ON 数据库名称 DURATION 0;
# 设为默认
> ALTER RETENTION POLICY autogen ON 数据库名称 DEFAULT;
#创建规则
> CREATE RETENTION POLICY "规则名" ON 数据库名称 DURATION 360h REPLICATION 1;
# 删除规则
> DROP RETENTION POLICY 规则名 ON 数据库名称;
duration 表示在这个时间外的数据将不会被保留,0表示不限制。default 表示是否为默认规则。其它含义没有深究。
实际场景中,不同表的数据需要保留的时间也不一样。此时可以考虑用sql语句,用程序定时删除数据。
influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$时间' ")
文章到这里就结束了,更多的聚合函数可以看官方文档:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/