我正在将财务分析应用程序数据从MongoDB迁移到InfluxDB,因为数据和分析呈指数增长。

我当前的情况是:

1)每秒从交易所获取报价,并将其存储在称为“报价”的度量中;

2)每10秒运行一次连续查询,将每分钟的“滴答”数据分组为一个称为“ohlc”(烛台数据)的度量;

当我使用Mongo作为数据库时,在我获得报价的那一刻,我已经将其转换为烛台数据,并计算了一些指标(MACD,EMA,BB,RSI)并将其存储。

我看到InfluxDB将Kapacitor用作数据处理器,有一种方法可以在Kapacitor中编写一些脚本来计算该指标,还是应该将数据流式传输到NodeJS并自己计算?

如果我必须流式传输数据,最佳做法是什么?

最佳答案

使用InfluxDB时,有一些选择。使用Kapacitor,您可以以任何支持 Protocol Buffer 的语言来合并用户定义的功能,也可以编写TICKscript进行数据转换。

您也可以使用数据库的连续查询功能,尽管根据查询和时间间隔的不同,有时它们可​​能是昂贵的查询。

如果要在NodeJS中编写自己的函数,则基本上只需编写一些在Unix域套接字上监听的代码,Kapacitor连接到该套接字,然后可以通过该套接字连接写入数据(完整文档here)。

如果要编写TICKscript,请举几个例子:

// {alert_name}

// metric: {alert_metric}
// available_fields: [[other_telegraf_fields]]

// TELEGRAF CONFIGURATION
// [inputs.{plugin}]
//   # full configuration

// DEFINE: kapacitor define {alert_name} -type batch -tick
//{plugin}/{alert_name}.tick -dbrp telegraf.autogen
// ENABLE: kapacitor enable {alert_name}

// Parameters
var info = {info_level}
var warn = {warn_level}
var crit = {crit_level}
var infoSig = 2.5
var warnSig = 3
var critSig = 3.5
var period = 10s
var every = 10s

// Dataframe
var data = stream
  |from()
    .database('telegraf')
    .retentionPolicy('autogen')
    .measurement({plugin})
    .groupBy('host')
  |window()
    .period(period)
    .every(every)
  |mean({alert_metric})
    .as("stat")

// Thresholds
var alert = data
  |eval(lambda: sigma("stat"))
    .as('sigma')
    .keep()
  |alert()
    .id('{{ index .Tags "host"}}/{alert_metric}')
    .message('{{ .ID }}:{{ index .Fields "stat" }}')
    .info(lambda: "stat" > info OR "sigma" > infoSig)
    .warn(lambda: "stat" > warn OR "sigma" > warnSig)
    .crit(lambda: "stat" > crit OR "sigma" > critSig)

// Alert
alert
  .log('/tmp/{alert_name}_log.txt')

希望对您有所帮助!

10-06 05:48