有没有可能在Pyspark中一次为不同的色谱柱做枢轴?
我有一个这样的数据框:

sdf = spark.createDataFrame(
    pd.DataFrame([[1,2,6,1],[1,3,3,2],[1,6,0,3],[2,1,0,1],
        [2,1,7,2],[2,7,8,3]], columns = ['id','val1','val2','month'])
)
+----+------+------+-------+
| id | val1 | val2 | month |
+----+------+------+-------+
|  1 |   2  |   6  |   1   |
|  1 |   3  |   3  |   2   |
|  1 |   6  |   0  |   3   |
|  2 |   1  |   0  |   1   |
|  2 |   1  |   7  |   2   |
|  2 |   7  |   8  |   3   |
+----+------+------+-------+


我想在多个列(val1,val2,...)上旋转此数据框,以使其具有如下所示的数据框:

+----+-------------+-------------+-------------+-------------+-------------+-------------+
| id | val1_month1 | val1_month2 | val1_month3 | val2_month1 | val2_month2 | val2_month3 |
+----+-------------+-------------+-------------+-------------+-------------+-------------+
|  1 |           2 |           3 |           6 |           6 |           3 |           0 |
|  2 |           1 |           1 |           7 |           0 |           7 |           8 |
+----+-------------+-------------+-------------+-------------+-------------+-------------+


我找到了一种适用于硬编码列的解决方案(请参见下文),但我正在寻找一种可以动态采用val1,val2等的解决方案。

sdf_pivot = (
    sdf
    .groupby('id')
    .pivot('month')
    .agg(sf.mean('val1'),sf.mean('val2'))
)


像这样的东西,但不幸的是这不起作用...

col_to_pivot = ['val1','val2']
sdf_pivot = (
    sdf
    .groupby('id')
    .pivot('month')
    .agg(sf.mean(col_to_pivot))
)


非常感谢!

最佳答案

IIUC,您可以使用列表理解:

newdf = sdf.groupby('id').pivot('month').agg(*[ sf.mean(c).alias(c) for c in col_to_pivot ])
#+---+------+------+------+------+------+------+
#| id|1_val1|1_val2|2_val1|2_val2|3_val1|3_val2|
#+---+------+------+------+------+------+------+
#|  1|     2|     6|     3|     3|     6|     0|
#|  2|     1|     0|     1|     7|     7|     8|
#+---+------+------+------+------+------+------+

col_names = [ '{}_month{}'.format(x[1],x[0]) if len(x)>1 else x[0] for c in newdf.columns for x in [c.split('_')] ]
#['id',
# 'val1_month1',
# 'val2_month1',
# 'val1_month2',
# 'val2_month2',
# 'val1_month3',
# 'val2_month3']

newdf = newdf.toDF(*col_names)

关于python - 如何在Pyspark中使用动态列旋转表,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58307217/

10-12 03:22