本文介绍了PySpark将列除以总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试将PySpark中的列除以它们各自的总和.我的数据框(此处仅使用一列)如下所示:
I am trying to divide columns in PySpark by their respective sums. My dataframe(using only one column here) looks like this:
event_rates = [[1,10.461016949152542], [2, 10.38953488372093], [3, 10.609418282548477]]
event_rates = spark.createDataFrame(event_rates, ['cluster_id','mean_encoded'])
event_rates.show()
+----------+------------------+
|cluster_id| mean_encoded|
+----------+------------------+
| 1|10.461016949152542|
| 2| 10.38953488372093|
| 3|10.609418282548477|
+----------+------------------+
我尝试了两种方法来执行此操作,但未能获得结果
I tried two methods to do this but have failed in getting results
from pyspark.sql.functions import sum as spark_sum
cols = event_rates.columns[1:]
for each in cols:
event_rates = event_rates.withColumn(each+"_scaled", event_rates[each]/spark_sum(event_rates[each]))
这给了我以下错误
org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`cluster_id`' is not an aggregate function. Wrap '((`mean_encoded` / sum(`mean_encoded`)) AS `mean_encoded_scaled`)' in windowing function(s) or wrap '`cluster_id`' in first() (or first_value) if you don't care which value you get.;;
Aggregate [cluster_id#22356L, mean_encoded#22357, (mean_encoded#22357 / sum(mean_encoded#22357)) AS mean_encoded_scaled#2
,并按照问题,我尝试了以下
and following the question here I tried the following
stats = (event_rates.agg([spark_sum(x).alias(x + '_sum') for x in cols]))
event_rates = event_rates.join(broadcast(stats))
exprs = [event_rates[x] / event_rates[event_rates + '_sum'] for x in cols]
event_rates.select(exprs)
但是第一行出现错误
AssertionError: all exprs should be Column
我如何解决这个问题?
推荐答案
这是有关如何将列mean_encoded
除以其总和的示例.您需要先对列求和,然后对crossJoin
求和回到前一个数据帧.然后,您可以将任意列除以其总和.
This is an example on how to divide column mean_encoded
by its sum. You need to sum the column first then crossJoin
back to the previous dataframe. Then, you can divide any column by its sum.
import pyspark.sql.functions as fn
from pyspark.sql.types import *
event_rates = event_rates.crossJoin(event_rates.groupby().agg(fn.sum('mean_encoded').alias('sum_mean_encoded')))
event_rates_div = event_rates.select('cluster_id',
'mean_encoded',
fn.col('mean_encoded') / fn.col('sum_mean_encoded'))
输出
+----------+------------------+---------------------------------+
|cluster_id| mean_encoded|(mean_encoded / sum_mean_encoded)|
+----------+------------------+---------------------------------+
| 1|10.461016949152542| 0.3325183371367686|
| 2| 10.38953488372093| 0.3302461777809474|
| 3|10.609418282548477| 0.3372354850822839|
+----------+------------------+---------------------------------+
这篇关于PySpark将列除以总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!