计算两个连续事件与事件时间之间的值差

计算两个连续事件与事件时间之间的值差

本文介绍了Apache Flink-计算两个连续事件与事件时间之间的值差的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些电能表,它们会不断产生计数器值,这是一个累积量度.即保持增长直到计数器重置.

I have some energy meters that will keep producing counter value which is a cumulative metric . i.e. Keep increasing until counter reset.

Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", Kwh: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", Kwh: 21}
Sensor1            {timestamp: "10-10-2019 10:20:55", Kwh: 25}
Sensor1            {timestamp: "10-10-2019 10:21:05", Kwh: 37}
Sensor1            {timestamp: "10-10-2019 10:21:08", Kwh: 43}
.
.
.

有一个实时ETL作业,可以在事件时间的两个连续值之间进行减法运算.

There is a real-time ETL job which to do subtraction between two consecutive values in event time.

例如

10-10-2019 10:20:30  = 21 - 10 = 11
10-10-2019 10:20:40  = 25 - 21 = 4
10-10-2019 10:20:55  = 37 - 25 = 12
.
.
.

此外,有时可能无法按顺序接收事件.

Moreover, sometimes the event may not be received in order.

如何使用Apache Flink Streaming API来实现?Java示例更好.

How can I achieve by using Apache Flink Streaming API? Better with example in Java.

推荐答案

通常,当面对按顺序处理乱序流的要求时,处理此问题的最简单(且性能最佳)的方法是使用Flink SQL,并依靠它进行排序.请注意,它将依靠 WatermarkStrategy 来确定何时可以安全地认为事件准备就绪可以发出,并且将删除任何较晚的事件.如果您必须了解最近发生的事件,那么我建议使用 CEP ,而不是带有MATCH_RECOGNIZE的SQL(如下所示).

In general, when faced with the requirement to process an out-of-order stream in order, the easiest (and performant) way to handle this is to use Flink SQL, and rely on it to do the sorting. Note that it will rely on the WatermarkStrategy to determine when events can safely be considered ready to be emitted, and will drop any late events. If you must know about the late events, then I would recommend using CEP rather than SQL with MATCH_RECOGNIZE (as shown below).

有关使用水印进行排序的更多信息,请参见 Flink文档中有关水印的教程.

For more about using Watermarks for sorting, see the tutorial about Watermarks in the Flink docs.

这是一个如何使用Flink SQL来实现用例的示例:

Here's an example of how to implement your use case using Flink SQL:

public class SortAndDiff {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
                new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
                new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
                new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
        .map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
            @Override
            public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
                return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                    .withTimestampAssigner((event, timestamp) -> event.f1));

        Table events = tableEnv.fromDataStream(input,
                $("sensorId"),
                $("ts").rowtime(),
                $("kwh"));

        Table results = tableEnv.sqlQuery(
                "SELECT E.* " +
                    "FROM " + events + " " +
                    "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sensorId " +
                        "ORDER BY ts " +
                        "MEASURES " +
                            "this_step.ts AS ts, " +
                            "next_step.kwh - this_step.kwh AS diff " +
                        "AFTER MATCH SKIP TO NEXT ROW " +
                        "PATTERN (this_step next_step) " +
                        "DEFINE " +
                            "this_step AS TRUE, " +
                            "next_step AS TRUE " +
                    ") AS E"
        );


        tableEnv
                .toAppendStream(results, Row.class)
                .print();

        env.execute();
    }

}

这篇关于Apache Flink-计算两个连续事件与事件时间之间的值差的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 03:49
查看更多