本文介绍了PySpark DataFrame上分组数据的 pandas 式转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我们有一个由一列类别和一列值组成的Pandas数据框,则可以通过执行以下操作来删除每个类别中的均值:

If we have a Pandas data frame consisting of a column of categories and a column of values, we can remove the mean in each category by doing the following:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

据我了解,Spark数据帧不直接提供此按组/转换操作(我在Spark 1.5.0上使用PySpark).那么,实现此计算的最佳方法是什么?

As far as I understand, Spark dataframes do not directly offer this group-by/transform operation (I am using PySpark on Spark 1.5.0). So, what is the best way to implement this computation?

我尝试使用分组方式/加入方式,如下所示:

I have tried using a group-by/join as follows:

df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

但是它非常慢,因为据我所知,每个类别都需要对DataFrame进行全面扫描.

But it is very slow since, as I understand, each category requires a full scan of the DataFrame.

我认为(但尚未验证)如果将分组依据/均值"的结果收集到字典中,然后按如下所示在UDF中使用该字典,则可以大大提高这一速度:

I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF as follows:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

是否有一种惯用的方式来表达这种类型的操作而不牺牲性能?

Is there an idiomatic way to express this type of operation without sacrificing performance?

推荐答案

不,不是.使用类似于aggregateByKey的逻辑执行DataFrame聚合.请参见 DataFrame组通过行为/优化较慢的部分是join,它需要进行排序/改组.但这仍然不需要按组扫描.

No it doesn't. DataFrame aggregations are performed using a logic similar to aggregateByKey. See DataFrame groupBy behaviour/optimization A slower part is join which requires sorting / shuffling. But it still doesn't require scan per group.

如果您使用的是准确的代码,则由于未提供联接表达式,因此使用起来很慢.因此,它仅执行笛卡尔积.因此,这不仅效率低下,而且不正确.您想要这样的东西:

If this is an exact code you use it is slow because you don't provide a join expression. Because of that it simply performs a Cartesian product. So it is not only inefficient but also incorrect. You want something like this:

from pyspark.sql.functions import col

means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))

这是可能的,尽管性能会根据具体情况而有所不同.使用Python UDF的一个问题是它必须在Python之间来回移动数据.不过,绝对值得尝试.不过,您应该考虑为nameToMean使用广播变量.

It is possible although performance will vary on case by case basis. A problem with using Python UDFs is that it has to move data to and from Python. Still, it is definitely worth trying. You should consider using a broadcast variable for nameToMean though.

在PySpark 1.6中,您可以使用broadcast功能:

In PySpark 1.6 you can use broadcast function:

df.alias("df").join(
    broadcast(means), col("df.Category") == col("means.Category"))

,但在< = 1.5中不可用.

but it is not available in <= 1.5.

这篇关于PySpark DataFrame上分组数据的 pandas 式转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-11 13:22