问题描述
我需要在 pyspark 数据框中旋转多于一列.示例数据框,
>>>d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]>>>mydf = spark.createDataFrame(d,['id','day','price','units'])>>>mydf.show()+---+---+-----+-----+|id|天|价格|单位|+---+---+-----+-----+|100|1|23|10||100|2|45|11||100|3|67|12||100|4|78|13||101|1|23|10||101|2|45|13||101|3|67|14||101|4|78|15||102|1|23|10||102|2|45|11||102|3|67|16||102|4|78|18|+---+---+-----+-----+
现在,如果我需要根据日期将每个 id 的价格列放入一行,那么我可以使用数据透视方法,
>>>pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.第一('价格'))>>>pvtdf.show()+---+-------+-------+-------+-------+|ID|价格_1|价格_2|价格_3|价格_4|+---+-------+-------+-------+-------+|100|23|45|67|78||101|23|45|67|78||102|23|45|67|78|+---+-------+-------+-------+-------+因此,当我需要将单位列也作为价格转置时,我必须像上面一样为单位再创建一个数据框,然后使用 id 将两者连接起来.但是,当我有更多列时,我尝试了一个函数去做,
>>>def pivot_udf(df,*cols):... mydf = df.select('id').drop_duplicates()...对于 c 中的 cols:... mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')...返回mydf...>>>pivot_udf(mydf,'price','units').show()+---+-------+-------+-------+-------+-------+-------+-------+-------+|id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|+---+-------+-------+-------+-------+-------+-------+-------+-------+|100|23|45|67|78|10|11|12|13||101|23|45|67|78|10|13|14|15||102|23|45|67|78|10|11|16|18|+---+-------+-------+-------+-------+-------+-------+-------+-------+需要建议,如果这样做是好的做法,以及是否有其他更好的方法.提前致谢!
这是一种非 UDF 方式,涉及单个数据透视表(因此,只需单列扫描即可识别所有唯一日期).
dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))
结果如下(对不匹配的排序和命名表示歉意):
+---+-------+------+-------+------+-------+------+-------+------+|id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|+---+-------+------+-------+------+-------+------+-------+------+|100|23|10|45|11|67|12|78|13||101|23|10|45|13|67|14|78|15||102|23|10|45|11|67|16|78|18|+---+-------+------+-------+------+-------+------+-------+------+
我们只是在当天旋转后在 price
和 unit
列上聚合.
如果需要命名,
dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()+---+-------+------+-------+------+-------+------+-------+------+|id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|+---+-------+------+-------+------+-------+------+-------+------+|100|23|10|45|11|67|12|78|13||101|23|10|45|13|67|14|78|15||102|23|10|45|11|67|16|78|18|+---+-------+------+-------+------+-------+------+-------+------+
I need to pivot more than one column in a pyspark dataframe. Sample dataframe,
>>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
>>> mydf = spark.createDataFrame(d,['id','day','price','units'])
>>> mydf.show()
+---+---+-----+-----+
| id|day|price|units|
+---+---+-----+-----+
|100| 1| 23| 10|
|100| 2| 45| 11|
|100| 3| 67| 12|
|100| 4| 78| 13|
|101| 1| 23| 10|
|101| 2| 45| 13|
|101| 3| 67| 14|
|101| 4| 78| 15|
|102| 1| 23| 10|
|102| 2| 45| 11|
|102| 3| 67| 16|
|102| 4| 78| 18|
+---+---+-----+-----+
Now,if I need to get price column into a row for each id based on day, then I can use pivot method as,
>>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price'))
>>> pvtdf.show()
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|100| 23| 45| 67| 78|
|101| 23| 45| 67| 78|
|102| 23| 45| 67| 78|
+---+-------+-------+-------+-------+
so when I need units column as well to be transposed as price, either I got to create one more dataframe as above for units and then join both using id.But, when I have more columns as such, I tried a function to do it,
>>> def pivot_udf(df,*cols):
... mydf = df.select('id').drop_duplicates()
... for c in cols:
... mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')
... return mydf
...
>>> pivot_udf(mydf,'price','units').show()
+---+-------+-------+-------+-------+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
|100| 23| 45| 67| 78| 10| 11| 12| 13|
|101| 23| 45| 67| 78| 10| 13| 14| 15|
|102| 23| 45| 67| 78| 10| 11| 16| 18|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
Need suggestions on ,if it is good practice to do so and if any other better way of doing it. Thanks in advance!
Here's a non-UDF way involving a single pivot (hence, just a single column scan to identify all the unique dates).
dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))
Here's the result (apologies for the non-matching ordering and naming):
+---+-------+------+-------+------+-------+------+-------+------+
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+
We just aggregate both on the price
and the unit
column after pivoting on the day.
If naming required as in question,
dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()
+---+-------+------+-------+------+-------+------+-------+------+
| id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+
这篇关于如何在 Spark SQL 中透视多列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!