用事件时间计算两个连续事件之间的值差

用事件时间计算两个连续事件之间的值差

本文介绍了Apache Flink - 用事件时间计算两个连续事件之间的值差的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些电能表会不断产生计数器值,这是一个累积指标.即不断增加直到计数器重置.

I have some energy meters that will keep producing counter value which is a cumulative metric . i.e. Keep increasing until counter reset.

Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", Kwh: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", Kwh: 21}
Sensor1            {timestamp: "10-10-2019 10:20:55", Kwh: 25}
Sensor1            {timestamp: "10-10-2019 10:21:05", Kwh: 37}
Sensor1            {timestamp: "10-10-2019 10:21:08", Kwh: 43}
.
.
.

有一个实时的 ETL 作业,它在事件时间的两个连续值之间进行减法.

There is a real-time ETL job which to do subtraction between two consecutive values in event time.

例如

10-10-2019 10:20:30  = 21 - 10 = 11
10-10-2019 10:20:40  = 25 - 21 = 4
10-10-2019 10:20:55  = 37 - 25 = 12
.
.
.

此外,有时可能无法按顺序接收事件.

Moreover, sometimes the event may not be received in order.

如何使用Apache Flink Streaming API来实现?最好用 Java 举例.

How can I achieve by using Apache Flink Streaming API? Better with example in Java.

推荐答案

一般来说,当面临按顺序处理乱序流的需求时,最简单(且高效)的处理方法是使用Flink SQL,依靠它来做排序.请注意,它将依赖 WatermarkStrategy 来确定何时可以安全地认为事件已准备好发出,并将丢弃任何迟到的事件.如果您必须了解后期事件,那么我建议您使用 CEP 而不是带有 MATCH_RECOGNIZE 的 SQL(如下所示).

In general, when faced with the requirement to process an out-of-order stream in order, the easiest (and performant) way to handle this is to use Flink SQL, and rely on it to do the sorting. Note that it will rely on the WatermarkStrategy to determine when events can safely be considered ready to be emitted, and will drop any late events. If you must know about the late events, then I would recommend using CEP rather than SQL with MATCH_RECOGNIZE (as shown below).

有关使用 Watermarks 进行排序的更多信息,请参阅 Flink 文档中关于 Watermarks 的教程.

For more about using Watermarks for sorting, see the tutorial about Watermarks in the Flink docs.

以下是如何使用 Flink SQL 实现您的用例的示例:

Here's an example of how to implement your use case using Flink SQL:

public class SortAndDiff {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
                new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
                new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
                new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
        .map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
            @Override
            public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
                return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                    .withTimestampAssigner((event, timestamp) -> event.f1));

        Table events = tableEnv.fromDataStream(input,
                $("sensorId"),
                $("ts").rowtime(),
                $("kwh"));

        Table results = tableEnv.sqlQuery(
                "SELECT E.* " +
                    "FROM " + events + " " +
                    "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sensorId " +
                        "ORDER BY ts " +
                        "MEASURES " +
                            "this_step.ts AS ts, " +
                            "next_step.kwh - this_step.kwh AS diff " +
                        "AFTER MATCH SKIP TO NEXT ROW " +
                        "PATTERN (this_step next_step) " +
                        "DEFINE " +
                            "this_step AS TRUE, " +
                            "next_step AS TRUE " +
                    ") AS E"
        );


        tableEnv
                .toAppendStream(results, Row.class)
                .print();

        env.execute();
    }

}

这篇关于Apache Flink - 用事件时间计算两个连续事件之间的值差的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 03:49