问题描述
如果我们有一个由一列类别和一列值组成的Pandas数据框,则可以通过执行以下操作来删除每个类别中的均值:
If we have a Pandas data frame consisting of a column of categories and a column of values, we can remove the mean in each category by doing the following:
df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))
据我了解,Spark数据帧不直接提供此按组/转换操作(我在Spark 1.5.0上使用PySpark).那么,实现此计算的最佳方法是什么?
As far as I understand, Spark dataframes do not directly offer this group-by/transform operation (I am using PySpark on Spark 1.5.0). So, what is the best way to implement this computation?
我尝试使用分组方式/加入方式,如下所示:
I have tried using a group-by/join as follows:
df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)
但是它非常慢,因为据我所知,每个类别都需要对DataFrame进行全面扫描.
But it is very slow since, as I understand, each category requires a full scan of the DataFrame.
我认为(但尚未验证)如果将分组依据/均值"的结果收集到字典中,然后按如下所示在UDF中使用该字典,则可以大大提高这一速度:
I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF as follows:
nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))
是否有一种惯用的方式来表达这种类型的操作而不牺牲性能?
Is there an idiomatic way to express this type of operation without sacrificing performance?
推荐答案
不,不是.使用类似于aggregateByKey
的逻辑执行DataFrame聚合.请参见 DataFrame组通过行为/优化较慢的部分是join
,它需要进行排序/改组.但这仍然不需要按组扫描.
No it doesn't. DataFrame aggregations are performed using a logic similar to aggregateByKey
. See DataFrame groupBy behaviour/optimization A slower part is join
which requires sorting / shuffling. But it still doesn't require scan per group.
如果您使用的是准确的代码,则由于未提供联接表达式,因此使用起来很慢.因此,它仅执行笛卡尔积.因此,这不仅效率低下,而且不正确.您想要这样的东西:
If this is an exact code you use it is slow because you don't provide a join expression. Because of that it simply performs a Cartesian product. So it is not only inefficient but also incorrect. You want something like this:
from pyspark.sql.functions import col
means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))
这是可能的,尽管性能会根据具体情况而有所不同.使用Python UDF的一个问题是它必须在Python之间来回移动数据.不过,绝对值得尝试.不过,您应该考虑为nameToMean
使用广播变量.
It is possible although performance will vary on case by case basis. A problem with using Python UDFs is that it has to move data to and from Python. Still, it is definitely worth trying. You should consider using a broadcast variable for nameToMean
though.
在PySpark 1.6中,您可以使用broadcast
功能:
In PySpark 1.6 you can use broadcast
function:
df.alias("df").join(
broadcast(means), col("df.Category") == col("means.Category"))
,但在< = 1.5中不可用.
but it is not available in <= 1.5.
这篇关于PySpark DataFrame上分组数据的 pandas 式转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!