问题描述
我有一个名为df
的PySpark DataFrame(不是熊猫),它对于使用collect()
来说是很大的.因此,下面给出的代码效率不高.它正在处理少量数据,但是现在失败了.
I have PySpark DataFrame (not pandas) called df
that is quite large to use collect()
. Therefore the below-given code is not efficient. It was working with a smaller amount of data, however now it fails.
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
for p2,score in nb:
total.append(score)
mean = np.mean(total)
std = np.std(total)
是否可以通过使用pyspark.sql.functions
或类似方法将mean
和std
作为两个变量获取?
Is there any way to get mean
and std
as two variables by using pyspark.sql.functions
or similar?
from pyspark.sql.functions import mean as mean_, std as std_
我可以使用withColumn
,但是,这种方法逐行应用计算,并且不会返回单个变量.
I could use withColumn
, however, this approach applies the calculations row by row, and it does not return a single variable.
更新:
df
的示例内容:
+----------+------------------+
|product_PK| products|
+----------+------------------+
| 680|[[691,1], [692,5]]|
| 685|[[691,2], [692,2]]|
| 684|[[691,1], [692,3]]|
我应该计算score
值的平均值和标准偏差,例如[691,1]
中的1
值是分数之一.
I should calculate mean and standard deviation of score
values, e.g. the value 1
in [691,1]
is one of scores.
推荐答案
您可以使用内置函数来获取汇总统计信息.这是获取均值和标准差的方法.
You can use the built in functions to get aggregate statistics. Here's how to get mean and standard deviation.
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
df_stats = df.select(
_mean(col('columnName')).alias('mean'),
_stddev(col('columnName')).alias('std')
).collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
请注意,存在三种不同的标准偏差函数.从文档中,我使用的文档(stddev
)返回以下内容:
Note that there are three different standard deviation functions. From the docs the one I used (stddev
) returns the following:
您也可以使用describe()
方法:
df.describe().show()
有关更多信息,请参考此链接: pyspark.sql.functions
Refer to this link for more info: pyspark.sql.functions
更新:这是处理嵌套数据的方式.
UPDATE: This is how you can work through the nested data.
使用explode
将值提取到单独的行中,然后调用mean
和stddev
,如上所示.
Use explode
to extract the values into separate rows, then call mean
and stddev
as shown above.
这是MWE:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev
# mock up sample dataframe
df = sqlCtx.createDataFrame(
[(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
["product_PK", "products"]
)
# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())
# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
.withColumn('score', get_score(col('exploded')))\
.select(
_mean(col('score')).alias('mean'),
_stddev(col('score')).alias('std')
)\
.collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
print([mean, std])
哪个输出:
[2.3333333333333335, 1.505545305418162]
您可以使用numpy
验证这些值正确:
You can verify that these values are correct using numpy
:
vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])
说明:您的"products"
列是list
的list
.调用explode
将为外部list
的每个元素添加一个新行.然后从每个爆炸行中获取"score"
值,这些行已定义为2元素list
中的第二个元素.最后,在此新列上调用聚合函数.
Explanation: Your "products"
column is a list
of list
s. Calling explode
will make a new row for each element of the outer list
. Then grab the "score"
value from each of the exploded rows, which you have defined as the second element in a 2-element list
. Finally, call the aggregate functions on this new column.
这篇关于在给定PySpark DataFrame的情况下,如何计算均值和标准差?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!