This question already has an answer here:
DataFrame / Dataset groupBy behaviour/optimization
(1个答案)
2年前关闭。
我用group by和sum函数编写了pyspark代码。我觉得表现会受到分组的影响。相反,我想使用reducebykey。但是我是这个领域的新手。请在下面找到我的情况,
步骤1:通过sqlcontext读取Hive表联接查询数据并存储在DataFrame中
步骤2:输入列的总数为15。其中5个是键字段,其余为数字值。
第3步:除了上面的输入列外,还需要从数字列中派生更多列。很少有默认值的列。
步骤4:我使用了分组和和功能。如何使用map和reducebykey选项的spark方法执行类似的逻辑。
(1个答案)
2年前关闭。
我用group by和sum函数编写了pyspark代码。我觉得表现会受到分组的影响。相反,我想使用reducebykey。但是我是这个领域的新手。请在下面找到我的情况,
步骤1:通过sqlcontext读取Hive表联接查询数据并存储在DataFrame中
步骤2:输入列的总数为15。其中5个是键字段,其余为数字值。
第3步:除了上面的输入列外,还需要从数字列中派生更多列。很少有默认值的列。
步骤4:我使用了分组和和功能。如何使用map和reducebykey选项的spark方法执行类似的逻辑。
from pyspark.sql.functions import col, when, lit, concat, round, sum
#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])
#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
withColumn("col6", col6).\
withColumn("col7", col7)
#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
withColumn("col6", col9).\
withColumn("col7", col10)
#final dataframe
final_df = df1.union(df2)
final_df.show()
#groupBy calculation
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum
#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])
#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
withColumn("col6", col6).\
withColumn("col7", col7)
#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
withColumn("col6", col9).\
withColumn("col7", col10)
#final dataframe
final_df = df1.union(df2)
final_df.show()
#groupBy calculation
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show()
最佳答案
Spark SQL中没有reduceByKey
。groupBy
+聚合函数将与RDD.reduceByKey几乎相同。 Spark会自动选择是类似于RDD.groupByKey
(即collect_list)还是RDD.reduceByKey
Dataset.groupBy +聚合函数的性能应优于或等于RDD.reduceByKey。 Catalyst Optimizer会注意如何在后台进行聚合
关于python - 如何在pyspark数据帧中将groupby转换为reducebykey? ,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46331077/
10-11 01:29