我正在将财务分析应用程序数据从MongoDB迁移到InfluxDB,因为数据和分析呈指数增长。
我当前的情况是:
1)每秒从交易所获取报价,并将其存储在称为“报价”的度量中;
2)每10秒运行一次连续查询,将每分钟的“滴答”数据分组为一个称为“ohlc”(烛台数据)的度量;
当我使用Mongo作为数据库时,在我获得报价的那一刻,我已经将其转换为烛台数据,并计算了一些指标(MACD,EMA,BB,RSI)并将其存储。
我看到InfluxDB将Kapacitor用作数据处理器,有一种方法可以在Kapacitor中编写一些脚本来计算该指标,还是应该将数据流式传输到NodeJS并自己计算?
如果我必须流式传输数据,最佳做法是什么?
最佳答案
使用InfluxDB时,有一些选择。使用Kapacitor,您可以以任何支持 Protocol Buffer 的语言来合并用户定义的功能,也可以编写TICKscript进行数据转换。
您也可以使用数据库的连续查询功能,尽管根据查询和时间间隔的不同,有时它们可能是昂贵的查询。
如果要在NodeJS中编写自己的函数,则基本上只需编写一些在Unix域套接字上监听的代码,Kapacitor连接到该套接字,然后可以通过该套接字连接写入数据(完整文档here)。
如果要编写TICKscript,请举几个例子:
// {alert_name}
// metric: {alert_metric}
// available_fields: [[other_telegraf_fields]]
// TELEGRAF CONFIGURATION
// [inputs.{plugin}]
// # full configuration
// DEFINE: kapacitor define {alert_name} -type batch -tick
//{plugin}/{alert_name}.tick -dbrp telegraf.autogen
// ENABLE: kapacitor enable {alert_name}
// Parameters
var info = {info_level}
var warn = {warn_level}
var crit = {crit_level}
var infoSig = 2.5
var warnSig = 3
var critSig = 3.5
var period = 10s
var every = 10s
// Dataframe
var data = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement({plugin})
.groupBy('host')
|window()
.period(period)
.every(every)
|mean({alert_metric})
.as("stat")
// Thresholds
var alert = data
|eval(lambda: sigma("stat"))
.as('sigma')
.keep()
|alert()
.id('{{ index .Tags "host"}}/{alert_metric}')
.message('{{ .ID }}:{{ index .Fields "stat" }}')
.info(lambda: "stat" > info OR "sigma" > infoSig)
.warn(lambda: "stat" > warn OR "sigma" > warnSig)
.crit(lambda: "stat" > crit OR "sigma" > critSig)
// Alert
alert
.log('/tmp/{alert_name}_log.txt')
希望对您有所帮助!