本文介绍了在pyspark Dataframe中找到10,000列的均值和corr的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有1万列和7000万行的DF.我想计算10K列的均值和corr.我做了下面的代码,但是由于代码大小为64K,它无法正常工作( https://issues. apache.org/jira/browse/SPARK-16845 )

I have DF with 10K columns and 70Million rows. I want to calculate the mean and corr of 10K columns. I did below code but it wont work due to code size 64K issue (https://issues.apache.org/jira/browse/SPARK-16845)

数据:

region dept week sal val1  val2  val3 ... val10000
 US    CS   1     1    2    1     1   ...  2
 US    CS   2     1.5  2    3     1   ...  2
 US    CS   3     1    2    2     2.1      2
 US    ELE  1     1.1  2    2     2.1      2
 US    ELE  2     2.1  2    2     2.1      2
 US    ELE  3     1    2    1     2   .... 2
 UE    CS   1     2    2    1     2   .... 2

代码:

aggList =  [func.mean(col) for col in df.columns]  #exclude keys
df2= df.groupBy('region', 'dept').agg(*aggList)

代码2

aggList =  [func.corr('sal', col).alias(col) for col in df.columns]  #exclude keys
df2  = df.groupBy('region', 'dept', 'week').agg(*aggList)

这失败.有没有其他方法可以克服此错误?有人尝试使用10K列的DF吗?关于性能改进有什么建议吗?

this fails. Is there any alternative way to overcome this bug? and any one tried DF with 10K columns?. Is there any suggestion on performance improvement?

推荐答案

我们还遇到了64KB问题,但是在where子句中,该子句在另一个bug报告下提交.我们用作解决方法的方法很简单,就是分几步进行操作/转换.

We also ran into the 64KB issue, but in a where clause, which is filed under another bug report. What we used as a workaround, is simply, to do the operations/transformations in several steps.

在您的情况下,这意味着您不会一步一步完成所有聚合.而是在外部操作中循环遍历相关列:

In your case, this would mean, that you don't do all the aggregatens in one step. Instead loop over the relevant columns in an outer operation:

  • 使用 select 来创建一个临时数据框,其中仅包含操作所需的列.
  • 像您一样使用groupByagg,除了不是用于聚合列表,而是仅用于on(或两个,可以将meancorr组合在一起.)
  • 收到对所有临时数据框的引用后,请使用 withColumn 将临时数据帧中的聚合列附加到结果df.
  • Use select to create a temporary dataframe, which just contains columns you need for the operation.
  • Use the groupBy and agg like you did, except not for a list of aggregations, but just for on (or two, you could combine the mean and corr.
  • After you received references to all temporary dataframes, use withColumn to append the aggregated columns from the temporary dataframes to a result df.

由于对Spark DAG的懒惰评估,这当然比一次操作要慢.但是它应该一次运行就可以评估整个分析.

Due to the lazy evaluation of a Spark DAG, this is of course slower as doing it in one operation. But it should evaluate the whole analysis in one run.

这篇关于在pyspark Dataframe中找到10,000列的均值和corr的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-11 15:21