本文介绍了窗口函数不适用于 Pyspark sqlcontext的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,我想将数据汇总到 7 天,并对某些函数进行一些聚合.

I have a data frame and I want to roll up the data into 7days and do some aggregation on some of the function.

我有一个 pyspark sql 数据框,例如 ------

I have a pyspark sql dataframe like ------

Sale_Date|P_1|P_2|P_3|G_1|G_2|G_3|Total_Sale|Sale_Amt|Promo_Disc_Amt  |

|2013-04-10| 1| 9| 1| 1| 1| 1| 1| 295.0|0.0|
|2013-04-11| 1| 9| 1| 1| 1| 1| 3| 567.0|0.0|
|2013-04-12| 1| 9| 1| 1| 1| 1| 2| 500.0|200.0|
|2013-04-13| 1| 9| 1| 1| 1| 1| 1| 245.0|20.0|
|2013-04-14| 1| 9| 1| 1| 1| 1| 1| 245.0|0.0|
|2013-04-15| 1| 9| 1| 1| 1| 1| 2| 500.0|200.0|
|2013-04-16| 1| 9| 1| 1| 1| 1| 1| 250.0|0.0|

我在数据框上应用了一个窗口函数,如下所示 -

I have applied a window function over the data frame as follows -

days = lambda i: i * 86400
windowSp = Window().partitionBy(dataframeOfquery3["P_1"],dataframeOfquery3["P_2"],dataframeOfquery3["P_3"],dataframeOfquery3["G_1"],dataframeOfquery3["G_2"],dataframeOfquery3["G_3"])\
          .orderBy(dataframeOfquery3["Sale_Date"].cast("timestamp").cast("long").desc())\
          .rangeBetween(-(days(7)), 0)

现在我想执行一些聚合,即应用一些像下面这样的 Windows 函数 --

Now I want to perform some aggregation i.e. applying some windows functions like the following --

df = dataframeOfquery3.select(min(dataframeOfquery3["Sale_Date"].over(windowSp).alias("Sale_Date")))
df.show()

但它给出了以下错误.

py4j.protocol.Py4JJavaError: An error occurred while calling o138.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'min'. Note that, using window functions currently requires a HiveContext;

我使用的是基于 Hadoop 的 Apache Spark 1.6.0 预构建版.

I am using Apache Spark 1.6.0 Pre-built on Hadoop.

推荐答案

错误说明了一切:

py4j.protocol.Py4JJavaError: An error occurred while calling o138.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'min'. Note that, using window functions currently requires a HiveContext;

您需要一个支持 hive(使用 hive 构建)的 spark 版本,而不是您可以声明 hivecontext :

You'll need a version of spark that supports hive (build with hive) than you can declare a hivecontext :

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

然后使用该上下文来执行您的窗口函数.

and then use that context to perform your window function.

在python中:

# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

您可以进一步了解 SQLContextHiveContext 之间的区别 此处.

You can read further about the difference between SQLContextand HiveContext here.

SparkSQL 有一个 SQLContext 和一个 HiveContext.HiveContext 是 SQLContext 的超集.Spark 社区建议使用 HiveContext.您可以看到,当您运行 spark-shell(您的交互式驱动程序应用程序)时,它会自动创建一个定义为 sc 的 SparkContext 和一个定义为 sqlContext 的 HiveContext.HiveContext 允许您执行 SQL 查询以及 Hive 命令.pyspark 也会发生相同的行为.

SparkSQL has a SQLContext and a HiveContext. HiveContext is a super set of the SQLContext. The Spark community suggest using the HiveContext. You can see that when you run spark-shell, which is your interactive driver application, it automatically creates a SparkContext defined as sc and a HiveContext defined as sqlContext. The HiveContext allows you to execute SQL queries as well as Hive commands. The same behavior occurs for pyspark.

这篇关于窗口函数不适用于 Pyspark sqlcontext的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-28 22:49