本文介绍了Spark-流式数据帧/数据集不支持基于非时间的窗口;的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要编写带有内部select和partition by的Spark sql查询.问题是我有AnalysisException.我已经花了几个小时,但是用其他方法却没有成功.

I need to write Spark sql query with inner select and partition by. Problem is that I have AnalysisException.I already spend few hours on this but with other approach I have no success.

例外:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]

我认为这太复杂了,无法实现这样的查询.但是我不知道要解决它.

I assume that this is too complicated query to implement like this. But i don't know to to fix it.

 SparkSession spark = SparkUtils.getSparkSession("RawModel");

 Dataset<RawModel> datasetMap = readFromKafka(spark);

 datasetMap.registerTempTable("test");

 Dataset<Row> res = datasetMap.sqlContext().sql("" +
                " select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
                " from (select test.*,  sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
                "  from test " +
                "  ) test " +
                " group by deviceid, grp ");

任何建议将不胜感激.谢谢.

Any suggestion would be very appreciated.Thank you.

推荐答案

我认为问题出在 windowing 规范中:

I believe the issue is in the windowing specification:

over (partition by deviceId order by timestamp)

分区需要位于基于时间的列上-在您的情况下为 timestamp .以下应该可以工作:

The partition would need to be over a time based column - in your case timestamp . The following should work:

over (partition by timestamp order by timestamp)

那当然不会解决查询的意图.可以尝试以下操作:但是不清楚spark是否会支持它:

That will not of course address the intent of your query. The following might be attempted: but it is unclear whether spark would support it:

over (partition by timestamp, deviceId order by timestamp)

即使spark 支持,它仍然会更改查询的语义.

Even if spark does support that it would still change the semantics of your query.

更新

Update

这里是权威消息来源: Tathagata Das 火花流的关键/核心提交者: http://apache -spark-user-list.1001560.n3.nabble.com/按工作区划分和仅按状态分配情况td31816.html

Here is a definitive source: from Tathagata Das who is a key/core committer on spark streaming: http://apache-spark-user-list.1001560.n3.nabble.com/Does-partition-by-and-order-by-works-only-in-stateful-case-td31816.html

这篇关于Spark-流式数据帧/数据集不支持基于非时间的窗口;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:23