我有以下来自Rabbit MQ的JSON数据

{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:30","data":{"RunStatus":1"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:40","data":{"RunStatus":2"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}

{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":1"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:55","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:22:00","data":{"RunStatus":2"}}


我正在尝试获取设备所在的每个RunStatus的持续时间,因此对于上面的数据说,对于Device-MACH-101,RunStatus看起来像这样

在Runstatus 1中,设备处于-5秒钟(30-35)
在Runstatus 2中,设备处于-5秒(40-45)
在Runstatus 3中,设备处于-10秒(35-40 + 45-50)

上面相同的逻辑也适用于第二设备数据。

以下是我正在尝试的Apache Spark SQL查询,但未获得所需的结果。请提出一些替代方案;我也不介意以非SQL方式进行操作。

public static void main(String[] args) {

        try {

            mconf = new SparkConf();
            mconf.setAppName("RabbitMqReceiver");
            mconf.setMaster("local[*]");

            jssc = new JavaStreamingContext(mconf,Durations.seconds(10));

            SparkSession spksess = SparkSession
                    .builder()
                    .master("local[*]")
                    .appName("RabbitMqReceiver2")
                    .getOrCreate();

            SQLContext sqlctxt = new SQLContext(spksess);

            JavaDStream<String> strmData = jssc.receiverStream(new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));

            JavaDStream<String> machineData = strmData.window(Durations.minutes(1),Durations.seconds(10));

            sqlctxt.udf().register("custdatediff", new UDF2<String, String, String>() {

                @Override public String call(String argdt1,String argdt2) {

                        DateTimeFormatter formatter = DateTimeFormat.forPattern("dd-MM-yyyy HH:mm:ss");
                        DateTime dt1 = formatter.parseDateTime(argdt1);
                        DateTime dt2 = formatter.parseDateTime(argdt2);

                        Seconds retsec = org.joda.time.Seconds.secondsBetween(dt2, dt1);
                        return retsec.toString();

                 }
            },DataTypes.StringType);

            machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {

                @Override
                public void call(JavaRDD<String> rdd) {
                    if(!rdd.isEmpty()){

                        Dataset<Row> df = sqlctxt.jsonRDD(rdd);
                        df.createOrReplaceTempView("DeviceData");

                        // I DONT WANT to GROUP by timestamp, but query requires I pass it.

                        Dataset<Row> searchResult = sqlctxt.sql("select t1.DeviceId,t1.data.runstatus,"
                                + " custdatediff(CAST((t1.timestamp) as STRING),CAST((t2.timestamp) as STRING)) as duration from DeviceData t1"
                                + " join DeviceData t2 on t1.DeviceId = t2.DeviceId group by t1.DeviceId,t1.data.runstatus,t1.timestamp,t2.timestamp");

                        searchResult.show();

                    }
                }
            });

            jssc.start();

            jssc.awaitTermination();

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }


上面的代码/ sql执行的示例结果如下



+--------+---------+--------+
|DeviceId|runstatus|duration|
+--------+---------+--------+
| NTC-167|        2|    PT0S|
| NTC-168|        2|    PT0S|
| NTC-168|        2|  PT-10S|
| NTC-168|        2|  PT-15S|
| NTC-168|        1|   PT10S|
| NTC-168|        1|    PT0S|
| NTC-168|        1|   PT-5S|
| NTC-168|        1|   PT15S|
| NTC-168|        1|    PT5S|
| NTC-168|        1|    PT0S|
+--------+---------+--------+





因此,您可以看到状态正在重复,并且在重复的行中,其中之一具有正确的结果。我写的查询也迫使我按时间戳分组,我想我是否可以避免按时间戳分组结果可能是正确的……对此不确定。

最佳答案

您可以尝试使用数据框和窗口功能。使用窗口功能中的“引线”,您可以将当前行时间戳与下一行时间戳进行比较,并找到每个设备和运行状态的差异。
像下面一样

 val windowSpec_wk = Window.partitionBy(df1("DeviceID")).orderBy(df1("timestamp"))
 val df2 = df1.withColumn("period", lead(df1("timestamp"), 1).over(windowSpec_wk))

09-27 17:09