我有以下来自Rabbit MQ的JSON数据
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:30","data":{"RunStatus":1"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:40","data":{"RunStatus":2"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":1"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:55","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:22:00","data":{"RunStatus":2"}}
我正在尝试获取设备所在的每个RunStatus的持续时间,因此对于上面的数据说,对于Device-MACH-101,RunStatus看起来像这样
在Runstatus 1中,设备处于-5秒钟(30-35)
在Runstatus 2中,设备处于-5秒(40-45)
在Runstatus 3中,设备处于-10秒(35-40 + 45-50)
上面相同的逻辑也适用于第二设备数据。
以下是我正在尝试的Apache Spark SQL查询,但未获得所需的结果。请提出一些替代方案;我也不介意以非SQL方式进行操作。
public static void main(String[] args) {
try {
mconf = new SparkConf();
mconf.setAppName("RabbitMqReceiver");
mconf.setMaster("local[*]");
jssc = new JavaStreamingContext(mconf,Durations.seconds(10));
SparkSession spksess = SparkSession
.builder()
.master("local[*]")
.appName("RabbitMqReceiver2")
.getOrCreate();
SQLContext sqlctxt = new SQLContext(spksess);
JavaDStream<String> strmData = jssc.receiverStream(new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
JavaDStream<String> machineData = strmData.window(Durations.minutes(1),Durations.seconds(10));
sqlctxt.udf().register("custdatediff", new UDF2<String, String, String>() {
@Override public String call(String argdt1,String argdt2) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd-MM-yyyy HH:mm:ss");
DateTime dt1 = formatter.parseDateTime(argdt1);
DateTime dt2 = formatter.parseDateTime(argdt2);
Seconds retsec = org.joda.time.Seconds.secondsBetween(dt2, dt1);
return retsec.toString();
}
},DataTypes.StringType);
machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> df = sqlctxt.jsonRDD(rdd);
df.createOrReplaceTempView("DeviceData");
// I DONT WANT to GROUP by timestamp, but query requires I pass it.
Dataset<Row> searchResult = sqlctxt.sql("select t1.DeviceId,t1.data.runstatus,"
+ " custdatediff(CAST((t1.timestamp) as STRING),CAST((t2.timestamp) as STRING)) as duration from DeviceData t1"
+ " join DeviceData t2 on t1.DeviceId = t2.DeviceId group by t1.DeviceId,t1.data.runstatus,t1.timestamp,t2.timestamp");
searchResult.show();
}
}
});
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
上面的代码/ sql执行的示例结果如下
+--------+---------+--------+
|DeviceId|runstatus|duration|
+--------+---------+--------+
| NTC-167| 2| PT0S|
| NTC-168| 2| PT0S|
| NTC-168| 2| PT-10S|
| NTC-168| 2| PT-15S|
| NTC-168| 1| PT10S|
| NTC-168| 1| PT0S|
| NTC-168| 1| PT-5S|
| NTC-168| 1| PT15S|
| NTC-168| 1| PT5S|
| NTC-168| 1| PT0S|
+--------+---------+--------+
因此,您可以看到状态正在重复,并且在重复的行中,其中之一具有正确的结果。我写的查询也迫使我按时间戳分组,我想我是否可以避免按时间戳分组结果可能是正确的……对此不确定。
最佳答案
您可以尝试使用数据框和窗口功能。使用窗口功能中的“引线”,您可以将当前行时间戳与下一行时间戳进行比较,并找到每个设备和运行状态的差异。
像下面一样
val windowSpec_wk = Window.partitionBy(df1("DeviceID")).orderBy(df1("timestamp"))
val df2 = df1.withColumn("period", lead(df1("timestamp"), 1).over(windowSpec_wk))